Skip to content

Conversation

codluca
Copy link
Member

@codluca codluca commented Oct 8, 2025

Add support for table_changes function for Lakehouse

Fixes #26756

Description

The table_changes function is supported by DeltaLake and Iceberg modules.

The TableChangesFunction does the initial analysis.
The Lakehouse TableChangesFunction will delegate to the DeltaLake TableChangesFunction, or, if the table is not found, to the Iceberg TableChangesFunction.

The LakehouseFunctionProvider returns the TableFunctionProcessorProviderFactory from DeltaLake or Iceberg, depending on the previously functionHandle generated by the TableChangesFunction.

The function for DeltaLake has 3 parameters:
schema_name, table_name, since_version (optional)

The function for Iceberg has 4 parameters (all required):
schema_name, table_name, start_snapshot_id, end_snapshot_id

The function for Lakehouse will have 5 parameters:
schema_name, table_name, start_snapshot_id, end_snapshot_id, since_version (optional)

start_snapshot_id and end_snapshot_id are optional for DeltaLake, but required for Iceberg module

For DeltaLake, the function is called like this:

system.table_changes(
  schema_name => 'tpch',
  table_name => 'region',
  since_version => 0
)

or

system.table_changes('tpch', 'region', null, null, 0)

For Iceberg, the function is called like this:

system.table_changes(
  schema_name => 'tpch',
  table_name => 'region',
  start_snapshot_id => 12345,
  end_snapshot_id => 67890
)

or

system.table_changes('tpch', 'region', 12345, 678909)

Additional context and related issues

Release notes

( ) This is not user-visible or is docs only, and no release notes are required.
(X) Release notes are required. Please propose a release note for me.
( ) Release notes are required, with the following suggested text:

## Section
* Fix some things. ({issue}`26756`)

Summary by Sourcery

Add support for the system.table_changes function to the Lakehouse connector by delegating SQL analysis and execution to the DeltaLake or Iceberg implementations.

New Features:

  • Expose a table_changes table function in the Lakehouse connector that routes to DeltaLake or Iceberg based on the target table.

Enhancements:

  • Extend LakehouseConnector to register table functions and provide a FunctionProvider for delegation.
  • Configure dependency injection in LakehouseModule, LakehouseDeltaModule, and LakehouseIcebergModule to bind function providers and processor factories.

Tests:

  • Add integration smoke tests for table_changes in both Iceberg and DeltaLake connectors to validate insert, update, and delete change streams.

@cla-bot cla-bot bot added the cla-signed label Oct 8, 2025
Copy link

sourcery-ai bot commented Oct 8, 2025

Reviewer's Guide

This PR adds a generic table_changes function to the Lakehouse connector by introducing a delegating TableChangesFunction, registering it via Guice in the connector and modules, and wiring a FunctionProvider to route calls to the appropriate DeltaLake or Iceberg implementation, with new smoke tests validating its behavior.

File-Level Changes

Change Details Files
Introduce a delegating TableChangesFunction
  • Create TableChangesFunction extending AbstractConnectorTableFunction
  • Inject DeltaLake and Iceberg table changes implementations
  • Implement analyze() to try DeltaLake then fall back to Iceberg
plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/functions/tablechanges/TableChangesFunction.java
plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/functions/tablechanges/TableChangesFunctionProvider.java
Register table_changes in the Lakehouse connector
  • Bind ConnectorTableFunction and FunctionProvider in LakehouseModule
  • Inject tableFunctions and functionProvider into LakehouseConnector
  • Override getTableFunctions() and getFunctionProvider()
plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseModule.java
plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseConnector.java
Delegate function processing based on handle type
  • Implement LakehouseFunctionProvider to route to DeltaLake or Iceberg provider
  • Bind DeltaLakeFunctionProvider and IcebergFunctionProvider in their modules
  • Include TableChangesFunctionProcessorProviderFactory binding
plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseFunctionProvider.java
plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseDeltaModule.java
plugin/trino-lakehouse/src/main/java/io/trino/plugin/lakehouse/LakehouseIcebergModule.java
Add smoke tests for table_changes on Lakehouse Iceberg
  • Extend TestLakehouseIcebergConnectorSmokeTest with snapshots-based test
  • Verify insert and delete change types and version/time columns
plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseIcebergConnectorSmokeTest.java
Add smoke tests for table_changes on Lakehouse DeltaLake
  • Extend TestLakehouseDeltaConnectorSmokeTest with CDF-enabled table tests
  • Use assertTableChangesQuery helper to validate inserts, updates, deletes
plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseDeltaConnectorSmokeTest.java

Possibly linked issues


Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes and they look great!

Prompt for AI Agents
Please address the comments from this code review:

## Individual Comments

### Comment 1
<location> `plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseIcebergConnectorSmokeTest.java:55-56` </location>
<code_context>
                 )\\E""");
     }
+
+    @Test
+    public void testTableChangesFunction()
+    {
+        String tableName = "test_table_changes_function_" + randomNameSuffix();
</code_context>

<issue_to_address>
**suggestion (testing):** Missing test cases for error scenarios and edge cases in table_changes function.

Please add tests for querying non-existent tables, invalid snapshot IDs, and cases with no changes between snapshots to improve coverage of error handling and edge cases.
</issue_to_address>

### Comment 2
<location> `plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseDeltaConnectorSmokeTest.java:65-66` </location>
<code_context>
                 )\\E""");
     }
+
+    @Test
+    public void testTableChangesFunction()
+    {
+        String tableName = "test_table_changes_function_" + randomNameSuffix();
</code_context>

<issue_to_address>
**suggestion (testing):** Missing test for table_changes on non-Delta tables and invalid arguments.

Please include tests for non-Delta tables and invalid arguments to ensure error handling is properly verified.
</issue_to_address>

### Comment 3
<location> `plugin/trino-lakehouse/src/test/java/io/trino/plugin/lakehouse/TestLakehouseDeltaConnectorSmokeTest.java:116-118` </location>
<code_context>
+                        """);
+    }
+
+    private void assertTableChangesQuery(@Language("SQL") String sql, @Language("SQL") String expectedResult)
+    {
+        assertThat(query(sql))
+                .result()
+                .exceptColumns("_commit_timestamp")
</code_context>

<issue_to_address>
**suggestion (testing):** Consider adding assertions for error messages and empty results.

Adding assertions for empty results and expected errors will improve test coverage and ensure the function behaves correctly in all scenarios.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines 55 to 56
@Test
public void testTableChangesFunction()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Missing test cases for error scenarios and edge cases in table_changes function.

Please add tests for querying non-existent tables, invalid snapshot IDs, and cases with no changes between snapshots to improve coverage of error handling and edge cases.

Comment on lines 65 to 66
@Test
public void testTableChangesFunction()
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Missing test for table_changes on non-Delta tables and invalid arguments.

Please include tests for non-Delta tables and invalid arguments to ensure error handling is properly verified.

Comment on lines 116 to 118
private void assertTableChangesQuery(@Language("SQL") String sql, @Language("SQL") String expectedResult)
{
assertThat(query(sql))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Consider adding assertions for error messages and empty results.

Adding assertions for empty results and expected errors will improve test coverage and ensure the function behaves correctly in all scenarios.

@codluca codluca force-pushed the 26756-lakehouse-table-changes-function branch 10 times, most recently from 9f5c6a1 to 7bbeb9b Compare October 10, 2025 15:38
@codluca codluca requested a review from ebyhr October 12, 2025 11:04
The table_changes function is supported by DeltaLake and Iceberg modules.
The TableChangesFunction does the initial analisys. The Lakehouse TableChangesFunction will delegate to the DeltaLake TableChangesFunction, or, if the table is not found, to the Iceberg TableChangesFunction.
The LakehouseFunctionProvider returns the TableFunctionProcessorProviderFactory from DeltaLak or Iceberg, depending on the previously functionHandle generated by the TableChangesFunction.
@codluca codluca force-pushed the 26756-lakehouse-table-changes-function branch from 7bbeb9b to 3c4a62f Compare October 12, 2025 11:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Development

Successfully merging this pull request may close these issues.

Lakehouse connector doesn't support table functions

1 participant