Skip to content

Commit 7e61a26

Browse files
committed
address comments
1 parent 8743f21 commit 7e61a26

File tree

14 files changed

+215
-306
lines changed

14 files changed

+215
-306
lines changed

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,11 +180,11 @@ public void handleSplitsChanges(SplitsChange<TieringSplit> splitsChange) {
180180
}
181181
for (TieringSplit split : splitsChange.splits()) {
182182
LOG.info("add split {}", split.splitId());
183-
if (split.isForceIgnore()) {
183+
if (split.shouldSkipCurrentRound()) {
184184
// if the split is forced to ignore,
185185
// mark it as empty
186186
LOG.info(
187-
"ignore split {} since the split is set to force to ignore",
187+
"ignore split {} since the split is set to skip the current round of tiering.",
188188
split.splitId());
189189
currentEmptySplits.add(split);
190190
continue;
@@ -551,6 +551,7 @@ private void finishCurrentTable() throws IOException {
551551
public void handleTableReachTieringMaxDuration(long tableId) {
552552
if ((currentTableId != null && currentTableId.equals(tableId))
553553
|| pendingTieringSplits.containsKey(tableId)) {
554+
LOG.info("Table {} reach tiering max duration, will force to complete.", tableId);
554555
reachTieringMaxDurationTables.add(tableId);
555556
}
556557
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/enumerator/TieringSourceEnumerator.java

Lines changed: 18 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -149,8 +149,8 @@ public TieringSourceEnumerator(
149149
this.pollTieringTableIntervalMs = pollTieringTableIntervalMs;
150150
this.tieringTableDurationMaxMs = tieringTableDurationMaxMs;
151151
this.tieringTableDurationDetectIntervalMs = tieringTableDurationDetectIntervalMs;
152-
this.pendingSplits = new ArrayList<>();
153-
this.readersAwaitingSplit = new TreeSet<>();
152+
this.pendingSplits = Collections.synchronizedList(new ArrayList<>());
153+
this.readersAwaitingSplit = Collections.synchronizedSet(new TreeSet<>());
154154
this.tieringTableEpochs = MapUtils.newConcurrentHashMap();
155155
this.finishedTableEpochs = MapUtils.newConcurrentHashMap();
156156
this.failedTableEpochs = MapUtils.newConcurrentHashMap();
@@ -350,10 +350,9 @@ private void handleReachMaxTieringDurationTables(
350350
for (Long reachMaxDurationTable : tieringReachMaxDurationTables) {
351351
for (TieringSplit tieringSplit : pendingSplits) {
352352
if (tieringSplit.getTableBucket().getTableId() == reachMaxDurationTable) {
353-
// force ignore this tiering split since the tiering for this table is timeout,
354-
// we have to force to set to ignore the tiering split so that the
355-
// tiering source reader can ignore them directly
356-
tieringSplit.forceIgnore();
353+
// mark this tiering split to skip the current round since the tiering for
354+
// this table has timed out, so the tiering source reader can skip them directly
355+
tieringSplit.skipCurrentRound();
357356
} else {
358357
// we can break directly, if found any one split's table id is not equal to the
359358
// timeout
@@ -362,7 +361,7 @@ private void handleReachMaxTieringDurationTables(
362361
}
363362
}
364363

365-
LOG.debug("Found the table {} reach max tiering duration.", reachMaxDurationTable);
364+
LOG.info("Found the table {} reach max tiering duration.", reachMaxDurationTable);
366365

367366
// broadcast the tiering reach max duration event to all readers,
368367
// we broadcast all for simplicity
@@ -386,25 +385,21 @@ private void generateAndAssignSplits(
386385
}
387386

388387
private void assignSplits() {
389-
// we don't assign splits during failovering
388+
// we don't assign splits during failover
390389
if (isFailOvering) {
391390
return;
392391
}
393-
/* This method may be called from both addSplitsBack and handleSplitRequest, make it thread safe. */
394-
// todo: do we need to add lock?
395-
synchronized (readersAwaitingSplit) {
396-
if (!readersAwaitingSplit.isEmpty()) {
397-
final Integer[] readers = readersAwaitingSplit.toArray(new Integer[0]);
398-
for (Integer nextAwaitingReader : readers) {
399-
if (!context.registeredReaders().containsKey(nextAwaitingReader)) {
400-
readersAwaitingSplit.remove(nextAwaitingReader);
401-
continue;
402-
}
403-
if (!pendingSplits.isEmpty()) {
404-
TieringSplit tieringSplit = pendingSplits.remove(0);
405-
context.assignSplit(tieringSplit, nextAwaitingReader);
406-
readersAwaitingSplit.remove(nextAwaitingReader);
407-
}
392+
if (!readersAwaitingSplit.isEmpty()) {
393+
final Integer[] readers = readersAwaitingSplit.toArray(new Integer[0]);
394+
for (Integer nextAwaitingReader : readers) {
395+
if (!context.registeredReaders().containsKey(nextAwaitingReader)) {
396+
readersAwaitingSplit.remove(nextAwaitingReader);
397+
continue;
398+
}
399+
if (!pendingSplits.isEmpty()) {
400+
TieringSplit tieringSplit = pendingSplits.remove(0);
401+
context.assignSplit(tieringSplit, nextAwaitingReader);
402+
readersAwaitingSplit.remove(nextAwaitingReader);
408403
}
409404
}
410405
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringLogSplit.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ public TieringLogSplit(
6060
partitionName,
6161
startingOffset,
6262
stoppingOffset,
63-
false,
64-
numberOfSplits);
63+
numberOfSplits,
64+
false);
6565
}
6666

6767
public TieringLogSplit(
@@ -70,9 +70,9 @@ public TieringLogSplit(
7070
@Nullable String partitionName,
7171
long startingOffset,
7272
long stoppingOffset,
73-
boolean forceIgnore,
74-
int numberOfSplits) {
75-
super(tablePath, tableBucket, partitionName, forceIgnore, numberOfSplits);
73+
int numberOfSplits,
74+
boolean skipCurrentRound) {
75+
super(tablePath, tableBucket, partitionName, numberOfSplits, skipCurrentRound);
7676
this.startingOffset = startingOffset;
7777
this.stoppingOffset = stoppingOffset;
7878
}
@@ -102,8 +102,8 @@ public String toString() {
102102
+ '\''
103103
+ ", numberOfSplits="
104104
+ numberOfSplits
105-
+ ", forceIgnore="
106-
+ forceIgnore
105+
+ ", skipCurrentRound="
106+
+ skipCurrentRound
107107
+ ", startingOffset="
108108
+ startingOffset
109109
+ ", stoppingOffset="
@@ -119,8 +119,8 @@ public TieringLogSplit copy(int numberOfSplits) {
119119
partitionName,
120120
startingOffset,
121121
stoppingOffset,
122-
forceIgnore,
123-
numberOfSplits);
122+
numberOfSplits,
123+
skipCurrentRound);
124124
}
125125

126126
@Override

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSnapshotSplit.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ public TieringSnapshotSplit(
6262
long snapshotId,
6363
long logOffsetOfSnapshot,
6464
int numberOfSplits,
65-
boolean forceIgnore) {
66-
super(tablePath, tableBucket, partitionName, forceIgnore, numberOfSplits);
65+
boolean skipCurrentRound) {
66+
super(tablePath, tableBucket, partitionName, numberOfSplits, skipCurrentRound);
6767
this.snapshotId = snapshotId;
6868
this.logOffsetOfSnapshot = logOffsetOfSnapshot;
6969
}
@@ -93,8 +93,8 @@ public String toString() {
9393
+ '\''
9494
+ ", numberOfSplits="
9595
+ numberOfSplits
96-
+ ", forceIgnore="
97-
+ forceIgnore
96+
+ ", skipCurrentRound="
97+
+ skipCurrentRound
9898
+ ", snapshotId="
9999
+ snapshotId
100100
+ ", logOffsetOfSnapshot="
@@ -111,7 +111,7 @@ public TieringSnapshotSplit copy(int numberOfSplits) {
111111
snapshotId,
112112
logOffsetOfSnapshot,
113113
numberOfSplits,
114-
forceIgnore);
114+
skipCurrentRound);
115115
}
116116

117117
@Override

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplit.java

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,21 @@ public abstract class TieringSplit implements SourceSplit {
3838
protected final TableBucket tableBucket;
3939
@Nullable protected final String partitionName;
4040

41-
protected boolean forceIgnore;
4241
// the total number of splits in one round of tiering
4342
protected final int numberOfSplits;
4443

44+
/**
45+
* Indicates whether to skip tiering data for this split in the current round of tiering. When
46+
* set to true, the split will not be processed and tiering for the split will be skipped.
47+
*/
48+
protected boolean skipCurrentRound;
49+
4550
public TieringSplit(
4651
TablePath tablePath,
4752
TableBucket tableBucket,
4853
@Nullable String partitionName,
49-
boolean forceIgnore,
50-
int numberOfSplits) {
54+
int numberOfSplits,
55+
boolean skipCurrentRound) {
5156
this.tablePath = tablePath;
5257
this.tableBucket = tableBucket;
5358
this.partitionName = partitionName;
@@ -56,8 +61,8 @@ public TieringSplit(
5661
throw new IllegalArgumentException(
5762
"Partition name and partition id must be both null or both not null.");
5863
}
59-
this.forceIgnore = forceIgnore;
6064
this.numberOfSplits = numberOfSplits;
65+
this.skipCurrentRound = skipCurrentRound;
6166
}
6267

6368
/** Checks whether this split is a primary key table split to tier. */
@@ -75,12 +80,21 @@ public final boolean isTieringLogSplit() {
7580
return getClass() == TieringLogSplit.class;
7681
}
7782

78-
public void forceIgnore() {
79-
this.forceIgnore = true;
83+
/**
84+
* Marks this split to skip reading data in the current round. Once called, the split will not
85+
* be processed and data reading will be skipped.
86+
*/
87+
public void skipCurrentRound() {
88+
this.skipCurrentRound = true;
8089
}
8190

82-
public boolean isForceIgnore() {
83-
return forceIgnore;
91+
/**
92+
* Returns whether this split should skip tiering data in the current round of tiering.
93+
*
94+
* @return true if the split should skip tiering data, false otherwise
95+
*/
96+
public boolean shouldSkipCurrentRound() {
97+
return skipCurrentRound;
8498
}
8599

86100
/** Casts this split into a {@link TieringLogSplit}. */
@@ -139,12 +153,13 @@ public boolean equals(Object object) {
139153
return Objects.equals(tablePath, that.tablePath)
140154
&& Objects.equals(tableBucket, that.tableBucket)
141155
&& Objects.equals(partitionName, that.partitionName)
142-
&& forceIgnore == that.forceIgnore
143-
&& numberOfSplits == that.numberOfSplits;
156+
&& numberOfSplits == that.numberOfSplits
157+
&& skipCurrentRound == that.skipCurrentRound;
144158
}
145159

146160
@Override
147161
public int hashCode() {
148-
return Objects.hash(tablePath, tableBucket, partitionName, forceIgnore, numberOfSplits);
162+
return Objects.hash(
163+
tablePath, tableBucket, partitionName, numberOfSplits, skipCurrentRound);
149164
}
150165
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/split/TieringSplitSerializer.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.fluss.flink.tiering.source.split;
1919

20+
import org.apache.fluss.flink.tiering.source.TieringSource;
21+
import org.apache.fluss.flink.tiering.source.enumerator.TieringSourceEnumerator;
2022
import org.apache.fluss.metadata.TableBucket;
2123
import org.apache.fluss.metadata.TablePath;
2224

@@ -26,7 +28,13 @@
2628

2729
import java.io.IOException;
2830

29-
/** A serializer for the {@link TieringSplit}. */
31+
/**
32+
* A serializer for the {@link TieringSplit}.
33+
*
34+
* <p>This serializer is only used to serialize and deserialize splits sent from {@link
35+
* TieringSourceEnumerator} to {@link TieringSource} for network transmission. Therefore, it does
36+
* not need to consider compatibility.
37+
*/
3038
public class TieringSplitSerializer implements SimpleVersionedSerializer<TieringSplit> {
3139

3240
public static final TieringSplitSerializer INSTANCE = new TieringSplitSerializer();
@@ -74,11 +82,10 @@ public byte[] serialize(TieringSplit split) throws IOException {
7482
out.writeBoolean(false);
7583
}
7684

77-
// write force ignore
78-
out.writeBoolean(split.isForceIgnore());
79-
8085
// write number of splits
8186
out.writeInt(split.getNumberOfSplits());
87+
// write skipCurrentRound
88+
out.writeBoolean(split.shouldSkipCurrentRound());
8289
if (split.isTieringSnapshotSplit()) {
8390
// Snapshot split
8491
TieringSnapshotSplit tieringSnapshotSplit = split.asTieringSnapshotSplit();
@@ -129,10 +136,9 @@ public TieringSplit deserialize(int version, byte[] serialized) throws IOExcepti
129136
}
130137
TableBucket tableBucket = new TableBucket(tableId, partitionId, bucketId);
131138

132-
boolean forceIgnore = in.readBoolean();
133-
134139
// deserialize number of splits
135140
int numberOfSplits = in.readInt();
141+
boolean skipCurrentRound = in.readBoolean();
136142

137143
if (splitKind == TIERING_SNAPSHOT_SPLIT_FLAG) {
138144
// deserialize snapshot id
@@ -146,7 +152,7 @@ public TieringSplit deserialize(int version, byte[] serialized) throws IOExcepti
146152
snapshotId,
147153
logOffsetOfSnapshot,
148154
numberOfSplits,
149-
forceIgnore);
155+
skipCurrentRound);
150156
} else {
151157
// deserialize starting offset
152158
long startingOffset = in.readLong();
@@ -158,8 +164,8 @@ public TieringSplit deserialize(int version, byte[] serialized) throws IOExcepti
158164
partitionName,
159165
startingOffset,
160166
stoppingOffset,
161-
forceIgnore,
162-
numberOfSplits);
167+
numberOfSplits,
168+
skipCurrentRound);
163169
}
164170
}
165171
}

0 commit comments

Comments
 (0)