Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,13 @@ The Iceberg handlers are designed to stream CDC events directly into Apache Iceb
* **Automatic Table Creation & Partitioning**: It automatically creates a new Iceberg table for each source table and partitions it by day on the `_consumed_at` timestamp for efficient time-series queries.
* **Enriched Metadata**: It also adds `_consumed_at`, `_dbz_event_key`, and `_dbz_event_key_hash` columns for enhanced traceability.

* `IcebergChangeHandlerV2`: A more advanced handler that automatically infers the schema from the Debezium events and creates a well-structured Iceberg table accordingly.
* **Use Case**: Ideal for scenarios where you want the pipeline to automatically create tables with native data types that mirror the source. This allows for direct querying of the data without needing to parse JSON.
* **Schema and Features**:
* **Automatic Schema Inference**: It inspects the first batch of records for a given table and infers the schema using PyArrow, preserving native data types (e.g., `LongType`, `TimestampType`).
* **Robust Type Handling**: If a field's type cannot be inferred from the initial batch (e.g., it is always `null`), it safely falls back to `StringType` to prevent errors.
* **Automatic Table Creation & Partitioning**: It automatically creates a new Iceberg table for each source table and partitions it by day on the `_consumed_at` timestamp for efficient time-series queries.
* **Enriched Metadata**: It also adds `_consumed_at`, `_dbz_event_key`, and `_dbz_event_key_hash` columns for enhanced traceability.

### dlt (data load tool) Handler (`pydbzengine[dlt]`)

Expand Down
210 changes: 209 additions & 1 deletion pydbzengine/handlers/iceberg.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import datetime
import io
import json
import logging
import uuid
from abc import abstractmethod
from typing import List, Dict

import pyarrow as pa
from pyarrow import json as pa_json
from pyiceberg.catalog import Catalog
from pyiceberg.exceptions import NoSuchTableError
from pyiceberg.partitioning import PartitionSpec, PartitionField
Expand Down Expand Up @@ -141,7 +143,8 @@ def load_table(self, table_identifier):
table = self.catalog.create_table(identifier=table_identifier,
schema=self._target_schema,
partition_spec=self.DEBEZIUM_TABLE_PARTITION_SPEC)
self.log.info(f"Created iceberg table {'.'.join(table_identifier)} with daily partitioning on _consumed_at.")
self.log.info(
f"Created iceberg table {'.'.join(table_identifier)} with daily partitioning on _consumed_at.")
return table

@property
Expand Down Expand Up @@ -200,3 +203,208 @@ def _target_schema(self) -> Schema:
),
)


class IcebergChangeHandlerV2(BaseIcebergChangeHandler):
"""
A change handler that uses Apache Iceberg to process Debezium change events.
This class receives batches of Debezium ChangeEvent objects and applies the changes
to the corresponding Iceberg tables.
"""

def __init__(self, catalog: "Catalog", destination_namespace: tuple, supports_variant: bool = False,
event_flattening_enabled=False):
super().__init__(catalog, destination_namespace, supports_variant)
self.event_flattening_enabled = event_flattening_enabled

def _handle_table_changes(self, destination: str, records: List[ChangeEvent]):
"""
Handles changes for a specific table.
Args:
destination: The name of the table to apply the changes to.
records: A list of ChangeEvent objects for the specified table.
"""

table = self.get_table(destination)
if table is None:
table_identifier: tuple = self.destination_to_table_identifier(destination=destination)
table = self._infer_and_create_table(records=records, table_identifier=table_identifier)
#
arrow_data = self._read_to_arrow_table(records=records, schema=table.schema())

# Populate all metadata columns (_consumed_at, _dbz_event_key, etc.)
enriched_arrow_data = self._enrich_arrow_table_with_metadata(
arrow_table=arrow_data,
records=records
)

self._handle_schema_changes(table=table, arrow_schema=enriched_arrow_data.schema)
table.append(enriched_arrow_data)
self.log.info(f"Appended {len(enriched_arrow_data)} records to table {'.'.join(table.name())}")

def _enrich_arrow_table_with_metadata(self, arrow_table: pa.Table, records: List[ChangeEvent]) -> pa.Table:
num_records = len(arrow_table)
dbz_event_keys = []
dbz_event_key_hashes = []

for record in records:
key = record.key()
dbz_event_keys.append(key)
key_hash = str(uuid.uuid5(uuid.NAMESPACE_DNS, key)) if key else None
dbz_event_key_hashes.append(key_hash)

# Create PyArrow arrays for each metadata column
consumed_at_array = pa.array([datetime.datetime.now(datetime.timezone.utc)] * num_records,
type=pa.timestamp('us', tz='UTC'))
dbz_event_key_array = pa.array(dbz_event_keys, type=pa.string())
dbz_event_key_hash_array = pa.array(dbz_event_key_hashes, type=pa.string())

# Replace the null columns in the Arrow table with the populated arrays.
# This uses set_column, which is efficient for replacing entire columns.
enriched_table = arrow_table.set_column(
arrow_table.schema.get_field_index("_consumed_at"),
"_consumed_at",
consumed_at_array
)
enriched_table = enriched_table.set_column(
enriched_table.schema.get_field_index("_dbz_event_key"),
"_dbz_event_key",
dbz_event_key_array
)
enriched_table = enriched_table.set_column(
enriched_table.schema.get_field_index("_dbz_event_key_hash"),
"_dbz_event_key_hash",
dbz_event_key_hash_array
)

return enriched_table

def _read_to_arrow_table(self, records, schema=None):
json_lines_buffer = io.BytesIO()
for record in records:
json_lines_buffer.write((record.value() + '\n').encode('utf-8'))
json_lines_buffer.seek(0)

parse_options = None
if schema:
# If an Iceberg schema is provided, convert it to a PyArrow schema and use it for parsing.
parse_options = pa_json.ParseOptions(explicit_schema=schema.as_arrow())
return pa_json.read_json(json_lines_buffer, parse_options=parse_options)

def get_table(self, destination: str) -> "Table":
table_identifier: tuple = self.destination_to_table_identifier(destination=destination)
return self.load_table(table_identifier=table_identifier)

def load_table(self, table_identifier):
try:
return self.catalog.load_table(identifier=table_identifier)
except NoSuchTableError:
return None

def _infer_and_create_table(self, records: List[ChangeEvent], table_identifier: tuple) -> Table:
"""
Infers a schema from a batch of records, creates a new Iceberg table with that schema,
and sets up daily partitioning on the _consumed_at field.
"""
arrow_table = self._read_to_arrow_table(records)
sanitized_fields = self._sanitize_schema_fields(data_schema=arrow_table.schema)

# Add metadata fields to the list of pyarrow fields
sanitized_fields.extend([
pa.field("_consumed_at", pa.timestamp('us', tz='UTC')),
pa.field("_dbz_event_key", pa.string()),
pa.field("_dbz_event_key_hash", pa.string()) # For UUIDType
])

# Create a pyarrow schema first
sanitized_arrow_schema = pa.schema(sanitized_fields)

# Create the table
table = self.catalog.create_table(
identifier=table_identifier,
schema=sanitized_arrow_schema
)
# add partitioning
with table.update_spec() as update_spec:
update_spec.add_field(source_column_name="_consumed_at", transform=DayTransform(),
partition_field_name="_consumed_at_day")

if self.event_flattening_enabled and False:
# @TODO fix future. https://github.com/apache/iceberg-python/issues/1728
identifier_fields = self._get_identifier_fields(sample_event=records[0],
table_identifier=table_identifier
)
if identifier_fields:
with table.update_schema(allow_incompatible_changes=True) as update_schema:
for field in identifier_fields:
update_schema._set_column_requirement(path=field, required=True)
update_schema.set_identifier_fields(*identifier_fields)
self.log.info(f"Created iceberg table {'.'.join(table_identifier)} with daily partitioning on _consumed_at.")
return table

def _sanitize_schema_fields(self, data_schema: pa.Schema) -> list:
"""
Recursively traverses a PyArrow schema and replaces null types with a string fallback.
This is useful when a schema is inferred from JSON where some fields are always null.
"""
new_fields = []
for field in data_schema:
field_type = field.type
if pa.types.is_null(field_type):
# Found a null type, replace it with a string as a fallback.
new_fields.append(field.with_type(pa.string()))
elif pa.types.is_struct(field_type):
# Found a struct, so we recurse on its fields to sanitize them.
# We can treat the struct's fields as a schema for the recursive call.
nested_schema = pa.schema(field_type)
sanitized_nested_schema = self._sanitize_schema_fields(nested_schema)
# Recreate the field with the new, sanitized struct type.
new_fields.append(field.with_type(pa.struct(sanitized_nested_schema)))
else:
# Not a null or struct, so we keep the field as is.
new_fields.append(field)
return new_fields

def _get_identifier_fields(self, sample_event: ChangeEvent, table_identifier: tuple) -> list:
"""
Parses the Debezium event key to extract primary key field names.

This method uses a series of guard clauses to validate the key and returns
an empty list if any validation step fails.

Args:
sample_event: A sample change event to inspect for the key.
table_identifier: The identifier of the table, used for logging.

Returns:
A list of key field names, or an empty list if the key cannot be determined.
"""
key_json_str = sample_event.key()
table_name_str = '.'.join(table_identifier)

if not key_json_str:
self.log.warning(f"Cannot determine identifier fields for {table_name_str}: event key is empty.")
return []

try:
key_data = json.loads(key_json_str)
except json.JSONDecodeError:
self.log.error(f"Failed to parse Debezium event key as JSON for table {table_name_str}: {key_json_str}")
return []

if not isinstance(key_data, dict):
self.log.warning(
f"Event key for {table_name_str} is not a JSON object, cannot infer primary key. Key: {key_json_str}")
return []

key_field_names = list(key_data.keys())
if not key_field_names:
self.log.warning(f"Event key for {table_name_str} is an empty JSON object, cannot infer primary key.")
return []

self.log.info(f"Found potential primary key fields {key_field_names} for table {table_name_str}")
return key_field_names

def _handle_schema_changes(self, table: "Table", arrow_schema: "pa.Schema"):
with table.update_schema() as update:
update.union_by_name(new_schema=arrow_schema)
self.log.info(f"Schema for table {'.'.join(table.name())} has been updated.")
15 changes: 6 additions & 9 deletions pydbzengine/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ def timeout_handler(signum, frame):
class Utils:

@staticmethod
def run_engine_async(engine, timeout_sec=22):
def run_engine_async(engine, timeout_sec=22, blocking=True):
"""
Runs an engine asynchronously with a timeout.

Expand All @@ -37,11 +37,12 @@ def run_engine_async(engine, timeout_sec=22):
signal.alarm(timeout_sec)

try:
thread = threading.Thread(target=engine.run)
thread = threading.Thread(target=engine.run, daemon=True)
thread.start()

# Wait for the thread to complete (or the timeout to occur).
thread.join() # This will block until the thread finishes or the signal is received.
if blocking:
# Wait for the thread to complete (or the timeout to occur).
thread.join() # This will block until the thread finishes or the signal is received.

except TimeoutError:
# Handle the timeout exception.
Expand All @@ -51,8 +52,4 @@ def run_engine_async(engine, timeout_sec=22):
finally:
# **Crucially important:** Cancel the alarm. This prevents the timeout
# from triggering again later if the main thread continues to run.
signal.alarm(0) # 0 means cancel the alarm.

# If the engine.run() finishes within the timeout, this point will be reached.
# No explicit return is needed as the function doesn't return anything.
print("Engine run completed successfully.") # Add a log message to signal success
signal.alarm(0) # 0 means cancel the alarm.
61 changes: 35 additions & 26 deletions tests/base_postgresql_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,35 +11,44 @@ class BasePostgresqlTest(unittest.TestCase):
OFFSET_FILE = CURRENT_DIR.joinpath('postgresql-offsets.dat')
SOURCEPGDB = DbPostgresql()

def debezium_engine_props(self, unwrap_messages=True):
def debezium_engine_props_dict(self, unwrap_messages=True) -> dict:
current_dir = Path(__file__).parent
offset_file_path = current_dir.joinpath('postgresql-offsets.dat')

props = Properties()
props.setProperty("name", "engine")
props.setProperty("snapshot.mode", "always")
props.setProperty("database.hostname", self.SOURCEPGDB.CONTAINER.get_container_host_ip())
props.setProperty("database.port", str(self.SOURCEPGDB.CONTAINER.get_exposed_port(self.SOURCEPGDB.POSTGRES_PORT_DEFAULT)))
props.setProperty("database.user", self.SOURCEPGDB.POSTGRES_USER)
props.setProperty("database.password", self.SOURCEPGDB.POSTGRES_PASSWORD)
props.setProperty("database.dbname", self.SOURCEPGDB.POSTGRES_DBNAME)
props.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
props.setProperty("offset.storage.file.filename", offset_file_path.as_posix())
props.setProperty("poll.interval.ms", "10000")
props.setProperty("converter.schemas.enable", "false")
props.setProperty("offset.flush.interval.ms", "1000")
props.setProperty("topic.prefix", "testc")
props.setProperty("schema.whitelist", "inventory")
props.setProperty("database.whitelist", "inventory")
props.setProperty("table.whitelist", "inventory.products")
props.setProperty("replica.identity.autoset.values", "inventory.*:FULL")
conf: dict = {}
conf.setdefault("name", "engine")
conf.setdefault("snapshot.mode", "always")
conf.setdefault("database.hostname", self.SOURCEPGDB.CONTAINER.get_container_host_ip())
conf.setdefault("database.port",
str(self.SOURCEPGDB.CONTAINER.get_exposed_port(self.SOURCEPGDB.POSTGRES_PORT_DEFAULT)))
conf.setdefault("database.user", self.SOURCEPGDB.POSTGRES_USER)
conf.setdefault("database.password", self.SOURCEPGDB.POSTGRES_PASSWORD)
conf.setdefault("database.dbname", self.SOURCEPGDB.POSTGRES_DBNAME)
conf.setdefault("connector.class", "io.debezium.connector.postgresql.PostgresConnector")
conf.setdefault("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
conf.setdefault("offset.storage.file.filename", offset_file_path.as_posix())
conf.setdefault("poll.interval.ms", "10000")
conf.setdefault("converter.schemas.enable", "false")
conf.setdefault("offset.flush.interval.ms", "1000")
conf.setdefault("topic.prefix", "testc")
conf.setdefault("schema.whitelist", "inventory")
conf.setdefault("database.whitelist", "inventory")
conf.setdefault("table.whitelist", "inventory.products")
conf.setdefault("replica.identity.autoset.values", "inventory.*:FULL")

if unwrap_messages:
props.setProperty("transforms", "unwrap")
props.setProperty("transforms.unwrap.type", "io.debezium.transforms.ExtractNewRecordState")
props.setProperty("transforms.unwrap.add.fields", "op,table,source.ts_ms,sourcedb,ts_ms")
props.setProperty("transforms.unwrap.delete.handling.mode", "rewrite")
conf.setdefault("transforms", "unwrap")
conf.setdefault("transforms.unwrap.type", "io.debezium.transforms.ExtractNewRecordState")
conf.setdefault("transforms.unwrap.add.fields", "op,table,source.ts_ms,sourcedb,ts_ms")
conf.setdefault("transforms.unwrap.delete.handling.mode", "rewrite")

return conf

def debezium_engine_props(self, unwrap_messages=True):
props = Properties()
conf = self.debezium_engine_props_dict(unwrap_messages=unwrap_messages)
for k, v in conf.items():
props.setProperty(k, v)
return props

def clean_offset_file(self):
Expand All @@ -54,5 +63,5 @@ def tearDown(self):
self.SOURCEPGDB.stop()
self.clean_offset_file()

def execute_on_source_db(self, sql:str):
self.SOURCEPGDB.execute_sql(sql=sql)
def execute_on_source_db(self, sql: str):
self.SOURCEPGDB.execute_sql(sql=sql)
3 changes: 3 additions & 0 deletions tests/catalog_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,6 @@ def list_namespaces(self):
catalog = self.get_catalog()
namespaces = catalog.list_namespaces()
print("Namespaces:", namespaces)

def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
4 changes: 3 additions & 1 deletion tests/db_postgresql.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ def stop(self):
pass

def get_connection(self) -> Connection:
engine = sqlalchemy.create_engine(self.CONTAINER.get_connection_url())
url = self.CONTAINER.get_connection_url()
print(url)
engine = sqlalchemy.create_engine(url)
return engine.connect()

def __exit__(self, exc_type, exc_value, traceback):
Expand Down
1 change: 1 addition & 0 deletions tests/test_iceberg_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ def setUp(self):
def tearDown(self):
self.SOURCEPGDB.stop()
self.S3MiNIO.stop()
self.RESTCATALOG.stop()
self.clean_offset_file()

@unittest.skip
Expand Down
Loading
Loading