Skip to content

[FLINK-37479][postgres] Add support for PARTITIONED TABLE #4004

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions docs/content.zh/docs/connectors/flink-sources/postgres-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,18 @@ The following options is available only when `scan.incremental.snapshot.enabled=
The table chunks would use evenly calculation optimization when the data distribution is even, and the query for splitting would happen when it is uneven.
The distribution factor could be calculated by (MAX(id) - MIN(id) + 1) / rowCount.</td>
</tr>
<tr>
<td>scan.publish-via-partition-root.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
Whether to enable reading partitioned tables via partition root.<br>
If enabled:
(1) PUBLICATION must be created beforehand with parameter publish_via_partition_root=true
(2) Table list (regex or predefined list) should only match the parent table name, if table list matches both parent and child tables, snapshot data will be read twice.
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
12 changes: 12 additions & 0 deletions docs/content/docs/connectors/flink-sources/postgres-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,18 @@ SELECT * FROM shipments;
For example updating an already updated value in snapshot, or deleting an already deleted entry in snapshot. These replayed change log events should be handled specially.
</td>
</tr>
<tr>
<td>scan.publish-via-partition-root.enabled</td>
<td>optional</td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>
Whether to enable reading partitioned tables via partition root.<br>
If enabled:
(1) PUBLICATION must be created beforehand with parameter publish_via_partition_root=true
(2) Table list (regex or predefined list) should only match the parent table name, if table list matches both parent and child tables, snapshot data will be read twice.
</td>
</tr>
</tbody>
</table>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,9 +176,14 @@ public ChunkSplitter createChunkSplitter(
@Override
public List<TableId> discoverDataCollections(JdbcSourceConfig sourceConfig) {
try (JdbcConnection jdbc = openJdbcConnection(sourceConfig)) {
boolean publishViaPartitionRoot =
((PostgresSourceConfig) sourceConfig).getPublishViaPartitionRoot();
return TableDiscoveryUtils.listTables(
// there is always a single database provided
sourceConfig.getDatabaseList().get(0), jdbc, sourceConfig.getTableFilters());
sourceConfig.getDatabaseList().get(0),
jdbc,
sourceConfig.getTableFilters(),
publishViaPartitionRoot);
} catch (SQLException e) {
throw new FlinkRuntimeException("Error to discover tables: " + e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,6 +289,12 @@ public PostgresSourceBuilder<T> lsnCommitCheckpointsDelay(int lsnCommitDelay) {
return this;
}

/** Whether the connector should read partitioned tables via partition root or not. */
public PostgresSourceBuilder<T> publishViaPartitionRoot(boolean publishViaPartitionRoot) {
this.configFactory.setPublishViaPartitionRoot(publishViaPartitionRoot);
return this;
}

/**
* Build the {@link PostgresIncrementalSource}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ public class PostgresSourceConfig extends JdbcSourceConfig {

private final int subtaskId;
private final int lsnCommitCheckpointsDelay;
private final boolean publishViaPartitionRoot;

public PostgresSourceConfig(
int subtaskId,
Expand Down Expand Up @@ -67,7 +68,8 @@ public PostgresSourceConfig(
boolean skipSnapshotBackfill,
boolean isScanNewlyAddedTableEnabled,
int lsnCommitCheckpointsDelay,
boolean assignUnboundedChunkFirst) {
boolean assignUnboundedChunkFirst,
boolean publishViaPartitionRoot) {
super(
startupOptions,
databaseList,
Expand Down Expand Up @@ -97,6 +99,7 @@ public PostgresSourceConfig(
assignUnboundedChunkFirst);
this.subtaskId = subtaskId;
this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
this.publishViaPartitionRoot = publishViaPartitionRoot;
}

/**
Expand All @@ -117,6 +120,15 @@ public int getLsnCommitCheckpointsDelay() {
return this.lsnCommitCheckpointsDelay;
}

/**
* Returns {@code publishViaPartitionRoot} value.
*
* @return include partitioned table
*/
public boolean getPublishViaPartitionRoot() {
return publishViaPartitionRoot;
}

/**
* Returns the slot name for backfill task.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ public class PostgresSourceConfigFactory extends JdbcSourceConfigFactory {

private int lsnCommitCheckpointsDelay;

private boolean publishViaPartitionRoot;

/** Creates a new {@link PostgresSourceConfig} for the given subtask {@code subtaskId}. */
@Override
public PostgresSourceConfig create(int subtaskId) {
Expand Down Expand Up @@ -133,7 +135,8 @@ public PostgresSourceConfig create(int subtaskId) {
skipSnapshotBackfill,
scanNewlyAddedTableEnabled,
lsnCommitCheckpointsDelay,
assignUnboundedChunkFirst);
assignUnboundedChunkFirst,
publishViaPartitionRoot);
}

/**
Expand Down Expand Up @@ -182,4 +185,9 @@ public void heartbeatInterval(Duration heartbeatInterval) {
public void setLsnCommitCheckpointsDelay(int lsnCommitCheckpointsDelay) {
this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
}

/** Enable include partitioned table. */
public void setPublishViaPartitionRoot(boolean publishViaPartitionRoot) {
this.publishViaPartitionRoot = publishViaPartitionRoot;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,14 @@ public class PostgresSourceOptions extends JdbcSourceOptions {
+ "By setting this to higher value, the offset that is consumed by global slot will be "
+ "committed after multiple checkpoint delays instead of after each checkpoint completion.\n"
+ "This allows continuous recycle of log files in stream phase.");

public static final ConfigOption<Boolean> SCAN_PUBLISH_VIA_PARTITION_ROOT_ENABLED =
ConfigOptions.key("scan.publish-via-partition-root.enabled")
.booleanType()
.defaultValue(Boolean.FALSE)
.withDescription(
"Enable reading from partitioned table via partition root.\n"
+ "If enabled:\n"
+ "(1) PUBLICATION must be created beforehand with parameter publish_via_partition_root=true\n"
+ "(2) Table list (regex or predefined list) should only match the parent table name, if table list matches both parent and child tables, snapshot data will be read twice.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,17 @@ public class TableDiscoveryUtils {
private static final Logger LOG = LoggerFactory.getLogger(TableDiscoveryUtils.class);

public static List<TableId> listTables(
String database, JdbcConnection jdbc, RelationalTableFilters tableFilters)
String database,
JdbcConnection jdbc,
RelationalTableFilters tableFilters,
boolean publishViaPartitionRoot)
throws SQLException {

Set<TableId> allTableIds =
jdbc.readTableNames(database, null, null, new String[] {"TABLE"});
Copy link
Contributor

@loserwang1024 loserwang1024 Apr 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hope we can add a param such as partitioned(maybe we can check whether debezium has, so we can reuse), when this param is enabled:

  1. discovery partition table here.
  2. add publish_via_partition_root=true when create publication.We can add a initRootPublication like what io.debezium.connector.postgresql.connection.PostgresReplicationConnection#initPublication does or just modify this method. (But this class is copy from Debezium, too much difference is not recommand).

Though it's enough to read partition table now. But user have to create publication in advance.
To be honest, I hope Debeziumapply it. But if it lacks it, we can do it to make easier for user.

Just my own thought,
@phamvinh1712 @leonardBang , What do you think?

Copy link
Contributor Author

@phamvinh1712 phamvinh1712 Apr 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know that debezium doesn't create publication with publish_via_partition_root=true and users will need to create it themselves in advance.
I agree with you that if we do it, it will help users but i'm afraid it will make us hard to upgrade debezium version (which i believe we need to do at some points since flink-cdc is using an old version of debezium 1.9.6)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theres is also another concern. If user use regex expression as table name, both parent table and subtable will be captured. Then snapshot data will be read twice.

Thus, a option can also tell user that. If they enable partition table, they should pay attention to it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theres is also another concern. If user use regex expression as table name, both parent table and subtable will be captured. Then snapshot data will be read twice.

Thus, a option can also tell user that. If they enable partition table, they should pay attention to it.

do you have any suggestion for this? should we add a section on flink-cdc doc for this?

String[] tableTypes = new String[] {"TABLE"};
if (publishViaPartitionRoot) {
tableTypes = new String[] {"TABLE", "PARTITIONED TABLE"};
}
Set<TableId> allTableIds = jdbc.readTableNames(database, null, null, tableTypes);

Set<TableId> capturedTables =
allTableIds.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_SIZE;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_ENABLED;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_LSN_COMMIT_CHECKPOINTS_DELAY;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_PUBLISH_VIA_PARTITION_ROOT_ENABLED;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_SNAPSHOT_FETCH_SIZE;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SCAN_STARTUP_MODE;
import static org.apache.flink.cdc.connectors.postgres.source.config.PostgresSourceOptions.SLOT_NAME;
Expand Down Expand Up @@ -117,6 +118,7 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c
boolean skipSnapshotBackfill = config.get(SCAN_INCREMENTAL_SNAPSHOT_BACKFILL_SKIP);
boolean isScanNewlyAddedTableEnabled = config.get(SCAN_NEWLY_ADDED_TABLE_ENABLED);
int lsnCommitCheckpointsDelay = config.get(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
boolean publishViaPartitionRoot = config.get(SCAN_PUBLISH_VIA_PARTITION_ROOT_ENABLED);
boolean assignUnboundedChunkFirst =
config.get(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);

Expand Down Expand Up @@ -165,7 +167,8 @@ public DynamicTableSource createDynamicTableSource(DynamicTableFactory.Context c
skipSnapshotBackfill,
isScanNewlyAddedTableEnabled,
lsnCommitCheckpointsDelay,
assignUnboundedChunkFirst);
assignUnboundedChunkFirst,
publishViaPartitionRoot);
}

@Override
Expand Down Expand Up @@ -209,6 +212,7 @@ public Set<ConfigOption<?>> optionalOptions() {
options.add(SCAN_NEWLY_ADDED_TABLE_ENABLED);
options.add(SCAN_LSN_COMMIT_CHECKPOINTS_DELAY);
options.add(SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED);
options.add(SCAN_PUBLISH_VIA_PARTITION_ROOT_ENABLED);
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ public class PostgreSQLTableSource implements ScanTableSource, SupportsReadingMe
private final boolean scanNewlyAddedTableEnabled;
private final int lsnCommitCheckpointsDelay;
private final boolean assignUnboundedChunkFirst;
private final boolean publishViaPartitionRoot;

// --------------------------------------------------------------------------------------------
// Mutable attributes
Expand Down Expand Up @@ -126,7 +127,8 @@ public PostgreSQLTableSource(
boolean skipSnapshotBackfill,
boolean isScanNewlyAddedTableEnabled,
int lsnCommitCheckpointsDelay,
boolean assignUnboundedChunkFirst) {
boolean assignUnboundedChunkFirst,
boolean publishViaPartitionRoot) {
this.physicalSchema = physicalSchema;
this.port = port;
this.hostname = checkNotNull(hostname);
Expand Down Expand Up @@ -159,6 +161,7 @@ public PostgreSQLTableSource(
this.scanNewlyAddedTableEnabled = isScanNewlyAddedTableEnabled;
this.lsnCommitCheckpointsDelay = lsnCommitCheckpointsDelay;
this.assignUnboundedChunkFirst = assignUnboundedChunkFirst;
this.publishViaPartitionRoot = publishViaPartitionRoot;
}

@Override
Expand Down Expand Up @@ -222,6 +225,7 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
.scanNewlyAddedTableEnabled(scanNewlyAddedTableEnabled)
.lsnCommitCheckpointsDelay(lsnCommitCheckpointsDelay)
.assignUnboundedChunkFirst(assignUnboundedChunkFirst)
.publishViaPartitionRoot(publishViaPartitionRoot)
.build();
return SourceProvider.of(parallelSource);
} else {
Expand Down Expand Up @@ -291,7 +295,8 @@ public DynamicTableSource copy() {
skipSnapshotBackfill,
scanNewlyAddedTableEnabled,
lsnCommitCheckpointsDelay,
assignUnboundedChunkFirst);
assignUnboundedChunkFirst,
publishViaPartitionRoot);
source.metadataKeys = metadataKeys;
source.producedDataType = producedDataType;
return source;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ class PostgresDialectTest extends PostgresTestBase {
POSTGRES_CONTAINER.getUsername(),
POSTGRES_CONTAINER.getPassword());

private final UniqueDatabase inventoryPartitionedDatabase =
new UniqueDatabase(
POSTGRES_CONTAINER,
"postgres3",
"inventory_partitioned",
POSTGRES_CONTAINER.getUsername(),
POSTGRES_CONTAINER.getPassword());

@Test
void testDiscoverDataCollectionsInMultiDatabases() {

Expand Down Expand Up @@ -88,4 +96,23 @@ void testDiscoverDataCollectionsInMultiDatabases() {
configFactoryOfInventoryDatabase2.create(0));
Assertions.assertThat(tableIdsOfInventoryDatabase2).isEmpty();
}

@Test
void testDiscoverDataCollectionsForPartitionedTable() {
// initial database with partitioned table
inventoryPartitionedDatabase.createAndInitialize();

// get table named 'inventory_partitioned.products' from inventoryPartitionedDatabase
PostgresSourceConfigFactory configFactoryOfInventoryPartitionedDatabase =
getMockPostgresSourceConfigFactory(
inventoryPartitionedDatabase, "inventory_partitioned", "products", 10);
configFactoryOfInventoryPartitionedDatabase.setPublishViaPartitionRoot(true);
PostgresDialect dialectOfInventoryPartitionedDatabase =
new PostgresDialect(configFactoryOfInventoryPartitionedDatabase.create(0));
List<TableId> tableIdsOfInventoryPartitionedDatabase =
dialectOfInventoryPartitionedDatabase.discoverDataCollections(
configFactoryOfInventoryPartitionedDatabase.create(0));
Assertions.assertThat(tableIdsOfInventoryPartitionedDatabase.get(0))
.hasToString("inventory_partitioned.products");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ public MockPostgreSQLTableSource(PostgreSQLTableSource postgreSQLTableSource) {
(boolean) get(postgreSQLTableSource, "skipSnapshotBackfill"),
(boolean) get(postgreSQLTableSource, "scanNewlyAddedTableEnabled"),
(int) get(postgreSQLTableSource, "lsnCommitCheckpointsDelay"),
(boolean) get(postgreSQLTableSource, "assignUnboundedChunkFirst"));
(boolean) get(postgreSQLTableSource, "assignUnboundedChunkFirst"),
(boolean) get(postgreSQLTableSource, "publishViaPartitionRoot"));
}

@Override
Expand Down
Loading
Loading