From 82c4e791990674e0451e12471c4f5fc449cf0a03 Mon Sep 17 00:00:00 2001 From: zhuyayun Date: Fri, 30 May 2025 16:24:58 +0800 Subject: [PATCH 1/2] Reduce the time cost in MySqlSchemaUtils#listTables --- .../mysql/utils/MySqlSchemaUtils.java | 51 +++++++++----- .../source/utils/TableDiscoveryUtils.java | 68 ++++++------------- 2 files changed, 57 insertions(+), 62 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java index 616e28b1db6..8186b43a28d 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java @@ -33,9 +33,12 @@ import javax.annotation.Nullable; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; -import java.util.Collections; +import java.util.Arrays; import java.util.List; import java.util.stream.Collectors; @@ -45,6 +48,9 @@ /** Utilities for converting from debezium {@link Table} types to {@link Schema}. */ public class MySqlSchemaUtils { + private static final String[] TABLE_QUERY = {"TABLE"}; + private static final List DB_LIST = + Arrays.asList("information_schema", "mysql", "sys", "performance_schema"); private static final Logger LOG = LoggerFactory.getLogger(MySqlSchemaUtils.class); public static List listDatabases(MySqlSourceConfig sourceConfig) { @@ -58,16 +64,23 @@ public static List listDatabases(MySqlSourceConfig sourceConfig) { public static List listTables( MySqlSourceConfig sourceConfig, @Nullable String dbName) { try (MySqlConnection jdbc = createMySqlConnection(sourceConfig)) { - List databases = - dbName != null ? Collections.singletonList(dbName) : listDatabases(jdbc); - List tableIds = new ArrayList<>(); - for (String database : databases) { - tableIds.addAll(listTables(jdbc, database)); + try (Connection connection = jdbc.connection()) { + DatabaseMetaData metaData = connection.getMetaData(); + try (ResultSet resultSet = metaData.getTables(dbName, null, "%", TABLE_QUERY)) { + while (resultSet.next()) { + String database = resultSet.getString("TABLE_CAT"); + if (dbName == null && DB_LIST.contains(database)) { + continue; + } + String tableName = resultSet.getString("TABLE_NAME"); + tableIds.add(TableId.tableId(database, tableName)); + } + } } return tableIds; } catch (SQLException e) { - throw new RuntimeException("Error to list databases: " + e.getMessage(), e); + throw new RuntimeException("Error to list tables: " + e.getMessage(), e); } } @@ -86,16 +99,22 @@ public static List listDatabases(JdbcConnection jdbc) throws SQLExceptio // ------------------- // Get the list of databases ... LOG.info("Read list of available databases"); - final List databaseNames = new ArrayList<>(); - jdbc.query( - "SHOW DATABASES WHERE `database` NOT IN ('information_schema', 'mysql', 'performance_schema', 'sys')", - rs -> { - while (rs.next()) { - databaseNames.add(rs.getString(1)); + try (Connection connection = jdbc.connection()) { + DatabaseMetaData metaData = connection.getMetaData(); + try (ResultSet resultSet = metaData.getCatalogs()) { + List databaseNames = new ArrayList<>(); + while (resultSet.next()) { + String dbName = resultSet.getString("TABLE_CAT"); + if (DB_LIST.contains(dbName)) { + continue; } - }); - LOG.info("\t list of available databases are: {}", databaseNames); - return databaseNames; + databaseNames.add(dbName); + } + + LOG.info("\t list of available databases are:{}", databaseNames); + return databaseNames; + } + } } public static List listTables(JdbcConnection jdbc, String dbName) throws SQLException { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java index dedeab2dd5e..2177a6972c8 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java @@ -29,6 +29,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.HashMap; @@ -40,6 +43,7 @@ /** Utilities to discovery matched tables. */ public class TableDiscoveryUtils { + private static final String[] TABLE_QUERY = {"TABLE"}; private static final Logger LOG = LoggerFactory.getLogger(TableDiscoveryUtils.class); public static List listTables( @@ -50,54 +54,26 @@ public static List listTables( // READ DATABASE NAMES // ------------------- // Get the list of databases ... - LOG.info("Read list of available databases"); - final List databaseNames = new ArrayList<>(); - - jdbc.query( - "SHOW DATABASES", - rs -> { - while (rs.next()) { - String databaseName = rs.getString(1); - if (databaseFilter.test(databaseName)) { - databaseNames.add(databaseName); - } + try (Connection connection = jdbc.connection()) { + DatabaseMetaData databaseMetaData = connection.getMetaData(); + try (ResultSet tableResult = databaseMetaData.getTables(null, null, "%", TABLE_QUERY)) { + while (tableResult.next()) { + String dbName = tableResult.getString("TABLE_CAT"); + if (!databaseFilter.test(dbName)) { + continue; + } + String tableName = tableResult.getString("TABLE_NAME"); + TableId tableId = new TableId(dbName, null, tableName); + if (tableFilter.test(tableId)) { + capturedTableIds.add(tableId); + LOG.info("\t including table '{}' for further processing", tableId); + } else { + LOG.info("\t '{}' is filtered out of table capturing", tableId); } - }); - LOG.info("\t list of available databases is: {}", databaseNames); + } - // ---------------- - // READ TABLE NAMES - // ---------------- - // Get the list of table IDs for each database. We can't use a prepared statement with - // MySQL, so we have to build the SQL statement each time. Although in other cases this - // might lead to SQL injection, in our case we are reading the database names from the - // database and not taking them from the user ... - LOG.info("Read list of available tables in each database"); - for (String dbName : databaseNames) { - try { - jdbc.query( - "SHOW FULL TABLES IN " - + StatementUtils.quote(dbName) - + " where Table_Type = 'BASE TABLE'", - rs -> { - while (rs.next()) { - TableId tableId = new TableId(dbName, null, rs.getString(1)); - if (tableFilter.test(tableId)) { - capturedTableIds.add(tableId); - LOG.info( - "\t including table '{}' for further processing", - tableId); - } else { - LOG.info("\t '{}' is filtered out of table capturing", tableId); - } - } - }); - } catch (SQLException e) { - // We were unable to execute the query or process the results, so skip this ... - LOG.warn( - "\t skipping database '{}' due to error reading tables: {}", - dbName, - e.getMessage()); + } catch (Exception e) { + LOG.warn("Failed to get list of tables: {}", e.getMessage()); } } return capturedTableIds; From 161fff97c32d9bc20914af7e3399a2fc0966a811 Mon Sep 17 00:00:00 2001 From: lvyanquan Date: Mon, 9 Jun 2025 14:12:54 +0800 Subject: [PATCH 2/2] Address comments. --- .../connectors/mysql/utils/MySqlSchemaUtils.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java index 8186b43a28d..926159babc8 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java @@ -39,7 +39,9 @@ import java.sql.SQLException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; import static org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createMySqlConnection; @@ -49,8 +51,9 @@ public class MySqlSchemaUtils { private static final String[] TABLE_QUERY = {"TABLE"}; - private static final List DB_LIST = - Arrays.asList("information_schema", "mysql", "sys", "performance_schema"); + private static final Set SYSTEM_DB_SET = + new HashSet<>( + Arrays.asList("information_schema", "mysql", "sys", "performance_schema")); private static final Logger LOG = LoggerFactory.getLogger(MySqlSchemaUtils.class); public static List listDatabases(MySqlSourceConfig sourceConfig) { @@ -70,7 +73,7 @@ public static List listTables( try (ResultSet resultSet = metaData.getTables(dbName, null, "%", TABLE_QUERY)) { while (resultSet.next()) { String database = resultSet.getString("TABLE_CAT"); - if (dbName == null && DB_LIST.contains(database)) { + if (SYSTEM_DB_SET.contains(database)) { continue; } String tableName = resultSet.getString("TABLE_NAME"); @@ -80,7 +83,7 @@ public static List listTables( } return tableIds; } catch (SQLException e) { - throw new RuntimeException("Error to list tables: " + e.getMessage(), e); + throw new RuntimeException("Error to list tables: " + dbName, e); } } @@ -105,7 +108,7 @@ public static List listDatabases(JdbcConnection jdbc) throws SQLExceptio List databaseNames = new ArrayList<>(); while (resultSet.next()) { String dbName = resultSet.getString("TABLE_CAT"); - if (DB_LIST.contains(dbName)) { + if (SYSTEM_DB_SET.contains(dbName)) { continue; } databaseNames.add(dbName);