diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java index 616e28b1db6..926159babc8 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/utils/MySqlSchemaUtils.java @@ -33,10 +33,15 @@ import javax.annotation.Nullable; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; -import java.util.Collections; +import java.util.Arrays; +import java.util.HashSet; import java.util.List; +import java.util.Set; import java.util.stream.Collectors; import static org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils.createMySqlConnection; @@ -45,6 +50,10 @@ /** Utilities for converting from debezium {@link Table} types to {@link Schema}. */ public class MySqlSchemaUtils { + private static final String[] TABLE_QUERY = {"TABLE"}; + private static final Set SYSTEM_DB_SET = + new HashSet<>( + Arrays.asList("information_schema", "mysql", "sys", "performance_schema")); private static final Logger LOG = LoggerFactory.getLogger(MySqlSchemaUtils.class); public static List listDatabases(MySqlSourceConfig sourceConfig) { @@ -58,16 +67,23 @@ public static List listDatabases(MySqlSourceConfig sourceConfig) { public static List listTables( MySqlSourceConfig sourceConfig, @Nullable String dbName) { try (MySqlConnection jdbc = createMySqlConnection(sourceConfig)) { - List databases = - dbName != null ? Collections.singletonList(dbName) : listDatabases(jdbc); - List tableIds = new ArrayList<>(); - for (String database : databases) { - tableIds.addAll(listTables(jdbc, database)); + try (Connection connection = jdbc.connection()) { + DatabaseMetaData metaData = connection.getMetaData(); + try (ResultSet resultSet = metaData.getTables(dbName, null, "%", TABLE_QUERY)) { + while (resultSet.next()) { + String database = resultSet.getString("TABLE_CAT"); + if (SYSTEM_DB_SET.contains(database)) { + continue; + } + String tableName = resultSet.getString("TABLE_NAME"); + tableIds.add(TableId.tableId(database, tableName)); + } + } } return tableIds; } catch (SQLException e) { - throw new RuntimeException("Error to list databases: " + e.getMessage(), e); + throw new RuntimeException("Error to list tables: " + dbName, e); } } @@ -86,16 +102,22 @@ public static List listDatabases(JdbcConnection jdbc) throws SQLExceptio // ------------------- // Get the list of databases ... LOG.info("Read list of available databases"); - final List databaseNames = new ArrayList<>(); - jdbc.query( - "SHOW DATABASES WHERE `database` NOT IN ('information_schema', 'mysql', 'performance_schema', 'sys')", - rs -> { - while (rs.next()) { - databaseNames.add(rs.getString(1)); + try (Connection connection = jdbc.connection()) { + DatabaseMetaData metaData = connection.getMetaData(); + try (ResultSet resultSet = metaData.getCatalogs()) { + List databaseNames = new ArrayList<>(); + while (resultSet.next()) { + String dbName = resultSet.getString("TABLE_CAT"); + if (SYSTEM_DB_SET.contains(dbName)) { + continue; } - }); - LOG.info("\t list of available databases are: {}", databaseNames); - return databaseNames; + databaseNames.add(dbName); + } + + LOG.info("\t list of available databases are:{}", databaseNames); + return databaseNames; + } + } } public static List listTables(JdbcConnection jdbc, String dbName) throws SQLException { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java index dedeab2dd5e..2177a6972c8 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/utils/TableDiscoveryUtils.java @@ -29,6 +29,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.ResultSet; import java.sql.SQLException; import java.util.ArrayList; import java.util.HashMap; @@ -40,6 +43,7 @@ /** Utilities to discovery matched tables. */ public class TableDiscoveryUtils { + private static final String[] TABLE_QUERY = {"TABLE"}; private static final Logger LOG = LoggerFactory.getLogger(TableDiscoveryUtils.class); public static List listTables( @@ -50,54 +54,26 @@ public static List listTables( // READ DATABASE NAMES // ------------------- // Get the list of databases ... - LOG.info("Read list of available databases"); - final List databaseNames = new ArrayList<>(); - - jdbc.query( - "SHOW DATABASES", - rs -> { - while (rs.next()) { - String databaseName = rs.getString(1); - if (databaseFilter.test(databaseName)) { - databaseNames.add(databaseName); - } + try (Connection connection = jdbc.connection()) { + DatabaseMetaData databaseMetaData = connection.getMetaData(); + try (ResultSet tableResult = databaseMetaData.getTables(null, null, "%", TABLE_QUERY)) { + while (tableResult.next()) { + String dbName = tableResult.getString("TABLE_CAT"); + if (!databaseFilter.test(dbName)) { + continue; + } + String tableName = tableResult.getString("TABLE_NAME"); + TableId tableId = new TableId(dbName, null, tableName); + if (tableFilter.test(tableId)) { + capturedTableIds.add(tableId); + LOG.info("\t including table '{}' for further processing", tableId); + } else { + LOG.info("\t '{}' is filtered out of table capturing", tableId); } - }); - LOG.info("\t list of available databases is: {}", databaseNames); + } - // ---------------- - // READ TABLE NAMES - // ---------------- - // Get the list of table IDs for each database. We can't use a prepared statement with - // MySQL, so we have to build the SQL statement each time. Although in other cases this - // might lead to SQL injection, in our case we are reading the database names from the - // database and not taking them from the user ... - LOG.info("Read list of available tables in each database"); - for (String dbName : databaseNames) { - try { - jdbc.query( - "SHOW FULL TABLES IN " - + StatementUtils.quote(dbName) - + " where Table_Type = 'BASE TABLE'", - rs -> { - while (rs.next()) { - TableId tableId = new TableId(dbName, null, rs.getString(1)); - if (tableFilter.test(tableId)) { - capturedTableIds.add(tableId); - LOG.info( - "\t including table '{}' for further processing", - tableId); - } else { - LOG.info("\t '{}' is filtered out of table capturing", tableId); - } - } - }); - } catch (SQLException e) { - // We were unable to execute the query or process the results, so skip this ... - LOG.warn( - "\t skipping database '{}' due to error reading tables: {}", - dbName, - e.getMessage()); + } catch (Exception e) { + LOG.warn("Failed to get list of tables: {}", e.getMessage()); } } return capturedTableIds;