From 13e4586c59605cff0fe695e8936a99aa3ce454f2 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Thu, 5 Jun 2025 21:24:45 +0800 Subject: [PATCH 1/2] [FLINK-37905] Fix transform failure with non-ascii string literals --- .../flink/FlinkPipelineTransformITCase.java | 103 ++++++++++++++ .../pipeline/tests/TransformE2eITCase.java | 132 ++++++++++++++++++ .../cdc/runtime/parser/JaninoCompiler.java | 7 +- .../src/main/resources/saffron.properties | 16 +++ .../runtime/parser/TransformParserTest.java | 46 ++++++ 5 files changed, 301 insertions(+), 3 deletions(-) create mode 100644 flink-cdc-runtime/src/main/resources/saffron.properties diff --git a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java index 08c6da71d78..23f2aceab04 100644 --- a/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java +++ b/flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java @@ -2725,6 +2725,109 @@ void testShadeOriginalColumnsWithDifferentType() throws Exception { "DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2.5, ], after=[2.5, x], op=UPDATE, meta=({op_ts=5})}"); } + private static final String[] UNICODE_STRINGS = { + "ascii test!?", + "大五", + "测试数据", + "ひびぴ", + "죠주쥬", + "ÀÆÉ", + "ÓÔŐÖ", + "αβγδε", + "בבקשה", + "твой", + "ภาษาไทย", + "piedzimst brīvi" + }; + + @ParameterizedTest + @EnumSource + void testTransformProjectionWithUnicodeCharacters(ValuesDataSink.SinkApi sinkApi) + throws Exception { + for (String unicodeString : UNICODE_STRINGS) { + List expected = + Stream.of( + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`prefix` STRING,`id` INT NOT NULL,`name` STRING,`age` INT,`suffix` STRING}, primaryKeys=id, partitionKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[{UNICODE_STRING} -> 1, 1, Alice, 18, 1 <- {UNICODE_STRING}], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[{UNICODE_STRING} -> 2, 2, Bob, 20, 2 <- {UNICODE_STRING}], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[{UNICODE_STRING} -> 2, 2, Bob, 20, 2 <- {UNICODE_STRING}], after=[{UNICODE_STRING} -> 2, 2, Bob, 30, 2 <- {UNICODE_STRING}], op=UPDATE, meta=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`prefix` STRING,`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`suffix` STRING}, primaryKeys=id, partitionKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[{UNICODE_STRING} -> 3, 3, Carol, 15, student, 3 <- {UNICODE_STRING}], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[{UNICODE_STRING} -> 4, 4, Derrida, 25, student, 4 <- {UNICODE_STRING}], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[{UNICODE_STRING} -> 4, 4, Derrida, 25, student, 4 <- {UNICODE_STRING}], after=[], op=DELETE, meta=()}") + .map(tpl -> tpl.replace("{UNICODE_STRING}", unicodeString)) + .collect(Collectors.toList()); + + runGenericTransformTest( + sinkApi, + Collections.singletonList( + new TransformDef( + "\\.*.\\.*.\\.*", + String.format( + "'%s' || ' -> ' || id AS prefix, *, id || ' <- ' || '%s' AS suffix", + unicodeString, unicodeString), + null, + null, + "id", + null, + null, + null)), + expected); + outCaptor.reset(); + } + } + + @ParameterizedTest + @EnumSource + void testTransformFilterWithUnicodeCharacters(ValuesDataSink.SinkApi sinkApi) throws Exception { + for (String unicodeString : UNICODE_STRINGS) { + List expected = + Stream.of( + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`extras` STRING}, primaryKeys=id, partitionKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[1, Alice, 18, {UNICODE_STRING}], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[], after=[2, Bob, 20, {UNICODE_STRING}], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable1, before=[2, Bob, 20, {UNICODE_STRING}], after=[2, Bob, 30, {UNICODE_STRING}], op=UPDATE, meta=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`extras` STRING}, primaryKeys=id, partitionKeys=id, options=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[3, Carol, 15, student, {UNICODE_STRING}], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[], after=[4, Derrida, 25, student, {UNICODE_STRING}], op=INSERT, meta=()}", + "DataChangeEvent{tableId=default_namespace.default_schema.mytable2, before=[4, Derrida, 25, student, {UNICODE_STRING}], after=[], op=DELETE, meta=()}") + .map(tpl -> tpl.replace("{UNICODE_STRING}", unicodeString)) + .collect(Collectors.toList()); + + runGenericTransformTest( + sinkApi, + Collections.singletonList( + new TransformDef( + "\\.*.\\.*.\\.*", + String.format("*, '%s' AS extras", unicodeString), + String.format("extras = '%s'", unicodeString), + null, + "id", + null, + null, + null)), + expected); + outCaptor.reset(); + + runGenericTransformTest( + sinkApi, + Collections.singletonList( + new TransformDef( + "\\.*.\\.*.\\.*", + String.format("*, '%s' AS extras", unicodeString), + String.format("extras <> '%s'", unicodeString), + null, + "id", + null, + null, + null)), + Arrays.asList( + "CreateTableEvent{tableId=default_namespace.default_schema.mytable1, schema=columns={`id` INT NOT NULL,`name` STRING,`age` INT,`extras` STRING}, primaryKeys=id, partitionKeys=id, options=()}", + "CreateTableEvent{tableId=default_namespace.default_schema.mytable2, schema=columns={`id` BIGINT NOT NULL,`name` VARCHAR(255),`age` TINYINT,`description` STRING,`extras` STRING}, primaryKeys=id, partitionKeys=id, options=()}")); + outCaptor.reset(); + } + } + private List generateFloorCeilAndRoundEvents(TableId tableId) { List events = new ArrayList<>(); Schema schema = diff --git a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java index d685d283b05..ca04db12e42 100644 --- a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java +++ b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/TransformE2eITCase.java @@ -1091,6 +1091,138 @@ void testTransformWildcardSuffixWithSchemaEvolution() throws Exception { "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3010 <- id, Beginning, 3010, 10, 10, 97, Lemon], op=INSERT, meta=()}"); } + private static final String[] UNICODE_STRINGS = { + "ascii test!?", + "大五", + "测试数据", + "ひびぴ", + "죠주쥬", + "ÀÆÉ", + "ÓÔŐÖ", + "αβγδε", + "בבקשה", + "твой", + "ภาษาไทย", + "piedzimst brīvi" + }; + + @Test + void testTransformWithUnicodeLiterals() throws Exception { + StringBuilder projectionExpression = new StringBuilder("\\*,"); + for (int i = 0; i < UNICODE_STRINGS.length; i++) { + projectionExpression + .append('\'') + .append(UNICODE_STRINGS[i]) + .append('\'') + .append(" AS col_") + .append(i) + .append(","); + } + projectionExpression.deleteCharAt(projectionExpression.length() - 1); + + String pipelineJob = + String.format( + "source:\n" + + " type: mysql\n" + + " hostname: %s\n" + + " port: 3306\n" + + " username: %s\n" + + " password: %s\n" + + " tables: %s.TABLEALPHA\n" + + " server-id: 5400-5404\n" + + " server-time-zone: UTC\n" + + "sink:\n" + + " type: values\n" + + "transform:\n" + + " - source-table: %s.\\.*\n" + + " projection: %s\n" + + " filter: ID > 1008\n" + + "pipeline:\n" + + " parallelism: %d\n" + + " schema.change.behavior: evolve", + INTER_CONTAINER_MYSQL_ALIAS, + MYSQL_TEST_USER, + MYSQL_TEST_PASSWORD, + transformTestDatabase.getDatabaseName(), + transformTestDatabase.getDatabaseName(), + projectionExpression, + parallelism); + submitPipelineJob(pipelineJob); + waitUntilJobRunning(Duration.ofSeconds(30)); + LOG.info("Pipeline job is running"); + + validateResult( + dbNameFormatter, + "CreateTableEvent{tableId=%s.TABLEALPHA, schema=columns={`ID` INT NOT NULL,`VERSION` VARCHAR(17),`PRICEALPHA` INT,`AGEALPHA` INT,`NAMEALPHA` VARCHAR(128),`col_0` STRING,`col_1` STRING,`col_2` STRING,`col_3` STRING,`col_4` STRING,`col_5` STRING,`col_6` STRING,`col_7` STRING,`col_8` STRING,`col_9` STRING,`col_10` STRING,`col_11` STRING}, primaryKeys=ID, options=()}", + "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1010, 10, 99, 19, Carol, ascii test!?, 大五, 测试数据, ひびぴ, 죠주쥬, ÀÆÉ, ÓÔŐÖ, αβγδε, בבקשה, твой, ภาษาไทย, piedzimst brīvi], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1009, 8.1, 0, 18, Bob, ascii test!?, 大五, 测试数据, ひびぴ, 죠주쥬, ÀÆÉ, ÓÔŐÖ, αβγδε, בבקשה, твой, ภาษาไทย, piedzimst brīvi], op=INSERT, meta=()}", + "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1011, 11, 59, 20, Dave, ascii test!?, 大五, 测试数据, ひびぴ, 죠주쥬, ÀÆÉ, ÓÔŐÖ, αβγδε, בבקשה, твой, ภาษาไทย, piedzimst brīvi], op=INSERT, meta=()}"); + + LOG.info("Begin incremental reading stage."); + // generate binlogs + String mysqlJdbcUrl = + String.format( + "jdbc:mysql://%s:%s/%s", + MYSQL.getHost(), + MYSQL.getDatabasePort(), + transformTestDatabase.getDatabaseName()); + try (Connection conn = + DriverManager.getConnection( + mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + Statement stat = conn.createStatement()) { + stat.execute("UPDATE TABLEALPHA SET VERSION='100' WHERE id=1009;"); + stat.execute("INSERT INTO TABLEALPHA VALUES (3007, '7', 79, 16, 'IINA');"); + stat.execute("DELETE FROM TABLEBETA WHERE id=2011;"); + } catch (SQLException e) { + LOG.error("Update table for CDC failed.", e); + throw e; + } + + validateResult( + dbNameFormatter, + "DataChangeEvent{tableId=%s.TABLEALPHA, before=[1009, 8.1, 0, 18, Bob, ascii test!?, 大五, 测试数据, ひびぴ, 죠주쥬, ÀÆÉ, ÓÔŐÖ, αβγδε, בבקשה, твой, ภาษาไทย, piedzimst brīvi], after=[1009, 100, 0, 18, Bob, ascii test!?, 大五, 测试数据, ひびぴ, 죠주쥬, ÀÆÉ, ÓÔŐÖ, αβγδε, בבקשה, твой, ภาษาไทย, piedzimst brīvi], op=UPDATE, meta=()}", + "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[3007, 7, 79, 16, IINA, ascii test!?, 大五, 测试数据, ひびぴ, 죠주쥬, ÀÆÉ, ÓÔŐÖ, αβγδε, בבקשה, твой, ภาษาไทย, piedzimst brīvi], op=INSERT, meta=()}"); + + LOG.info("Start schema evolution."); + try (Connection conn = + DriverManager.getConnection( + mysqlJdbcUrl, MYSQL_TEST_USER, MYSQL_TEST_PASSWORD); + Statement stmt = conn.createStatement()) { + + // triggers AddColumnEvent + stmt.execute("ALTER TABLE TABLEALPHA ADD COLUMN CODENAME TINYINT AFTER VERSION;"); + stmt.execute("ALTER TABLE TABLEALPHA ADD COLUMN FIRST VARCHAR(17) FIRST;"); + stmt.execute("INSERT INTO TABLEALPHA VALUES ('First', 3008, '8', 8, 80, 17, 'Jazz');"); + + // triggers AlterColumnTypeEvent and RenameColumnEvent + stmt.execute("ALTER TABLE TABLEALPHA CHANGE COLUMN CODENAME CODE_NAME DOUBLE;"); + + // triggers RenameColumnEvent + stmt.execute("ALTER TABLE TABLEALPHA RENAME COLUMN CODE_NAME TO CODE_NAME_EX;"); + stmt.execute("INSERT INTO TABLEALPHA VALUES ('1st', 3009, '9', 9, 90, 18, 'Keka');"); + + // triggers DropColumnEvent + stmt.execute("ALTER TABLE TABLEALPHA DROP COLUMN CODE_NAME_EX"); + stmt.execute( + "INSERT INTO TABLEALPHA VALUES ('Beginning', 3010, '10', 10, 97, 'Lemon');"); + } catch (SQLException e) { + LOG.error("Update table for CDC failed.", e); + throw e; + } + + validateResult( + dbNameFormatter, + "AddColumnEvent{tableId=%s.TABLEALPHA, addedColumns=[ColumnWithPosition{column=`CODENAME` TINYINT, position=AFTER, existedColumnName=VERSION}]}", + "AddColumnEvent{tableId=%s.TABLEALPHA, addedColumns=[ColumnWithPosition{column=`FIRST` VARCHAR(17), position=BEFORE, existedColumnName=ID}]}", + "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[First, 3008, 8, 8, 80, 17, Jazz, ascii test!?, 大五, 测试数据, ひびぴ, 죠주쥬, ÀÆÉ, ÓÔŐÖ, αβγδε, בבקשה, твой, ภาษาไทย, piedzimst brīvi], op=INSERT, meta=()}", + "AlterColumnTypeEvent{tableId=%s.TABLEALPHA, typeMapping={CODENAME=DOUBLE}, oldTypeMapping={CODENAME=TINYINT}}", + "RenameColumnEvent{tableId=%s.TABLEALPHA, nameMapping={CODENAME=CODE_NAME}}", + "RenameColumnEvent{tableId=%s.TABLEALPHA, nameMapping={CODE_NAME=CODE_NAME_EX}}", + "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[1st, 3009, 9, 9.0, 90, 18, Keka, ascii test!?, 大五, 测试数据, ひびぴ, 죠주쥬, ÀÆÉ, ÓÔŐÖ, αβγδε, בבקשה, твой, ภาษาไทย, piedzimst brīvi], op=INSERT, meta=()}", + "DropColumnEvent{tableId=%s.TABLEALPHA, droppedColumnNames=[CODE_NAME_EX]}", + "DataChangeEvent{tableId=%s.TABLEALPHA, before=[], after=[Beginning, 3010, 10, 10, 97, Lemon, ascii test!?, 大五, 测试数据, ひびぴ, 죠주쥬, ÀÆÉ, ÓÔŐÖ, αβγδε, בבקשה, твой, ภาษาไทย, piedzimst brīvi], op=INSERT, meta=()}"); + } + private void validateEventsWithPattern(String... patterns) throws Exception { for (String pattern : patterns) { waitUntilSpecificEventWithPattern( diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java index 0fec90d39fa..821c3914091 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java @@ -34,6 +34,7 @@ import org.apache.calcite.sql.SqlNumericLiteral; import org.apache.calcite.sql.fun.SqlCase; import org.apache.calcite.sql.type.SqlTypeName; +import org.apache.calcite.util.NlsString; import org.codehaus.commons.compiler.CompileException; import org.codehaus.commons.compiler.Location; import org.codehaus.janino.ExpressionEvaluator; @@ -158,10 +159,10 @@ private static Java.Rvalue translateSqlSqlLiteral(SqlLiteral sqlLiteral) { if (sqlLiteral.getValue() == null) { return new Java.NullLiteral(Location.NOWHERE); } - String value = sqlLiteral.getValue().toString(); + Object value = sqlLiteral.getValue(); if (sqlLiteral instanceof SqlCharStringLiteral) { // Double quotation marks represent strings in Janino. - value = "\"" + value.substring(1, value.length() - 1) + "\""; + value = "\"" + ((NlsString) value).getValue() + "\""; } else if (sqlLiteral instanceof SqlNumericLiteral) { if (((SqlNumericLiteral) sqlLiteral).isInteger()) { long longValue = sqlLiteral.longValue(true); @@ -173,7 +174,7 @@ private static Java.Rvalue translateSqlSqlLiteral(SqlLiteral sqlLiteral) { if (SQL_TYPE_NAME_IGNORE.contains(sqlLiteral.getTypeName())) { value = "\"" + value + "\""; } - return new Java.AmbiguousName(Location.NOWHERE, new String[] {value}); + return new Java.AmbiguousName(Location.NOWHERE, new String[] {value.toString()}); } private static Java.Rvalue translateSqlBasicCall( diff --git a/flink-cdc-runtime/src/main/resources/saffron.properties b/flink-cdc-runtime/src/main/resources/saffron.properties new file mode 100644 index 00000000000..f88324f17de --- /dev/null +++ b/flink-cdc-runtime/src/main/resources/saffron.properties @@ -0,0 +1,16 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +calcite.default.charset = utf8 diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java index bebb533baf6..6aa6a626bad 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java @@ -729,6 +729,52 @@ public void testProjectionColumnsWithColumnNameMap() { Assertions.assertThat(result).hasToString("[" + String.join(", ", expected) + "]"); } + private static final String[] UNICODE_STRINGS = { + "ascii test!?", + "大五", + "测试数据", + "ひびぴ", + "죠주쥬", + "ÀÆÉ", + "ÓÔŐÖ", + "αβγδε", + "בבקשה", + "твой", + "ภาษาไทย", + "piedzimst brīvi" + }; + + @Test + void testParsingExpressionWithUnicodeLiterals() { + List columns = + Arrays.asList( + Column.physicalColumn("a", DataTypes.STRING(), "a"), + Column.physicalColumn("b", DataTypes.INT(), "b")); + + for (String unicodeString : UNICODE_STRINGS) { + Assertions.assertThat( + TransformParser.generateProjectionColumns( + "a, b, a = '{UNICODE_STRING}' AS c1, a <> '{UNICODE_STRING}' AS c2, b = '{UNICODE_STRING}' AS c3, b <> '{UNICODE_STRING}' AS c4" + .replace("{UNICODE_STRING}", unicodeString), + columns, + Collections.emptyList(), + new SupportedMetadataColumn[] {})) + .map(ProjectionColumn::getScriptExpression) + .containsExactly( + "$0", + "$0", + "valueEquals($0, \"" + unicodeString + "\")", + "!valueEquals($0, \"" + unicodeString + "\")", + "valueEquals($0, castToInteger(\"" + unicodeString + "\"))", + "!valueEquals($0, castToInteger(\"" + unicodeString + "\"))"); + + testFilterExpression( + "a = '" + unicodeString + "'", "valueEquals(a, \"" + unicodeString + "\")"); + testFilterExpression( + "a <> '" + unicodeString + "'", "!valueEquals(a, \"" + unicodeString + "\")"); + } + } + private void testFilterExpression(String expression, String expressionExpect) { String janinoExpression = TransformParser.translateFilterExpressionToJaninoExpression( From b901d421a6227784dd6d81269c4508ce33bd2b79 Mon Sep 17 00:00:00 2001 From: yuxiqian <34335406+yuxiqian@users.noreply.github.com> Date: Thu, 7 Aug 2025 14:47:57 +0800 Subject: [PATCH 2/2] address comments Signed-off-by: yuxiqian <34335406+yuxiqian@users.noreply.github.com> --- .../org/apache/flink/cdc/runtime/parser/JaninoCompiler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java index 821c3914091..74dceba596b 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/JaninoCompiler.java @@ -162,7 +162,7 @@ private static Java.Rvalue translateSqlSqlLiteral(SqlLiteral sqlLiteral) { Object value = sqlLiteral.getValue(); if (sqlLiteral instanceof SqlCharStringLiteral) { // Double quotation marks represent strings in Janino. - value = "\"" + ((NlsString) value).getValue() + "\""; + value = "\"" + sqlLiteral.getValueAs(NlsString.class).getValue() + "\""; } else if (sqlLiteral instanceof SqlNumericLiteral) { if (((SqlNumericLiteral) sqlLiteral).isInteger()) { long longValue = sqlLiteral.longValue(true);