Skip to content

Commit 02bde17

Browse files
proletarianswuzexian
authored andcommitted
[FLINK-38045] Build createTableEventCache using TableSchemas from split and Make transform operator stateless (apache#4056)
Co-authored-by: wuzexian <[email protected]>
1 parent 652068d commit 02bde17

File tree

16 files changed

+331
-130
lines changed

16 files changed

+331
-130
lines changed

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.flink.cdc.composer.flink;
1919

2020
import org.apache.flink.cdc.common.annotation.Internal;
21+
import org.apache.flink.cdc.common.annotation.VisibleForTesting;
2122
import org.apache.flink.cdc.common.configuration.Configuration;
2223
import org.apache.flink.cdc.common.event.Event;
2324
import org.apache.flink.cdc.common.pipeline.PipelineOptions;
@@ -186,9 +187,7 @@ private void translate(StreamExecutionEnvironment env, PipelineDef pipelineDef)
186187
pipelineDef.getTransforms(),
187188
pipelineDef.getUdfs(),
188189
pipelineDef.getModels(),
189-
dataSource.supportedMetadataColumns(),
190-
!isParallelMetadataSource && !isBatchMode,
191-
operatorUidGenerator);
190+
dataSource.supportedMetadataColumns());
192191

193192
// PreTransform ---> PostTransform
194193
stream =
@@ -290,4 +289,9 @@ private Optional<URL> getContainingJar(Class<?> clazz) throws Exception {
290289
}
291290
return Optional.of(container);
292291
}
292+
293+
@VisibleForTesting
294+
public StreamExecutionEnvironment getEnv() {
295+
return env;
296+
}
293297
}

flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/TransformTranslator.java

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -50,30 +50,21 @@ public DataStream<Event> translatePreTransform(
5050
List<TransformDef> transforms,
5151
List<UdfDef> udfFunctions,
5252
List<ModelDef> models,
53-
SupportedMetadataColumn[] supportedMetadataColumns,
54-
boolean shouldStoreSchemasInState,
55-
OperatorUidGenerator operatorUidGenerator) {
53+
SupportedMetadataColumn[] supportedMetadataColumns) {
5654
if (transforms.isEmpty()) {
5755
return input;
5856
}
5957
return input.transform(
60-
"Transform:Schema",
61-
new EventTypeInfo(),
62-
generatePreTransform(
63-
transforms,
64-
udfFunctions,
65-
models,
66-
supportedMetadataColumns,
67-
shouldStoreSchemasInState))
68-
.uid(operatorUidGenerator.generateUid("pre-transform"));
58+
"Transform:Schema",
59+
new EventTypeInfo(),
60+
generatePreTransform(transforms, udfFunctions, models, supportedMetadataColumns));
6961
}
7062

7163
private PreTransformOperator generatePreTransform(
7264
List<TransformDef> transforms,
7365
List<UdfDef> udfFunctions,
7466
List<ModelDef> models,
75-
SupportedMetadataColumn[] supportedMetadataColumns,
76-
boolean shouldStoreSchemasInState) {
67+
SupportedMetadataColumn[] supportedMetadataColumns) {
7768

7869
PreTransformOperatorBuilder preTransformFunctionBuilder = PreTransformOperator.newBuilder();
7970
for (TransformDef transform : transforms) {
@@ -94,8 +85,7 @@ private PreTransformOperator generatePreTransform(
9485
.map(this::udfDefToUDFTuple)
9586
.collect(Collectors.toList()))
9687
.addUdfFunctions(
97-
models.stream().map(this::modelToUDFTuple).collect(Collectors.toList()))
98-
.shouldStoreSchemasInState(shouldStoreSchemasInState);
88+
models.stream().map(this::modelToUDFTuple).collect(Collectors.toList()));
9989

10090
return preTransformFunctionBuilder.build();
10191
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,14 @@
2626
import org.apache.flink.cdc.connectors.mysql.schema.MySqlTableDefinition;
2727
import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
2828
import org.apache.flink.cdc.connectors.mysql.source.metrics.MySqlSourceReaderMetrics;
29+
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSnapshotSplit;
30+
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplit;
2931
import org.apache.flink.cdc.connectors.mysql.source.split.MySqlSplitState;
3032
import org.apache.flink.cdc.connectors.mysql.source.utils.StatementUtils;
3133
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
3234
import org.apache.flink.cdc.connectors.mysql.utils.MySqlTypeUtils;
3335
import org.apache.flink.cdc.debezium.DebeziumDeserializationSchema;
36+
import org.apache.flink.cdc.debezium.event.DebeziumEventDeserializationSchema;
3437
import org.apache.flink.connector.base.source.reader.RecordEmitter;
3538

3639
import io.debezium.connector.mysql.antlr.MySqlAntlrDdlParser;
@@ -40,6 +43,7 @@
4043
import io.debezium.relational.Table;
4144
import io.debezium.relational.TableId;
4245
import io.debezium.relational.Tables;
46+
import io.debezium.relational.history.TableChanges;
4347
import io.debezium.text.ParsingException;
4448
import org.apache.commons.lang3.StringUtils;
4549
import org.apache.kafka.connect.source.SourceRecord;
@@ -77,7 +81,9 @@ public class MySqlPipelineRecordEmitter extends MySqlRecordEmitter<Event> {
7781
private boolean shouldEmitAllCreateTableEventsInSnapshotMode = true;
7882
private boolean isBounded = false;
7983

80-
private Map<TableId, CreateTableEvent> createTableEventCache;
84+
private final DebeziumDeserializationSchema<Event> debeziumDeserializationSchema;
85+
86+
private final Map<TableId, CreateTableEvent> createTableEventCache;
8187

8288
public MySqlPipelineRecordEmitter(
8389
DebeziumDeserializationSchema<Event> debeziumDeserializationSchema,
@@ -87,12 +93,32 @@ public MySqlPipelineRecordEmitter(
8793
debeziumDeserializationSchema,
8894
sourceReaderMetrics,
8995
sourceConfig.isIncludeSchemaChanges());
96+
this.debeziumDeserializationSchema = debeziumDeserializationSchema;
9097
this.sourceConfig = sourceConfig;
9198
this.alreadySendCreateTableTables = new HashSet<>();
92-
this.createTableEventCache = generateCreateTableEvent(sourceConfig);
99+
this.createTableEventCache =
100+
((DebeziumEventDeserializationSchema) debeziumDeserializationSchema)
101+
.getCreateTableEventCache();
93102
this.isBounded = StartupOptions.snapshot().equals(sourceConfig.getStartupOptions());
94103
}
95104

105+
@Override
106+
public void applySplit(MySqlSplit split) {
107+
if ((isBounded) && createTableEventCache.isEmpty() && split instanceof MySqlSnapshotSplit) {
108+
// TableSchemas in MySqlSnapshotSplit only contains one table.
109+
createTableEventCache.putAll(generateCreateTableEvent(sourceConfig));
110+
} else {
111+
for (TableChanges.TableChange tableChange : split.getTableSchemas().values()) {
112+
CreateTableEvent createTableEvent =
113+
new CreateTableEvent(
114+
toCdcTableId(tableChange.getId()),
115+
buildSchemaFromTable(tableChange.getTable()));
116+
((DebeziumEventDeserializationSchema) debeziumDeserializationSchema)
117+
.applyChangeEvent(createTableEvent);
118+
}
119+
}
120+
}
121+
96122
@Override
97123
protected void processElement(
98124
SourceRecord element, SourceOutput<Event> output, MySqlSplitState splitState)
@@ -130,6 +156,11 @@ protected void processElement(
130156
super.processElement(element, output, splitState);
131157
}
132158

159+
private org.apache.flink.cdc.common.event.TableId toCdcTableId(TableId dbzTableId) {
160+
return org.apache.flink.cdc.common.event.TableId.tableId(
161+
dbzTableId.catalog(), dbzTableId.table());
162+
}
163+
133164
private void sendCreateTableEvent(
134165
JdbcConnection jdbc, TableId tableId, SourceOutput<Event> output) {
135166
Schema schema = getSchema(jdbc, tableId);
@@ -204,7 +235,10 @@ private String describeTable(JdbcConnection jdbc, TableId tableId) {
204235

205236
private Schema parseDDL(String ddlStatement, TableId tableId) {
206237
Table table = parseDdl(ddlStatement, tableId);
238+
return buildSchemaFromTable(table);
239+
}
207240

241+
private Schema buildSchemaFromTable(Table table) {
208242
List<Column> columns = table.columns();
209243
Schema.Builder tableBuilder = Schema.newBuilder();
210244
for (int i = 0; i < columns.size(); i++) {

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlTypeUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,8 @@ private static DataType convertFromColumn(Column column, boolean tinyInt1isBit)
129129
String typeName = column.typeName();
130130
switch (typeName) {
131131
case BIT:
132-
return column.length() == 1
132+
// column.length() might be -1
133+
return column.length() <= 1
133134
? DataTypes.BOOLEAN()
134135
: DataTypes.BINARY((column.length() + 7) / 8);
135136
case BOOL:

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineITCase.java

Lines changed: 127 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,8 @@
6060
import org.junit.jupiter.api.BeforeAll;
6161
import org.junit.jupiter.api.BeforeEach;
6262
import org.junit.jupiter.api.Test;
63+
import org.junit.jupiter.params.ParameterizedTest;
64+
import org.junit.jupiter.params.provider.ValueSource;
6365
import org.testcontainers.lifecycle.Startables;
6466

6567
import java.sql.Connection;
@@ -399,7 +401,131 @@ void testSqlInjection() throws Exception {
399401
}
400402

401403
@Test
402-
void testExcludeTables() throws Exception {
404+
void testLatestOffsetStartupMode() throws Exception {
405+
inventoryDatabase.createAndInitialize();
406+
MySqlSourceConfigFactory configFactory =
407+
new MySqlSourceConfigFactory()
408+
.hostname(MYSQL8_CONTAINER.getHost())
409+
.port(MYSQL8_CONTAINER.getDatabasePort())
410+
.username(TEST_USER)
411+
.password(TEST_PASSWORD)
412+
.databaseList(inventoryDatabase.getDatabaseName())
413+
.tableList(inventoryDatabase.getDatabaseName() + "\\.products")
414+
.startupOptions(StartupOptions.latest())
415+
.serverId(getServerId(env.getParallelism()))
416+
.serverTimeZone("UTC")
417+
.includeSchemaChanges(SCHEMA_CHANGE_ENABLED.defaultValue());
418+
419+
FlinkSourceProvider sourceProvider =
420+
(FlinkSourceProvider) new MySqlDataSource(configFactory).getEventSourceProvider();
421+
CloseableIterator<Event> events =
422+
env.fromSource(
423+
sourceProvider.getSource(),
424+
WatermarkStrategy.noWatermarks(),
425+
MySqlDataSourceFactory.IDENTIFIER,
426+
new EventTypeInfo())
427+
.executeAndCollect();
428+
Thread.sleep(10_000);
429+
TableId tableId = TableId.tableId(inventoryDatabase.getDatabaseName(), "products");
430+
431+
List<Event> expectedBinlog = new ArrayList<>();
432+
try (Connection connection = inventoryDatabase.getJdbcConnection();
433+
Statement statement = connection.createStatement()) {
434+
expectedBinlog.addAll(executeAlterAndProvideExpected(tableId, statement));
435+
436+
RowType rowType =
437+
RowType.of(
438+
new DataType[] {
439+
DataTypes.INT().notNull(),
440+
DataTypes.VARCHAR(255).notNull(),
441+
DataTypes.FLOAT(),
442+
DataTypes.VARCHAR(45),
443+
DataTypes.VARCHAR(55)
444+
},
445+
new String[] {"id", "name", "weight", "col1", "col2"});
446+
BinaryRecordDataGenerator generator = new BinaryRecordDataGenerator(rowType);
447+
// insert more data
448+
statement.execute(
449+
String.format(
450+
"INSERT INTO `%s`.`products` VALUES (default,'scooter',5.5,'c-10','c-20');",
451+
inventoryDatabase.getDatabaseName())); // 110
452+
expectedBinlog.add(
453+
DataChangeEvent.insertEvent(
454+
tableId,
455+
generator.generate(
456+
new Object[] {
457+
110,
458+
BinaryStringData.fromString("scooter"),
459+
5.5f,
460+
BinaryStringData.fromString("c-10"),
461+
BinaryStringData.fromString("c-20")
462+
})));
463+
statement.execute(
464+
String.format(
465+
"INSERT INTO `%s`.`products` VALUES (default,'football',6.6,'c-11','c-21');",
466+
inventoryDatabase.getDatabaseName())); // 111
467+
expectedBinlog.add(
468+
DataChangeEvent.insertEvent(
469+
tableId,
470+
generator.generate(
471+
new Object[] {
472+
111,
473+
BinaryStringData.fromString("football"),
474+
6.6f,
475+
BinaryStringData.fromString("c-11"),
476+
BinaryStringData.fromString("c-21")
477+
})));
478+
statement.execute(
479+
String.format(
480+
"UPDATE `%s`.`products` SET `col1`='c-12', `col2`='c-22' WHERE id=110;",
481+
inventoryDatabase.getDatabaseName()));
482+
expectedBinlog.add(
483+
DataChangeEvent.updateEvent(
484+
tableId,
485+
generator.generate(
486+
new Object[] {
487+
110,
488+
BinaryStringData.fromString("scooter"),
489+
5.5f,
490+
BinaryStringData.fromString("c-10"),
491+
BinaryStringData.fromString("c-20")
492+
}),
493+
generator.generate(
494+
new Object[] {
495+
110,
496+
BinaryStringData.fromString("scooter"),
497+
5.5f,
498+
BinaryStringData.fromString("c-12"),
499+
BinaryStringData.fromString("c-22")
500+
})));
501+
statement.execute(
502+
String.format(
503+
"DELETE FROM `%s`.`products` WHERE `id` = 111;",
504+
inventoryDatabase.getDatabaseName()));
505+
expectedBinlog.add(
506+
DataChangeEvent.deleteEvent(
507+
tableId,
508+
generator.generate(
509+
new Object[] {
510+
111,
511+
BinaryStringData.fromString("football"),
512+
6.6f,
513+
BinaryStringData.fromString("c-11"),
514+
BinaryStringData.fromString("c-21")
515+
})));
516+
}
517+
// In this configuration, several subtasks might emit their corresponding CreateTableEvent
518+
// to downstream. Since it is not possible to predict how many CreateTableEvents should we
519+
// expect, we simply filter them out from expected sets, and assert there's at least one.
520+
521+
Event createTableEvent = getProductsCreateTableEvent(tableId);
522+
List<Event> actual = fetchResultsExcept(events, expectedBinlog.size(), createTableEvent);
523+
assertThat(actual).isEqualTo(expectedBinlog);
524+
}
525+
526+
@ParameterizedTest(name = "batchEmit: {0}")
527+
@ValueSource(booleans = {true, false})
528+
void testExcludeTables(boolean inBatch) throws Exception {
403529
inventoryDatabase.createAndInitialize();
404530
String databaseName = inventoryDatabase.getDatabaseName();
405531
MySqlSourceConfigFactory configFactory =

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/ValuesDatabase.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,15 @@ public static class ValuesMetadataApplier implements MetadataApplier {
8181

8282
private Set<SchemaChangeEventType> enabledSchemaEvolutionTypes;
8383

84+
private final boolean materializedInMemory;
85+
8486
public ValuesMetadataApplier() {
87+
this(true);
88+
}
89+
90+
public ValuesMetadataApplier(boolean materializedInMemory) {
8591
this.enabledSchemaEvolutionTypes = getSupportedSchemaEvolutionTypes();
92+
this.materializedInMemory = materializedInMemory;
8693
}
8794

8895
@Override
@@ -109,7 +116,9 @@ public Set<SchemaChangeEventType> getSupportedSchemaEvolutionTypes() {
109116

110117
@Override
111118
public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) {
112-
applySchemaChangeEvent(schemaChangeEvent);
119+
if (materializedInMemory) {
120+
applySchemaChangeEvent(schemaChangeEvent);
121+
}
113122
}
114123
}
115124

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSink.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public MetadataApplier getMetadataApplier() {
8080
if (errorOnSchemaChange) {
8181
return new ValuesDatabase.ErrorOnChangeMetadataApplier();
8282
} else {
83-
return new ValuesDatabase.ValuesMetadataApplier();
83+
return new ValuesDatabase.ValuesMetadataApplier(materializedInMemory);
8484
}
8585
}
8686

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-values/src/main/java/org/apache/flink/cdc/connectors/values/sink/ValuesDataSinkOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public class ValuesDataSinkOptions {
2828
.booleanType()
2929
.defaultValue(false)
3030
.withDescription(
31-
"True if the DataChangeEvent need to be materialized in memory.");
31+
"True if the SchemaChangeEvent and DataChangeEvent need to be materialized in memory.");
3232

3333
public static final ConfigOption<Boolean> PRINT_ENABLED =
3434
ConfigOptions.key("print.enabled")

0 commit comments

Comments
 (0)