diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReader.java b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReader.java index ab6a900f7a..ace93767c7 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReader.java @@ -89,7 +89,7 @@ private boolean closed = false; public StreamReader(long streamId, long nextReadOffset, EventLoop eventLoop, ObjectManager objectManager, - ObjectReaderFactory objectReaderFactory, DataBlockCache dataBlockCache) { + ObjectReaderFactory objectReaderFactory, DataBlockCache dataBlockCache) { this.streamId = streamId; this.nextReadOffset = nextReadOffset; this.readahead = new Readahead(); @@ -125,7 +125,7 @@ CompletableFuture read(long startOffset, long endOffset, int maxB Throwable cause = FutureUtil.cause(ex); if (cause != null) { readContext.records.forEach(StreamRecordBatch::release); - for (Block block : readContext.blocks) { + for (ExternalCacheBlock block : readContext.blocks) { block.release(); } if (leftRetries > 0 && isRecoverable(cause)) { @@ -163,10 +163,10 @@ public void close() { void read0(ReadContext ctx, final long startOffset, final long endOffset, final int maxBytes) { // 1. get blocks - CompletableFuture> getBlocksCf = getBlocks(startOffset, endOffset, maxBytes, false); + CompletableFuture> getBlocksCf = getBlocks(startOffset, endOffset, maxBytes, false); // 2. wait block's data loaded - List blocks = new ArrayList<>(); + List blocks = new ArrayList<>(); CompletableFuture loadBlocksCf = getBlocksCf.thenCompose(blockList -> { blocks.addAll(blockList); return CompletableFuture.allOf(blockList.stream().map(block -> block.loadCf).toArray(CompletableFuture[]::new)); @@ -180,7 +180,7 @@ void read0(ReadContext ctx, final long startOffset, final long endOffset, final // 3. extract records from blocks loadBlocksCf.thenAccept(nil -> { ctx.blocks.addAll(blocks); - Optional failedBlock = blocks.stream().filter(block -> block.exception != null).findAny(); + Optional failedBlock = blocks.stream().filter(block -> block.exception != null).findAny(); if (failedBlock.isPresent()) { ctx.cf.completeExceptionally(failedBlock.get().exception); return; @@ -193,7 +193,7 @@ void read0(ReadContext ctx, final long startOffset, final long endOffset, final long nextStartOffset = startOffset; long nextEndOffset; boolean fulfill = false; - for (Block block : blocks) { + for (ExternalCacheBlock block : blocks) { DataBlockIndex index = block.index; if (nextStartOffset < index.startOffset() || nextStartOffset >= index.endOffset()) { String msg = String.format("[BUG] nextStartOffset:%d is not in the range of index:%d-%d", nextStartOffset, index.startOffset(), index.endOffset()); @@ -275,7 +275,7 @@ void afterRead(ReadDataBlock readDataBlock, ReadContext ctx) { break; } } - for (Block block : ctx.blocks) { + for (ExternalCacheBlock block : ctx.blocks) { block.release(); } // try readahead to speed up the next read @@ -290,8 +290,8 @@ void afterRead(ReadDataBlock readDataBlock, ReadContext ctx) { }); } - private CompletableFuture> getBlocks(long startOffset, long endOffset, int maxBytes, - boolean readahead) { + private CompletableFuture> getBlocks(long startOffset, long endOffset, int maxBytes, + boolean readahead) { GetBlocksContext context = new GetBlocksContext(readahead); try { getBlocks0(context, startOffset, endOffset, maxBytes); @@ -299,7 +299,7 @@ private CompletableFuture> getBlocks(long startOffset, long endOffse context.cf.completeExceptionally(ex); } context.cf.exceptionally(ex -> { - context.blocks.forEach(Block::release); + context.blocks.forEach(ExternalCacheBlock::release); return null; }); return context.cf; @@ -340,8 +340,8 @@ private void getBlocks0(GetBlocksContext ctx, long startOffset, long endOffset, firstBlock = false; } // after read the data will be return to the cache, so we need to reload the data every time - block = block.newBlockWithData(ctx.readahead); - ctx.blocks.add(block); + ExternalCacheBlock externalBlock = block.newBlockWithData(ctx.readahead); + ctx.blocks.add(externalBlock); if ((endOffset != -1L && index.endOffset() >= endOffset) || remainingSize <= 0) { fulfill = true; break; @@ -512,8 +512,8 @@ private static boolean isRecoverable(Throwable cause) { } static class GetBlocksContext { - final List blocks = new ArrayList<>(); - final CompletableFuture> cf = new CompletableFuture<>(); + final List blocks = new ArrayList<>(); + final CompletableFuture> cf = new CompletableFuture<>(); final boolean readahead; public GetBlocksContext(boolean readahead) { @@ -523,21 +523,42 @@ public GetBlocksContext(boolean readahead) { static class ReadContext { final List records = new LinkedList<>(); - final List blocks = new ArrayList<>(); + final List blocks = new ArrayList<>(); final CompletableFuture cf = new CompletableFuture<>(); CacheAccessType accessType = BLOCK_CACHE_HIT; final TimerUtil start = new TimerUtil(); } + public static class ExternalCacheBlock { + final S3ObjectMetadata metadata; + final DataBlockIndex index; + + CompletableFuture loadCf; + + DataBlock data; + Throwable exception; + + public ExternalCacheBlock(S3ObjectMetadata metadata, DataBlockIndex index) { + this.metadata = metadata; + this.index = index; + } + + public void release() { + loadCf.whenComplete((v, ex) -> { + if (data != null) { + data.release(); + data = null; + } + }); + } + } + class Block { final S3ObjectMetadata metadata; final DataBlockIndex index; DataBlock data; DataBlock.FreeListenerHandle freeListenerHandle; - CompletableFuture loadCf; - Throwable exception; - boolean released = false; boolean readCompleted = false; public Block(S3ObjectMetadata metadata, DataBlockIndex index) { @@ -545,43 +566,34 @@ public Block(S3ObjectMetadata metadata, DataBlockIndex index) { this.index = index; } - // TODO: use different Block type, cause of the returned Block shouldn't have markReadCompleted method - public Block newBlockWithData(boolean readahead) { + public ExternalCacheBlock newBlockWithData(boolean readahead) { // We need to create a new block with consistent data to avoid duplicated release or leak, // cause of the loaded data maybe evicted and reloaded. - Block newBlock = new Block(metadata, index); + ExternalCacheBlock externalBlock = new ExternalCacheBlock(metadata, index); ObjectReader objectReader = objectReaderFactory.get(metadata); DataBlockCache.GetOptions getOptions = DataBlockCache.GetOptions.builder().readahead(readahead).build(); - loadCf = dataBlockCache.getBlock(getOptions, objectReader, index).thenAccept(newData -> { - newBlock.data = newData; - if (!readCompleted && data != newData) { - // the data block is first loaded or evict & reload - if (data != null) { - freeListenerHandle.close(); - } - data = newData; - newData.markUnread(); - freeListenerHandle = data.registerFreeListener(b -> handleBlockFree(this)); - } + // the data block is first loaded or evict & reload + externalBlock.loadCf = dataBlockCache.getBlock(getOptions, objectReader, index).thenAccept(newData -> { + externalBlock.data = newData; + handleDataBlockChange(newData); }).exceptionally(ex -> { - exception = ex; - newBlock.exception = ex; + externalBlock.exception = ex; return null; }).whenComplete((nil, ex) -> objectReader.release()); - newBlock.loadCf = loadCf; - return newBlock; + + return externalBlock; } - public void release() { - if (released) { - return; - } - released = true; - loadCf.whenComplete((nil, ex) -> { + private void handleDataBlockChange(DataBlock newData) { + if (!readCompleted && data != newData) { + // the data block is first loaded or evict & reload if (data != null) { - data.release(); + freeListenerHandle.close(); } - }); + data = newData; + newData.markUnread(); + freeListenerHandle = data.registerFreeListener(b -> handleBlockFree(this)); + } } /** @@ -601,8 +613,7 @@ public String toString() { "metadata=" + metadata + ", index=" + index + ", data=" + data + - ", exception=" + exception + - ", released=" + released + + ", readCompleted=" + readCompleted + '}'; } } @@ -645,7 +656,7 @@ public void tryReadahead(boolean cacheMiss) { readaheadMarkOffset = nextReadaheadOffset; inflightReadaheadCf = getBlocks(nextReadaheadOffset, -1L, nextReadaheadSize, true).thenAccept(blocks -> { nextReadaheadOffset = blocks.isEmpty() ? nextReadaheadOffset : blocks.get(blocks.size() - 1).index.endOffset(); - blocks.forEach(Block::release); + blocks.forEach(ExternalCacheBlock::release); }); // For get block indexes and load data block are sync success, // the whenComplete will invoke first before assign CompletableFuture to inflightReadaheadCf