From 8a6c2dbc4c9a4af229ff19320505709fc73144d1 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Mon, 21 Jul 2025 21:48:00 +0200 Subject: [PATCH 1/5] Add iceberg v2 consumer for flattened data consuming, schema will be inferred by arrow --- README.md | 7 ++ pydbzengine/handlers/iceberg.py | 201 ++++++++++++++++++++++++++++++++ tests/test_iceberg_handlerv2.py | 124 ++++++++++++++++++++ 3 files changed, 332 insertions(+) create mode 100644 tests/test_iceberg_handlerv2.py diff --git a/README.md b/README.md index 0cdef65..f4e7865 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,13 @@ The Iceberg handlers are designed to stream CDC events directly into Apache Iceb * **Automatic Table Creation & Partitioning**: It automatically creates a new Iceberg table for each source table and partitions it by day on the `_consumed_at` timestamp for efficient time-series queries. * **Enriched Metadata**: It also adds `_consumed_at`, `_dbz_event_key`, and `_dbz_event_key_hash` columns for enhanced traceability. +* `IcebergChangeHandlerV2`: A more advanced handler that automatically infers the schema from the Debezium events and creates a well-structured Iceberg table accordingly. + * **Use Case**: Ideal for scenarios where you want the pipeline to automatically create tables with native data types that mirror the source. This allows for direct querying of the data without needing to parse JSON. + * **Schema and Features**: + * **Automatic Schema Inference**: It inspects the first batch of records for a given table and infers the schema using PyArrow, preserving native data types (e.g., `LongType`, `TimestampType`). + * **Robust Type Handling**: If a field's type cannot be inferred from the initial batch (e.g., it is always `null`), it safely falls back to `StringType` to prevent errors. + * **Automatic Table Creation & Partitioning**: It automatically creates a new Iceberg table for each source table and partitions it by day on the `_consumed_at` timestamp for efficient time-series queries. + * **Enriched Metadata**: It also adds `_consumed_at`, `_dbz_event_key`, and `_dbz_event_key_hash` columns for enhanced traceability. ### dlt (data load tool) Handler (`pydbzengine[dlt]`) diff --git a/pydbzengine/handlers/iceberg.py b/pydbzengine/handlers/iceberg.py index 61be864..0bd781e 100644 --- a/pydbzengine/handlers/iceberg.py +++ b/pydbzengine/handlers/iceberg.py @@ -1,4 +1,5 @@ import datetime +import io import json import logging import uuid @@ -6,6 +7,7 @@ from typing import List, Dict import pyarrow as pa +from pyarrow import json as pa_json from pyiceberg.catalog import Catalog from pyiceberg.exceptions import NoSuchTableError from pyiceberg.partitioning import PartitionSpec, PartitionField @@ -200,3 +202,202 @@ def _target_schema(self) -> Schema: ), ) + +class IcebergChangeHandlerV2(BaseIcebergChangeHandler): + """ + A change handler that uses Apache Iceberg to process Debezium change events. + This class receives batches of Debezium ChangeEvent objects and applies the changes + to the corresponding Iceberg tables. + """ + + def __init__(self, catalog: "Catalog", destination_namespace: tuple, supports_variant: bool = False, + event_flattening_enabled=False): + super().__init__(catalog, destination_namespace, supports_variant) + self.event_flattening_enabled = event_flattening_enabled + + def _handle_table_changes(self, destination: str, records: List[ChangeEvent]): + """ + Handles changes for a specific table. + Args: + destination: The name of the table to apply the changes to. + records: A list of ChangeEvent objects for the specified table. + """ + + table = self.get_table(destination) + if table is None: + table_identifier: tuple = self.destination_to_table_identifier(destination=destination) + table = self._infer_and_create_table(records=records, table_identifier=table_identifier) + # + arrow_data = self._read_to_arrow_table(records=records, schema=table.schema()) + + # Populate all metadata columns (_consumed_at, _dbz_event_key, etc.) + enriched_arrow_data = self._enrich_arrow_table_with_metadata( + arrow_table=arrow_data, + records=records + ) + + table.append(enriched_arrow_data) + self.log.info(f"Appended {len(enriched_arrow_data)} records to table {'.'.join(table.name())}") + + def _enrich_arrow_table_with_metadata(self, arrow_table: pa.Table, records: List[ChangeEvent]) -> pa.Table: + num_records = len(arrow_table) + dbz_event_keys = [] + dbz_event_key_hashes = [] + + for record in records: + key = record.key() + dbz_event_keys.append(key) + key_hash = str(uuid.uuid5(uuid.NAMESPACE_DNS, key)) if key else None + dbz_event_key_hashes.append(key_hash) + + # Create PyArrow arrays for each metadata column + consumed_at_array = pa.array([datetime.datetime.now(datetime.timezone.utc)] * num_records, + type=pa.timestamp('us', tz='UTC')) + dbz_event_key_array = pa.array(dbz_event_keys, type=pa.string()) + dbz_event_key_hash_array = pa.array(dbz_event_key_hashes, type=pa.string()) + + # Replace the null columns in the Arrow table with the populated arrays. + # This uses set_column, which is efficient for replacing entire columns. + enriched_table = arrow_table.set_column( + arrow_table.schema.get_field_index("_consumed_at"), + "_consumed_at", + consumed_at_array + ) + enriched_table = enriched_table.set_column( + enriched_table.schema.get_field_index("_dbz_event_key"), + "_dbz_event_key", + dbz_event_key_array + ) + enriched_table = enriched_table.set_column( + enriched_table.schema.get_field_index("_dbz_event_key_hash"), + "_dbz_event_key_hash", + dbz_event_key_hash_array + ) + + return enriched_table + + def _read_to_arrow_table(self, records, schema=None): + json_lines_buffer = io.BytesIO() + for record in records: + json_lines_buffer.write((record.value() + '\n').encode('utf-8')) + json_lines_buffer.seek(0) + + parse_options = None + if schema: + # If an Iceberg schema is provided, convert it to a PyArrow schema and use it for parsing. + parse_options = pa_json.ParseOptions(explicit_schema=schema.as_arrow()) + return pa_json.read_json(json_lines_buffer, parse_options=parse_options) + + def get_table(self, destination: str) -> "Table": + table_identifier: tuple = self.destination_to_table_identifier(destination=destination) + return self.load_table(table_identifier=table_identifier) + + def load_table(self, table_identifier): + try: + return self.catalog.load_table(identifier=table_identifier) + except NoSuchTableError: + return None + + def _infer_and_create_table(self, records: List[ChangeEvent], table_identifier: tuple) -> Table: + """ + Infers a schema from a batch of records, creates a new Iceberg table with that schema, + and sets up daily partitioning on the _consumed_at field. + """ + arrow_table = self._read_to_arrow_table(records) + sanitized_fields = self._sanitize_schema_fields(data_schema=arrow_table.schema) + + # Add metadata fields to the list of pyarrow fields + sanitized_fields.extend([ + pa.field("_consumed_at", pa.timestamp('us', tz='UTC')), + pa.field("_dbz_event_key", pa.string()), + pa.field("_dbz_event_key_hash", pa.string()) # For UUIDType + ]) + + # Create a pyarrow schema first + sanitized_arrow_schema = pa.schema(sanitized_fields) + + # Create the table + table = self.catalog.create_table( + identifier=table_identifier, + schema=sanitized_arrow_schema + ) + # add partitioning + with table.update_spec() as update_spec: + update_spec.add_field(source_column_name="_consumed_at", transform=DayTransform(), + partition_field_name="_consumed_at_day") + + if self.event_flattening_enabled and False: + # @TODO fix future. https://github.com/apache/iceberg-python/issues/1728 + identifier_fields = self._get_identifier_fields(sample_event=records[0], + table_identifier=table_identifier + ) + if identifier_fields: + with table.update_schema(allow_incompatible_changes=True) as update_schema: + for field in identifier_fields: + update_schema._set_column_requirement(path=field, required=True) + update_schema.set_identifier_fields(*identifier_fields) + self.log.info(f"Created iceberg table {'.'.join(table_identifier)} with daily partitioning on _consumed_at.") + return table + + def _sanitize_schema_fields(self, data_schema: pa.Schema) -> list: + """ + Recursively traverses a PyArrow schema and replaces null types with a string fallback. + This is useful when a schema is inferred from JSON where some fields are always null. + """ + new_fields = [] + for field in data_schema: + field_type = field.type + if pa.types.is_null(field_type): + # Found a null type, replace it with a string as a fallback. + new_fields.append(field.with_type(pa.string())) + elif pa.types.is_struct(field_type): + # Found a struct, so we recurse on its fields to sanitize them. + # We can treat the struct's fields as a schema for the recursive call. + nested_schema = pa.schema(field_type) + sanitized_nested_schema = self._sanitize_schema_fields(nested_schema) + # Recreate the field with the new, sanitized struct type. + new_fields.append(field.with_type(pa.struct(sanitized_nested_schema))) + else: + # Not a null or struct, so we keep the field as is. + new_fields.append(field) + return new_fields + + def _get_identifier_fields(self, sample_event: ChangeEvent, table_identifier: tuple) -> list: + """ + Parses the Debezium event key to extract primary key field names. + + This method uses a series of guard clauses to validate the key and returns + an empty list if any validation step fails. + + Args: + sample_event: A sample change event to inspect for the key. + table_identifier: The identifier of the table, used for logging. + + Returns: + A list of key field names, or an empty list if the key cannot be determined. + """ + key_json_str = sample_event.key() + table_name_str = '.'.join(table_identifier) + + if not key_json_str: + self.log.warning(f"Cannot determine identifier fields for {table_name_str}: event key is empty.") + return [] + + try: + key_data = json.loads(key_json_str) + except json.JSONDecodeError: + self.log.error(f"Failed to parse Debezium event key as JSON for table {table_name_str}: {key_json_str}") + return [] + + if not isinstance(key_data, dict): + self.log.warning( + f"Event key for {table_name_str} is not a JSON object, cannot infer primary key. Key: {key_json_str}") + return [] + + key_field_names = list(key_data.keys()) + if not key_field_names: + self.log.warning(f"Event key for {table_name_str} is an empty JSON object, cannot infer primary key.") + return [] + + self.log.info(f"Found potential primary key fields {key_field_names} for table {table_name_str}") + return key_field_names diff --git a/tests/test_iceberg_handlerv2.py b/tests/test_iceberg_handlerv2.py new file mode 100644 index 0000000..618713b --- /dev/null +++ b/tests/test_iceberg_handlerv2.py @@ -0,0 +1,124 @@ +import unittest + +import pandas as pd +from pyiceberg.catalog import load_catalog +from pyiceberg.schema import Schema +from pyiceberg.types import LongType, NestedField, StringType + +from base_postgresql_test import BasePostgresqlTest +from catalog_rest import CatalogRestContainer +from pydbzengine import DebeziumJsonEngine +from pydbzengine.handlers.iceberg import IcebergChangeHandlerV2 +from pydbzengine.helper import Utils +from s3_minio import S3Minio + + +class TestIcebergChangeHandler(BasePostgresqlTest): + S3MiNIO = S3Minio() + RESTCATALOG = CatalogRestContainer() + + def setUp(self): + print("setUp") + self.clean_offset_file() + self.SOURCEPGDB.start() + self.S3MiNIO.start() + self.RESTCATALOG.start(s3_endpoint=self.S3MiNIO.endpoint()) + # Set pandas options to display all rows and columns, and prevent truncation of cell content + pd.set_option('display.max_rows', None) # Show all rows + pd.set_option('display.max_columns', None) # Show all columns + pd.set_option('display.width', None) # Auto-detect terminal width + pd.set_option('display.max_colwidth', None) # Do not truncate cell contents + + def tearDown(self): + self.SOURCEPGDB.stop() + self.S3MiNIO.stop() + self.clean_offset_file() + + @unittest.skip + def test_iceberg_catalog(self): + conf = { + "uri": self.RESTCATALOG.get_uri(), + # "s3.path-style.access": "true", + "warehouse": "warehouse", + "s3.endpoint": self.S3MiNIO.endpoint(), + "s3.access-key-id": S3Minio.AWS_ACCESS_KEY_ID, + "s3.secret-access-key": S3Minio.AWS_SECRET_ACCESS_KEY, + } + print(conf) + catalog = load_catalog( + name="rest", + **conf + ) + catalog.create_namespace('my_warehouse') + debezium_event_schema = Schema( + NestedField(field_id=1, name="id", field_type=LongType(), required=True), + NestedField(field_id=2, name="data", field_type=StringType(), required=False), + ) + table = catalog.create_table(identifier=("my_warehouse", "test_table",), schema=debezium_event_schema) + print(f"Created iceberg table {table.refs()}") + + def test_iceberg_handler(self): + dest_ns1_database="my_warehouse" + dest_ns2_schema="dbz_cdc_data" + conf = { + "uri": self.RESTCATALOG.get_uri(), + # "s3.path-style.access": "true", + "warehouse": "warehouse", + "s3.endpoint": self.S3MiNIO.endpoint(), + "s3.access-key-id": S3Minio.AWS_ACCESS_KEY_ID, + "s3.secret-access-key": S3Minio.AWS_SECRET_ACCESS_KEY, + } + catalog = load_catalog(name="rest",**conf) + + handler = IcebergChangeHandlerV2(catalog=catalog, + destination_namespace=(dest_ns1_database, dest_ns2_schema,), + event_flattening_enabled=True + ) + + dbz_props = self.debezium_engine_props(unwrap_messages=True) + engine = DebeziumJsonEngine(properties=dbz_props, handler=handler) + with self.assertLogs(IcebergChangeHandlerV2.LOGGER_NAME, level='INFO') as cm: + # run async then interrupt after timeout time to test the result! + Utils.run_engine_async(engine=engine, timeout_sec=44) + + # for t in cm.output: + # print(t) + self.assertRegex(text=str(cm.output), expected_regex='.*Created iceberg table.*') + self.assertRegex(text=str(cm.output), expected_regex='.*Appended.*records to table.*') + + # catalog.create_namespace(dest_ns1_database) + namespaces = catalog.list_namespaces() + self.assertIn((dest_ns1_database,) , namespaces, msg="Namespace not found in catalog") + + tables = catalog.list_tables((dest_ns1_database, dest_ns2_schema,)) + print(tables) + self.assertIn(('my_warehouse', 'dbz_cdc_data', 'testc_inventory_customers'), tables, msg="Namespace not found in catalog") + + tbl = catalog.load_table(identifier=('my_warehouse', 'dbz_cdc_data', 'testc_inventory_customers')) + data = tbl.scan().to_arrow() + self.assertIn("sally.thomas@acme.com", str(data)) + self.assertIn("annek@noanswer.org", str(data)) + self.assertEqual(data.num_rows, 4) + self.pprint_table(data=data) + #================================================================= + ## ==== PART 2 CONSUME CHANGES FROM BINLOG ======================= + #================================================================= + self.execute_on_source_db("UPDATE inventory.customers SET first_name='George__UPDATE1' WHERE ID = 1002 ;") + # self.execute_on_source_db("ALTER TABLE inventory.customers DROP COLUMN email;") + self.execute_on_source_db("UPDATE inventory.customers SET first_name='George__UPDATE2' WHERE ID = 1002 ;") + self.execute_on_source_db("DELETE FROM inventory.orders WHERE purchaser = 1002 ;") + self.execute_on_source_db("DELETE FROM inventory.customers WHERE id = 1002 ;") + self.execute_on_source_db("ALTER TABLE inventory.customers ADD birth_date date;") + self.execute_on_source_db("UPDATE inventory.customers SET birth_date = '2020-01-01' WHERE id = 1001 ;") + # run + Utils.run_engine_async(engine=engine, timeout_sec=44) + # test + # @TODO test that new field is received and added to iceberg! + data = tbl.scan().to_arrow() + self.pprint_table(data=data) + self.assertEqual(data.num_rows, 4) + + def pprint_table(self, data): + print("--- Iceberg Table Content ---") + print(data.to_pandas()) + print("---------------------------\n") From 0a881ff6c0ec913649a11fe4e130f57b9eff8cda Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Tue, 22 Jul 2025 22:03:03 +0200 Subject: [PATCH 2/5] Add iceberg v2 consumer for flattened data consuming, schema will be inferred by arrow --- pydbzengine/handlers/iceberg.py | 9 +- pydbzengine/helper.py | 9 +- tests/base_postgresql_test.py | 61 ++++++++------ tests/catalog_rest.py | 3 + tests/db_postgresql.py | 4 +- tests/test_iceberg_handler.py | 1 + tests/test_iceberg_handlerv2.py | 143 ++++++++++++++++---------------- 7 files changed, 128 insertions(+), 102 deletions(-) diff --git a/pydbzengine/handlers/iceberg.py b/pydbzengine/handlers/iceberg.py index 0bd781e..59d7ce4 100644 --- a/pydbzengine/handlers/iceberg.py +++ b/pydbzengine/handlers/iceberg.py @@ -143,7 +143,8 @@ def load_table(self, table_identifier): table = self.catalog.create_table(identifier=table_identifier, schema=self._target_schema, partition_spec=self.DEBEZIUM_TABLE_PARTITION_SPEC) - self.log.info(f"Created iceberg table {'.'.join(table_identifier)} with daily partitioning on _consumed_at.") + self.log.info( + f"Created iceberg table {'.'.join(table_identifier)} with daily partitioning on _consumed_at.") return table @property @@ -236,6 +237,7 @@ def _handle_table_changes(self, destination: str, records: List[ChangeEvent]): records=records ) + self._handle_schema_changes(table=table, arrow_schema=enriched_arrow_data.schema) table.append(enriched_arrow_data) self.log.info(f"Appended {len(enriched_arrow_data)} records to table {'.'.join(table.name())}") @@ -401,3 +403,8 @@ def _get_identifier_fields(self, sample_event: ChangeEvent, table_identifier: tu self.log.info(f"Found potential primary key fields {key_field_names} for table {table_name_str}") return key_field_names + + def _handle_schema_changes(self, table: "Table", arrow_schema: "pa.Schema"): + with table.update_schema() as update: + update.union_by_name(new_schema=arrow_schema) + self.log.info(f"Schema for table {'.'.join(table.name())} has been updated.") diff --git a/pydbzengine/helper.py b/pydbzengine/helper.py index 40fc218..55cb89e 100644 --- a/pydbzengine/helper.py +++ b/pydbzengine/helper.py @@ -20,7 +20,7 @@ def timeout_handler(signum, frame): class Utils: @staticmethod - def run_engine_async(engine, timeout_sec=22): + def run_engine_async(engine, timeout_sec=22, blocking=True): """ Runs an engine asynchronously with a timeout. @@ -37,11 +37,12 @@ def run_engine_async(engine, timeout_sec=22): signal.alarm(timeout_sec) try: - thread = threading.Thread(target=engine.run) + thread = threading.Thread(target=engine.run, daemon=True) thread.start() - # Wait for the thread to complete (or the timeout to occur). - thread.join() # This will block until the thread finishes or the signal is received. + if blocking: + # Wait for the thread to complete (or the timeout to occur). + thread.join() # This will block until the thread finishes or the signal is received. except TimeoutError: # Handle the timeout exception. diff --git a/tests/base_postgresql_test.py b/tests/base_postgresql_test.py index 791a7da..7622f07 100644 --- a/tests/base_postgresql_test.py +++ b/tests/base_postgresql_test.py @@ -11,35 +11,44 @@ class BasePostgresqlTest(unittest.TestCase): OFFSET_FILE = CURRENT_DIR.joinpath('postgresql-offsets.dat') SOURCEPGDB = DbPostgresql() - def debezium_engine_props(self, unwrap_messages=True): + def debezium_engine_props_dict(self, unwrap_messages=True) -> dict: current_dir = Path(__file__).parent offset_file_path = current_dir.joinpath('postgresql-offsets.dat') - props = Properties() - props.setProperty("name", "engine") - props.setProperty("snapshot.mode", "always") - props.setProperty("database.hostname", self.SOURCEPGDB.CONTAINER.get_container_host_ip()) - props.setProperty("database.port", str(self.SOURCEPGDB.CONTAINER.get_exposed_port(self.SOURCEPGDB.POSTGRES_PORT_DEFAULT))) - props.setProperty("database.user", self.SOURCEPGDB.POSTGRES_USER) - props.setProperty("database.password", self.SOURCEPGDB.POSTGRES_PASSWORD) - props.setProperty("database.dbname", self.SOURCEPGDB.POSTGRES_DBNAME) - props.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector") - props.setProperty("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore") - props.setProperty("offset.storage.file.filename", offset_file_path.as_posix()) - props.setProperty("poll.interval.ms", "10000") - props.setProperty("converter.schemas.enable", "false") - props.setProperty("offset.flush.interval.ms", "1000") - props.setProperty("topic.prefix", "testc") - props.setProperty("schema.whitelist", "inventory") - props.setProperty("database.whitelist", "inventory") - props.setProperty("table.whitelist", "inventory.products") - props.setProperty("replica.identity.autoset.values", "inventory.*:FULL") + conf: dict = {} + conf.setdefault("name", "engine") + conf.setdefault("snapshot.mode", "always") + conf.setdefault("database.hostname", self.SOURCEPGDB.CONTAINER.get_container_host_ip()) + conf.setdefault("database.port", + str(self.SOURCEPGDB.CONTAINER.get_exposed_port(self.SOURCEPGDB.POSTGRES_PORT_DEFAULT))) + conf.setdefault("database.user", self.SOURCEPGDB.POSTGRES_USER) + conf.setdefault("database.password", self.SOURCEPGDB.POSTGRES_PASSWORD) + conf.setdefault("database.dbname", self.SOURCEPGDB.POSTGRES_DBNAME) + conf.setdefault("connector.class", "io.debezium.connector.postgresql.PostgresConnector") + conf.setdefault("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore") + conf.setdefault("offset.storage.file.filename", offset_file_path.as_posix()) + conf.setdefault("poll.interval.ms", "10000") + conf.setdefault("converter.schemas.enable", "false") + conf.setdefault("offset.flush.interval.ms", "1000") + conf.setdefault("topic.prefix", "testc") + conf.setdefault("schema.whitelist", "inventory") + conf.setdefault("database.whitelist", "inventory") + conf.setdefault("table.whitelist", "inventory.products") + conf.setdefault("replica.identity.autoset.values", "inventory.*:FULL") if unwrap_messages: - props.setProperty("transforms", "unwrap") - props.setProperty("transforms.unwrap.type", "io.debezium.transforms.ExtractNewRecordState") - props.setProperty("transforms.unwrap.add.fields", "op,table,source.ts_ms,sourcedb,ts_ms") - props.setProperty("transforms.unwrap.delete.handling.mode", "rewrite") + conf.setdefault("transforms", "unwrap") + conf.setdefault("transforms.unwrap.type", "io.debezium.transforms.ExtractNewRecordState") + conf.setdefault("transforms.unwrap.add.fields", "op,table,source.ts_ms,sourcedb,ts_ms") + conf.setdefault("transforms.unwrap.delete.handling.mode", "rewrite") + + return conf + + def debezium_engine_props(self, unwrap_messages=True): + props = Properties() + conf = self.debezium_engine_props_dict(unwrap_messages=unwrap_messages) + for k, v in conf.items(): + props.setProperty(k, v) return props def clean_offset_file(self): @@ -54,5 +63,5 @@ def tearDown(self): self.SOURCEPGDB.stop() self.clean_offset_file() - def execute_on_source_db(self, sql:str): - self.SOURCEPGDB.execute_sql(sql=sql) \ No newline at end of file + def execute_on_source_db(self, sql: str): + self.SOURCEPGDB.execute_sql(sql=sql) diff --git a/tests/catalog_rest.py b/tests/catalog_rest.py index 1289e6a..4bf7a76 100644 --- a/tests/catalog_rest.py +++ b/tests/catalog_rest.py @@ -46,3 +46,6 @@ def list_namespaces(self): catalog = self.get_catalog() namespaces = catalog.list_namespaces() print("Namespaces:", namespaces) + + def __exit__(self, exc_type, exc_val, exc_tb): + self.stop() diff --git a/tests/db_postgresql.py b/tests/db_postgresql.py index d617be6..f1cc177 100644 --- a/tests/db_postgresql.py +++ b/tests/db_postgresql.py @@ -39,7 +39,9 @@ def stop(self): pass def get_connection(self) -> Connection: - engine = sqlalchemy.create_engine(self.CONTAINER.get_connection_url()) + url = self.CONTAINER.get_connection_url() + print(url) + engine = sqlalchemy.create_engine(url) return engine.connect() def __exit__(self, exc_type, exc_value, traceback): diff --git a/tests/test_iceberg_handler.py b/tests/test_iceberg_handler.py index 67f659d..f9700ae 100644 --- a/tests/test_iceberg_handler.py +++ b/tests/test_iceberg_handler.py @@ -26,6 +26,7 @@ def setUp(self): def tearDown(self): self.SOURCEPGDB.stop() self.S3MiNIO.stop() + self.RESTCATALOG.stop() self.clean_offset_file() @unittest.skip diff --git a/tests/test_iceberg_handlerv2.py b/tests/test_iceberg_handlerv2.py index 618713b..31346c9 100644 --- a/tests/test_iceberg_handlerv2.py +++ b/tests/test_iceberg_handlerv2.py @@ -1,9 +1,12 @@ -import unittest +import io +import threading +import time import pandas as pd +import pyarrow as pa +import pyarrow.json as pj +import waiting from pyiceberg.catalog import load_catalog -from pyiceberg.schema import Schema -from pyiceberg.types import LongType, NestedField, StringType from base_postgresql_test import BasePostgresqlTest from catalog_rest import CatalogRestContainer @@ -24,99 +27,99 @@ def setUp(self): self.S3MiNIO.start() self.RESTCATALOG.start(s3_endpoint=self.S3MiNIO.endpoint()) # Set pandas options to display all rows and columns, and prevent truncation of cell content - pd.set_option('display.max_rows', None) # Show all rows - pd.set_option('display.max_columns', None) # Show all columns - pd.set_option('display.width', None) # Auto-detect terminal width - pd.set_option('display.max_colwidth', None) # Do not truncate cell contents + pd.set_option('display.max_rows', None) # Show all rows + pd.set_option('display.max_columns', None) # Show all columns + pd.set_option('display.width', None) # Auto-detect terminal width + pd.set_option('display.max_colwidth', None) # Do not truncate cell contents def tearDown(self): self.SOURCEPGDB.stop() self.S3MiNIO.stop() + self.RESTCATALOG.stop() self.clean_offset_file() - @unittest.skip - def test_iceberg_catalog(self): - conf = { - "uri": self.RESTCATALOG.get_uri(), - # "s3.path-style.access": "true", - "warehouse": "warehouse", - "s3.endpoint": self.S3MiNIO.endpoint(), - "s3.access-key-id": S3Minio.AWS_ACCESS_KEY_ID, - "s3.secret-access-key": S3Minio.AWS_SECRET_ACCESS_KEY, - } - print(conf) - catalog = load_catalog( - name="rest", - **conf - ) - catalog.create_namespace('my_warehouse') - debezium_event_schema = Schema( - NestedField(field_id=1, name="id", field_type=LongType(), required=True), - NestedField(field_id=2, name="data", field_type=StringType(), required=False), - ) - table = catalog.create_table(identifier=("my_warehouse", "test_table",), schema=debezium_event_schema) - print(f"Created iceberg table {table.refs()}") + def test_read_json_lines_example(self): + json_data = """ +{"id": 1, "name": "Alice", "age": 30} +{"id": 2, "name": "Bob", "age": 24} +{"id": 3, "name": "Charlie", "age": 35} + """.strip() # .strip() removes leading/trailing whitespace/newlines + json_buffer = io.BytesIO(json_data.encode('utf-8')) + json_buffer.seek(0) + # ============================= + table_inferred = pj.read_json(json_buffer) + print("\nInferred Schema:") + print(table_inferred.schema) + # ============================= + explicit_schema = pa.schema([ + pa.field('id', pa.int64()), # Integer type for 'id' + pa.field('name', pa.string()), # String type for 'name' + ]) + json_buffer.seek(0) + po = pj.ParseOptions(explicit_schema=explicit_schema) + table_explicit = pj.read_json(json_buffer, parse_options=po) + print("\nExplicit Schema:") + print(table_explicit.schema) + + def _apply_source_db_changes(self): + time.sleep(12) + self.execute_on_source_db("UPDATE inventory.customers SET first_name='George__UPDATE1' WHERE ID = 1002 ;") + # self.execute_on_source_db("ALTER TABLE inventory.customers DROP COLUMN email;") + self.execute_on_source_db("UPDATE inventory.customers SET first_name='George__UPDATE2' WHERE ID = 1002 ;") + self.execute_on_source_db("DELETE FROM inventory.orders WHERE purchaser = 1002 ;") + self.execute_on_source_db("DELETE FROM inventory.customers WHERE id = 1002 ;") + self.execute_on_source_db("ALTER TABLE inventory.customers ADD birth_date date;") + self.execute_on_source_db("UPDATE inventory.customers SET birth_date = '2020-01-01' WHERE id = 1001 ;") def test_iceberg_handler(self): - dest_ns1_database="my_warehouse" - dest_ns2_schema="dbz_cdc_data" - conf = { + dest_ns1_database = "my_warehouse" + dest_ns2_schema = "dbz_cdc_data" + catalog_conf = { "uri": self.RESTCATALOG.get_uri(), - # "s3.path-style.access": "true", "warehouse": "warehouse", "s3.endpoint": self.S3MiNIO.endpoint(), "s3.access-key-id": S3Minio.AWS_ACCESS_KEY_ID, "s3.secret-access-key": S3Minio.AWS_SECRET_ACCESS_KEY, } - catalog = load_catalog(name="rest",**conf) - + catalog = load_catalog(name="rest", **catalog_conf) handler = IcebergChangeHandlerV2(catalog=catalog, destination_namespace=(dest_ns1_database, dest_ns2_schema,), event_flattening_enabled=True ) - dbz_props = self.debezium_engine_props(unwrap_messages=True) engine = DebeziumJsonEngine(properties=dbz_props, handler=handler) - with self.assertLogs(IcebergChangeHandlerV2.LOGGER_NAME, level='INFO') as cm: - # run async then interrupt after timeout time to test the result! - Utils.run_engine_async(engine=engine, timeout_sec=44) - # for t in cm.output: - # print(t) - self.assertRegex(text=str(cm.output), expected_regex='.*Created iceberg table.*') - self.assertRegex(text=str(cm.output), expected_regex='.*Appended.*records to table.*') + t = threading.Thread(target=self._apply_source_db_changes) + t.start() + Utils.run_engine_async(engine=engine, timeout_sec=77, blocking=False) - # catalog.create_namespace(dest_ns1_database) - namespaces = catalog.list_namespaces() - self.assertIn((dest_ns1_database,) , namespaces, msg="Namespace not found in catalog") + test_ns = (dest_ns1_database,) + print(catalog.list_namespaces()) + waiting.wait(predicate=lambda: test_ns in catalog.list_namespaces(), timeout_seconds=7.5) - tables = catalog.list_tables((dest_ns1_database, dest_ns2_schema,)) - print(tables) - self.assertIn(('my_warehouse', 'dbz_cdc_data', 'testc_inventory_customers'), tables, msg="Namespace not found in catalog") + test_tbl = ('my_warehouse', 'dbz_cdc_data', 'testc_inventory_customers') + test_tbl_ns = (dest_ns1_database, dest_ns2_schema,) + waiting.wait(predicate=lambda: test_tbl in catalog.list_tables(test_tbl_ns), timeout_seconds=10.5) - tbl = catalog.load_table(identifier=('my_warehouse', 'dbz_cdc_data', 'testc_inventory_customers')) - data = tbl.scan().to_arrow() - self.assertIn("sally.thomas@acme.com", str(data)) - self.assertIn("annek@noanswer.org", str(data)) - self.assertEqual(data.num_rows, 4) + test_tbl_data = ('my_warehouse', 'dbz_cdc_data', 'testc_inventory_customers') + waiting.wait(predicate=lambda: "sally.thomas@acme.com" in str(self.red_table(catalog, test_tbl_data)), + timeout_seconds=10.5) + waiting.wait(predicate=lambda: self.red_table(catalog, test_tbl_data).num_rows >= 4, timeout_seconds=10.5) + + data = self.red_table(catalog, test_tbl_data) self.pprint_table(data=data) - #================================================================= - ## ==== PART 2 CONSUME CHANGES FROM BINLOG ======================= - #================================================================= - self.execute_on_source_db("UPDATE inventory.customers SET first_name='George__UPDATE1' WHERE ID = 1002 ;") - # self.execute_on_source_db("ALTER TABLE inventory.customers DROP COLUMN email;") - self.execute_on_source_db("UPDATE inventory.customers SET first_name='George__UPDATE2' WHERE ID = 1002 ;") - self.execute_on_source_db("DELETE FROM inventory.orders WHERE purchaser = 1002 ;") - self.execute_on_source_db("DELETE FROM inventory.customers WHERE id = 1002 ;") - self.execute_on_source_db("ALTER TABLE inventory.customers ADD birth_date date;") - self.execute_on_source_db("UPDATE inventory.customers SET birth_date = '2020-01-01' WHERE id = 1001 ;") - # run - Utils.run_engine_async(engine=engine, timeout_sec=44) - # test - # @TODO test that new field is received and added to iceberg! - data = tbl.scan().to_arrow() + # ================================================================= + ## ==== PART 2 CONSUME CHANGES FROM BINLOG ======================== + # ================================================================= + waiting.wait(predicate=lambda: self.red_table(catalog, test_tbl_data).num_rows >= 7, timeout_seconds=77) + data = self.red_table(catalog, test_tbl_data) self.pprint_table(data=data) - self.assertEqual(data.num_rows, 4) + + def red_table(self, catalog, table_identifier) -> "pa.Table": + tbl = catalog.load_table(identifier=table_identifier) + data = tbl.scan().to_arrow() + self.pprint_table(data) + return data def pprint_table(self, data): print("--- Iceberg Table Content ---") From 8a44ec6b9513b2f626d98813bbc486651836cad0 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sat, 26 Jul 2025 19:25:53 +0200 Subject: [PATCH 3/5] document --- pydbzengine/helper.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/pydbzengine/helper.py b/pydbzengine/helper.py index 55cb89e..cf47ddf 100644 --- a/pydbzengine/helper.py +++ b/pydbzengine/helper.py @@ -52,8 +52,4 @@ def run_engine_async(engine, timeout_sec=22, blocking=True): finally: # **Crucially important:** Cancel the alarm. This prevents the timeout # from triggering again later if the main thread continues to run. - signal.alarm(0) # 0 means cancel the alarm. - - # If the engine.run() finishes within the timeout, this point will be reached. - # No explicit return is needed as the function doesn't return anything. - print("Engine run completed successfully.") # Add a log message to signal success \ No newline at end of file + signal.alarm(0) # 0 means cancel the alarm. \ No newline at end of file From 620c47772c433c1bdc56b191a99331af02519188 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sat, 26 Jul 2025 19:54:33 +0200 Subject: [PATCH 4/5] document --- tests/test_iceberg_handlerv2.py | 45 ++++++++++++++++++++++++++------- 1 file changed, 36 insertions(+), 9 deletions(-) diff --git a/tests/test_iceberg_handlerv2.py b/tests/test_iceberg_handlerv2.py index 31346c9..78aac28 100644 --- a/tests/test_iceberg_handlerv2.py +++ b/tests/test_iceberg_handlerv2.py @@ -5,7 +5,6 @@ import pandas as pd import pyarrow as pa import pyarrow.json as pj -import waiting from pyiceberg.catalog import load_catalog from base_postgresql_test import BasePostgresqlTest @@ -89,29 +88,42 @@ def test_iceberg_handler(self): dbz_props = self.debezium_engine_props(unwrap_messages=True) engine = DebeziumJsonEngine(properties=dbz_props, handler=handler) - t = threading.Thread(target=self._apply_source_db_changes) - t.start() Utils.run_engine_async(engine=engine, timeout_sec=77, blocking=False) test_ns = (dest_ns1_database,) print(catalog.list_namespaces()) - waiting.wait(predicate=lambda: test_ns in catalog.list_namespaces(), timeout_seconds=7.5) + self._wait_for_condition( + predicate=lambda: test_ns in catalog.list_namespaces(), + failure_message=f"Namespace {test_ns} did not appear in the catalog" + ) test_tbl = ('my_warehouse', 'dbz_cdc_data', 'testc_inventory_customers') test_tbl_ns = (dest_ns1_database, dest_ns2_schema,) - waiting.wait(predicate=lambda: test_tbl in catalog.list_tables(test_tbl_ns), timeout_seconds=10.5) + self._wait_for_condition( + predicate=lambda: test_tbl in catalog.list_tables(test_tbl_ns), + failure_message=f"Table {test_tbl} did not appear in the tables" + ) test_tbl_data = ('my_warehouse', 'dbz_cdc_data', 'testc_inventory_customers') - waiting.wait(predicate=lambda: "sally.thomas@acme.com" in str(self.red_table(catalog, test_tbl_data)), - timeout_seconds=10.5) - waiting.wait(predicate=lambda: self.red_table(catalog, test_tbl_data).num_rows >= 4, timeout_seconds=10.5) + self._wait_for_condition( + predicate=lambda: "sally.thomas@acme.com" in str(self.red_table(catalog, test_tbl_data)), + failure_message=f"Expected row not consumed!" + ) + self._wait_for_condition( + predicate=lambda: self.red_table(catalog, test_tbl_data).num_rows >= 4, + failure_message=f"Rows {4} did not consumed" + ) data = self.red_table(catalog, test_tbl_data) self.pprint_table(data=data) # ================================================================= ## ==== PART 2 CONSUME CHANGES FROM BINLOG ======================== # ================================================================= - waiting.wait(predicate=lambda: self.red_table(catalog, test_tbl_data).num_rows >= 7, timeout_seconds=77) + self._apply_source_db_changes() + self._wait_for_condition( + predicate=lambda: self.red_table(catalog, test_tbl_data).num_rows >= 7, + failure_message=f"Rows {7} did not consumed" + ) data = self.red_table(catalog, test_tbl_data) self.pprint_table(data=data) @@ -125,3 +137,18 @@ def pprint_table(self, data): print("--- Iceberg Table Content ---") print(data.to_pandas()) print("---------------------------\n") + + def _wait_for_condition(self, predicate, failure_message: str, retries: int = 10, delay_seconds: int = 2): + attempts = 0 + while attempts < retries: + print(f"Attempt {attempts + 1}/{retries}: Checking condition...") + if predicate(): + print("Condition met.") + return + + attempts += 1 + # Avoid sleeping after the last attempt + if attempts < retries: + time.sleep(delay_seconds) + + raise TimeoutError(f"{failure_message} after {retries} attempts ({retries * delay_seconds} seconds).") From 99995063617a3c2260e9470b90d1341c282691c6 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Sat, 26 Jul 2025 20:01:26 +0200 Subject: [PATCH 5/5] document --- tests/test_iceberg_handlerv2.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/test_iceberg_handlerv2.py b/tests/test_iceberg_handlerv2.py index 78aac28..801e630 100644 --- a/tests/test_iceberg_handlerv2.py +++ b/tests/test_iceberg_handlerv2.py @@ -80,8 +80,8 @@ def test_iceberg_handler(self): "s3.access-key-id": S3Minio.AWS_ACCESS_KEY_ID, "s3.secret-access-key": S3Minio.AWS_SECRET_ACCESS_KEY, } - catalog = load_catalog(name="rest", **catalog_conf) - handler = IcebergChangeHandlerV2(catalog=catalog, + dbz_catalog = load_catalog(name="rest", **catalog_conf) + handler = IcebergChangeHandlerV2(catalog=dbz_catalog, destination_namespace=(dest_ns1_database, dest_ns2_schema,), event_flattening_enabled=True ) @@ -90,6 +90,8 @@ def test_iceberg_handler(self): Utils.run_engine_async(engine=engine, timeout_sec=77, blocking=False) + # TEST + catalog = load_catalog(name="rest", **catalog_conf) test_ns = (dest_ns1_database,) print(catalog.list_namespaces()) self._wait_for_condition(