Skip to content

[FLINK-37905] Fix transform failure with non-ascii string literals #4038

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2725,6 +2725,109 @@ void testShadeOriginalColumnsWithDifferentType() throws Exception {
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2.5, ], after=[2.5, x], op=UPDATE, meta=({op_ts=5})}");
}

private static final String[] UNICODE_STRINGS = {
"ascii test!?",
"大五",
"测试数据",
"ひびぴ",
"죠주쥬",
"ÀÆÉ",
"ÓÔŐÖ",
"αβγδε",
"בבקשה",
"твой",
"ภาษาไทย",
"piedzimst brīvi"
};

@ParameterizedTest
@EnumSource
void testTransformProjectionWithUnicodeCharacters(ValuesDataSink.SinkApi sinkApi)
throws Exception {
for (String unicodeString : UNICODE_STRINGS) {
List<String> expected =
Stream.of(
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`prefix` STRING,`id` INT NOT NULL,`name` STRING,`age` INT,`suffix` STRING}, primaryKeys=id, partitionKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[{UNICODE_STRING} -> 1, 1, Alice, 18, 1 <- {UNICODE_STRING}], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[{UNICODE_STRING} -> 2, 2, Bob, 20, 2 <- {UNICODE_STRING}], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[{UNICODE_STRING} -> 2, 2, Bob, 20, 2 <- {UNICODE_STRING}], after=[{UNICODE_STRING} -> 2, 2, Bob, 30, 2 <- {UNICODE_STRING}], op=UPDATE, meta=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`prefix` STRING,`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`suffix` STRING}, primaryKeys=id, partitionKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[{UNICODE_STRING} -> 3, 3, Carol, 15, student, 3 <- {UNICODE_STRING}], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[{UNICODE_STRING} -> 4, 4, Derrida, 25, student, 4 <- {UNICODE_STRING}], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[{UNICODE_STRING} -> 4, 4, Derrida, 25, student, 4 <- {UNICODE_STRING}], after=[], op=DELETE, meta=()}")
.map(tpl -> tpl.replace("{UNICODE_STRING}", unicodeString))
.collect(Collectors.toList());

runGenericTransformTest(
sinkApi,
Collections.singletonList(
new TransformDef(
"\\.*.\\.*.\\.*",
String.format(
"'%s' || ' -> ' || id AS prefix, *, id || ' <- ' || '%s' AS suffix",
unicodeString, unicodeString),
null,
null,
"id",
null,
null,
null)),
expected);
outCaptor.reset();
}
}

@ParameterizedTest
@EnumSource
void testTransformFilterWithUnicodeCharacters(ValuesDataSink.SinkApi sinkApi) throws Exception {
for (String unicodeString : UNICODE_STRINGS) {
List<String> expected =
Stream.of(
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`extras` STRING}, primaryKeys=id, partitionKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, {UNICODE_STRING}], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, {UNICODE_STRING}], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, {UNICODE_STRING}], after=[2, Bob, 30, {UNICODE_STRING}], op=UPDATE, meta=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`extras` STRING}, primaryKeys=id, partitionKeys=id, options=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, {UNICODE_STRING}], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, {UNICODE_STRING}], op=INSERT, meta=()}",
"DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, {UNICODE_STRING}], after=[], op=DELETE, meta=()}")
.map(tpl -> tpl.replace("{UNICODE_STRING}", unicodeString))
.collect(Collectors.toList());

runGenericTransformTest(
sinkApi,
Collections.singletonList(
new TransformDef(
"\\.*.\\.*.\\.*",
String.format("*, '%s' AS extras", unicodeString),
String.format("extras = '%s'", unicodeString),
null,
"id",
null,
null,
null)),
expected);
outCaptor.reset();

runGenericTransformTest(
sinkApi,
Collections.singletonList(
new TransformDef(
"\\.*.\\.*.\\.*",
String.format("*, '%s' AS extras", unicodeString),
String.format("extras <> '%s'", unicodeString),
null,
"id",
null,
null,
null)),
Arrays.asList(
"CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`extras` STRING}, primaryKeys=id, partitionKeys=id, options=()}",
"CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`extras` STRING}, primaryKeys=id, partitionKeys=id, options=()}"));
outCaptor.reset();
}
}

private List<Event> generateFloorCeilAndRoundEvents(TableId tableId) {
List<Event> events = new ArrayList<>();
Schema schema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,138 @@ void testTransformWildcardSuffixWithSchemaEvolution() throws Exception {
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3010 <- id, Beginning, 3010, 10, 10, 97, Lemon], op=INSERT, meta=()}");
}

private static final String[] UNICODE_STRINGS = {
"ascii test!?",
"大五",
"测试数据",
"ひびぴ",
"죠주쥬",
"ÀÆÉ",
"ÓÔŐÖ",
"αβγδε",
"בבקשה",
"твой",
"ภาษาไทย",
"piedzimst brīvi"
};

@Test
void testTransformWithUnicodeLiterals() throws Exception {
StringBuilder projectionExpression = new StringBuilder("\\*,");
for (int i = 0; i < UNICODE_STRINGS.length; i++) {
projectionExpression
.append('\'')
.append(UNICODE_STRINGS[i])
.append('\'')
.append(" AS col_")
.append(i)
.append(",");
}
projectionExpression.deleteCharAt(projectionExpression.length() - 1);

String pipelineJob =
String.format(
"source:\n"
+ " type: mysql\n"
+ " hostname: %s\n"
+ " port: 3306\n"
+ " username: %s\n"
+ " password: %s\n"
+ " tables: %s.TABLEALPHA\n"
+ " server-id: 5400-5404\n"
+ " server-time-zone: UTC\n"
+ "sink:\n"
+ " type: values\n"
+ "transform:\n"
+ " - source-table: %s.\\.*\n"
+ " projection: %s\n"
+ " filter: ID > 1008\n"
+ "pipeline:\n"
+ " parallelism: %d\n"
+ " schema.change.behavior: evolve",
INTER_CONTAINER_MYSQL_ALIAS,
MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD,
transformTestDatabase.getDatabaseName(),
transformTestDatabase.getDatabaseName(),
projectionExpression,
parallelism);
submitPipelineJob(pipelineJob);
waitUntilJobRunning(Duration.ofSeconds(30));
LOG.info("Pipeline job is running");

validateResult(
dbNameFormatter,
"CreateTableEvent{tableId=%s.TABLEALPHA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`PRICEALPHA` INT,`AGEALPHA` INT,`NAMEALPHA` VARCHAR(128),`col_0` STRING,`col_1` STRING,`col_2` STRING,`col_3` STRING,`col_4` STRING,`col_5` STRING,`col_6` STRING,`col_7` STRING,`col_8` STRING,`col_9` STRING,`col_10` STRING,`col_11` STRING}, primaryKeys=ID, options=()}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1010, 10, 99, 19, Carol, ascii test!?, 大五, 测试数据, ひびぴ, 죠주쥬, ÀÆÉ, ÓÔŐÖ, αβγδε, בבקשה, твой, ภาษาไทย, piedzimst brīvi], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1009, 8.1, 0, 18, Bob, ascii test!?, 大五, 测试数据, ひびぴ, 죠주쥬, ÀÆÉ, ÓÔŐÖ, αβγδε, בבקשה, твой, ภาษาไทย, piedzimst brīvi], op=INSERT, meta=()}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1011, 11, 59, 20, Dave, ascii test!?, 大五, 测试数据, ひびぴ, 죠주쥬, ÀÆÉ, ÓÔŐÖ, αβγδε, בבקשה, твой, ภาษาไทย, piedzimst brīvi], op=INSERT, meta=()}");

LOG.info("Begin incremental reading stage.");
// generate binlogs
String mysqlJdbcUrl =
String.format(
"jdbc:mysql://%s:%s/%s",
MYSQL.getHost(),
MYSQL.getDatabasePort(),
transformTestDatabase.getDatabaseName());
try (Connection conn =
DriverManager.getConnection(
mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
Statement stat = conn.createStatement()) {
stat.execute("UPDATE TABLEALPHA SET VERSION='100' WHERE id=1009;");
stat.execute("INSERT INTO TABLEALPHA VALUES (3007, '7', 79, 16, 'IINA');");
stat.execute("DELETE FROM TABLEBETA WHERE id=2011;");
} catch (SQLException e) {
LOG.error("Update table for CDC failed.", e);
throw e;
}

validateResult(
dbNameFormatter,
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[1009, 8.1, 0, 18, Bob, ascii test!?, 大五, 测试数据, ひびぴ, 죠주쥬, ÀÆÉ, ÓÔŐÖ, αβγδε, בבקשה, твой, ภาษาไทย, piedzimst brīvi], after=[1009, 100, 0, 18, Bob, ascii test!?, 大五, 测试数据, ひびぴ, 죠주쥬, ÀÆÉ, ÓÔŐÖ, αβγδε, בבקשה, твой, ภาษาไทย, piedzimst brīvi], op=UPDATE, meta=()}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3007, 7, 79, 16, IINA, ascii test!?, 大五, 测试数据, ひびぴ, 죠주쥬, ÀÆÉ, ÓÔŐÖ, αβγδε, בבקשה, твой, ภาษาไทย, piedzimst brīvi], op=INSERT, meta=()}");

LOG.info("Start schema evolution.");
try (Connection conn =
DriverManager.getConnection(
mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD);
Statement stmt = conn.createStatement()) {

// triggers AddColumnEvent
stmt.execute("ALTER TABLE TABLEALPHA ADD COLUMN CODENAME TINYINT AFTER VERSION;");
stmt.execute("ALTER TABLE TABLEALPHA ADD COLUMN FIRST VARCHAR(17) FIRST;");
stmt.execute("INSERT INTO TABLEALPHA VALUES ('First', 3008, '8', 8, 80, 17, 'Jazz');");

// triggers AlterColumnTypeEvent and RenameColumnEvent
stmt.execute("ALTER TABLE TABLEALPHA CHANGE COLUMN CODENAME CODE_NAME DOUBLE;");

// triggers RenameColumnEvent
stmt.execute("ALTER TABLE TABLEALPHA RENAME COLUMN CODE_NAME TO CODE_NAME_EX;");
stmt.execute("INSERT INTO TABLEALPHA VALUES ('1st', 3009, '9', 9, 90, 18, 'Keka');");

// triggers DropColumnEvent
stmt.execute("ALTER TABLE TABLEALPHA DROP COLUMN CODE_NAME_EX");
stmt.execute(
"INSERT INTO TABLEALPHA VALUES ('Beginning', 3010, '10', 10, 97, 'Lemon');");
} catch (SQLException e) {
LOG.error("Update table for CDC failed.", e);
throw e;
}

validateResult(
dbNameFormatter,
"AddColumnEvent{tableId=%s.TABLEALPHA, addedColumns=[ColumnWithPosition{column=`CODENAME` TINYINT, position=AFTER, existedColumnName=VERSION}]}",
"AddColumnEvent{tableId=%s.TABLEALPHA, addedColumns=[ColumnWithPosition{column=`FIRST` VARCHAR(17), position=BEFORE, existedColumnName=ID}]}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[First, 3008, 8, 8, 80, 17, Jazz, ascii test!?, 大五, 测试数据, ひびぴ, 죠주쥬, ÀÆÉ, ÓÔŐÖ, αβγδε, בבקשה, твой, ภาษาไทย, piedzimst brīvi], op=INSERT, meta=()}",
"AlterColumnTypeEvent{tableId=%s.TABLEALPHA, typeMapping={CODENAME=DOUBLE}, oldTypeMapping={CODENAME=TINYINT}}",
"RenameColumnEvent{tableId=%s.TABLEALPHA, nameMapping={CODENAME=CODE_NAME}}",
"RenameColumnEvent{tableId=%s.TABLEALPHA, nameMapping={CODE_NAME=CODE_NAME_EX}}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1st, 3009, 9, 9.0, 90, 18, Keka, ascii test!?, 大五, 测试数据, ひびぴ, 죠주쥬, ÀÆÉ, ÓÔŐÖ, αβγδε, בבקשה, твой, ภาษาไทย, piedzimst brīvi], op=INSERT, meta=()}",
"DropColumnEvent{tableId=%s.TABLEALPHA, droppedColumnNames=[CODE_NAME_EX]}",
"DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[Beginning, 3010, 10, 10, 97, Lemon, ascii test!?, 大五, 测试数据, ひびぴ, 죠주쥬, ÀÆÉ, ÓÔŐÖ, αβγδε, בבקשה, твой, ภาษาไทย, piedzimst brīvi], op=INSERT, meta=()}");
}

private void validateEventsWithPattern(String... patterns) throws Exception {
for (String pattern : patterns) {
waitUntilSpecificEventWithPattern(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.calcite.sql.SqlNumericLiteral;
import org.apache.calcite.sql.fun.SqlCase;
import org.apache.calcite.sql.type.SqlTypeName;
import org.apache.calcite.util.NlsString;
import org.codehaus.commons.compiler.CompileException;
import org.codehaus.commons.compiler.Location;
import org.codehaus.janino.ExpressionEvaluator;
Expand Down Expand Up @@ -158,10 +159,10 @@ private static Java.Rvalue translateSqlSqlLiteral(SqlLiteral sqlLiteral) {
if (sqlLiteral.getValue() == null) {
return new Java.NullLiteral(Location.NOWHERE);
}
String value = sqlLiteral.getValue().toString();
Object value = sqlLiteral.getValue();
if (sqlLiteral instanceof SqlCharStringLiteral) {
// Double quotation marks represent strings in Janino.
value = "\"" + value.substring(1, value.length() - 1) + "\"";
value = "\"" + sqlLiteral.getValueAs(NlsString.class).getValue() + "\"";
} else if (sqlLiteral instanceof SqlNumericLiteral) {
if (((SqlNumericLiteral) sqlLiteral).isInteger()) {
long longValue = sqlLiteral.longValue(true);
Expand All @@ -173,7 +174,7 @@ private static Java.Rvalue translateSqlSqlLiteral(SqlLiteral sqlLiteral) {
if (SQL_TYPE_NAME_IGNORE.contains(sqlLiteral.getTypeName())) {
value = "\"" + value + "\"";
}
return new Java.AmbiguousName(Location.NOWHERE, new String[] {value});
return new Java.AmbiguousName(Location.NOWHERE, new String[] {value.toString()});
}

private static Java.Rvalue translateSqlBasicCall(
Expand Down
16 changes: 16 additions & 0 deletions flink-cdc-runtime/src/main/resources/saffron.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

calcite.default.charset = utf8
Loading
Loading