Skip to content

Commit 8a6c2db

Browse files
committed
Add iceberg v2 consumer for flattened data consuming, schema will be inferred by arrow
1 parent cda126c commit 8a6c2db

File tree

3 files changed

+332
-0
lines changed

3 files changed

+332
-0
lines changed

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,13 @@ The Iceberg handlers are designed to stream CDC events directly into Apache Iceb
3434
* **Automatic Table Creation & Partitioning**: It automatically creates a new Iceberg table for each source table and partitions it by day on the `_consumed_at` timestamp for efficient time-series queries.
3535
* **Enriched Metadata**: It also adds `_consumed_at`, `_dbz_event_key`, and `_dbz_event_key_hash` columns for enhanced traceability.
3636

37+
* `IcebergChangeHandlerV2`: A more advanced handler that automatically infers the schema from the Debezium events and creates a well-structured Iceberg table accordingly.
38+
* **Use Case**: Ideal for scenarios where you want the pipeline to automatically create tables with native data types that mirror the source. This allows for direct querying of the data without needing to parse JSON.
39+
* **Schema and Features**:
40+
* **Automatic Schema Inference**: It inspects the first batch of records for a given table and infers the schema using PyArrow, preserving native data types (e.g., `LongType`, `TimestampType`).
41+
* **Robust Type Handling**: If a field's type cannot be inferred from the initial batch (e.g., it is always `null`), it safely falls back to `StringType` to prevent errors.
42+
* **Automatic Table Creation & Partitioning**: It automatically creates a new Iceberg table for each source table and partitions it by day on the `_consumed_at` timestamp for efficient time-series queries.
43+
* **Enriched Metadata**: It also adds `_consumed_at`, `_dbz_event_key`, and `_dbz_event_key_hash` columns for enhanced traceability.
3744

3845
### dlt (data load tool) Handler (`pydbzengine[dlt]`)
3946

pydbzengine/handlers/iceberg.py

Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
import datetime
2+
import io
23
import json
34
import logging
45
import uuid
56
from abc import abstractmethod
67
from typing import List, Dict
78

89
import pyarrow as pa
10+
from pyarrow import json as pa_json
911
from pyiceberg.catalog import Catalog
1012
from pyiceberg.exceptions import NoSuchTableError
1113
from pyiceberg.partitioning import PartitionSpec, PartitionField
@@ -200,3 +202,202 @@ def _target_schema(self) -> Schema:
200202
),
201203
)
202204

205+
206+
class IcebergChangeHandlerV2(BaseIcebergChangeHandler):
207+
"""
208+
A change handler that uses Apache Iceberg to process Debezium change events.
209+
This class receives batches of Debezium ChangeEvent objects and applies the changes
210+
to the corresponding Iceberg tables.
211+
"""
212+
213+
def __init__(self, catalog: "Catalog", destination_namespace: tuple, supports_variant: bool = False,
214+
event_flattening_enabled=False):
215+
super().__init__(catalog, destination_namespace, supports_variant)
216+
self.event_flattening_enabled = event_flattening_enabled
217+
218+
def _handle_table_changes(self, destination: str, records: List[ChangeEvent]):
219+
"""
220+
Handles changes for a specific table.
221+
Args:
222+
destination: The name of the table to apply the changes to.
223+
records: A list of ChangeEvent objects for the specified table.
224+
"""
225+
226+
table = self.get_table(destination)
227+
if table is None:
228+
table_identifier: tuple = self.destination_to_table_identifier(destination=destination)
229+
table = self._infer_and_create_table(records=records, table_identifier=table_identifier)
230+
#
231+
arrow_data = self._read_to_arrow_table(records=records, schema=table.schema())
232+
233+
# Populate all metadata columns (_consumed_at, _dbz_event_key, etc.)
234+
enriched_arrow_data = self._enrich_arrow_table_with_metadata(
235+
arrow_table=arrow_data,
236+
records=records
237+
)
238+
239+
table.append(enriched_arrow_data)
240+
self.log.info(f"Appended {len(enriched_arrow_data)} records to table {'.'.join(table.name())}")
241+
242+
def _enrich_arrow_table_with_metadata(self, arrow_table: pa.Table, records: List[ChangeEvent]) -> pa.Table:
243+
num_records = len(arrow_table)
244+
dbz_event_keys = []
245+
dbz_event_key_hashes = []
246+
247+
for record in records:
248+
key = record.key()
249+
dbz_event_keys.append(key)
250+
key_hash = str(uuid.uuid5(uuid.NAMESPACE_DNS, key)) if key else None
251+
dbz_event_key_hashes.append(key_hash)
252+
253+
# Create PyArrow arrays for each metadata column
254+
consumed_at_array = pa.array([datetime.datetime.now(datetime.timezone.utc)] * num_records,
255+
type=pa.timestamp('us', tz='UTC'))
256+
dbz_event_key_array = pa.array(dbz_event_keys, type=pa.string())
257+
dbz_event_key_hash_array = pa.array(dbz_event_key_hashes, type=pa.string())
258+
259+
# Replace the null columns in the Arrow table with the populated arrays.
260+
# This uses set_column, which is efficient for replacing entire columns.
261+
enriched_table = arrow_table.set_column(
262+
arrow_table.schema.get_field_index("_consumed_at"),
263+
"_consumed_at",
264+
consumed_at_array
265+
)
266+
enriched_table = enriched_table.set_column(
267+
enriched_table.schema.get_field_index("_dbz_event_key"),
268+
"_dbz_event_key",
269+
dbz_event_key_array
270+
)
271+
enriched_table = enriched_table.set_column(
272+
enriched_table.schema.get_field_index("_dbz_event_key_hash"),
273+
"_dbz_event_key_hash",
274+
dbz_event_key_hash_array
275+
)
276+
277+
return enriched_table
278+
279+
def _read_to_arrow_table(self, records, schema=None):
280+
json_lines_buffer = io.BytesIO()
281+
for record in records:
282+
json_lines_buffer.write((record.value() + '\n').encode('utf-8'))
283+
json_lines_buffer.seek(0)
284+
285+
parse_options = None
286+
if schema:
287+
# If an Iceberg schema is provided, convert it to a PyArrow schema and use it for parsing.
288+
parse_options = pa_json.ParseOptions(explicit_schema=schema.as_arrow())
289+
return pa_json.read_json(json_lines_buffer, parse_options=parse_options)
290+
291+
def get_table(self, destination: str) -> "Table":
292+
table_identifier: tuple = self.destination_to_table_identifier(destination=destination)
293+
return self.load_table(table_identifier=table_identifier)
294+
295+
def load_table(self, table_identifier):
296+
try:
297+
return self.catalog.load_table(identifier=table_identifier)
298+
except NoSuchTableError:
299+
return None
300+
301+
def _infer_and_create_table(self, records: List[ChangeEvent], table_identifier: tuple) -> Table:
302+
"""
303+
Infers a schema from a batch of records, creates a new Iceberg table with that schema,
304+
and sets up daily partitioning on the _consumed_at field.
305+
"""
306+
arrow_table = self._read_to_arrow_table(records)
307+
sanitized_fields = self._sanitize_schema_fields(data_schema=arrow_table.schema)
308+
309+
# Add metadata fields to the list of pyarrow fields
310+
sanitized_fields.extend([
311+
pa.field("_consumed_at", pa.timestamp('us', tz='UTC')),
312+
pa.field("_dbz_event_key", pa.string()),
313+
pa.field("_dbz_event_key_hash", pa.string()) # For UUIDType
314+
])
315+
316+
# Create a pyarrow schema first
317+
sanitized_arrow_schema = pa.schema(sanitized_fields)
318+
319+
# Create the table
320+
table = self.catalog.create_table(
321+
identifier=table_identifier,
322+
schema=sanitized_arrow_schema
323+
)
324+
# add partitioning
325+
with table.update_spec() as update_spec:
326+
update_spec.add_field(source_column_name="_consumed_at", transform=DayTransform(),
327+
partition_field_name="_consumed_at_day")
328+
329+
if self.event_flattening_enabled and False:
330+
# @TODO fix future. https://github.com/apache/iceberg-python/issues/1728
331+
identifier_fields = self._get_identifier_fields(sample_event=records[0],
332+
table_identifier=table_identifier
333+
)
334+
if identifier_fields:
335+
with table.update_schema(allow_incompatible_changes=True) as update_schema:
336+
for field in identifier_fields:
337+
update_schema._set_column_requirement(path=field, required=True)
338+
update_schema.set_identifier_fields(*identifier_fields)
339+
self.log.info(f"Created iceberg table {'.'.join(table_identifier)} with daily partitioning on _consumed_at.")
340+
return table
341+
342+
def _sanitize_schema_fields(self, data_schema: pa.Schema) -> list:
343+
"""
344+
Recursively traverses a PyArrow schema and replaces null types with a string fallback.
345+
This is useful when a schema is inferred from JSON where some fields are always null.
346+
"""
347+
new_fields = []
348+
for field in data_schema:
349+
field_type = field.type
350+
if pa.types.is_null(field_type):
351+
# Found a null type, replace it with a string as a fallback.
352+
new_fields.append(field.with_type(pa.string()))
353+
elif pa.types.is_struct(field_type):
354+
# Found a struct, so we recurse on its fields to sanitize them.
355+
# We can treat the struct's fields as a schema for the recursive call.
356+
nested_schema = pa.schema(field_type)
357+
sanitized_nested_schema = self._sanitize_schema_fields(nested_schema)
358+
# Recreate the field with the new, sanitized struct type.
359+
new_fields.append(field.with_type(pa.struct(sanitized_nested_schema)))
360+
else:
361+
# Not a null or struct, so we keep the field as is.
362+
new_fields.append(field)
363+
return new_fields
364+
365+
def _get_identifier_fields(self, sample_event: ChangeEvent, table_identifier: tuple) -> list:
366+
"""
367+
Parses the Debezium event key to extract primary key field names.
368+
369+
This method uses a series of guard clauses to validate the key and returns
370+
an empty list if any validation step fails.
371+
372+
Args:
373+
sample_event: A sample change event to inspect for the key.
374+
table_identifier: The identifier of the table, used for logging.
375+
376+
Returns:
377+
A list of key field names, or an empty list if the key cannot be determined.
378+
"""
379+
key_json_str = sample_event.key()
380+
table_name_str = '.'.join(table_identifier)
381+
382+
if not key_json_str:
383+
self.log.warning(f"Cannot determine identifier fields for {table_name_str}: event key is empty.")
384+
return []
385+
386+
try:
387+
key_data = json.loads(key_json_str)
388+
except json.JSONDecodeError:
389+
self.log.error(f"Failed to parse Debezium event key as JSON for table {table_name_str}: {key_json_str}")
390+
return []
391+
392+
if not isinstance(key_data, dict):
393+
self.log.warning(
394+
f"Event key for {table_name_str} is not a JSON object, cannot infer primary key. Key: {key_json_str}")
395+
return []
396+
397+
key_field_names = list(key_data.keys())
398+
if not key_field_names:
399+
self.log.warning(f"Event key for {table_name_str} is an empty JSON object, cannot infer primary key.")
400+
return []
401+
402+
self.log.info(f"Found potential primary key fields {key_field_names} for table {table_name_str}")
403+
return key_field_names

tests/test_iceberg_handlerv2.py

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
import unittest
2+
3+
import pandas as pd
4+
from pyiceberg.catalog import load_catalog
5+
from pyiceberg.schema import Schema
6+
from pyiceberg.types import LongType, NestedField, StringType
7+
8+
from base_postgresql_test import BasePostgresqlTest
9+
from catalog_rest import CatalogRestContainer
10+
from pydbzengine import DebeziumJsonEngine
11+
from pydbzengine.handlers.iceberg import IcebergChangeHandlerV2
12+
from pydbzengine.helper import Utils
13+
from s3_minio import S3Minio
14+
15+
16+
class TestIcebergChangeHandler(BasePostgresqlTest):
17+
S3MiNIO = S3Minio()
18+
RESTCATALOG = CatalogRestContainer()
19+
20+
def setUp(self):
21+
print("setUp")
22+
self.clean_offset_file()
23+
self.SOURCEPGDB.start()
24+
self.S3MiNIO.start()
25+
self.RESTCATALOG.start(s3_endpoint=self.S3MiNIO.endpoint())
26+
# Set pandas options to display all rows and columns, and prevent truncation of cell content
27+
pd.set_option('display.max_rows', None) # Show all rows
28+
pd.set_option('display.max_columns', None) # Show all columns
29+
pd.set_option('display.width', None) # Auto-detect terminal width
30+
pd.set_option('display.max_colwidth', None) # Do not truncate cell contents
31+
32+
def tearDown(self):
33+
self.SOURCEPGDB.stop()
34+
self.S3MiNIO.stop()
35+
self.clean_offset_file()
36+
37+
@unittest.skip
38+
def test_iceberg_catalog(self):
39+
conf = {
40+
"uri": self.RESTCATALOG.get_uri(),
41+
# "s3.path-style.access": "true",
42+
"warehouse": "warehouse",
43+
"s3.endpoint": self.S3MiNIO.endpoint(),
44+
"s3.access-key-id": S3Minio.AWS_ACCESS_KEY_ID,
45+
"s3.secret-access-key": S3Minio.AWS_SECRET_ACCESS_KEY,
46+
}
47+
print(conf)
48+
catalog = load_catalog(
49+
name="rest",
50+
**conf
51+
)
52+
catalog.create_namespace('my_warehouse')
53+
debezium_event_schema = Schema(
54+
NestedField(field_id=1, name="id", field_type=LongType(), required=True),
55+
NestedField(field_id=2, name="data", field_type=StringType(), required=False),
56+
)
57+
table = catalog.create_table(identifier=("my_warehouse", "test_table",), schema=debezium_event_schema)
58+
print(f"Created iceberg table {table.refs()}")
59+
60+
def test_iceberg_handler(self):
61+
dest_ns1_database="my_warehouse"
62+
dest_ns2_schema="dbz_cdc_data"
63+
conf = {
64+
"uri": self.RESTCATALOG.get_uri(),
65+
# "s3.path-style.access": "true",
66+
"warehouse": "warehouse",
67+
"s3.endpoint": self.S3MiNIO.endpoint(),
68+
"s3.access-key-id": S3Minio.AWS_ACCESS_KEY_ID,
69+
"s3.secret-access-key": S3Minio.AWS_SECRET_ACCESS_KEY,
70+
}
71+
catalog = load_catalog(name="rest",**conf)
72+
73+
handler = IcebergChangeHandlerV2(catalog=catalog,
74+
destination_namespace=(dest_ns1_database, dest_ns2_schema,),
75+
event_flattening_enabled=True
76+
)
77+
78+
dbz_props = self.debezium_engine_props(unwrap_messages=True)
79+
engine = DebeziumJsonEngine(properties=dbz_props, handler=handler)
80+
with self.assertLogs(IcebergChangeHandlerV2.LOGGER_NAME, level='INFO') as cm:
81+
# run async then interrupt after timeout time to test the result!
82+
Utils.run_engine_async(engine=engine, timeout_sec=44)
83+
84+
# for t in cm.output:
85+
# print(t)
86+
self.assertRegex(text=str(cm.output), expected_regex='.*Created iceberg table.*')
87+
self.assertRegex(text=str(cm.output), expected_regex='.*Appended.*records to table.*')
88+
89+
# catalog.create_namespace(dest_ns1_database)
90+
namespaces = catalog.list_namespaces()
91+
self.assertIn((dest_ns1_database,) , namespaces, msg="Namespace not found in catalog")
92+
93+
tables = catalog.list_tables((dest_ns1_database, dest_ns2_schema,))
94+
print(tables)
95+
self.assertIn(('my_warehouse', 'dbz_cdc_data', 'testc_inventory_customers'), tables, msg="Namespace not found in catalog")
96+
97+
tbl = catalog.load_table(identifier=('my_warehouse', 'dbz_cdc_data', 'testc_inventory_customers'))
98+
data = tbl.scan().to_arrow()
99+
self.assertIn("[email protected]", str(data))
100+
self.assertIn("[email protected]", str(data))
101+
self.assertEqual(data.num_rows, 4)
102+
self.pprint_table(data=data)
103+
#=================================================================
104+
## ==== PART 2 CONSUME CHANGES FROM BINLOG =======================
105+
#=================================================================
106+
self.execute_on_source_db("UPDATE inventory.customers SET first_name='George__UPDATE1' WHERE ID = 1002 ;")
107+
# self.execute_on_source_db("ALTER TABLE inventory.customers DROP COLUMN email;")
108+
self.execute_on_source_db("UPDATE inventory.customers SET first_name='George__UPDATE2' WHERE ID = 1002 ;")
109+
self.execute_on_source_db("DELETE FROM inventory.orders WHERE purchaser = 1002 ;")
110+
self.execute_on_source_db("DELETE FROM inventory.customers WHERE id = 1002 ;")
111+
self.execute_on_source_db("ALTER TABLE inventory.customers ADD birth_date date;")
112+
self.execute_on_source_db("UPDATE inventory.customers SET birth_date = '2020-01-01' WHERE id = 1001 ;")
113+
# run
114+
Utils.run_engine_async(engine=engine, timeout_sec=44)
115+
# test
116+
# @TODO test that new field is received and added to iceberg!
117+
data = tbl.scan().to_arrow()
118+
self.pprint_table(data=data)
119+
self.assertEqual(data.num_rows, 4)
120+
121+
def pprint_table(self, data):
122+
print("--- Iceberg Table Content ---")
123+
print(data.to_pandas())
124+
print("---------------------------\n")

0 commit comments

Comments
 (0)