diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/AbstractSqlSinkBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/AbstractSqlSinkBuilder.java index cbbd03b315..3de94bd072 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/AbstractSqlSinkBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/AbstractSqlSinkBuilder.java @@ -23,6 +23,7 @@ import org.dinky.cdc.AbstractSinkBuilder; import org.dinky.cdc.convert.DataTypeConverter; import org.dinky.cdc.utils.FlinkStatementUtil; +import org.dinky.data.flink.table.FlinkTableObjectIdentifier; import org.dinky.data.model.Column; import org.dinky.data.model.FlinkCDCConfig; import org.dinky.data.model.Schema; @@ -54,6 +55,8 @@ import java.util.Map; import java.util.stream.Collectors; +import cn.hutool.core.util.StrUtil; + public abstract class AbstractSqlSinkBuilder extends AbstractSinkBuilder implements Serializable { protected AbstractSqlSinkBuilder() {} @@ -220,19 +223,20 @@ protected void executeCatalogStatement() {} * @return view name */ public static String replaceViewNameMiddleLineToUnderLine(String viewName) { - if (!viewName.isEmpty() && viewName.contains("-")) { + if (StrUtil.isNotBlank(viewName) && viewName.contains("-")) { logger.warn("the view name [{}] contains '-', replace '-' to '_' for flink use view name", viewName); return viewName.replaceAll("-", "_"); } return viewName; } - protected List createInsertOperations(Table table, String viewName, String tableName) { - String cdcSqlInsert = FlinkStatementUtil.getCDCInsertSql(table, tableName, viewName, config); + protected List createInsertOperations( + Table table, FlinkTableObjectIdentifier sourceTable, FlinkTableObjectIdentifier targetTable) { + String cdcSqlInsert = FlinkStatementUtil.getCDCInsertSql(table, targetTable, sourceTable, config); logger.info(cdcSqlInsert); List operations = customTableEnvironment.getParser().parse(cdcSqlInsert); - logger.info("Create {} FlinkSQL insert into successful...", tableName); + logger.info("Create {} FlinkSQL insert into successful...", targetTable); if (operations.isEmpty()) { return operations; } diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java index b869276bd7..4702bc0c2b 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/SQLSinkBuilder.java @@ -21,6 +21,7 @@ import org.dinky.cdc.SinkBuilder; import org.dinky.cdc.utils.FlinkStatementUtil; +import org.dinky.data.flink.table.FlinkTableObjectIdentifier; import org.dinky.data.model.FlinkCDCConfig; import org.dinky.data.model.Table; @@ -44,47 +45,51 @@ private SQLSinkBuilder(FlinkCDCConfig config) { super(config); } - private String addSourceTableView(DataStream rowDataDataStream, Table table) { + private FlinkTableObjectIdentifier addSourceTableView(DataStream rowDataDataStream, Table table) { // Because the name of the view on Flink is not allowed to have -, it needs to be replaced with - here_ String viewName = replaceViewNameMiddleLineToUnderLine("VIEW_" + table.getSchemaTableNameWithUnderline()); customTableEnvironment.createTemporaryView( viewName, customTableEnvironment.fromChangelogStream(rowDataDataStream)); logger.info("Create {} temporaryView successful...", viewName); - return viewName; + return FlinkTableObjectIdentifier.of(viewName); } @Override protected void addTableSink(DataStream rowDataDataStream, Table table) { - final String viewName = addSourceTableView(rowDataDataStream, table); + final FlinkTableObjectIdentifier viewName = addSourceTableView(rowDataDataStream, table); final String sinkSchemaName = getSinkSchemaName(table); final String sinkTableName = getSinkTableName(table); + final FlinkTableObjectIdentifier sinkTable = FlinkTableObjectIdentifier.of(sinkTableName); // Multiple sinks and single sink if (CollectionUtils.isEmpty(config.getSinks())) { - addSinkInsert(table, viewName, sinkTableName, sinkSchemaName, sinkTableName); + addSinkInsert(table, viewName, sinkTable, sinkSchemaName, sinkTable); } else { for (int index = 0; index < config.getSinks().size(); index++) { - String tableName = sinkTableName; + FlinkTableObjectIdentifier newSinkTable = sinkTable; if (config.getSinks().size() != 1) { - tableName = sinkTableName + "_" + index; + newSinkTable = FlinkTableObjectIdentifier.of(sinkTable + "_" + index); } config.setSink(config.getSinks().get(index)); - addSinkInsert(table, viewName, tableName, sinkSchemaName, sinkTableName); + addSinkInsert(table, viewName, newSinkTable, sinkSchemaName, sinkTable); } } } private List addSinkInsert( - Table table, String viewName, String tableName, String sinkSchemaName, String sinkTableName) { + Table table, + FlinkTableObjectIdentifier sourceTable, + FlinkTableObjectIdentifier targetTable, + String sinkSchemaName, + FlinkTableObjectIdentifier sinkTable) { String pkList = StringUtils.join(getPKList(table), "."); - String flinkDDL = - FlinkStatementUtil.getFlinkDDL(table, tableName, config, sinkSchemaName, sinkTableName, pkList); + String flinkDDL = FlinkStatementUtil.getFlinkDDL(table, targetTable, config, sinkSchemaName, sinkTable, pkList); logger.info(flinkDDL); customTableEnvironment.executeSql(flinkDDL); - logger.info("Create {} FlinkSQL DDL successful...", tableName); - return createInsertOperations(table, viewName, tableName); + logger.info("Create {} FlinkSQL DDL successful...", targetTable); + return createInsertOperations(table, sourceTable, targetTable); } @Override diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/catalog/SQLCatalogSinkBuilder.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/catalog/SQLCatalogSinkBuilder.java index bc59a989a4..adc396cd50 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/catalog/SQLCatalogSinkBuilder.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/sql/catalog/SQLCatalogSinkBuilder.java @@ -22,6 +22,7 @@ import org.dinky.cdc.SinkBuilder; import org.dinky.cdc.sql.AbstractSqlSinkBuilder; import org.dinky.cdc.utils.FlinkStatementUtil; +import org.dinky.data.flink.table.FlinkTableObjectIdentifier; import org.dinky.data.model.FlinkCDCConfig; import org.dinky.data.model.Table; @@ -46,7 +47,6 @@ public void addTableSink(DataStream rowDataDataStream, Table table) { String catalogName = config.getSink().get("catalog.name"); String sinkSchemaName = getSinkSchemaName(table); String tableName = getSinkTableName(table); - String sinkTableName = catalogName + ".`" + sinkSchemaName + "`.`" + tableName + "`"; // Because the name of the view on Flink is not allowed to have -, it needs to be replaced with - here_ String viewName = replaceViewNameMiddleLineToUnderLine("VIEW_" + table.getSchemaTableNameWithUnderline()); @@ -54,7 +54,10 @@ public void addTableSink(DataStream rowDataDataStream, Table table) { viewName, customTableEnvironment.fromChangelogStream(rowDataDataStream)); logger.info("Create {} temporaryView successful...", viewName); - createInsertOperations(table, viewName, sinkTableName); + createInsertOperations( + table, + FlinkTableObjectIdentifier.of(viewName), + FlinkTableObjectIdentifier.of(catalogName, sinkSchemaName, tableName)); } @Override diff --git a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/utils/FlinkStatementUtil.java b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/utils/FlinkStatementUtil.java index cff6504718..7cc2f9bfd9 100644 --- a/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/utils/FlinkStatementUtil.java +++ b/dinky-cdc/dinky-cdc-core/src/main/java/org/dinky/cdc/utils/FlinkStatementUtil.java @@ -19,6 +19,7 @@ package org.dinky.cdc.utils; +import org.dinky.data.flink.table.FlinkTableObjectIdentifier; import org.dinky.data.model.Column; import org.dinky.data.model.FlinkCDCConfig; import org.dinky.data.model.Table; @@ -34,9 +35,13 @@ public class FlinkStatementUtil { private FlinkStatementUtil() {} - public static String getCDCInsertSql(Table table, String targetName, String sourceName, FlinkCDCConfig config) { + public static String getCDCInsertSql( + Table table, + FlinkTableObjectIdentifier targetTable, + FlinkTableObjectIdentifier sourceTable, + FlinkCDCConfig config) { StringBuilder sb = new StringBuilder("INSERT INTO "); - sb.append(targetName); + sb.append(targetTable.toTablePath()); sb.append(" SELECT\n"); for (int i = 0; i < table.getColumns().size(); i++) { sb.append(" "); @@ -45,9 +50,8 @@ public static String getCDCInsertSql(Table table, String targetName, String sour } sb.append(getColumnProcessing(table.getColumns().get(i), config)).append(" \n"); } - sb.append(" FROM `"); - sb.append(sourceName); - sb.append("`"); + sb.append(" FROM "); + sb.append(sourceTable.toTablePath()); return sb.toString(); } @@ -65,19 +69,19 @@ public static String getColumnProcessing(Column column, FlinkCDCConfig config) { public static String getFlinkDDL( Table table, - String tableName, + FlinkTableObjectIdentifier flinkTable, FlinkCDCConfig config, String sinkSchemaName, - String sinkTableName, + FlinkTableObjectIdentifier sinkTableName, String pkList) { StringBuilder sb = new StringBuilder(); if (Integer.parseInt(EnvironmentInformation.getVersion().split("\\.")[1]) < 13) { - sb.append("CREATE TABLE `"); + sb.append("CREATE TABLE "); } else { - sb.append("CREATE TABLE IF NOT EXISTS `"); + sb.append("CREATE TABLE IF NOT EXISTS "); } - sb.append(tableName); - sb.append("` (\n"); + sb.append(flinkTable.toTablePath()); + sb.append(" (\n"); List pks = new ArrayList<>(); for (int i = 0; i < table.getColumns().size(); i++) { String type = table.getColumns().get(i).getFlinkType(); @@ -109,7 +113,7 @@ public static String getFlinkDDL( sb.append(pksb); } sb.append(") WITH (\n"); - sb.append(getSinkConfigurationString(config, sinkSchemaName, sinkTableName, pkList)); + sb.append(getSinkConfigurationString(config, sinkSchemaName, sinkTableName.getObjectName(), pkList)); sb.append(")\n"); return sb.toString(); } diff --git a/dinky-common/src/main/java/org/dinky/data/flink/table/FlinkTableObjectIdentifier.java b/dinky-common/src/main/java/org/dinky/data/flink/table/FlinkTableObjectIdentifier.java new file mode 100644 index 0000000000..43cc244503 --- /dev/null +++ b/dinky-common/src/main/java/org/dinky/data/flink/table/FlinkTableObjectIdentifier.java @@ -0,0 +1,71 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.dinky.data.flink.table; + +import lombok.Getter; + +@Getter +public class FlinkTableObjectIdentifier { + private final String catalogName; + private final String databaseName; + private final String objectName; + + public static FlinkTableObjectIdentifier of(String catalogName, String databaseName, String objectName) { + return new FlinkTableObjectIdentifier(catalogName, databaseName, objectName); + } + + public static FlinkTableObjectIdentifier of(String objectName) { + return of(null, null, objectName); + } + + private FlinkTableObjectIdentifier(String catalogName, String databaseName, String objectName) { + this.catalogName = catalogName; + this.databaseName = databaseName; + this.objectName = objectName; + if (objectName == null) { + throw new IllegalArgumentException("objectName can not be null"); + } + } + + /** + * + * @return catalogName.`databaseName`.`objectName` + */ + public String toTablePath() { + StringBuilder sb = new StringBuilder(); + if (catalogName != null) { + sb.append(catalogName); + } + if (databaseName != null) { + sb.append(".`"); + sb.append(databaseName); + sb.append("`."); + } + sb.append("`"); + sb.append(objectName); + sb.append("`"); + return sb.toString(); + } + + @Override + public String toString() { + return toTablePath(); + } +}