diff --git a/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md b/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md
index 557e1d69771..8ec6e0de50e 100644
--- a/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md
+++ b/docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md
@@ -280,6 +280,18 @@ Connector Options
false (default): All types of messages are sent as is.
+
+ scan.include-partitioned-tables.enabled |
+ optional |
+ false |
+ Boolean |
+
+ Whether to enable reading partitioned tables via partition root.
+ If enabled:
+ (1) PUBLICATION must be created beforehand with parameter publish_via_partition_root=true
+ (2) Table list (regex or predefined list) should only match the parent table name, if table list matches both parent and child tables, snapshot data will be read twice.
+ |
+
diff --git a/docs/content/docs/connectors/flink-sources/postgres-cdc.md b/docs/content/docs/connectors/flink-sources/postgres-cdc.md
index 11b0a275f57..0131d2524cf 100644
--- a/docs/content/docs/connectors/flink-sources/postgres-cdc.md
+++ b/docs/content/docs/connectors/flink-sources/postgres-cdc.md
@@ -277,6 +277,18 @@ SELECT * FROM shipments;
false(默认):所有类型的消息都保持原样下发。
+
+ scan.include-partitioned-tables.enabled |
+ optional |
+ false |
+ Boolean |
+
+ Whether to enable reading partitioned tables via partition root.
+ If enabled:
+ (1) PUBLICATION must be created beforehand with parameter publish_via_partition_root=true
+ (2) Table list (regex or predefined list) should only match the parent table name, if table list matches both parent and child tables, snapshot data will be read twice.
+ |
+
diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
index f3540ddd4a2..3faddcf94c6 100644
--- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
+++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-postgres/src/main/java/org/apache/flink/cdc/connectors/postgres/source/reader/PostgresPipelineRecordEmitter.java
@@ -130,7 +130,8 @@ private void generateCreateTableEvent(PostgresSourceConfig sourceConfig) {
TableDiscoveryUtils.listTables(
sourceConfig.getDatabaseList().get(0),
jdbc,
- sourceConfig.getTableFilters());
+ sourceConfig.getTableFilters(),
+ sourceConfig.includePartitionedTables());
for (TableId tableId : capturedTableIds) {
Schema schema = PostgresSchemaUtils.getTableSchema(tableId, sourceConfig, jdbc);
createTableEventCache.add(
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java
index 000cfa9c4da..1c99f651cd2 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialect.java
@@ -176,9 +176,14 @@ public ChunkSplitter createChunkSplitter(
@Override
public List discoverDataCollections(JdbcSourceConfig sourceConfig) {
try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
+ boolean includePartitionedTables =
+ ((PostgresSourceConfig) sourceConfig).includePartitionedTables();
return TableDiscoveryUtils.listTables(
// there is always a single database provided
- sourceConfig.getDatabaseList().get(0), jdbc, sourceConfig.getTableFilters());
+ sourceConfig.getDatabaseList().get(0),
+ jdbc,
+ sourceConfig.getTableFilters(),
+ includePartitionedTables);
} catch (SQLException e) {
throw new FlinkRuntimeException("Error to discover tables: " + e.getMessage(), e);
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
index 5ab50c7bc4c..bbd65b5e9ab 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/PostgresSourceBuilder.java
@@ -299,6 +299,12 @@ public PostgresSourceBuilder lsnCommitCheckpointsDelay(int lsnCommitDelay) {
return this;
}
+ /** Whether the connector should read partitioned tables via partition root or not. */
+ public PostgresSourceBuilder includePartitionedTables(boolean includePartitionedTables) {
+ this.configFactory.setIncludePartitionedTables(includePartitionedTables);
+ return this;
+ }
+
/**
* Build the {@link PostgresIncrementalSource}.
*
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
index a3d77491933..30271612800 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfig.java
@@ -38,6 +38,7 @@ public class PostgresSourceConfig extends JdbcSourceConfig {
private final int subtaskId;
private final int lsnCommitCheckpointsDelay;
+ private final boolean includePartitionedTables;
public PostgresSourceConfig(
int subtaskId,
@@ -67,7 +68,8 @@ public PostgresSourceConfig(
boolean skipSnapshotBackfill,
boolean isScanNewlyAddedTableEnabled,
int lsnCommitCheckpointsDelay,
- boolean assignUnboundedChunkFirst) {
+ boolean assignUnboundedChunkFirst,
+ boolean includePartitionedTables) {
super(
startupOptions,
databaseList,
@@ -97,6 +99,7 @@ public PostgresSourceConfig(
assignUnboundedChunkFirst);
this.subtaskId = subtaskId;
this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
+ this.includePartitionedTables = includePartitionedTables;
}
/**
@@ -117,6 +120,15 @@ public int getLsnCommitCheckpointsDelay() {
return this.lsnCommitCheckpointsDelay;
}
+ /**
+ * Returns {@code includePartitionedTables} value.
+ *
+ * @return include partitioned table
+ */
+ public boolean includePartitionedTables() {
+ return includePartitionedTables;
+ }
+
/**
* Returns the slot name for backfill task.
*
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
index 7febb0743dd..670d4f37a56 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceConfigFactory.java
@@ -52,6 +52,8 @@ public class PostgresSourceConfigFactory extends JdbcSourceConfigFactory {
private int lsnCommitCheckpointsDelay;
+ private boolean includePartitionedTables;
+
/** Creates a new {@link PostgresSourceConfig} for the given subtask {@code subtaskId}. */
@Override
public PostgresSourceConfig create(int subtaskId) {
@@ -133,7 +135,8 @@ public PostgresSourceConfig create(int subtaskId) {
skipSnapshotBackfill,
scanNewlyAddedTableEnabled,
lsnCommitCheckpointsDelay,
- assignUnboundedChunkFirst);
+ assignUnboundedChunkFirst,
+ includePartitionedTables);
}
/**
@@ -181,4 +184,9 @@ public void heartbeatInterval(Duration heartbeatInterval) {
public void setLsnCommitCheckpointsDelay(int lsnCommitCheckpointsDelay) {
this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
}
+
+ /** Enable include partitioned table. */
+ public void setIncludePartitionedTables(boolean includePartitionedTables) {
+ this.includePartitionedTables = includePartitionedTables;
+ }
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java
index 6d3a6159323..f498c264532 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/config/PostgresSourceOptions.java
@@ -87,4 +87,14 @@ public class PostgresSourceOptions extends JdbcSourceOptions {
+ "By setting this to higher value, the offset that is consumed by global slot will be "
+ "committed after multiple checkpoint delays instead of after each checkpoint completion.\n"
+ "This allows continuous recycle of log files in stream phase.");
+
+ public static final ConfigOption SCAN_INCLUDE_PARTITIONED_TABLES_ENABLED =
+ ConfigOptions.key("scan.include-partitioned-tables.enabled")
+ .booleanType()
+ .defaultValue(Boolean.FALSE)
+ .withDescription(
+ "Enable reading from partitioned table via partition root.\n"
+ + "If enabled:\n"
+ + "(1) PUBLICATION must be created beforehand with parameter publish_via_partition_root=true\n"
+ + "(2) Table list (regex or predefined list) should only match the parent table name, if table list matches both parent and child tables, snapshot data will be read twice.");
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/TableDiscoveryUtils.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/TableDiscoveryUtils.java
index 181c2cd0d01..2ff3e66d207 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/TableDiscoveryUtils.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/source/utils/TableDiscoveryUtils.java
@@ -34,11 +34,17 @@ public class TableDiscoveryUtils {
private static final Logger LOG = LoggerFactory.getLogger(TableDiscoveryUtils.class);
public static List listTables(
- String database, JdbcConnection jdbc, RelationalTableFilters tableFilters)
+ String database,
+ JdbcConnection jdbc,
+ RelationalTableFilters tableFilters,
+ boolean includePartitionedTables)
throws SQLException {
- Set allTableIds =
- jdbc.readTableNames(database, null, null, new String[] {"TABLE"});
+ String[] tableTypes = new String[] {"TABLE"};
+ if (includePartitionedTables) {
+ tableTypes = new String[] {"TABLE", "PARTITIONED TABLE"};
+ }
+ Set allTableIds = jdbc.readTableNames(database, null, null, tableTypes);
Set capturedTables =
allTableIds.stream()
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java
index 7748ed6b3ea..876f04a83bf 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactory.java
@@ -55,6 +55,7 @@
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.DECODING_PLUGIN_NAME;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.HEARTBEAT_INTERVAL;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.PG_PORT;
+import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCLUDE_PARTITIONED_TABLES_ENABLED;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
@@ -117,6 +118,7 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c
boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
boolean isScanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
int lsnCommitCheckpointsDelay = config.get(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
+ boolean includePartitionedTables = config.get(SCAN_INCLUDE_PARTITIONED_TABLES_ENABLED);
boolean assignUnboundedChunkFirst =
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
boolean appendOnly = config.get(SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED);
@@ -167,7 +169,8 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c
isScanNewlyAddedTableEnabled,
lsnCommitCheckpointsDelay,
assignUnboundedChunkFirst,
- appendOnly);
+ appendOnly,
+ includePartitionedTables);
}
@Override
@@ -212,6 +215,7 @@ public Set> optionalOptions() {
options.add(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
options.add(SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED);
+ options.add(SCAN_INCLUDE_PARTITIONED_TABLES_ENABLED);
return options;
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java
index 892a2c5f322..7ebb43f9b4e 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/main/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableSource.java
@@ -87,6 +87,7 @@ public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMe
private final int lsnCommitCheckpointsDelay;
private final boolean assignUnboundedChunkFirst;
private final boolean appendOnly;
+ private final boolean includePartitionedTables;
// --------------------------------------------------------------------------------------------
// Mutable attributes
@@ -128,7 +129,8 @@ public PostgreSQLTableSource(
boolean isScanNewlyAddedTableEnabled,
int lsnCommitCheckpointsDelay,
boolean assignUnboundedChunkFirst,
- boolean appendOnly) {
+ boolean appendOnly,
+ boolean includePartitionedTables) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = checkNotNull(hostname);
@@ -162,6 +164,7 @@ public PostgreSQLTableSource(
this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
this.assignUnboundedChunkFirst = assignUnboundedChunkFirst;
this.appendOnly = appendOnly;
+ this.includePartitionedTables = includePartitionedTables;
}
@Override
@@ -230,6 +233,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
.lsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay)
.assignUnboundedChunkFirst(assignUnboundedChunkFirst)
+ .includePartitionedTables(includePartitionedTables)
.build();
return SourceProvider.of(parallelSource);
} else {
@@ -300,7 +304,8 @@ public DynamicTableSource copy() {
scanNewlyAddedTableEnabled,
lsnCommitCheckpointsDelay,
assignUnboundedChunkFirst,
- appendOnly);
+ appendOnly,
+ includePartitionedTables);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
@@ -345,7 +350,8 @@ public boolean equals(Object o) {
&& Objects.equals(skipSnapshotBackfill, that.skipSnapshotBackfill)
&& Objects.equals(scanNewlyAddedTableEnabled, that.scanNewlyAddedTableEnabled)
&& Objects.equals(assignUnboundedChunkFirst, that.assignUnboundedChunkFirst)
- && Objects.equals(appendOnly, that.appendOnly);
+ && Objects.equals(appendOnly, that.appendOnly)
+ && Objects.equals(includePartitionedTables, that.includePartitionedTables);
}
@Override
@@ -381,7 +387,8 @@ public int hashCode() {
skipSnapshotBackfill,
scanNewlyAddedTableEnabled,
assignUnboundedChunkFirst,
- appendOnly);
+ appendOnly,
+ includePartitionedTables);
}
@Override
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialectTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialectTest.java
index 898cec1fb08..0fb9e800743 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialectTest.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/source/PostgresDialectTest.java
@@ -46,6 +46,14 @@ class PostgresDialectTest extends PostgresTestBase {
POSTGRES_CONTAINER.getUsername(),
POSTGRES_CONTAINER.getPassword());
+ private final UniqueDatabase inventoryPartitionedDatabase =
+ new UniqueDatabase(
+ POSTGRES_CONTAINER,
+ "postgres3",
+ "inventory_partitioned",
+ POSTGRES_CONTAINER.getUsername(),
+ POSTGRES_CONTAINER.getPassword());
+
@Test
void testDiscoverDataCollectionsInMultiDatabases() {
@@ -88,4 +96,23 @@ void testDiscoverDataCollectionsInMultiDatabases() {
configFactoryOfInventoryDatabase2.create(0));
Assertions.assertThat(tableIdsOfInventoryDatabase2).isEmpty();
}
+
+ @Test
+ void testDiscoverDataCollectionsForPartitionedTable() {
+ // initial database with partitioned table
+ inventoryPartitionedDatabase.createAndInitialize();
+
+ // get table named 'inventory_partitioned.products' from inventoryPartitionedDatabase
+ PostgresSourceConfigFactory configFactoryOfInventoryPartitionedDatabase =
+ getMockPostgresSourceConfigFactory(
+ inventoryPartitionedDatabase, "inventory_partitioned", "products", 10);
+ configFactoryOfInventoryPartitionedDatabase.setIncludePartitionedTables(true);
+ PostgresDialect dialectOfInventoryPartitionedDatabase =
+ new PostgresDialect(configFactoryOfInventoryPartitionedDatabase.create(0));
+ List tableIdsOfInventoryPartitionedDatabase =
+ dialectOfInventoryPartitionedDatabase.discoverDataCollections(
+ configFactoryOfInventoryPartitionedDatabase.create(0));
+ Assertions.assertThat(tableIdsOfInventoryPartitionedDatabase.get(0))
+ .hasToString("inventory_partitioned.products");
+ }
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/MockPostgreSQLTableSource.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/MockPostgreSQLTableSource.java
index 8177c948ebd..ecd049b6f19 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/MockPostgreSQLTableSource.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/MockPostgreSQLTableSource.java
@@ -67,7 +67,8 @@ public MockPostgreSQLTableSource(PostgreSQLTableSource postgreSQLTableSource) {
(boolean) get(postgreSQLTableSource, "scanNewlyAddedTableEnabled"),
(int) get(postgreSQLTableSource, "lsnCommitCheckpointsDelay"),
(boolean) get(postgreSQLTableSource, "assignUnboundedChunkFirst"),
- (boolean) get(postgreSQLTableSource, "appendOnly"));
+ (boolean) get(postgreSQLTableSource, "appendOnly"),
+ (boolean) get(postgreSQLTableSource, "includePartitionedTables"));
}
@Override
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java
index 44b59312058..7f4329be082 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLConnectorITCase.java
@@ -232,6 +232,124 @@ void testConsumingAllEvents(boolean parallelismSnapshot)
result.getJobClient().get().cancel().get();
}
+ @ParameterizedTest
+ @ValueSource(booleans = {true, false})
+ void testConsumingAllEventsForPartitionedTable(boolean parallelismSnapshot)
+ throws SQLException, ExecutionException, InterruptedException {
+ setup(parallelismSnapshot);
+ initializePostgresTable(POSTGRES_CONTAINER, "inventory_partitioned");
+ String publicationName = "dbz_publication_" + new Random().nextInt(1000);
+ String slotName = getSlotName();
+ try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
+ Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "CREATE PUBLICATION %s FOR TABLE inventory_partitioned.products "
+ + " WITH (publish_via_partition_root=true)",
+ publicationName));
+ statement.execute(
+ String.format(
+ "select pg_create_logical_replication_slot('%s','pgoutput');",
+ slotName));
+ }
+
+ String sourceDDL =
+ String.format(
+ "CREATE TABLE debezium_source ("
+ + " id INT NOT NULL,"
+ + " name STRING,"
+ + " description STRING,"
+ + " weight DECIMAL(10,3),"
+ + " country STRING"
+ + ") WITH ("
+ + " 'connector' = 'postgres-cdc',"
+ + " 'hostname' = '%s',"
+ + " 'port' = '%s',"
+ + " 'username' = '%s',"
+ + " 'password' = '%s',"
+ + " 'database-name' = '%s',"
+ + " 'schema-name' = '%s',"
+ + " 'table-name' = '%s',"
+ + " 'scan.incremental.snapshot.enabled' = '%s',"
+ + " 'scan.include-partitioned-tables.enabled' = 'true',"
+ + " 'decoding.plugin.name' = 'pgoutput', "
+ + " 'debezium.publication.name' = '%s',"
+ + " 'slot.name' = '%s'"
+ + ")",
+ POSTGRES_CONTAINER.getHost(),
+ POSTGRES_CONTAINER.getMappedPort(POSTGRESQL_PORT),
+ POSTGRES_CONTAINER.getUsername(),
+ POSTGRES_CONTAINER.getPassword(),
+ POSTGRES_CONTAINER.getDatabaseName(),
+ "inventory_partitioned",
+ "products",
+ parallelismSnapshot,
+ publicationName,
+ slotName);
+ String sinkDDL =
+ "CREATE TABLE sink ("
+ + " id INT NOT NULL,"
+ + " name STRING,"
+ + " description STRING,"
+ + " weight DECIMAL(10,3),"
+ + " country STRING,"
+ + " PRIMARY KEY (id, country) NOT ENFORCED"
+ + ") WITH ("
+ + " 'connector' = 'values',"
+ + " 'sink-insert-only' = 'false',"
+ + " 'sink-expected-messages-num' = '20'"
+ + ")";
+ tEnv.executeSql(sourceDDL);
+ tEnv.executeSql(sinkDDL);
+
+ // async submit job
+ TableResult result =
+ tEnv.executeSql(
+ "INSERT INTO sink SELECT id, name, description, weight, country FROM debezium_source");
+
+ waitForSnapshotStarted("sink");
+
+ // wait a bit to make sure the replication slot is ready
+ Thread.sleep(5000);
+
+ // generate WAL
+ try (Connection connection = getJdbcConnection(POSTGRES_CONTAINER);
+ Statement statement = connection.createStatement()) {
+ statement.execute(
+ "INSERT INTO inventory_partitioned.products VALUES (default,'jacket','water resistent white wind breaker',0.2, 'us');"); // 110
+ statement.execute(
+ "INSERT INTO inventory_partitioned.products VALUES (default,'scooter','Big 2-wheel scooter ',5.18, 'uk');");
+ statement.execute(
+ "CREATE TABLE inventory_partitioned.products_china PARTITION OF inventory_partitioned.products FOR VALUES IN ('china');");
+ statement.execute(
+ "INSERT INTO inventory_partitioned.products VALUES (default,'bike','Big 2-wheel bycicle ',6.18, 'china');");
+ }
+
+ waitForSinkSize("sink", 11);
+
+ // consume both snapshot and wal events
+ String[] expected =
+ new String[] {
+ "101,scooter,Small 2-wheel scooter,3.140,us",
+ "102,car battery,12V car battery,8.100,us",
+ "103,12-pack drill bits,12-pack of drill bits with sizes ranging from #40 to #3,0.800,us",
+ "104,hammer,12oz carpenter's hammer,0.750,us",
+ "105,hammer,14oz carpenter's hammer,0.875,us",
+ "106,hammer,16oz carpenter's hammer,1.000,uk",
+ "107,rocks,box of assorted rocks,5.300,uk",
+ "108,jacket,water resistent black wind breaker,0.100,uk",
+ "109,spare tire,24 inch spare tire,22.200,uk",
+ "110,jacket,water resistent white wind breaker,0.200,us",
+ "111,scooter,Big 2-wheel scooter ,5.180,uk",
+ "112,bike,Big 2-wheel bycicle ,6.180,china"
+ };
+
+ List actual = TestValuesTableFactory.getResultsAsStrings("sink");
+ Assertions.assertThat(actual).containsExactlyInAnyOrder(expected);
+
+ result.getJobClient().get().cancel().get();
+ }
+
@ParameterizedTest
@ValueSource(booleans = {true})
void testStartupFromLatestOffset(boolean parallelismSnapshot) throws Exception {
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java
index e06ff4d9a91..4a4cf4d4027 100644
--- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/java/org/apache/flink/cdc/connectors/postgres/table/PostgreSQLTableFactoryTest.java
@@ -64,6 +64,7 @@
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND;
import static org.apache.flink.cdc.connectors.base.options.SourceOptions.SPLIT_KEY_EVEN_DISTRIBUTION_FACTOR_UPPER_BOUND;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.HEARTBEAT_INTERVAL;
+import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCLUDE_PARTITIONED_TABLES_ENABLED;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY;
import static org.apache.flink.cdc.connectors.utils.AssertUtils.assertProducedTypeOfSourceFunction;
@@ -155,7 +156,8 @@ void testCommonProperties() {
SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(),
SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue(),
- SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED.defaultValue());
+ SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED.defaultValue(),
+ SCAN_INCLUDE_PARTITIONED_TABLES_ENABLED.defaultValue());
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -204,7 +206,8 @@ void testOptionalProperties() {
true,
SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue(),
- true);
+ true,
+ SCAN_INCLUDE_PARTITIONED_TABLES_ENABLED.defaultValue());
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -250,7 +253,8 @@ void testMetadataColumns() {
SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(),
SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue(),
- SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED.defaultValue());
+ SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED.defaultValue(),
+ SCAN_INCLUDE_PARTITIONED_TABLES_ENABLED.defaultValue());
expectedSource.producedDataType = SCHEMA_WITH_METADATA.toSourceRowDataType();
expectedSource.metadataKeys =
Arrays.asList("row_kind", "op_ts", "database_name", "schema_name", "table_name");
@@ -306,7 +310,8 @@ void testEnableParallelReadSource() {
SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(),
SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue(),
- SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED.defaultValue());
+ SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED.defaultValue(),
+ SCAN_INCLUDE_PARTITIONED_TABLES_ENABLED.defaultValue());
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
@@ -352,7 +357,8 @@ void testStartupFromLatestOffset() {
SCAN_NEWLY_ADDED_TABLE_ENABLED.defaultValue(),
SCAN_LSN_COMMIT_CHECKPOINTS_DELAY.defaultValue(),
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED.defaultValue(),
- SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED.defaultValue());
+ SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED.defaultValue(),
+ SCAN_INCLUDE_PARTITIONED_TABLES_ENABLED.defaultValue());
Assertions.assertThat(actualSource).isEqualTo(expectedSource);
}
diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/inventory_partitioned.sql b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/inventory_partitioned.sql
new file mode 100644
index 00000000000..1cffc45c23d
--- /dev/null
+++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-postgres-cdc/src/test/resources/ddl/inventory_partitioned.sql
@@ -0,0 +1,48 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+-- Create the schema that we'll use to populate data and watch the effect in the WAL
+DROP SCHEMA IF EXISTS inventory_partitioned CASCADE;
+CREATE SCHEMA inventory_partitioned;
+SET search_path TO inventory_partitioned;
+
+-- Create and populate our products using a single insert with many rows
+CREATE TABLE products (
+ id SERIAL NOT NULL,
+ name VARCHAR(255) NOT NULL DEFAULT 'flink',
+ description VARCHAR(512),
+ weight FLOAT,
+ country VARCHAR(20) NOT NULL,
+ PRIMARY KEY (id, country)
+) PARTITION BY LIST(country);
+ALTER SEQUENCE products_id_seq RESTART WITH 101;
+ALTER TABLE products REPLICA IDENTITY FULL;
+
+CREATE TABLE products_uk PARTITION OF products
+ FOR VALUES IN ('uk');
+
+CREATE TABLE products_us PARTITION OF products
+ FOR VALUES IN ('us');
+
+INSERT INTO products
+VALUES (default,'scooter','Small 2-wheel scooter',3.14, 'us'),
+ (default,'car battery','12V car battery',8.1, 'us'),
+ (default,'12-pack drill bits','12-pack of drill bits with sizes ranging from #40 to #3',0.8, 'us'),
+ (default,'hammer','12oz carpenter''s hammer',0.75, 'us'),
+ (default,'hammer','14oz carpenter''s hammer',0.875, 'us'),
+ (default,'hammer','16oz carpenter''s hammer',1.0, 'uk'),
+ (default,'rocks','box of assorted rocks',5.3, 'uk'),
+ (default,'jacket','water resistent black wind breaker',0.1, 'uk'),
+ (default,'spare tire','24 inch spare tire',22.2, 'uk');
\ No newline at end of file