diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/ProxySinkTask.java b/src/main/java/com/clickhouse/kafka/connect/sink/ProxySinkTask.java index 42dfc6c1..8921b5fa 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/ProxySinkTask.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/ProxySinkTask.java @@ -24,6 +24,7 @@ import java.util.Timer; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import java.util.stream.Collectors; public class ProxySinkTask { @@ -86,13 +87,24 @@ public void put(final Collection records) throws IOException, Execut LOGGER.trace(String.format("Got %d records from put API.", records.size())); ExecutionTimer processingTime = ExecutionTimer.start(); + Function keyMapper; + + if (clickHouseSinkConfig.isEnableDbTopicSplit()) { + // in this case topic == table name so it cannot be a grouping key because it will cause all records + // to be written to same database. Grouping by partition is not possible, too. + keyMapper = Record::getDatabase; + } else if (!clickHouseSinkConfig.isExactlyOnce() && clickHouseSinkConfig.isIgnorePartitionsWhenBatching()) { + keyMapper = Record::getTopic; + } else { + keyMapper = Record::getTopicAndPartition; + } + Map> dataRecords = records.stream() .map(v -> Record.convert(v, clickHouseSinkConfig.isEnableDbTopicSplit(), clickHouseSinkConfig.getDbTopicSplitChar(), clickHouseSinkConfig.getDatabase() )) - .collect(Collectors.groupingBy(!clickHouseSinkConfig.isExactlyOnce() && clickHouseSinkConfig.isIgnorePartitionsWhenBatching() - ? Record::getTopic : Record::getTopicAndPartition)); + .collect(Collectors.groupingBy(keyMapper)); statistics.recordProcessingTime(processingTime); // TODO - Multi process??? diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClient.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClient.java index 3af7b99a..4597ff83 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClient.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClient.java @@ -427,11 +427,11 @@ public Table describeTableV2(String database, String tableName) { } return table; } - public List extractTablesMapping(String database, Map cache) { + public List
extractTablesMapping(String tableDatabase, Map cache) { List
tableList = new ArrayList<>(); - for (Table table : showTables(database) ) { + for (Table table : showTables(tableDatabase) ) { // (Full) Table names are escaped in the cache - String escapedTableName = Utils.escapeTableName(database, table.getCleanName()); + String escapedTableName = Utils.escapeTableName(tableDatabase, table.getCleanName()); // Read from cache if we already described this table before // This means we won't pick up edited table configs until the connector is restarted @@ -444,7 +444,7 @@ public List
extractTablesMapping(String database, Map cach continue; } } - Table tableDescribed = describeTable(this.database, table.getCleanName()); + Table tableDescribed = describeTable(tableDatabase, table.getCleanName()); if (tableDescribed != null) { tableDescribed.setNumColumns(table.getNumColumns()); tableList.add(tableDescribed); diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java index 9ed3c9fe..5deb6f43 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java @@ -1100,4 +1100,45 @@ public void exactlyOnceStateMismatchTest(int split, int batch) { assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic));assertTrue(ClickHouseTestHelpers.validateRows(chc, topic, sr)); } + + @Test + public void splitDBTopicTest() throws Exception { + Map props = createProps(); + ClickHouseHelperClient chc = createClient(props); + String topic1 = "tenant_1__events"; + String topic2 = "tenant_2__events"; + ClickHouseTestHelpers.dropDatabase(chc, "tenant_1"); + ClickHouseTestHelpers.dropDatabase(chc, "tenant_2"); + ClickHouseTestHelpers.createDatabase(chc, "tenant_1"); + ClickHouseTestHelpers.createDatabase(chc, "tenant_2"); + + ClickHouseTestHelpers.query(chc, "CREATE TABLE `tenant_1`.`events` (" + + "`off16` Int16," + + "`string` String" + + ") Engine = MergeTree ORDER BY `off16`"); + ClickHouseTestHelpers.query(chc, "CREATE TABLE `tenant_2`.`events` (" + + "`off16` Int16," + + "`string` String" + + ") Engine = MergeTree ORDER BY `off16`"); + + Collection sr1 = SchemaTestData.createSimpleData(topic1, 1, 5); + Collection sr2 = SchemaTestData.createSimpleData(topic2, 1, 10); + + List records = new ArrayList<>(); + records.addAll(sr1); + records.addAll(sr2); + Collections.shuffle(records); + + ClickHouseSinkTask chst = new ClickHouseSinkTask(); + props.put(ClickHouseSinkConfig.DB_TOPIC_SPLIT_CHAR, "__"); + props.put(ClickHouseSinkConfig.ENABLE_DB_TOPIC_SPLIT, "true"); + props.put(ClickHouseSinkConfig.DATABASE, "tenant_1"); + chst.start(props); + chst.put(records); + chst.stop(); + + + assertEquals(sr1.size(), ClickHouseTestHelpers.countRows(chc, "events", "tenant_1")); + assertEquals(sr2.size(), ClickHouseTestHelpers.countRows(chc, "events", "tenant_2")); + } } diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/data/convert/RecordConvertorTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/data/convert/RecordConvertorTest.java new file mode 100644 index 00000000..3030848b --- /dev/null +++ b/src/test/java/com/clickhouse/kafka/connect/sink/data/convert/RecordConvertorTest.java @@ -0,0 +1,30 @@ +package com.clickhouse.kafka.connect.sink.data.convert; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.regex.Pattern; + +import static org.junit.Assert.assertEquals; + +class RecordConvertorTest { + + @ParameterizedTest + @MethodSource("splitDBTopicProvider") + void splitDBTopic(String topic, String dbTopicSeparatorChar, String database) { + + String[] parts = topic.split(Pattern.quote(dbTopicSeparatorChar)); + String actualDatabase = parts[0]; + String actualTopic = parts[1]; + System.out.println("actual_topic: " + actualTopic); + + assertEquals(database, actualDatabase); + } + + static Object[][] splitDBTopicProvider() { + return new Object[][] { + { "tenant_A__telemetry", "__", "tenant_A" }, + }; + } + +} \ No newline at end of file diff --git a/src/testFixtures/java/com/clickhouse/kafka/connect/sink/helper/ClickHouseTestHelpers.java b/src/testFixtures/java/com/clickhouse/kafka/connect/sink/helper/ClickHouseTestHelpers.java index be3fa198..f9d545d9 100644 --- a/src/testFixtures/java/com/clickhouse/kafka/connect/sink/helper/ClickHouseTestHelpers.java +++ b/src/testFixtures/java/com/clickhouse/kafka/connect/sink/helper/ClickHouseTestHelpers.java @@ -12,12 +12,14 @@ import com.clickhouse.client.api.query.QuerySettings; import com.clickhouse.client.api.query.Records; import com.clickhouse.data.ClickHouseFormat; +import com.clickhouse.kafka.connect.sink.ClickHouseSinkConfig; import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseFieldDescriptor; import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient; import com.clickhouse.kafka.connect.sink.db.mapping.Column; import com.clickhouse.kafka.connect.sink.db.mapping.Type; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; +import jdk.dynalink.Operation; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.sink.SinkRecord; import org.json.JSONObject; @@ -65,13 +67,26 @@ public static boolean isCloud() { } public static void query(ClickHouseHelperClient chc, String query) { - if (chc.isUseClientV2()) { - chc.queryV2(query); - } else { - chc.queryV1(query); + try { + if (chc.isUseClientV2()) { + chc.queryV2(query).close(); + } else { + chc.queryV1(query).close(); + } + } catch (Exception e) { + LOGGER.error("Error while executing query", e); + throw new RuntimeException(e); } } + public static void createDatabase(ClickHouseHelperClient chc, String database) { + query(chc, "CREATE DATABASE " + database); + } + + public static void dropDatabase(ClickHouseHelperClient chc, String database) { + query(chc, "DROP DATABASE IF EXISTS " + database); + } + public static OperationMetrics dropTable(ClickHouseHelperClient chc, String tableName) { for (int i = 0; i < 5; i++) { try { @@ -185,7 +200,11 @@ public static OperationMetrics optimizeTable(ClickHouseHelperClient chc, String } public static int countRows(ClickHouseHelperClient chc, String tableName) { - String queryCount = String.format("SELECT COUNT(*) FROM `%s` SETTINGS select_sequential_consistency = 1", tableName); + return countRows(chc, tableName, chc.getDatabase()); + } + + public static int countRows(ClickHouseHelperClient chc, String tableName, String database) { + String queryCount = String.format("SELECT COUNT(*) FROM `%s`.`%s` SETTINGS select_sequential_consistency = 1", database, tableName); try { optimizeTable(chc, tableName);