Skip to content

Commit fe804bf

Browse files
committed
Add iceberg v2 consumer for flattened data consuming, schema will be inferred by arrow
1 parent 4142c3a commit fe804bf

File tree

2 files changed

+4
-4
lines changed

2 files changed

+4
-4
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ The Iceberg handlers are designed to stream CDC events directly into Apache Iceb
3030
* `IcebergChangeHandler`: A straightforward handler that appends change data to a source-equivalent Iceberg table using a predefined schema.
3131
* **Use Case**: Best for creating a "bronze" layer where you want to capture the raw Debezium event. The `before` and `after` payloads are stored as complete JSON strings.
3232
* **Schema**: Uses a fixed schema where complex nested fields (`source`, `before`, `after`) are stored as `StringType`. It also includes helpful metadata columns (`_consumed_at`, `_dbz_event_key`, `_dbz_event_key_hash`) for traceability.
33+
* With consuming data as json, all source syste schema changes will be absorbed automatically.
3334

3435
* `IcebergChangeHandlerV2`: A more advanced handler that automatically infers the schema from the Debezium events and creates a well-structured Iceberg table accordingly.
3536
* **Use Case**: Ideal for scenarios where you want the pipeline to automatically create tables with native data types that mirror the source. This allows for direct querying of the data without needing to parse JSON.

tests/test_iceberg_handlerv2.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -110,10 +110,9 @@ def test_iceberg_handler(self):
110110
self.execute_on_source_db("DELETE FROM inventory.customers WHERE id = 1002 ;")
111111
self.execute_on_source_db("ALTER TABLE inventory.customers ADD birth_date date;")
112112
self.execute_on_source_db("UPDATE inventory.customers SET birth_date = '2020-01-01' WHERE id = 1001 ;")
113-
with self.assertLogs(IcebergChangeHandlerV2.LOGGER_NAME, level='INFO') as cm:
114-
# run async then interrupt after timeout time to test the result!
115-
Utils.run_engine_async(engine=engine, timeout_sec=44)
116-
113+
# run
114+
Utils.run_engine_async(engine=engine, timeout_sec=44)
115+
# test
117116
data = tbl.scan().to_arrow()
118117
self.pprint_table(data=data)
119118
self.assertEqual(data.num_rows, 4)

0 commit comments

Comments
 (0)