Skip to content

Commit d42b9ec

Browse files
authored
Use hc5 for file downloads (#597)
1 parent e48605d commit d42b9ec

File tree

5 files changed

+201
-51
lines changed

5 files changed

+201
-51
lines changed

src/main/java/me/itzg/helpers/http/FetchBuilderBase.java

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import me.itzg.helpers.errors.GenericException;
3030
import me.itzg.helpers.http.SharedFetch.Options;
3131
import me.itzg.helpers.json.ObjectMappers;
32+
import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
33+
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
3234
import org.slf4j.Logger;
3335
import reactor.core.publisher.Mono;
3436
import reactor.netty.ByteBufMono;
@@ -197,6 +199,25 @@ protected <R> R useReactiveClient(ReactiveClientUser<R> user) {
197199
}
198200
}
199201

202+
@FunctionalInterface
203+
protected interface HcAsyncClientUser<R> {
204+
void use(CloseableHttpAsyncClient client, Runnable closer);
205+
}
206+
207+
protected <R> void useHcAsyncClient(HcAsyncClientUser<R> user) {
208+
if (state.sharedFetch != null) {
209+
user.use(state.sharedFetch.getHcAsyncClient(), () -> {
210+
}
211+
);
212+
}
213+
else {
214+
//noinspection resource since close callback will handle it
215+
SharedFetch sharedFetch = new SharedFetch(state.userAgentCommand, Options.builder().build());
216+
217+
user.use(sharedFetch.getHcAsyncClient(), sharedFetch::close);
218+
}
219+
}
220+
200221
protected static BiConsumer<? super HttpClientRequest, ? super Connection> debugLogRequest(
201222
Logger log, String operation
202223
) {
@@ -291,6 +312,25 @@ protected void applyHeaders(io.netty.handler.codec.http.HttpHeaders headers) {
291312
state.requestHeaders.forEach(headers::set);
292313
}
293314

315+
protected void applyHeaders(SimpleRequestBuilder requestBuilder) {
316+
final Set<String> contentTypes = getAcceptContentTypes();
317+
if (contentTypes != null && !contentTypes.isEmpty()) {
318+
contentTypes.forEach(s -> requestBuilder.addHeader(ACCEPT.toString(), s));
319+
}
320+
321+
if (state.userInfo != null) {
322+
requestBuilder.setHeader(
323+
AUTHORIZATION.toString(),
324+
"Basic " +
325+
Base64.getEncoder().encodeToString(
326+
state.userInfo.getBytes(StandardCharsets.UTF_8)
327+
)
328+
);
329+
}
330+
331+
state.requestHeaders.forEach(requestBuilder::setHeader);
332+
}
333+
294334
static String formatDuration(long millis) {
295335
final StringBuilder sb = new StringBuilder();
296336
final long minutes = millis / 60000;
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package me.itzg.helpers.http;
2+
3+
import org.apache.hc.core5.concurrent.FutureCallback;
4+
import reactor.core.publisher.MonoSink;
5+
6+
class MonoSinkFutureCallbackAdapter<T> implements FutureCallback<T> {
7+
8+
private final MonoSink<T> sink;
9+
10+
public MonoSinkFutureCallbackAdapter(MonoSink<T> sink) {
11+
this.sink = sink;
12+
}
13+
14+
@Override
15+
public void completed(T result) {
16+
sink.success(result);
17+
}
18+
19+
@Override
20+
public void failed(Exception ex) {
21+
sink.error(ex);
22+
}
23+
24+
@Override
25+
public void cancelled() {
26+
sink.success();
27+
}
28+
}

src/main/java/me/itzg/helpers/http/OutputToDirectoryFetchBuilder.java

Lines changed: 95 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,27 @@
11
package me.itzg.helpers.http;
22

3-
import static io.netty.handler.codec.http.HttpHeaderNames.IF_MODIFIED_SINCE;
43
import static io.netty.handler.codec.http.HttpHeaderNames.LAST_MODIFIED;
54
import static java.util.Objects.requireNonNull;
65

76
import io.netty.handler.codec.http.HttpHeaderNames;
8-
import io.netty.handler.codec.http.HttpResponseStatus;
97
import java.io.IOException;
8+
import java.nio.ByteBuffer;
9+
import java.nio.channels.SeekableByteChannel;
1010
import java.nio.file.Files;
1111
import java.nio.file.Path;
12+
import java.nio.file.StandardOpenOption;
1213
import java.time.Instant;
1314
import lombok.Setter;
1415
import lombok.experimental.Accessors;
1516
import lombok.extern.slf4j.Slf4j;
1617
import me.itzg.helpers.files.ReactiveFileUtils;
18+
import org.apache.hc.client5.http.async.methods.AbstractBinResponseConsumer;
19+
import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder;
20+
import org.apache.hc.client5.http.async.methods.SimpleRequestProducer;
21+
import org.apache.hc.core5.http.ContentType;
22+
import org.apache.hc.core5.http.HttpException;
23+
import org.apache.hc.core5.http.HttpResponse;
24+
import org.apache.hc.core5.http.HttpStatus;
1725
import org.jetbrains.annotations.NotNull;
1826
import reactor.core.publisher.Mono;
1927
import reactor.core.scheduler.Schedulers;
@@ -202,40 +210,27 @@ else if (skipUpToDate) {
202210

203211
return alreadyUpToDateMono
204212
.filter(alreadyUpToDate -> !alreadyUpToDate)
205-
.flatMap(notUsed -> client
206-
.headers(this::applyHeaders)
207-
.headersWhen(headers ->
208-
skipUpToDate ?
209-
fileLastModifiedMono
210-
.map(outputLastModified -> headers.set(
211-
IF_MODIFIED_SINCE,
212-
httpDateTimeFormatter.format(outputLastModified)
213-
))
214-
.defaultIfEmpty(headers)
215-
: Mono.just(headers)
216-
)
217-
.followRedirect(true)
218-
.doOnRequest(debugLogRequest(log, "file fetch"))
219-
.doOnRequest(
220-
(httpClientRequest, connection) -> statusHandler.call(FileDownloadStatus.DOWNLOADING, uri(), outputFile))
221-
.get()
222-
.uri(resourceUrl)
223-
.response((resp, byteBufFlux) -> {
224-
if (skipUpToDate && resp.status() == HttpResponseStatus.NOT_MODIFIED) {
225-
log.debug("The file {} is already up to date", outputFile);
226-
statusHandler.call(FileDownloadStatus.SKIP_FILE_UP_TO_DATE, uri(), outputFile);
227-
return Mono.just(outputFile);
228-
}
229-
230-
if (notSuccess(resp)) {
231-
return failedRequestMono(resp, byteBufFlux.aggregate(), "Downloading file");
232-
}
233-
234-
return copyBodyInputStreamToFile(byteBufFlux, outputFile);
235-
})
236-
.last()
237-
.checkpoint("Fetching file into directory")
238-
)
213+
.flatMap(notUsed -> {
214+
final SimpleRequestBuilder reqBuilder = SimpleRequestBuilder.get(resourceUrl);
215+
applyHeaders(reqBuilder);
216+
217+
return
218+
fileLastModifiedMono
219+
.map(instant -> reqBuilder.setHeader("If-Modified-Since", httpDateTimeFormatter.format(instant)))
220+
.then(
221+
Mono.<Path>create(sink -> {
222+
useHcAsyncClient((hcClient, close) -> {
223+
sink.onDispose(close::run);
224+
225+
hcClient.execute(
226+
SimpleRequestProducer.create(reqBuilder.build()),
227+
new ResponseToFileConsumer(outputFile),
228+
new MonoSinkFutureCallbackAdapter<>(sink)
229+
);
230+
});
231+
})
232+
);
233+
})
239234
.defaultIfEmpty(outputFile);
240235
}
241236

@@ -261,4 +256,68 @@ private String extractFilename(HttpClientResponse resp) {
261256
return resp.path().substring(pos + 1);
262257
}
263258

259+
private class ResponseToFileConsumer extends AbstractBinResponseConsumer<Path> {
260+
261+
private final Path outputFile;
262+
263+
private SeekableByteChannel channel;
264+
private long amount;
265+
266+
public ResponseToFileConsumer(Path outputFile) {
267+
this.outputFile = outputFile;
268+
}
269+
270+
@Override
271+
public void releaseResources() {
272+
if (channel != null) {
273+
try {
274+
channel.close();
275+
} catch (IOException e) {
276+
throw new RuntimeException(e);
277+
}
278+
}
279+
}
280+
281+
@Override
282+
protected int capacityIncrement() {
283+
return 4096;
284+
}
285+
286+
@Override
287+
protected void data(ByteBuffer src, boolean endOfStream) throws IOException {
288+
if (channel != null) {
289+
amount += channel.write(src);
290+
if (endOfStream) {
291+
channel.close();
292+
293+
statusHandler.call(FileDownloadStatus.DOWNLOADED, uri(), outputFile);
294+
downloadedHandler.call(uri(), outputFile, amount);
295+
}
296+
}
297+
}
298+
299+
@Override
300+
protected void start(HttpResponse response,
301+
ContentType contentType
302+
) throws HttpException, IOException {
303+
if (skipUpToDate && response.getCode() == HttpStatus.SC_NOT_MODIFIED) {
304+
log.debug("The file {} is already up to date", outputFile);
305+
statusHandler.call(FileDownloadStatus.SKIP_FILE_UP_TO_DATE, uri(), outputFile);
306+
return;
307+
}
308+
309+
statusHandler.call(FileDownloadStatus.DOWNLOADING, uri(), outputFile);
310+
311+
channel = Files.newByteChannel(outputFile,
312+
StandardOpenOption.WRITE,
313+
StandardOpenOption.CREATE,
314+
StandardOpenOption.TRUNCATE_EXISTING
315+
);
316+
}
317+
318+
@Override
319+
protected Path buildResult() {
320+
return outputFile;
321+
}
322+
}
264323
}

src/main/java/me/itzg/helpers/http/SharedFetch.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package me.itzg.helpers.http;
22

33
import io.netty.handler.codec.http.HttpHeaderNames;
4+
import java.io.IOException;
45
import java.net.URI;
56
import java.time.Duration;
67
import java.util.HashMap;
@@ -12,6 +13,8 @@
1213
import lombok.extern.slf4j.Slf4j;
1314
import me.itzg.helpers.McImageHelper;
1415
import me.itzg.helpers.errors.GenericException;
16+
import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
17+
import org.apache.hc.client5.http.impl.async.HttpAsyncClients;
1518
import reactor.netty.http.Http11SslContextSpec;
1619
import reactor.netty.http.client.HttpClient;
1720
import reactor.netty.resources.ConnectionProvider;
@@ -32,6 +35,8 @@ public class SharedFetch implements AutoCloseable {
3235
final LatchingUrisInterceptor latchingUrisInterceptor = new LatchingUrisInterceptor();
3336

3437
private final HttpClient reactiveClient;
38+
private final CloseableHttpAsyncClient hcAsyncClient;
39+
private boolean hcAsyncClientStarted = false;
3540

3641
private final URI filesViaUrl;
3742

@@ -78,6 +83,16 @@ public SharedFetch(String forCommand, Options options) {
7883
headers.put("x-fetch-session", fetchSessionId);
7984

8085
this.filesViaUrl = options.getFilesViaUrl();
86+
87+
hcAsyncClient = HttpAsyncClients.createSystem();
88+
}
89+
90+
public synchronized CloseableHttpAsyncClient getHcAsyncClient() {
91+
if (!hcAsyncClientStarted) {
92+
hcAsyncClient.start();
93+
hcAsyncClientStarted = true;
94+
}
95+
return hcAsyncClient;
8196
}
8297

8398
public FetchBuilderBase<?> fetch(URI uri) {
@@ -92,6 +107,11 @@ public SharedFetch addHeader(String name, String value) {
92107

93108
@Override
94109
public void close() {
110+
try {
111+
hcAsyncClient.close();
112+
} catch (IOException e) {
113+
log.warn("Failed to close async client for shared fetch", e);
114+
}
95115
}
96116

97117
@Builder

src/main/java/me/itzg/helpers/sync/MulitCopyCommand.java

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import me.itzg.helpers.http.FailedRequestException;
2121
import me.itzg.helpers.http.Fetch;
2222
import me.itzg.helpers.http.SharedFetch;
23+
import me.itzg.helpers.http.SharedFetch.Options;
2324
import me.itzg.helpers.http.Uris;
2425
import org.jetbrains.annotations.Blocking;
2526
import org.reactivestreams.Publisher;
@@ -82,13 +83,15 @@ public Integer call() throws Exception {
8283

8384
Files.createDirectories(dest);
8485

85-
Flux.fromIterable(sources)
86-
.map(String::trim)
87-
.filter(s -> !s.isEmpty())
88-
.flatMap(source -> processSource(source, fileIsListingOption))
89-
.collectList()
90-
.flatMap(this::cleanupAndSaveManifest)
91-
.block();
86+
try (SharedFetch sharedFetch = Fetch.sharedFetch("mcopy", Options.builder().build())) {
87+
Flux.fromIterable(sources)
88+
.map(String::trim)
89+
.filter(s -> !s.isEmpty())
90+
.flatMap(source -> processSource(sharedFetch, source, fileIsListingOption))
91+
.collectList()
92+
.flatMap(this::cleanupAndSaveManifest)
93+
.block();
94+
}
9295

9396
return ExitCode.OK;
9497
}
@@ -113,9 +116,9 @@ private Mono<?> cleanupAndSaveManifest(List<Path> paths) {
113116
});
114117
}
115118

116-
private Publisher<Path> processSource(String source, boolean fileIsListing) {
119+
private Publisher<Path> processSource(SharedFetch sharedFetch, String source, boolean fileIsListing) {
117120
if (Uris.isUri(source)) {
118-
return fileIsListing ? processRemoteListingFile(source) : processRemoteSource(source);
121+
return fileIsListing ? processRemoteListingFile(source) : processRemoteSource(sharedFetch, source);
119122
} else {
120123
final Path path = Paths.get(source);
121124
if (!Files.exists(path)) {
@@ -125,12 +128,12 @@ private Publisher<Path> processSource(String source, boolean fileIsListing) {
125128
if (Files.isDirectory(path)) {
126129
return processDirectory(path);
127130
} else {
128-
return fileIsListing ? processListingFile(path) : processFile(path);
131+
return fileIsListing ? processListingFile(sharedFetch, path) : processFile(path);
129132
}
130133
}
131134
}
132135

133-
private Flux<Path> processListingFile(Path listingFile) {
136+
private Flux<Path> processListingFile(SharedFetch sharedFetch, Path listingFile) {
134137
return Mono.just(listingFile)
135138
.publishOn(Schedulers.boundedElastic())
136139
.flatMapMany(path -> {
@@ -139,7 +142,7 @@ private Flux<Path> processListingFile(Path listingFile) {
139142
final List<String> lines = Files.readAllLines(path);
140143
return Flux.fromIterable(lines)
141144
.filter(this::isListingLine)
142-
.flatMap(src -> processSource(src,
145+
.flatMap(src -> processSource(sharedFetch, src,
143146
// avoid recursive file-listing processing
144147
false));
145148
} catch (IOException e) {
@@ -230,8 +233,8 @@ private Flux<Path> processDirectory(Path srcDir) {
230233
});
231234
}
232235

233-
private Mono<Path> processRemoteSource(String source) {
234-
return Fetch.fetch(URI.create(source))
236+
private Mono<Path> processRemoteSource(SharedFetch sharedFetch, String source) {
237+
return sharedFetch.fetch(URI.create(source))
235238
.userAgentCommand("mcopy")
236239
.toDirectory(dest)
237240
.skipUpToDate(skipUpToDate)
@@ -273,7 +276,7 @@ private Flux<Path> processRemoteListingFile(String source) {
273276
.flatMapMany(content -> Flux.just(content.split("\\r?\\n")))
274277
.filter(this::isListingLine)
275278
)
276-
.flatMap(this::processRemoteSource)
279+
.flatMap(sourceInListing -> processRemoteSource(sharedFetch, sourceInListing))
277280
.doOnTerminate(sharedFetch::close)
278281
.checkpoint("Processing remote listing at " + source, true);
279282
}

0 commit comments

Comments
 (0)