Skip to content

Add TableSink operator with Java/Spark implementations#665

Draft
harrygav wants to merge 1 commit intoapache:mainfrom
harrygav:introduce_postgresql_sink
Draft

Add TableSink operator with Java/Spark implementations#665
harrygav wants to merge 1 commit intoapache:mainfrom
harrygav:introduce_postgresql_sink

Conversation

@harrygav
Copy link

Summary

This PR introduces a new TableSink operator for writing Record data into a database table via JDBC, with implementations for the Java and Spark platforms.

Opening as Draft to start discussion on the operator design and expected behavior.

Changes

  • New operator: TableSink (in wayang-basic)

    • A UnarySink<Record> that targets a table name and accepts JDBC connection Properties
    • Supports a write mode (e.g. overwrite) and optional column names
  • Java platform: JavaTableSink (in wayang-java)

    • JDBC-based implementation that can create the target table (if missing) and batch-insert records
    • Supports overwrite by dropping the target table first
  • Spark platform: SparkTableSink (in wayang-spark)

    • Spark-side implementation of the same TableSink operator

Notes / open questions

  • This started as a PostgreSQL sink, but the intention should likely be a generic JDBC sink that works across multiple databases.
  • DDL generation is currently basic (e.g., columns are auto-created as VARCHARs)
  • mode behavior (overwrite vs append, etc.) should be agreed on and formalized.

How to use / test

To run end-to-end locally, you currently need an external PostgreSQL instance available and provide JDBC connection details (driver/url/user/password) in the test setup/environment.

@juripetersen
Copy link
Contributor

Thanks @harrygav, this is great!

Could we make TableSink generic over its input type and thus make DDL generation easier with reflections on the given type?

@novatechflow
Copy link
Member

Thank you - just to make the tests running, how's about mocking the JDBC layer?

Wrap DriverManager.getConnection/Connection in a small interface (e.g., JdbcClient) and inject a fake in tests. Then assert SQL statements and batch parameters without a real DB.

@harrygav
Copy link
Author

Thanks @harrygav, this is great!

Could we make TableSink generic over its input type and thus make DDL generation easier with reflections on the given type?

I will take a look and update the PR to continue the discussion!

@zkaoudi
Copy link
Contributor

zkaoudi commented Feb 11, 2026

Hey @harrygav, any news on this? Apparently a table sink is crucial for many things and we would like to start using it already.

@zkaoudi
Copy link
Contributor

zkaoudi commented Feb 11, 2026

Also, another question: wouldn't it make sense to also have a sink for a database? Now you have implemented one execution operator for Java and one for Spark but why not for a database?
One reason where that would be desirable could be if you have data in one database and you would like to extract some data from it and import it into another one (or think an ETL pipeline).

@harrygav
Copy link
Author

Hi @zkaoudi, nice to hear that the PR will be useful, I will follow up on this by the end of the week!

Thanks for your input, I think there are many things to be clarified for the sink operator, but I guess we will figure them out once we know more about the targeted use cases we want to cover. With the current implementation, you could do the ETL pipeline you mention through the Java or Spark platforms: e.g., Source(Java/Spark from DBMS1)->ETL(Java/Spark)->Sink(Java/Spark to DBMS2). Or where you thinking to write from DBMS1 into DBMS2 directly without any intermediate Java/Spark platform step? That would also be interesting for some use cases (improved perf) but becomes cumbersome to maintain in terms of interoperability.

Let me know what you think!

@zkaoudi
Copy link
Contributor

zkaoudi commented Feb 12, 2026

Yes I was thinking about directly writing from DBMS1 to DBMS2. For example, if you have two tables in two DBMSs and you want to join them and write the result into DBMS2 without doing the join in Spark or java. What do you mean with issues of interoperability?

@zkaoudi
Copy link
Contributor

zkaoudi commented Feb 12, 2026

But on a second thought, what I described above as a scenario is more like a conversion operator in addition to a sink. You would ideally want to create a temp table to do the join and then persist the result.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants