|
1 | 1 | package me.itzg.helpers.files;
|
2 | 2 |
|
3 |
| -import java.io.IOException; |
| 3 | +import io.netty.buffer.ByteBuf; |
4 | 4 | import java.nio.channels.FileChannel;
|
5 | 5 | import java.nio.file.Files;
|
6 | 6 | import java.nio.file.Path;
|
7 | 7 | import java.nio.file.StandardOpenOption;
|
8 | 8 | import java.time.Instant;
|
9 | 9 | import java.util.stream.Collectors;
|
10 | 10 | import lombok.extern.slf4j.Slf4j;
|
11 |
| -import reactor.core.Exceptions; |
12 | 11 | import reactor.core.publisher.Mono;
|
13 | 12 | import reactor.core.scheduler.Schedulers;
|
14 | 13 | import reactor.netty.ByteBufFlux;
|
| 14 | +import reactor.util.function.Tuple2; |
15 | 15 |
|
16 | 16 | @Slf4j
|
17 | 17 | public class ReactiveFileUtils {
|
@@ -42,36 +42,46 @@ public static Mono<Path> createDirectories(Path dir) {
|
42 | 42 | }
|
43 | 43 |
|
44 | 44 | public static Mono<Long> writeByteBufFluxToFile(ByteBufFlux byteBufFlux, Path file) {
|
45 |
| - return Mono.fromCallable(() -> { |
46 |
| - log.trace("Opening {} for writing", file); |
47 |
| - return FileChannel.open(file, |
48 |
| - StandardOpenOption.WRITE, |
49 |
| - StandardOpenOption.CREATE, |
50 |
| - StandardOpenOption.TRUNCATE_EXISTING |
51 |
| - ); |
52 |
| - } |
53 |
| - ) |
54 |
| - .subscribeOn(Schedulers.boundedElastic()) |
55 |
| - .flatMap(outChannel -> |
56 |
| - byteBufFlux |
57 |
| - .asByteBuffer() |
58 |
| - .subscribeOn(Schedulers.boundedElastic()) |
59 |
| - .<Integer>handle((byteBuffer, sink) -> { |
60 |
| - try { |
61 |
| - sink.next(outChannel.write(byteBuffer)); |
62 |
| - } catch (IOException e) { |
63 |
| - sink.error(Exceptions.propagate(e)); |
| 45 | + final ByteBufQueue byteBufQueue = new ByteBufQueue(); |
| 46 | + |
| 47 | + // Separate this into a pair of concurrent mono's |
| 48 | + return Mono.zip( |
| 49 | + // ...file writer |
| 50 | + Mono.fromCallable(() -> { |
| 51 | + try (FileChannel channel = FileChannel.open(file, |
| 52 | + StandardOpenOption.WRITE, |
| 53 | + StandardOpenOption.CREATE, |
| 54 | + StandardOpenOption.TRUNCATE_EXISTING |
| 55 | + )) { |
| 56 | + ByteBuf byteBuf; |
| 57 | + while ((byteBuf = byteBufQueue.take()) != null) { |
| 58 | + try { |
| 59 | + //noinspection ResultOfMethodCallIgnored |
| 60 | + channel.write(byteBuf.nioBuffer()); |
| 61 | + } finally { |
| 62 | + byteBuf.release(); |
| 63 | + } |
| 64 | + } |
| 65 | + |
| 66 | + return file; |
64 | 67 | }
|
65 | 68 | })
|
66 |
| - .doOnTerminate(() -> { |
67 |
| - try { |
68 |
| - outChannel.close(); |
69 |
| - log.trace("Closed {}", file); |
70 |
| - } catch (IOException e) { |
71 |
| - log.warn("Failed to close {}", file, e); |
72 |
| - } |
| 69 | + // ...which runs in a separate thread |
| 70 | + .subscribeOn(Schedulers.boundedElastic()), |
| 71 | + // ...and the network consumer flux |
| 72 | + byteBufFlux |
| 73 | + // Mark the bytebufs as retained so they can be released after |
| 74 | + // they are written by the mono above |
| 75 | + .retain() |
| 76 | + .map(byteBuf -> { |
| 77 | + final int amount = byteBuf.readableBytes(); |
| 78 | + byteBufQueue.add(byteBuf); |
| 79 | + return amount; |
73 | 80 | })
|
| 81 | + .doOnTerminate(byteBufQueue::finish) |
74 | 82 | .collect(Collectors.<Integer>summingLong(value -> value))
|
75 |
| - ); |
| 83 | + ) |
| 84 | + // Just expose the total bytes read from network |
| 85 | + .map(Tuple2::getT2); |
76 | 86 | }
|
77 | 87 | }
|
0 commit comments