Skip to content

Commit ba3df7e

Browse files
committed
Reduce Common Usage between APOC Extended and APOC Core
1 parent c7102fd commit ba3df7e

File tree

63 files changed

+319
-181
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

63 files changed

+319
-181
lines changed

common/src/main/java/apoc/ApocConfig.java

Lines changed: 42 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
*/
1919
package apoc;
2020

21-
import static apoc.util.FileUtils.isFile;
2221
import static java.lang.String.format;
2322
import static org.neo4j.configuration.BootloaderSettings.lib_directory;
2423
import static org.neo4j.configuration.BootloaderSettings.run_directory;
@@ -35,6 +34,8 @@
3534
import java.io.File;
3635
import java.io.IOException;
3736
import java.lang.reflect.Field;
37+
import java.net.MalformedURLException;
38+
import java.net.URI;
3839
import java.net.URL;
3940
import java.nio.file.Path;
4041
import java.time.Duration;
@@ -44,6 +45,10 @@
4445
import java.util.List;
4546
import java.util.Map;
4647
import java.util.stream.Stream;
48+
49+
import apoc.util.FileUtils;
50+
import apoc.util.SupportedProtocols;
51+
import apoc.util.Util;
4752
import org.apache.commons.configuration2.CombinedConfiguration;
4853
import org.apache.commons.configuration2.Configuration;
4954
import org.apache.commons.configuration2.EnvironmentConfiguration;
@@ -336,6 +341,42 @@ public void checkWriteAllowed(ExportConfig exportConfig, String fileName) {
336341
}
337342
}
338343
}
344+
345+
public static boolean isFile(String fileName) {
346+
return from(fileName) == SupportedProtocols.file;
347+
}
348+
349+
350+
public static SupportedProtocols from(URL url) {
351+
return FileUtils.of(url.getProtocol());
352+
}
353+
354+
public static SupportedProtocols from(String source) {
355+
try {
356+
final URL url = new URL(source);
357+
return from(url);
358+
} catch (MalformedURLException e) {
359+
if (!e.getMessage().contains("no protocol")) {
360+
try {
361+
// in case new URL(source) throw e.g. unknown protocol: hdfs, because of missing jar,
362+
// we retrieve the related enum and throw the associated MissingDependencyException(..)
363+
// otherwise we return unknown protocol: yyyyy
364+
return SupportedProtocols.valueOf(new URI(source).getScheme());
365+
} catch (Exception ignored) {
366+
}
367+
368+
// in case a Windows user write an url like `C:/User/...`
369+
if (e.getMessage().contains("unknown protocol") && Util.isWindows()) {
370+
throw new RuntimeException(e.getMessage()
371+
+ "\n Please note that for Windows absolute paths they have to be explicit by prepending `file:` or supplied without the drive, "
372+
+ "\n e.g. `file:C:/my/path/file` or `/my/path/file`, instead of `C:/my/path/file`");
373+
}
374+
throw new RuntimeException(e);
375+
}
376+
return SupportedProtocols.file;
377+
}
378+
}
379+
339380

340381
public static ApocConfig apocConfig() {
341382
return theInstance;

common/src/main/java/apoc/result/VirtualPath.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,15 @@
1818
*/
1919
package apoc.result;
2020

21-
import apoc.util.CollectionUtils;
2221
import java.util.ArrayList;
2322
import java.util.Arrays;
2423
import java.util.Iterator;
2524
import java.util.List;
2625
import java.util.Objects;
2726
import java.util.concurrent.atomic.AtomicReference;
2827
import javax.annotation.Nonnull;
28+
29+
import apoc.util.Util;
2930
import org.neo4j.graphdb.Entity;
3031
import org.neo4j.graphdb.Node;
3132
import org.neo4j.graphdb.Path;
@@ -131,7 +132,7 @@ public String toString() {
131132

132133
private void requireConnected(Relationship relationship) {
133134
final List<Node> previousNodes = getPreviousNodes();
134-
boolean isRelConnectedToPrevious = CollectionUtils.containsAny(previousNodes, relationship.getNodes());
135+
boolean isRelConnectedToPrevious = Util.containsAny(previousNodes, relationship.getNodes());
135136
if (!isRelConnectedToPrevious) {
136137
throw new IllegalArgumentException("Relationship is not part of current path.");
137138
}

common/src/main/java/apoc/util/FileUtils.java

Lines changed: 152 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@
2020

2121
import static apoc.ApocConfig.APOC_IMPORT_FILE_ALLOW__READ__FROM__FILESYSTEM;
2222
import static apoc.ApocConfig.apocConfig;
23+
import static apoc.export.util.LimitedSizeInputStream.toLimitedIStream;
2324
import static apoc.util.Util.ERROR_BYTES_OR_STRING;
2425
import static apoc.util.Util.REDIRECT_LIMIT;
25-
import static apoc.util.Util.readHttpInputStream;
26+
import static apoc.util.Util.isRedirect;
2627

2728
import apoc.ApocConfig;
2829
import apoc.export.util.CountingInputStream;
@@ -32,22 +33,33 @@
3233
import apoc.util.s3.S3URLConnection;
3334
import apoc.util.s3.S3UploadUtils;
3435
import java.io.BufferedOutputStream;
36+
import java.io.BufferedWriter;
37+
import java.io.ByteArrayInputStream;
3538
import java.io.File;
3639
import java.io.FileOutputStream;
3740
import java.io.IOException;
41+
import java.io.InputStream;
3842
import java.io.OutputStream;
43+
import java.io.OutputStreamWriter;
44+
import java.io.StringWriter;
45+
import java.net.HttpURLConnection;
3946
import java.net.MalformedURLException;
4047
import java.net.URI;
4148
import java.net.URISyntaxException;
4249
import java.net.URL;
50+
import java.net.URLConnection;
4351
import java.net.URLStreamHandler;
4452
import java.net.URLStreamHandlerFactory;
4553
import java.nio.file.NoSuchFileException;
4654
import java.nio.file.Path;
4755
import java.nio.file.Paths;
4856
import java.util.Map;
4957
import java.util.Optional;
58+
59+
import org.apache.commons.compress.archivers.ArchiveEntry;
60+
import org.apache.commons.compress.archivers.ArchiveInputStream;
5061
import org.apache.commons.io.FilenameUtils;
62+
import org.apache.commons.io.IOUtils;
5163
import org.apache.commons.lang3.StringUtils;
5264
import org.neo4j.graphdb.security.URLAccessChecker;
5365
import org.neo4j.graphdb.security.URLAccessValidationError;
@@ -187,7 +199,7 @@ public static CountingInputStream inputStreamFor(
187199
if (input instanceof String) {
188200
String fileName = (String) input;
189201
fileName = changeFileUrlIfImportDirectoryConstrained(fileName, urlAccessChecker);
190-
return Util.openInputStream(fileName, headers, payload, compressionAlgo, urlAccessChecker);
202+
return FileUtils.openInputStream(fileName, headers, payload, compressionAlgo, urlAccessChecker);
191203
} else if (input instanceof byte[]) {
192204
return getInputStreamFromBinary((byte[]) input, compressionAlgo);
193205
} else {
@@ -345,4 +357,142 @@ public static File getLogDirectory() {
345357
public static CountingInputStream getInputStreamFromBinary(byte[] urlOrBinary, String compressionAlgo) {
346358
return CompressionAlgo.valueOf(compressionAlgo).toInputStream(urlOrBinary);
347359
}
360+
361+
public static StreamConnection readHttpInputStream(
362+
String urlAddress,
363+
Map<String, Object> headers,
364+
String payload,
365+
int redirectLimit,
366+
URLAccessChecker urlAccessChecker)
367+
throws IOException {
368+
URL url = ApocConfig.apocConfig().checkAllowedUrlAndPinToIP(urlAddress, urlAccessChecker);
369+
URLConnection con = openUrlConnection(url, headers);
370+
writePayload(con, payload);
371+
String newUrl = handleRedirect(con, urlAddress);
372+
if (newUrl != null && !urlAddress.equals(newUrl)) {
373+
con.getInputStream().close();
374+
if (redirectLimit == 0) {
375+
throw new IOException("Redirect limit exceeded");
376+
}
377+
return readHttpInputStream(newUrl, headers, payload, --redirectLimit, urlAccessChecker);
378+
}
379+
380+
return new StreamConnection.UrlStreamConnection(con);
381+
}
382+
383+
public static URLConnection openUrlConnection(URL src, Map<String, Object> headers) throws IOException {
384+
URLConnection con = src.openConnection();
385+
con.setRequestProperty("User-Agent", "APOC Procedures for Neo4j");
386+
if (con instanceof HttpURLConnection) {
387+
HttpURLConnection http = (HttpURLConnection) con;
388+
http.setInstanceFollowRedirects(false);
389+
if (headers != null) {
390+
Object method = headers.get("method");
391+
if (method != null) {
392+
http.setRequestMethod(method.toString());
393+
http.setChunkedStreamingMode(1024 * 1024);
394+
}
395+
headers.forEach((k, v) -> con.setRequestProperty(k, v == null ? "" : v.toString()));
396+
}
397+
}
398+
399+
con.setConnectTimeout(apocConfig().getInt("apoc.http.timeout.connect", 10_000));
400+
con.setReadTimeout(apocConfig().getInt("apoc.http.timeout.read", 60_000));
401+
return con;
402+
}
403+
404+
private static void writePayload(URLConnection con, String payload) throws IOException {
405+
if (payload == null) return;
406+
con.setDoOutput(true);
407+
BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(con.getOutputStream(), "UTF-8"));
408+
writer.write(payload);
409+
writer.close();
410+
}
411+
412+
private static String handleRedirect(URLConnection con, String url) throws IOException {
413+
if (!(con instanceof HttpURLConnection)) return url;
414+
if (!isRedirect(((HttpURLConnection) con))) return url;
415+
return con.getHeaderField("Location");
416+
}
417+
418+
public static CountingInputStream openInputStream(
419+
Object input,
420+
Map<String, Object> headers,
421+
String payload,
422+
String compressionAlgo,
423+
URLAccessChecker urlAccessChecker)
424+
throws IOException, URISyntaxException, URLAccessValidationError {
425+
if (input instanceof String) {
426+
String urlAddress = (String) input;
427+
final ArchiveType archiveType = ArchiveType.from(urlAddress);
428+
if (archiveType.isArchive()) {
429+
return getStreamCompressedFile(urlAddress, headers, payload, archiveType, urlAccessChecker);
430+
}
431+
432+
StreamConnection sc = getStreamConnection(urlAddress, headers, payload, urlAccessChecker);
433+
return sc.toCountingInputStream(compressionAlgo);
434+
} else if (input instanceof byte[]) {
435+
return FileUtils.getInputStreamFromBinary((byte[]) input, compressionAlgo);
436+
} else {
437+
throw new RuntimeException(ERROR_BYTES_OR_STRING);
438+
}
439+
}
440+
441+
private static CountingInputStream getStreamCompressedFile(
442+
String urlAddress,
443+
Map<String, Object> headers,
444+
String payload,
445+
ArchiveType archiveType,
446+
URLAccessChecker urlAccessChecker)
447+
throws IOException, URISyntaxException, URLAccessValidationError {
448+
StreamConnection sc;
449+
InputStream stream;
450+
String[] tokens = urlAddress.split("!");
451+
urlAddress = tokens[0];
452+
String zipFileName;
453+
if (tokens.length == 2) {
454+
zipFileName = tokens[1];
455+
sc = getStreamConnection(urlAddress, headers, payload, urlAccessChecker);
456+
stream = getFileStreamIntoCompressedFile(sc.getInputStream(), zipFileName, archiveType);
457+
stream = toLimitedIStream(stream, sc.getLength());
458+
} else throw new IllegalArgumentException("filename can't be null or empty");
459+
460+
return new CountingInputStream(stream, sc.getLength());
461+
}
462+
463+
public static StreamConnection getStreamConnection(
464+
String urlAddress, Map<String, Object> headers, String payload, URLAccessChecker urlAccessChecker)
465+
throws IOException, URISyntaxException, URLAccessValidationError {
466+
return FileUtils.getStreamConnection(
467+
FileUtils.from(urlAddress), urlAddress, headers, payload, urlAccessChecker);
468+
}
469+
470+
private static InputStream getFileStreamIntoCompressedFile(InputStream is, String fileName, ArchiveType archiveType)
471+
throws IOException {
472+
try (ArchiveInputStream archive = archiveType.getInputStream(is)) {
473+
ArchiveEntry archiveEntry;
474+
475+
while ((archiveEntry = archive.getNextEntry()) != null) {
476+
if (!archiveEntry.isDirectory() && archiveEntry.getName().equals(fileName)) {
477+
return new ByteArrayInputStream(IOUtils.toByteArray(archive));
478+
}
479+
}
480+
}
481+
482+
return null;
483+
}
484+
485+
public static Object getStringOrCompressedData(StringWriter writer, ExportConfig config) {
486+
try {
487+
final String compression = config.getCompressionAlgo();
488+
final String writerString = writer.toString();
489+
Object data = compression.equals(CompressionAlgo.NONE.name())
490+
? writerString
491+
: CompressionAlgo.valueOf(compression).compress(writerString, config.getCharset());
492+
writer.getBuffer().setLength(0);
493+
return data;
494+
} catch (Exception e) {
495+
throw new RuntimeException(e);
496+
}
497+
}
348498
}

0 commit comments

Comments
 (0)