1717
1818package org .apache .fluss .flink .tiering .source .enumerator ;
1919
20- import org .apache .flink .api .connector .source .ReaderInfo ;
21- import org .apache .flink .api .connector .source .SourceEvent ;
22- import org .apache .flink .api .connector .source .SplitEnumerator ;
23- import org .apache .flink .api .connector .source .SplitEnumeratorContext ;
24- import org .apache .flink .api .java .tuple .Tuple3 ;
25- import org .apache .flink .metrics .groups .SplitEnumeratorMetricGroup ;
26- import org .apache .flink .util .FlinkRuntimeException ;
2720import org .apache .fluss .annotation .VisibleForTesting ;
2821import org .apache .fluss .client .Connection ;
2922import org .apache .fluss .client .ConnectionFactory ;
4841import org .apache .fluss .rpc .messages .PbLakeTieringTableInfo ;
4942import org .apache .fluss .rpc .metrics .ClientMetricGroup ;
5043import org .apache .fluss .utils .MapUtils ;
44+
45+ import org .apache .flink .api .connector .source .ReaderInfo ;
46+ import org .apache .flink .api .connector .source .SourceEvent ;
47+ import org .apache .flink .api .connector .source .SplitEnumerator ;
48+ import org .apache .flink .api .connector .source .SplitEnumeratorContext ;
49+ import org .apache .flink .api .java .tuple .Tuple3 ;
50+ import org .apache .flink .metrics .groups .SplitEnumeratorMetricGroup ;
51+ import org .apache .flink .util .FlinkRuntimeException ;
5152import org .slf4j .Logger ;
5253import org .slf4j .LoggerFactory ;
5354
5455import javax .annotation .Nullable ;
56+
5557import java .io .IOException ;
5658import java .util .ArrayList ;
5759import java .util .Collections ;
@@ -102,7 +104,7 @@ public class TieringSourceEnumerator
102104
103105 private final Map <Long , Long > tieringTableEpochs ;
104106 private final Map <Long , Long > failedTableEpochs ;
105- private final Map <Long , Long > finishedTableEpochs ;
107+ private final Map <Long , TieringFinishState > finishedTables ;
106108 private final Set <Long > tieringReachMaxDurationsTables ;
107109
108110 // lazily instantiated
@@ -131,7 +133,7 @@ public TieringSourceEnumerator(
131133 this .pendingSplits = Collections .synchronizedList (new ArrayList <>());
132134 this .readersAwaitingSplit = Collections .synchronizedSet (new TreeSet <>());
133135 this .tieringTableEpochs = MapUtils .newConcurrentHashMap ();
134- this .finishedTableEpochs = MapUtils .newConcurrentHashMap ();
136+ this .finishedTables = MapUtils .newConcurrentHashMap ();
135137 this .failedTableEpochs = MapUtils .newConcurrentHashMap ();
136138 this .tieringReachMaxDurationsTables = Collections .synchronizedSet (new TreeSet <>());
137139 }
@@ -179,8 +181,24 @@ public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname
179181 }
180182 LOG .info ("TieringSourceReader {} requests split." , subtaskId );
181183 readersAwaitingSplit .add (subtaskId );
182- this .context .callAsync (
183- this ::requestTieringTableSplitsViaHeartBeat , this ::generateAndAssignSplits );
184+
185+ // If pending splits exist, assign them directly to the requesting reader
186+ if (!pendingSplits .isEmpty ()) {
187+ assignSplits ();
188+ } else {
189+ // Note: Ideally, only one table should be tiering at a time.
190+ // Here we block to request a tiering table synchronously to avoid multiple threads
191+ // requesting tiering tables concurrently, which would cause the enumerator to contain
192+ // multiple tiering tables simultaneously. This is not optimal for tiering performance.
193+ Tuple3 <Long , Long , TablePath > tieringTable = null ;
194+ Throwable throwable = null ;
195+ try {
196+ tieringTable = this .requestTieringTableSplitsViaHeartBeat ();
197+ } catch (Throwable t ) {
198+ throwable = t ;
199+ }
200+ this .generateAndAssignSplits (tieringTable , throwable );
201+ }
184202 }
185203
186204 @ Override
@@ -252,7 +270,9 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
252270 "The finished table {} is not in tiering table, won't report it to Fluss to mark as finished." ,
253271 finishedTableId );
254272 } else {
255- finishedTableEpochs .put (finishedTableId , tieringEpoch );
273+ boolean isForceComplete = tieringReachMaxDurationsTables .remove (finishedTableId );
274+ finishedTables .put (
275+ finishedTableId , TieringFinishState .from (tieringEpoch , isForceComplete ));
256276 }
257277 }
258278
@@ -274,7 +294,7 @@ public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
274294 }
275295 }
276296
277- if (!finishedTableEpochs .isEmpty () || !failedTableEpochs .isEmpty ()) {
297+ if (!finishedTables .isEmpty () || !failedTableEpochs .isEmpty ()) {
278298 // call one round of heartbeat to notify table has been finished or failed
279299 this .context .callAsync (
280300 this ::requestTieringTableSplitsViaHeartBeat , this ::generateAndAssignSplits );
@@ -288,6 +308,7 @@ private void handleSourceReaderFailOver() {
288308 // we need to make all as failed
289309 failedTableEpochs .putAll (new HashMap <>(tieringTableEpochs ));
290310 tieringTableEpochs .clear ();
311+ tieringReachMaxDurationsTables .clear ();
291312 // also clean all pending splits since we mark all as failed
292313 pendingSplits .clear ();
293314 if (!failedTableEpochs .isEmpty ()) {
@@ -298,22 +319,18 @@ private void handleSourceReaderFailOver() {
298319 }
299320
300321 @ VisibleForTesting
301- protected void handleTableTieringReachMaxDuration (long tableId , long tieringEpoch ) {
322+ protected void handleTableTieringReachMaxDuration (
323+ TablePath tablePath , long tableId , long tieringEpoch ) {
302324 Long currentEpoch = tieringTableEpochs .get (tableId );
303325 if (currentEpoch != null && currentEpoch .equals (tieringEpoch )) {
304- LOG .info ("Table {} reached max duration. Force completing." , tableId );
326+ LOG .info ("Table {}-{} reached max duration. Force completing." , tablePath , tableId );
305327 tieringReachMaxDurationsTables .add (tableId );
306328
307329 for (TieringSplit tieringSplit : pendingSplits ) {
308330 if (tieringSplit .getTableBucket ().getTableId () == tableId ) {
309331 // mark this tiering split to skip the current round since the tiering for
310332 // this table has timed out, so the tiering source reader can skip them directly
311333 tieringSplit .skipCurrentRound ();
312- } else {
313- // we can break directly, if found any one split's table id is not equal to the
314- // timeout
315- // table, the following split must be not equal to the table id
316- break ;
317334 }
318335 }
319336
@@ -362,13 +379,13 @@ private void assignSplits() {
362379 if (closed ) {
363380 return null ;
364381 }
365- Map <Long , Long > currentFinishedTableEpochs = new HashMap <>(this .finishedTableEpochs );
382+ Map <Long , TieringFinishState > currentFinishedTables = new HashMap <>(this .finishedTables );
366383 Map <Long , Long > currentFailedTableEpochs = new HashMap <>(this .failedTableEpochs );
367384 LakeTieringHeartbeatRequest tieringHeartbeatRequest =
368385 tieringTableHeartBeat (
369386 basicHeartBeat (),
370387 this .tieringTableEpochs ,
371- currentFinishedTableEpochs ,
388+ currentFinishedTables ,
372389 currentFailedTableEpochs ,
373390 this .flussCoordinatorEpoch );
374391
@@ -397,9 +414,9 @@ private void assignSplits() {
397414 waitHeartbeatResponse (coordinatorGateway .lakeTieringHeartbeat (tieringHeartbeatRequest ));
398415 }
399416
400- // if come to here, we can remove currentFinishedTableEpochs /failedTableEpochs to avoid send
417+ // if come to here, we can remove currentFinishedTables /failedTableEpochs to avoid send
401418 // in next round
402- currentFinishedTableEpochs .forEach (finishedTableEpochs ::remove );
419+ currentFinishedTables .forEach (finishedTables ::remove );
403420 currentFailedTableEpochs .forEach (failedTableEpochs ::remove );
404421 return lakeTieringInfo ;
405422 }
@@ -428,7 +445,7 @@ private void generateTieringSplits(Tuple3<Long, Long, TablePath> tieringTable)
428445 LOG .info (
429446 "Generate Tiering splits for table {} is empty, no need to tier data." ,
430447 tieringTable .f2 .getTableName ());
431- finishedTableEpochs .put (tieringTable .f0 , tieringTable .f1 );
448+ finishedTables .put (tieringTable .f0 , TieringFinishState . from ( tieringTable .f1 ) );
432449 } else {
433450 tieringTableEpochs .put (tieringTable .f0 , tieringTable .f1 );
434451 pendingSplits .addAll (tieringSplits );
@@ -438,7 +455,9 @@ private void generateTieringSplits(Tuple3<Long, Long, TablePath> tieringTable)
438455 context .runInCoordinatorThread (
439456 () ->
440457 handleTableTieringReachMaxDuration (
441- tieringTable .f0 , tieringTable .f1 )),
458+ tablePath ,
459+ tieringTable .f0 ,
460+ tieringTable .f1 )),
442461
443462 // for simplicity, we use the freshness as
444463 tableInfo .getTableConfig ().getDataLakeFreshness ().toMillis (),
@@ -537,16 +556,28 @@ static LakeTieringHeartbeatRequest heartBeatWithRequestNewTieringTable(
537556 static LakeTieringHeartbeatRequest tieringTableHeartBeat (
538557 LakeTieringHeartbeatRequest heartbeatRequest ,
539558 Map <Long , Long > tieringTableEpochs ,
540- Map <Long , Long > finishedTableEpochs ,
559+ Map <Long , TieringFinishState > finishedTables ,
541560 Map <Long , Long > failedTableEpochs ,
542561 int coordinatorEpoch ) {
543562 if (!tieringTableEpochs .isEmpty ()) {
544563 heartbeatRequest .addAllTieringTables (
545564 toPbHeartbeatReqForTable (tieringTableEpochs , coordinatorEpoch ));
546565 }
547- if (!finishedTableEpochs .isEmpty ()) {
566+ if (!finishedTables .isEmpty ()) {
567+ Map <Long , Long > finishTieringEpochs = new HashMap <>();
568+ Set <Long > forceFinishedTables = new HashSet <>();
569+ finishedTables .forEach (
570+ (tableId , tieringFinishState ) -> {
571+ finishTieringEpochs .put (tableId , tieringFinishState .tieringEpoch );
572+ if (tieringFinishState .isForceToFinish ) {
573+ forceFinishedTables .add (tableId );
574+ }
575+ });
548576 heartbeatRequest .addAllFinishedTables (
549- toPbHeartbeatReqForTable (finishedTableEpochs , coordinatorEpoch ));
577+ toPbHeartbeatReqForTable (finishTieringEpochs , coordinatorEpoch ));
578+ for (long forceFinishedTableId : forceFinishedTables ) {
579+ heartbeatRequest .addForceFinishedTable (forceFinishedTableId );
580+ }
550581 }
551582 // add failed tiering table to heart beat request
552583 return failedTableHeartBeat (heartbeatRequest , failedTableEpochs , coordinatorEpoch );
@@ -590,4 +621,22 @@ static LakeTieringHeartbeatResponse waitHeartbeatResponse(
590621 }
591622 }
592623 }
624+
625+ private static class TieringFinishState {
626+ long tieringEpoch ;
627+ boolean isForceToFinish ;
628+
629+ public static TieringFinishState from (long tieringEpoch ) {
630+ return new TieringFinishState (tieringEpoch , false );
631+ }
632+
633+ public static TieringFinishState from (long tieringEpoch , boolean isForceToFinish ) {
634+ return new TieringFinishState (tieringEpoch , isForceToFinish );
635+ }
636+
637+ private TieringFinishState (long tieringEpoch , boolean isForceToFinish ) {
638+ this .tieringEpoch = tieringEpoch ;
639+ this .isForceToFinish = isForceToFinish ;
640+ }
641+ }
593642}
0 commit comments