Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -256,19 +256,33 @@ public LakeTieringTableInfo requestTable() {
return inLock(
lock,
() -> {
Long tableId = pendingTieringTables.poll();
// no any pending table, return directly
if (tableId == null) {
return null;
while (true) {
Long tableId = pendingTieringTables.poll();
// no any pending table, return directly
if (tableId == null) {
return null;
}

TablePath tablePath = tablePaths.get(tableId);
// the table has been dropped, request again
if (tablePath == null) {
continue;
}

TieringState state = tieringStates.get(tableId);
if (state != TieringState.Pending) {
// stale queue entry, ignore
LOG.debug(
"requestTable: skipping table {} because state is {}",
tableId,
state);
continue;
}

doHandleStateChange(tableId, TieringState.Tiering);
long tieringEpoch = tableTierEpoch.get(tableId);
return new LakeTieringTableInfo(tableId, tablePath, tieringEpoch);
}
TablePath tablePath = tablePaths.get(tableId);
// the table has been dropped, request again
if (tablePath == null) {
return requestTable();
}
doHandleStateChange(tableId, TieringState.Tiering);
long tieringEpoch = tableTierEpoch.get(tableId);
return new LakeTieringTableInfo(tableId, tablePath, tieringEpoch);
});
}

Expand Down Expand Up @@ -374,6 +388,8 @@ private void doHandleStateChange(long tableId, TieringState targetState) {
targetState);
return;
}

doStateChange(tableId, currentState, targetState);
switch (targetState) {
case New:
case Initialized:
Expand All @@ -399,7 +415,6 @@ private void doHandleStateChange(long tableId, TieringState targetState) {
// do nothing
break;
}
doStateChange(tableId, currentState, targetState);
}

private boolean isValidStateTransition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.fluss.metadata.TablePath;
import org.apache.fluss.server.entity.LakeTieringTableInfo;
import org.apache.fluss.server.utils.timer.DefaultTimer;
import org.apache.fluss.server.utils.timer.Timer;
import org.apache.fluss.server.utils.timer.TimerTask;
import org.apache.fluss.testutils.common.ManuallyTriggeredScheduledExecutorService;
import org.apache.fluss.types.DataTypes;
import org.apache.fluss.utils.clock.ManualClock;
Expand All @@ -34,10 +36,13 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.lang.reflect.Field;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

import static org.apache.fluss.server.coordinator.LakeTableTieringManager.TIERING_SERVICE_TIMEOUT_MS;
import static org.apache.fluss.testutils.common.CommonTestUtils.waitValue;
Expand Down Expand Up @@ -238,6 +243,102 @@ void testTieringFail() {
assertRequestTable(tableId1, tablePath1, 2);
}

@Test
void testRequestTableSkipsStaleQueueEntries() throws Exception {
long tableId1 = 1L;
TablePath tablePath1 = TablePath.of("db", "table1");
TableInfo tableInfo1 = createTableInfo(tableId1, tablePath1, Duration.ofSeconds(10));
tableTieringManager.addNewLakeTable(tableInfo1);

long tableId2 = 2L;
TablePath tablePath2 = TablePath.of("db", "table2");
TableInfo tableInfo2 = createTableInfo(tableId2, tablePath2, Duration.ofSeconds(10));
tableTieringManager.addNewLakeTable(tableInfo2);

manualClock.advanceTime(Duration.ofSeconds(10));

// Wait until we actually get the first assignable table
LakeTieringTableInfo first =
waitValue(
() -> Optional.ofNullable(tableTieringManager.requestTable()),
Duration.ofSeconds(10),
"First requestTable() timed out");

long tieringTableId = first.tableId();
long pendingTableId = (tieringTableId == tableId1) ? tableId2 : tableId1;
TablePath pendingPath = (pendingTableId == tableId1) ? tablePath1 : tablePath2;

// Verify first table is tiering (so it will be stale if queued again)
tableTieringManager.renewTieringHeartbeat(tieringTableId, 1L);

// Force a stale entry to the head of the pending queue
ArrayDeque<Long> pendingDeque =
getPrivateField(tableTieringManager, "pendingTieringTables");
pendingDeque.addFirst(tieringTableId);

// Now the next request should skip the stale tiering id and return the other pending table.
LakeTieringTableInfo second =
waitValue(
() -> Optional.ofNullable(tableTieringManager.requestTable()),
Duration.ofSeconds(10),
"Second requestTable() timed out");

assertThat(second).isEqualTo(new LakeTieringTableInfo(pendingTableId, pendingPath, 1L));
}

@Test
void testScheduledStateRecordedBeforeTimerCallbackRuns() throws Exception {
Timer immediateTimer = new ImmediateTimer();

LakeTableTieringManager manager =
new LakeTableTieringManager(
immediateTimer, lakeTieringServiceTimeoutChecker, manualClock);

try {
long tableId = 1L;
TablePath tablePath = TablePath.of("db", "table1");
TableInfo tableInfo = createTableInfo(tableId, tablePath, Duration.ofMinutes(3));

// lastTieredTime is older than freshness -> computed delay is negative
long lastTieredTime = manualClock.milliseconds() - Duration.ofMinutes(3).toMillis();

manager.initWithLakeTables(List.of(Tuple2.of(tableInfo, lastTieredTime)));

// Table should be immediately requestable
LakeTieringTableInfo table = manager.requestTable();
assertThat(table).isEqualTo(new LakeTieringTableInfo(tableId, tablePath, 1L));
} finally {
manager.close();
}
}

/**
* Timer implementation used for testing that immediately executes in the caller thread when
* added.
*/
private static final class ImmediateTimer implements Timer {
@Override
public void add(TimerTask timerTask) {
timerTask.run();
}

@Override
public boolean advanceClock(long waitMs) throws InterruptedException {
TimeUnit.MILLISECONDS.sleep(Math.min(waitMs, 5));
return false;
}

@Override
public int numOfTimerTasks() {
return 0;
}

@Override
public void shutdown() {
// no-op
}
}

private TableInfo createTableInfo(long tableId, TablePath tablePath, Duration freshness) {
TableDescriptor tableDescriptor =
TableDescriptor.builder()
Expand All @@ -264,4 +365,11 @@ private void assertRequestTable(long tableId, TablePath tablePath, long tieredEp
"Request tiering table timout");
assertThat(table).isEqualTo(new LakeTieringTableInfo(tableId, tablePath, tieredEpoch));
}

@SuppressWarnings("unchecked")
private static <T> T getPrivateField(Object target, String fieldName) throws Exception {
Field f = target.getClass().getDeclaredField(fieldName);
f.setAccessible(true);
return (T) f.get(target);
}
}
Loading