Skip to content

Commit 9f16356

Browse files
committed
[FLINK-38218]: Add a MySqlSnapshotSplitAssigner test
1 parent f8873a5 commit 9f16356

File tree

1 file changed

+92
-0
lines changed

1 file changed

+92
-0
lines changed

flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/assigners/MySqlSnapshotSplitAssignerTest.java

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@
3636
import org.assertj.core.api.Assertions;
3737
import org.junit.jupiter.api.BeforeAll;
3838
import org.junit.jupiter.api.Test;
39+
import org.junit.jupiter.params.ParameterizedTest;
40+
import org.junit.jupiter.params.provider.Arguments;
41+
import org.junit.jupiter.params.provider.MethodSource;
42+
43+
import javax.annotation.Nullable;
3944

4045
import java.time.ZoneId;
4146
import java.util.ArrayList;
@@ -47,6 +52,7 @@
4752
import java.util.Map;
4853
import java.util.Optional;
4954
import java.util.stream.Collectors;
55+
import java.util.stream.Stream;
5056

5157
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
5258
import static org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
@@ -528,6 +534,92 @@ void testSplitEvenlySizedChunksEndingFirst() {
528534
assertThat(splits).isEqualTo(expected);
529535
}
530536

537+
@ParameterizedTest
538+
@MethodSource
539+
void testFinishedSnapshotSplitInfosAreInOrderOfAssignment(
540+
String table1Name, String table2Name) {
541+
List<String> tableNames = new ArrayList<>();
542+
tableNames.add(table1Name);
543+
544+
SnapshotPendingSplitsState state;
545+
546+
try (MySqlSnapshotSplitAssigner assigner = createAssigner(tableNames, null)) {
547+
state = processAllSplitsAndSnapshotState(assigner, 1L);
548+
}
549+
550+
tableNames.add(table2Name);
551+
552+
try (MySqlSnapshotSplitAssigner assigner = createAssigner(tableNames, state)) {
553+
state = processAllSplitsAndSnapshotState(assigner, 2L);
554+
}
555+
556+
try (MySqlSnapshotSplitAssigner assigner = createAssigner(tableNames, state)) {
557+
List<String> finishedSnapshotSplitTableNames =
558+
assigner.getFinishedSplitInfos().stream()
559+
.map(i -> i.getTableId().table())
560+
.collect(Collectors.toList());
561+
562+
assertThat(finishedSnapshotSplitTableNames).isEqualTo(tableNames);
563+
}
564+
}
565+
566+
/**
567+
* Use various combinations of table names to ensure that the finished snapshot split infos are
568+
* in the order of assignment, not the order of table names.
569+
*/
570+
public static Stream<Arguments> testFinishedSnapshotSplitInfosAreInOrderOfAssignment() {
571+
String table1Name = "customers";
572+
String table2Name = "customers_1";
573+
574+
return Stream.of(
575+
Arguments.of(table1Name, table2Name), Arguments.of(table2Name, table1Name));
576+
}
577+
578+
private MySqlSnapshotSplitAssigner createAssigner(
579+
List<String> tableNames, @Nullable SnapshotPendingSplitsState state) {
580+
int currentParallelism = 1;
581+
582+
if (state == null) {
583+
return new MySqlSnapshotSplitAssigner(
584+
createConfiguration(tableNames),
585+
currentParallelism,
586+
new ArrayList<>(),
587+
true,
588+
getMySqlSplitEnumeratorContext());
589+
}
590+
591+
return new MySqlSnapshotSplitAssigner(
592+
createConfiguration(tableNames),
593+
currentParallelism,
594+
state,
595+
getMySqlSplitEnumeratorContext());
596+
}
597+
598+
private MySqlSourceConfig createConfiguration(List<String> tableNames) {
599+
return getConfig(
600+
customerDatabase,
601+
Integer.MAX_VALUE,
602+
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND.defaultValue(),
603+
CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue(),
604+
tableNames.toArray(new String[0]),
605+
"id",
606+
true,
607+
false);
608+
}
609+
610+
private SnapshotPendingSplitsState processAllSplitsAndSnapshotState(
611+
MySqlSnapshotSplitAssigner assigner, long checkpointId) {
612+
assigner.open();
613+
614+
Optional<MySqlSplit> optional;
615+
while ((optional = assigner.getNext()).isPresent()) {
616+
assigner.onFinishedSplits(
617+
Collections.singletonMap(optional.get().splitId(), BinlogOffset.ofLatest()));
618+
}
619+
620+
return assigner.snapshotState(checkpointId);
621+
}
622+
531623
private List<String> getTestAssignSnapshotSplits(
532624
int splitSize,
533625
double distributionFactorUpper,

0 commit comments

Comments
 (0)