Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@
package org.apache.fluss.client.table.scanner.log;

import org.apache.fluss.annotation.Internal;
import org.apache.fluss.exception.FetchException;
import org.apache.fluss.exception.WakeupException;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.utils.ExceptionUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -73,17 +75,19 @@ public class LogFetchBuffer implements AutoCloseable {
@GuardedBy("lock")
private @Nullable CompletedFetch nextInLineFetch;

@GuardedBy("lock")
private @Nullable Throwable throwable;

public LogFetchBuffer() {
this.completedFetches = new LinkedList<>();
}

/**
* Returns {@code true} if there are no completed fetches pending to return to the user.
*
* @return {@code true} if the buffer is empty, {@code false} otherwise
* @return {@code true} if there are no completed fetches pending to return to the user or some
* error occurs, {@code false} otherwise
*/
boolean isEmpty() {
return inLock(lock, completedFetches::isEmpty);
return inLock(lock, () -> completedFetches.isEmpty() && throwable == null);
}

void pend(PendingFetch pendingFetch) {
Expand All @@ -109,10 +113,20 @@ void tryComplete(TableBucket tableBucket) {
while (pendings != null && !pendings.isEmpty()) {
PendingFetch pendingFetch = pendings.peek();
if (pendingFetch.isCompleted()) {
CompletedFetch completedFetch = pendingFetch.toCompletedFetch();
completedFetches.add(completedFetch);
pendings.poll();
hasCompleted = true;
try {
CompletedFetch completedFetch = pendingFetch.toCompletedFetch();
completedFetches.add(completedFetch);
pendings.poll();
hasCompleted = true;
} catch (Throwable t) {
LOG.error(
"Failed to complete fetch for tableBucket: {}",
tableBucket,
t);
throwable = t;
return;
}

} else {
break;
}
Expand Down Expand Up @@ -157,12 +171,22 @@ void setNextInLineFetch(@Nullable CompletedFetch nextInLineFetch) {
inLock(lock, () -> this.nextInLineFetch = nextInLineFetch);
}

CompletedFetch peek() {
return inLock(lock, completedFetches::peek);
CompletedFetch peek() throws FetchException {
return inLock(
lock,
() -> {
checkException();
return completedFetches.peek();
});
}

CompletedFetch poll() {
return inLock(lock, completedFetches::poll);
CompletedFetch poll() throws FetchException {
return inLock(
lock,
() -> {
checkException();
return completedFetches.poll();
});
}

/**
Expand Down Expand Up @@ -273,6 +297,12 @@ Set<TableBucket> pendedBuckets() {
return inLock(lock, pendingFetches::keySet);
}

void checkException() throws FetchException {
if (throwable != null) {
throw new FetchException(ExceptionUtils.stripCompletionException(throwable));
}
}

@Override
public void close() throws Exception {
inLock(lock, () -> retainAll(Collections.emptySet()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.fluss.cluster.BucketLocation;
import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.FetchException;
import org.apache.fluss.exception.InvalidMetadataException;
import org.apache.fluss.exception.LeaderNotAvailableException;
import org.apache.fluss.fs.FsPath;
Expand Down Expand Up @@ -154,7 +155,7 @@ public boolean hasAvailableFetches() {
return !logFetchBuffer.isEmpty();
}

public Map<TableBucket, List<ScanRecord>> collectFetch() {
public Map<TableBucket, List<ScanRecord>> collectFetch() throws FetchException {
return logFetchCollector.collectFetch(logFetchBuffer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class RemoteLogDownloader implements Closeable {
private static final Logger LOG = LoggerFactory.getLogger(RemoteLogDownloader.class);

private static final long POLL_TIMEOUT = 5000L;
private static final int MAX_RETRY_COUNT = 5;

private final Path localLogDir;

Expand Down Expand Up @@ -151,51 +152,56 @@ void fetchOnce() throws Exception {
return;
}

try {
// 1. cleanup the finished logs first to free up disk space
cleanupRemoteLogs();
// the semaphore will only be released after the remote file are downloaded to local
// successfully.
downloadRemoteLog(request, MAX_RETRY_COUNT, System.currentTimeMillis());
}

// 2. do the actual download work
FsPathAndFileName fsPathAndFileName = request.getFsPathAndFileName();
scannerMetricGroup.remoteFetchRequestCount().inc();

long startTime = System.currentTimeMillis();
// download the remote file to local
remoteFileDownloader
.downloadFileAsync(fsPathAndFileName, localLogDir)
.whenComplete(
(bytes, throwable) -> {
if (throwable != null) {
LOG.error(
"Failed to download remote log segment file {}.",
fsPathAndFileName.getFileName(),
ExceptionUtils.stripExecutionException(throwable));
// release the semaphore for the failed request
prefetchSemaphore.release();
// add back the request to the queue,
// so we do not complete the request.future here
segmentsToFetch.add(request);
scannerMetricGroup.remoteFetchErrorCount().inc();
} else {
LOG.info(
"Successfully downloaded remote log segment file {} to local cost {} ms.",
fsPathAndFileName.getFileName(),
System.currentTimeMillis() - startTime);
File localFile =
new File(
localLogDir.toFile(),
fsPathAndFileName.getFileName());
scannerMetricGroup.remoteFetchBytes().inc(bytes);
request.future.complete(localFile);
}
});
} catch (Throwable t) {
prefetchSemaphore.release();
// add back the request to the queue
segmentsToFetch.add(request);
scannerMetricGroup.remoteFetchErrorCount().inc();
// log the error and continue instead of shutdown the download thread
LOG.error("Failed to download remote log segment.", t);
private void downloadRemoteLog(
RemoteLogDownloadRequest request, int retryCount, long startTime) {
// 1. cleanup the finished logs first to free up disk space
cleanupRemoteLogs();

// 2. do the actual download work
FsPathAndFileName fsPathAndFileName = request.getFsPathAndFileName();
scannerMetricGroup.remoteFetchRequestCount().inc();
remoteFileDownloader
.downloadFileAsync(fsPathAndFileName, localLogDir)
.whenComplete(
(bytes, throwable) -> {
if (throwable != null) {
handleFetchException(request, throwable, retryCount, startTime);
} else {
LOG.info(
"Successfully downloaded remote log segment file {} to local cost {} ms.",
fsPathAndFileName.getFileName(),
System.currentTimeMillis() - startTime);
File localFile =
new File(
localLogDir.toFile(),
fsPathAndFileName.getFileName());
scannerMetricGroup.remoteFetchBytes().inc(bytes);
request.future.complete(localFile);
}
});
}

private void handleFetchException(
RemoteLogDownloadRequest request, Throwable throwable, int retryCount, long startTime) {
LOG.error(
"Failed to download remote log segment file {}.",
request.getFsPathAndFileName().getFileName(),
ExceptionUtils.stripExecutionException(throwable));
scannerMetricGroup.remoteFetchErrorCount().inc();
if (retryCount <= 0) {
downloadRemoteLog(request, retryCount - 1, startTime);
} else {
request.future.completeExceptionally(
new IOException(
String.format(
"Failed to download remote log segment file %s, retry count %d",
request.getFsPathAndFileName().getFileName(), MAX_RETRY_COUNT),
throwable));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.fluss.client.table.scanner.log;

import org.apache.fluss.exception.FetchException;
import org.apache.fluss.exception.WakeupException;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.record.LogRecordReadContext;
Expand All @@ -26,6 +27,8 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import javax.annotation.Nullable;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
Expand All @@ -44,6 +47,7 @@
import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
import static org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Test for {@link LogFetchBuffer}. */
public class LogFetchBufferTest {
Expand Down Expand Up @@ -251,6 +255,31 @@ void testPendFetches() throws Exception {
}
}

@Test
void testFetchException() throws Exception {
try (LogFetchBuffer logFetchBuffer = new LogFetchBuffer()) {
AtomicBoolean completed = new AtomicBoolean(false);
PendingFetch pendingFetch =
makePendingFetch(
tableBucket1, completed, new FetchException("Test fetch exception"));

logFetchBuffer.tryComplete(pendingFetch.tableBucket());
assertThat(logFetchBuffer.isEmpty()).isTrue();
logFetchBuffer.pend(pendingFetch);
assertThat(logFetchBuffer.isEmpty()).isTrue();

completed.set(true);
logFetchBuffer.tryComplete(pendingFetch.tableBucket());
assertThat(logFetchBuffer.isEmpty()).isFalse();
assertThatThrownBy(logFetchBuffer::poll)
.isExactlyInstanceOf(FetchException.class)
.hasMessageContaining("Test fetch exception");
assertThatThrownBy(logFetchBuffer::peek)
.isExactlyInstanceOf(FetchException.class)
.hasMessageContaining("Test fetch exception");
}
}

private boolean await(LogFetchBuffer buffer, Duration waitTime) throws InterruptedException {
return buffer.awaitNotEmpty(System.nanoTime() + waitTime.toNanos());
}
Expand All @@ -271,6 +300,12 @@ private PendingFetch makePendingFetch(TableBucket tableBucket) throws Exception

private PendingFetch makePendingFetch(TableBucket tableBucket, AtomicBoolean completed)
throws Exception {
return makePendingFetch(tableBucket, completed, null);
}

private PendingFetch makePendingFetch(
TableBucket tableBucket, AtomicBoolean completed, @Nullable RuntimeException exception)
throws Exception {
DefaultCompletedFetch completedFetch = makeCompletedFetch(tableBucket);
return new PendingFetch() {
@Override
Expand All @@ -285,6 +320,9 @@ public boolean isCompleted() {

@Override
public CompletedFetch toCompletedFetch() {
if (exception != null) {
throw exception;
}
return completedFetch;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.fluss.client.metadata.TestingMetadataUpdater;
import org.apache.fluss.client.table.scanner.ScanRecord;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.FetchException;
import org.apache.fluss.metadata.TableBucket;
import org.apache.fluss.record.LogRecordReadContext;
import org.apache.fluss.rpc.entity.FetchLogResultForBucket;
Expand All @@ -42,6 +43,7 @@
import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID;
import static org.apache.fluss.testutils.DataTestUtils.genMemoryLogRecordsByObject;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Test for {@link LogFetchCollector}. */
public class LogFetchCollectorTest {
Expand Down Expand Up @@ -155,6 +157,35 @@ void testCollectAfterUnassign() throws Exception {
assertThat(bucketAndRecords.size()).isEqualTo(0);
}

@Test
void testFetchException() throws Exception {
TableBucket tb = new TableBucket(DATA1_TABLE_ID, 0);
PendingFetch pendingFetch =
new PendingFetch() {
@Override
public TableBucket tableBucket() {
return tb;
}

@Override
public boolean isCompleted() {
return true;
}

@Override
public CompletedFetch toCompletedFetch() {
throw new FetchException("test fetch exception");
}
};

try (LogFetchBuffer logFetchBuffer = new LogFetchBuffer()) {
logFetchBuffer.pend(pendingFetch);
logFetchBuffer.tryComplete(tb);
assertThatThrownBy(() -> logFetchCollector.collectFetch(logFetchBuffer))
.hasMessageContaining("test fetch exception");
}
}

private DefaultCompletedFetch makeCompletedFetch(
TableBucket tableBucket, FetchLogResultForBucket resultForBucket, long offset) {
return new DefaultCompletedFetch(
Expand Down
Loading