Skip to content

Commit c2230d5

Browse files
authored
[FLINK-36742][cdc-base] Filter unacked split for no capture tables when task restore from state
This closes #3739
1 parent b89d84f commit c2230d5

File tree

3 files changed

+379
-8
lines changed

3 files changed

+379
-8
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceReader.java

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.flink.api.connector.source.SourceEvent;
2121
import org.apache.flink.cdc.common.annotation.Experimental;
22+
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
2223
import org.apache.flink.cdc.connectors.base.config.SourceConfig;
2324
import org.apache.flink.cdc.connectors.base.dialect.DataSourceDialect;
2425
import org.apache.flink.cdc.connectors.base.source.meta.events.FinishedSnapshotSplitsAckEvent;
@@ -261,13 +262,14 @@ private void addSplits(List<SourceSplitBase> splits, boolean checkTableChangeFor
261262
for (SourceSplitBase split : splits) {
262263
if (split.isSnapshotSplit()) {
263264
SnapshotSplit snapshotSplit = split.asSnapshotSplit();
264-
if (snapshotSplit.isSnapshotReadFinished()) {
265-
finishedUnackedSplits.put(snapshotSplit.splitId(), snapshotSplit);
266-
} else if (dialect.isIncludeDataCollection(
267-
sourceConfig, snapshotSplit.getTableId())) {
268-
unfinishedSplits.add(split);
265+
if (dialect.isIncludeDataCollection(sourceConfig, snapshotSplit.getTableId())) {
266+
if (snapshotSplit.isSnapshotReadFinished()) {
267+
finishedUnackedSplits.put(snapshotSplit.splitId(), snapshotSplit);
268+
} else {
269+
unfinishedSplits.add(split);
270+
}
269271
} else {
270-
LOG.debug(
272+
LOG.info(
271273
"The subtask {} is skipping split {} because it does not match new table filter.",
272274
subtaskId,
273275
split.splitId());
@@ -320,8 +322,9 @@ private void addSplits(List<SourceSplitBase> splits, boolean checkTableChangeFor
320322
// add all un-finished splits (including binlog split) to SourceReaderBase
321323
if (!unfinishedSplits.isEmpty()) {
322324
super.addSplits(unfinishedSplits);
323-
} else if (suspendedStreamSplit
324-
!= null) { // only request new snapshot split if the stream split is suspended
325+
} else if (suspendedStreamSplit != null
326+
|| getNumberOfCurrentlyAssignedSplits()
327+
<= 1) { // only request new snapshot split if the stream split is suspended
325328
context.sendSplitRequest();
326329
}
327330
}
@@ -541,4 +544,9 @@ private void logCurrentStreamOffsets(List<SourceSplitBase> splits, long checkpoi
541544
LOG.info("Stream split offset on checkpoint {}: {}", checkpointId, offset);
542545
}
543546
}
547+
548+
@VisibleForTesting
549+
public Map<String, SnapshotSplit> getFinishedUnackedSplits() {
550+
return finishedUnackedSplits;
551+
}
544552
}

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oracle-cdc/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,12 @@ limitations under the License.
164164
<artifactId>commons-lang3</artifactId>
165165
<version>${commons-lang3.version}</version>
166166
</dependency>
167+
<dependency>
168+
<groupId>org.apache.flink</groupId>
169+
<artifactId>flink-connector-test-utils</artifactId>
170+
<version>${flink.version}</version>
171+
<scope>test</scope>
172+
</dependency>
167173
</dependencies>
168174

169175
<build>

0 commit comments

Comments
 (0)