Skip to content

Commit bb1d150

Browse files
committed
[FLINK-38188][cdc-postgres] Fix database name validation logic in PostgresDataSourceFactory(apache#4075)
1 parent 998e023 commit bb1d150

File tree

2 files changed

+59
-13
lines changed

2 files changed

+59
-13
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactory.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -369,23 +369,21 @@ private Optional<String> getValidateDatabaseName(String tables) {
369369
String.format(
370370
"Tables format must db.schema.table, can not 'tables' = %s",
371371
TABLES.key()));
372-
if (tableNameParts.length == 3) {
373-
String currentDbName = tableNameParts[0];
372+
String currentDbName = tableNameParts[0];
374373

374+
checkState(
375+
isValidPostgresDbName(currentDbName),
376+
String.format(
377+
"The value of option %s does not conform to PostgresSQL database name naming conventions",
378+
TABLES.key()));
379+
if (dbName == null) {
380+
dbName = currentDbName;
381+
} else {
375382
checkState(
376-
isValidPostgresDbName(currentDbName),
383+
dbName.equals(currentDbName),
377384
String.format(
378-
"The value of option %s does not conform to PostgresSQL database name naming conventions",
385+
"The value of option %s all table names must have the same database name",
379386
TABLES.key()));
380-
if (dbName == null) {
381-
dbName = currentDbName;
382-
} else {
383-
checkState(
384-
!dbName.equals(currentDbName),
385-
String.format(
386-
"The value of option %s all table names must have the same database name",
387-
TABLES.key()));
388-
}
389387
}
390388
}
391389

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/test/java/org/apache/flink/cdc/connectors/postgres/factory/PostgresDataSourceFactoryTest.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,54 @@ public void testPrefixRequireOption() {
257257
.isEqualTo(Arrays.asList("inventory.products"));
258258
}
259259

260+
@Test
261+
public void testTableValidationWithDifferentDatabases() {
262+
Map<String, String> options = new HashMap<>();
263+
options.put(HOSTNAME.key(), POSTGRES_CONTAINER.getHost());
264+
options.put(
265+
PG_PORT.key(), String.valueOf(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT)));
266+
options.put(USERNAME.key(), TEST_USER);
267+
options.put(PASSWORD.key(), TEST_PASSWORD);
268+
options.put(
269+
TABLES.key(),
270+
"aia_test.public.aia_t_icc_jjdb,different_db.public.aia_t_icc_jjdb_extend");
271+
options.put(SLOT_NAME.key(), slotName);
272+
273+
PostgresDataSourceFactory factory = new PostgresDataSourceFactory();
274+
Factory.Context context = new MockContext(Configuration.fromMap(options));
275+
276+
assertThatThrownBy(() -> factory.createDataSource(context))
277+
.isInstanceOf(IllegalStateException.class)
278+
.hasMessageContaining(
279+
"The value of option tables all table names must have the same database name");
280+
}
281+
282+
@Test
283+
public void testTableValidationWithOriginalBugScenario() {
284+
Map<String, String> options = new HashMap<>();
285+
options.put(HOSTNAME.key(), POSTGRES_CONTAINER.getHost());
286+
options.put(
287+
PG_PORT.key(), String.valueOf(POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT)));
288+
options.put(USERNAME.key(), TEST_USER);
289+
options.put(PASSWORD.key(), TEST_PASSWORD);
290+
String tables =
291+
POSTGRES_CONTAINER.getDatabaseName()
292+
+ ".public.aia_t_icc_jjdb,"
293+
+ POSTGRES_CONTAINER.getDatabaseName()
294+
+ ".public.aia_t_icc_jjdb_\\\\d{6},"
295+
+ POSTGRES_CONTAINER.getDatabaseName()
296+
+ ".public.aia_t_icc_jjdb_extend";
297+
options.put(TABLES.key(), tables);
298+
options.put(SLOT_NAME.key(), slotName);
299+
300+
PostgresDataSourceFactory factory = new PostgresDataSourceFactory();
301+
Factory.Context context = new MockContext(Configuration.fromMap(options));
302+
303+
assertThatThrownBy(() -> factory.createDataSource(context))
304+
.isInstanceOf(IllegalArgumentException.class)
305+
.hasMessageContaining("Cannot find any table by the option 'tables'");
306+
}
307+
260308
class MockContext implements Factory.Context {
261309

262310
Configuration factoryConfiguration;

0 commit comments

Comments
 (0)