Skip to content

Commit 88c65ca

Browse files
committed
address comment & remove unnecessary option
1 parent 7e61a26 commit 88c65ca

File tree

8 files changed

+65
-165
lines changed

8 files changed

+65
-165
lines changed

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1851,23 +1851,15 @@ public class ConfigOptions {
18511851
+ ConfigOptions.TABLE_DATALAKE_AUTO_EXPIRE_SNAPSHOT
18521852
+ " is false.");
18531853

1854-
public static final ConfigOption<Duration> LAKE_TIERING_TABLE_DURATION_MAX =
1855-
key("lake.tiering.table.duration.max")
1854+
public static final ConfigOption<Duration> LAKE_TIERING_TABLE_MAX_DURATION =
1855+
key("lake.tiering.table.max-duration")
18561856
.durationType()
18571857
.defaultValue(Duration.ofMinutes(30))
18581858
.withDescription(
18591859
"The maximum duration for tiering a single table. If tiering a table exceeds this duration, "
18601860
+ "it will be force completed: the tiering will be finalized and committed to the data lake "
18611861
+ "(e.g., Paimon) immediately, even if they haven't reached their desired stopping offsets.");
18621862

1863-
public static final ConfigOption<Duration> LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL =
1864-
key("lake.tiering.table.duration.detect-interval")
1865-
.durationType()
1866-
.defaultValue(Duration.ofSeconds(30))
1867-
.withDescription(
1868-
"The interval to check if a table tiering operation has reached the maximum duration. "
1869-
+ "The enumerator will periodically check tiering tables and force complete those that exceed the maximum duration.");
1870-
18711863
// ------------------------------------------------------------------------
18721864
// ConfigOptions for fluss kafka
18731865
// ------------------------------------------------------------------------

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/LakeTieringJobBuilder.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,7 @@
3434
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
3535
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
3636

37-
import static org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL;
38-
import static org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_DURATION_MAX;
37+
import static org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_MAX_DURATION;
3938
import static org.apache.fluss.flink.tiering.source.TieringSource.TIERING_SOURCE_TRANSFORMATION_UID;
4039
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL;
4140
import static org.apache.fluss.utils.Preconditions.checkNotNull;
@@ -92,14 +91,9 @@ public JobClient build() throws Exception {
9291
flussConfig.get(POLL_TIERING_TABLE_INTERVAL).toMillis());
9392
}
9493

95-
if (lakeTieringConfig.get(LAKE_TIERING_TABLE_DURATION_MAX) != null) {
96-
tieringSourceBuilder.withTieringTableDurationMax(
97-
lakeTieringConfig.get(LAKE_TIERING_TABLE_DURATION_MAX).toMillis());
98-
}
99-
100-
if (lakeTieringConfig.get(LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL) != null) {
101-
tieringSourceBuilder.withTieringTableDurationDetectInterval(
102-
lakeTieringConfig.get(LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL).toMillis());
94+
if (lakeTieringConfig.get(LAKE_TIERING_TABLE_MAX_DURATION) != null) {
95+
tieringSourceBuilder.withTieringTableMaxDurationMs(
96+
lakeTieringConfig.get(LAKE_TIERING_TABLE_MAX_DURATION).toMillis());
10397
}
10498

10599
TieringSource<?> tieringSource = tieringSourceBuilder.build();

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSource.java

Lines changed: 9 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,7 @@
4444

4545
import java.nio.charset.StandardCharsets;
4646

47-
import static org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL;
48-
import static org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_DURATION_MAX;
47+
import static org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_MAX_DURATION;
4948
import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL;
5049

5150
/**
@@ -66,19 +65,16 @@ public class TieringSource<WriteResult>
6665
private final LakeTieringFactory<WriteResult, ?> lakeTieringFactory;
6766
private final long pollTieringTableIntervalMs;
6867
private final long tieringTableDurationMaxMs;
69-
private final long tieringTableDurationDetectIntervalMs;
7068

7169
public TieringSource(
7270
Configuration flussConf,
7371
LakeTieringFactory<WriteResult, ?> lakeTieringFactory,
7472
long pollTieringTableIntervalMs,
75-
long tieringTableDurationMaxMs,
76-
long tieringTableDurationDetectIntervalMs) {
73+
long tieringTableDurationMaxMs) {
7774
this.flussConf = flussConf;
7875
this.lakeTieringFactory = lakeTieringFactory;
7976
this.pollTieringTableIntervalMs = pollTieringTableIntervalMs;
8077
this.tieringTableDurationMaxMs = tieringTableDurationMaxMs;
81-
this.tieringTableDurationDetectIntervalMs = tieringTableDurationDetectIntervalMs;
8278
}
8379

8480
@Override
@@ -93,8 +89,7 @@ public SplitEnumerator<TieringSplit, TieringSourceEnumeratorState> createEnumera
9389
flussConf,
9490
splitEnumeratorContext,
9591
pollTieringTableIntervalMs,
96-
tieringTableDurationMaxMs,
97-
tieringTableDurationDetectIntervalMs);
92+
tieringTableDurationMaxMs);
9893
}
9994

10095
@Override
@@ -106,8 +101,7 @@ public SplitEnumerator<TieringSplit, TieringSourceEnumeratorState> restoreEnumer
106101
flussConf,
107102
splitEnumeratorContext,
108103
pollTieringTableIntervalMs,
109-
tieringTableDurationMaxMs,
110-
tieringTableDurationDetectIntervalMs);
104+
tieringTableDurationMaxMs);
111105
}
112106

113107
@Override
@@ -146,10 +140,8 @@ public static class Builder<WriteResult> {
146140
private final LakeTieringFactory<WriteResult, ?> lakeTieringFactory;
147141
private long pollTieringTableIntervalMs =
148142
POLL_TIERING_TABLE_INTERVAL.defaultValue().toMillis();
149-
private long tieringTableDurationMaxMs =
150-
LAKE_TIERING_TABLE_DURATION_MAX.defaultValue().toMillis();
151-
private long tieringTableDurationDetectIntervalMs =
152-
LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL.defaultValue().toMillis();
143+
private long tieringTableMaxDurationMs =
144+
LAKE_TIERING_TABLE_MAX_DURATION.defaultValue().toMillis();
153145

154146
public Builder(
155147
Configuration flussConf, LakeTieringFactory<WriteResult, ?> lakeTieringFactory) {
@@ -162,14 +154,8 @@ public Builder<WriteResult> withPollTieringTableIntervalMs(long pollTieringTable
162154
return this;
163155
}
164156

165-
public Builder<WriteResult> withTieringTableDurationMax(long tieringTableDurationMaxMs) {
166-
this.tieringTableDurationMaxMs = tieringTableDurationMaxMs;
167-
return this;
168-
}
169-
170-
public Builder<WriteResult> withTieringTableDurationDetectInterval(
171-
long tieringTableDurationDetectIntervalMs) {
172-
this.tieringTableDurationDetectIntervalMs = tieringTableDurationDetectIntervalMs;
157+
public Builder<WriteResult> withTieringTableMaxDurationMs(long tieringTableDurationMaxMs) {
158+
this.tieringTableMaxDurationMs = tieringTableDurationMaxMs;
173159
return this;
174160
}
175161

@@ -178,8 +164,7 @@ public TieringSource<WriteResult> build() {
178164
flussConf,
179165
lakeTieringFactory,
180166
pollTieringTableIntervalMs,
181-
tieringTableDurationMaxMs,
182-
tieringTableDurationDetectIntervalMs);
167+
tieringTableMaxDurationMs);
183168
}
184169
}
185170
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -533,7 +533,7 @@ private void finishCurrentTable() throws IOException {
533533
} catch (Exception e) {
534534
throw new IOException("Fail to finish current table.", e);
535535
}
536-
536+
reachTieringMaxDurationTables.remove(currentTableId);
537537
// before switch to a new table, mark all as empty or null
538538
currentTableId = null;
539539
currentTablePath = null;

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java

Lines changed: 29 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,6 @@
4040
import org.apache.fluss.rpc.messages.PbLakeTieringTableInfo;
4141
import org.apache.fluss.rpc.metrics.ClientMetricGroup;
4242
import org.apache.fluss.utils.MapUtils;
43-
import org.apache.fluss.utils.clock.Clock;
44-
import org.apache.fluss.utils.clock.SystemClock;
4543

4644
import org.apache.flink.api.connector.source.ReaderInfo;
4745
import org.apache.flink.api.connector.source.SourceEvent;
@@ -65,6 +63,8 @@
6563
import java.util.Set;
6664
import java.util.TreeSet;
6765
import java.util.concurrent.CompletableFuture;
66+
import java.util.concurrent.Executors;
67+
import java.util.concurrent.ScheduledExecutorService;
6868
import java.util.concurrent.TimeUnit;
6969
import java.util.stream.Collectors;
7070

@@ -95,20 +95,17 @@ public class TieringSourceEnumerator
9595

9696
private final Configuration flussConf;
9797
private final SplitEnumeratorContext<TieringSplit> context;
98+
private final ScheduledExecutorService timerService;
9899
private final SplitEnumeratorMetricGroup enumeratorMetricGroup;
99100
private final long pollTieringTableIntervalMs;
100-
private final long tieringTableDurationMaxMs;
101-
private final long tieringTableDurationDetectIntervalMs;
101+
private final long tieringTableMaxDurationMs;
102102
private final List<TieringSplit> pendingSplits;
103103
private final Set<Integer> readersAwaitingSplit;
104104

105-
private final Map<Long, Long> tieringTablesDeadline;
106105
private final Map<Long, Long> tieringTableEpochs;
107106
private final Map<Long, Long> failedTableEpochs;
108107
private final Map<Long, Long> finishedTableEpochs;
109108

110-
private final Clock clock;
111-
112109
// lazily instantiated
113110
private RpcClient rpcClient;
114111
private CoordinatorGateway coordinatorGateway;
@@ -125,37 +122,20 @@ public TieringSourceEnumerator(
125122
Configuration flussConf,
126123
SplitEnumeratorContext<TieringSplit> context,
127124
long pollTieringTableIntervalMs,
128-
long tieringTableDurationMaxMs,
129-
long tieringTableDurationDetectIntervalMs) {
130-
this(
131-
flussConf,
132-
context,
133-
pollTieringTableIntervalMs,
134-
tieringTableDurationMaxMs,
135-
tieringTableDurationDetectIntervalMs,
136-
SystemClock.getInstance());
137-
}
138-
139-
public TieringSourceEnumerator(
140-
Configuration flussConf,
141-
SplitEnumeratorContext<TieringSplit> context,
142-
long pollTieringTableIntervalMs,
143-
long tieringTableDurationMaxMs,
144-
long tieringTableDurationDetectIntervalMs,
145-
Clock clock) {
125+
long tieringTableMaxDurationMs) {
146126
this.flussConf = flussConf;
147127
this.context = context;
128+
this.timerService =
129+
Executors.newSingleThreadScheduledExecutor(
130+
r -> new Thread(r, "Tiering-Timer-Thread"));
148131
this.enumeratorMetricGroup = context.metricGroup();
149132
this.pollTieringTableIntervalMs = pollTieringTableIntervalMs;
150-
this.tieringTableDurationMaxMs = tieringTableDurationMaxMs;
151-
this.tieringTableDurationDetectIntervalMs = tieringTableDurationDetectIntervalMs;
133+
this.tieringTableMaxDurationMs = tieringTableMaxDurationMs;
152134
this.pendingSplits = Collections.synchronizedList(new ArrayList<>());
153135
this.readersAwaitingSplit = Collections.synchronizedSet(new TreeSet<>());
154136
this.tieringTableEpochs = MapUtils.newConcurrentHashMap();
155137
this.finishedTableEpochs = MapUtils.newConcurrentHashMap();
156138
this.failedTableEpochs = MapUtils.newConcurrentHashMap();
157-
this.tieringTablesDeadline = MapUtils.newConcurrentHashMap();
158-
this.clock = clock;
159139
}
160140

161141
@Override
@@ -191,12 +171,6 @@ public void start() {
191171
this::generateAndAssignSplits,
192172
0,
193173
pollTieringTableIntervalMs);
194-
195-
this.context.callAsync(
196-
this::checkTableReachMaxTieringDuration,
197-
this::handleReachMaxTieringDurationTables,
198-
0,
199-
tieringTableDurationDetectIntervalMs);
200174
}
201175

202176
@Override
@@ -282,7 +256,6 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
282256
} else {
283257
finishedTableEpochs.put(finishedTableId, tieringEpoch);
284258
}
285-
tieringTablesDeadline.remove(finishedTableId);
286259
}
287260

288261
if (sourceEvent instanceof FailedTieringEvent) {
@@ -301,7 +274,6 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
301274
} else {
302275
failedTableEpochs.put(failedTableId, tieringEpoch);
303276
}
304-
tieringTablesDeadline.remove(failedTableId);
305277
}
306278

307279
if (!finishedTableEpochs.isEmpty() || !failedTableEpochs.isEmpty()) {
@@ -327,29 +299,17 @@ private void handleSourceReaderFailOver() {
327299
}
328300
}
329301

330-
private Set<Long> checkTableReachMaxTieringDuration() {
331-
Set<Long> tieringReachMaxDurationTables = new HashSet<>();
332-
long currentTime = clock.milliseconds();
333-
for (Map.Entry<Long, Long> tieringTableDeadline : tieringTablesDeadline.entrySet()) {
334-
long tableId = tieringTableDeadline.getKey();
335-
long deadline = tieringTableDeadline.getValue();
336-
if (deadline < currentTime) {
337-
tieringReachMaxDurationTables.add(tableId);
338-
}
339-
}
340-
return tieringReachMaxDurationTables;
341-
}
342-
343-
private void handleReachMaxTieringDurationTables(
344-
Set<Long> tieringReachMaxDurationTables, Throwable throwable) {
345-
if (throwable != null) {
346-
LOG.error("Fail to check tiering timeout tables.", throwable);
347-
return;
348-
}
302+
@VisibleForTesting
303+
protected void handleTableTieringReachMaxDuration(long tableId, long tieringEpoch) {
304+
Long currentEpoch = tieringTableEpochs.get(tableId);
305+
if (currentEpoch != null && currentEpoch.equals(tieringEpoch)) {
306+
LOG.info(
307+
"Table {} reached max duration ({}ms). Force completing.",
308+
tableId,
309+
tieringTableMaxDurationMs);
349310

350-
for (Long reachMaxDurationTable : tieringReachMaxDurationTables) {
351311
for (TieringSplit tieringSplit : pendingSplits) {
352-
if (tieringSplit.getTableBucket().getTableId() == reachMaxDurationTable) {
312+
if (tieringSplit.getTableBucket().getTableId() == tableId) {
353313
// mark this tiering split to skip the current round since the tiering for
354314
// this table has timed out, so the tiering source reader can skip them directly
355315
tieringSplit.skipCurrentRound();
@@ -361,14 +321,11 @@ private void handleReachMaxTieringDurationTables(
361321
}
362322
}
363323

364-
LOG.info("Found the table {} reach max tiering duration.", reachMaxDurationTable);
365-
366324
// broadcast the tiering reach max duration event to all readers,
367325
// we broadcast all for simplicity
368326
Set<Integer> readers = new HashSet<>(context.registeredReaders().keySet());
369327
for (int reader : readers) {
370-
context.sendEventToSourceReader(
371-
reader, new TieringReachMaxDurationEvent(reachMaxDurationTable));
328+
context.sendEventToSourceReader(reader, new TieringReachMaxDurationEvent(tableId));
372329
}
373330
}
374331
}
@@ -478,8 +435,15 @@ private void generateTieringSplits(Tuple3<Long, Long, TablePath> tieringTable)
478435
} else {
479436
tieringTableEpochs.put(tieringTable.f0, tieringTable.f1);
480437
pendingSplits.addAll(tieringSplits);
481-
tieringTablesDeadline.put(
482-
tieringTable.f0, clock.milliseconds() + tieringTableDurationMaxMs);
438+
439+
timerService.schedule(
440+
() ->
441+
context.runInCoordinatorThread(
442+
() ->
443+
handleTableTieringReachMaxDuration(
444+
tieringTable.f0, tieringTable.f1)),
445+
tieringTableMaxDurationMs,
446+
TimeUnit.MILLISECONDS);
483447
}
484448
} catch (Exception e) {
485449
LOG.warn("Fail to generate Tiering splits for table {}.", tieringTable.f2, e);
@@ -503,6 +467,7 @@ public TieringSourceEnumeratorState snapshotState(long checkpointId) throws Exce
503467
@Override
504468
public void close() throws IOException {
505469
closed = true;
470+
timerService.shutdownNow();
506471
if (rpcClient != null) {
507472
failedTableEpochs.putAll(tieringTableEpochs);
508473
tieringTableEpochs.clear();

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/FlinkTieringTestBase.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,6 @@ static void afterAll() throws Exception {
9797
conn.close();
9898
conn = null;
9999
}
100-
101-
System.out.println("after all");
102100
}
103101

104102
@BeforeEach

fluss-flink/fluss-flink-common/src/test/java/org/apache/fluss/flink/tiering/TieringITCase.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,7 @@
4444
import java.util.Map;
4545
import java.util.Optional;
4646

47-
import static org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL;
48-
import static org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_DURATION_MAX;
47+
import static org.apache.fluss.config.ConfigOptions.LAKE_TIERING_TABLE_MAX_DURATION;
4948
import static org.apache.fluss.testutils.common.CommonTestUtils.waitValue;
5049
import static org.assertj.core.api.Assertions.assertThat;
5150

@@ -63,6 +62,7 @@ protected static void afterAll() throws Exception {
6362
}
6463

6564
@BeforeEach
65+
@Override
6666
void beforeEach() {
6767
execEnv =
6868
StreamExecutionEnvironment.getExecutionEnvironment()
@@ -96,8 +96,7 @@ void testTieringReachMaxDuration() throws Exception {
9696

9797
// set tiering duration to a small value for testing purpose
9898
Configuration lakeTieringConfig = new Configuration();
99-
lakeTieringConfig.set(LAKE_TIERING_TABLE_DURATION_MAX, Duration.ofSeconds(1));
100-
lakeTieringConfig.set(LAKE_TIERING_TABLE_DURATION_DETECT_INTERVAL, Duration.ofMillis(100));
99+
lakeTieringConfig.set(LAKE_TIERING_TABLE_MAX_DURATION, Duration.ofSeconds(1));
101100
JobClient jobClient = buildTieringJob(execEnv, lakeTieringConfig);
102101

103102
try {
@@ -117,7 +116,7 @@ void testTieringReachMaxDuration() throws Exception {
117116
}
118117
}
119118

120-
private long countTieredRecords(LakeSnapshot lakeSnapshot) throws Exception {
119+
private long countTieredRecords(LakeSnapshot lakeSnapshot) {
121120
return lakeSnapshot.getTableBucketsOffset().values().stream()
122121
.mapToLong(Long::longValue)
123122
.sum();

0 commit comments

Comments
 (0)