diff --git a/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala b/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala index 9bc8d8044..ed9e2c3f7 100644 --- a/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala +++ b/client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala @@ -183,6 +183,8 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest { Table(name = "deletion_vectors_with_dvs_dv_property_on", schema = "default", share = "share8"), Table(name = "dv_and_cm_table", schema = "default", share = "share8"), Table(name = "timestampntz_cdf_table", schema = "default", share = "share8") + Table(name = "add_columns_non_partitioned_cdf", schema = "default", share = "share8"), + Table(name = "add_columns_partitioned_cdf", schema = "default", share = "share8") ) assert(expected == client.listAllTables().toSet) } finally { diff --git a/python/delta_sharing/reader.py b/python/delta_sharing/reader.py index fd3430bae..309b6ff9f 100644 --- a/python/delta_sharing/reader.py +++ b/python/delta_sharing/reader.py @@ -382,17 +382,24 @@ def table_changes_to_pandas(self, cdfOptions: CdfOptions) -> pd.DataFrame: response = self._rest_client.list_table_changes(self._table, cdfOptions) schema_json = loads(response.metadata.schema_string) + converters = to_converters(schema_json) + schema_with_cdf = self._add_special_cdf_schema(schema_json) if len(response.actions) == 0: - return get_empty_table(self._add_special_cdf_schema(schema_json)) + return get_empty_table(schema_with_cdf) - converters = to_converters(schema_json) pdfs = [] for action in response.actions: pdf = DeltaSharingReader._to_pandas(action, converters, True, None) pdfs.append(pdf) - return pd.concat(pdfs, axis=0, ignore_index=True, copy=False) + merged = pd.concat(pdfs, axis=0, ignore_index=True, copy=False) + + col_map = {} + for col in merged.columns: + col_map[col.lower()] = col + + return merged[[col_map[field["name"].lower()] for field in schema_with_cdf["fields"]]] def _copy( self, diff --git a/python/delta_sharing/tests/test_delta_sharing.py b/python/delta_sharing/tests/test_delta_sharing.py index 2f6ba708d..91c841989 100644 --- a/python/delta_sharing/tests/test_delta_sharing.py +++ b/python/delta_sharing/tests/test_delta_sharing.py @@ -96,6 +96,8 @@ def _verify_all_tables_result(tables: Sequence[Table]): Table(name="table_wasb", share="share_azure", schema="default"), Table(name="table_abfs", share="share_azure", schema="default"), Table(name="table_gcs", share="share_gcp", schema="default"), + Table(name="add_columns_partitioned_cdf", share="share8", schema="default"), + Table(name="add_columns_non_partitioned_cdf", share="share8", schema="default"), Table(name="timestampntz_cdf_table", share="share8", schema="default"), Table(name="cdf_table_cdf_enabled", share="share8", schema="default"), Table(name="cdf_table_with_partition", share="share8", schema="default"), @@ -1687,3 +1689,436 @@ def test_load_table_changes_as_spark( match="Unable to import pyspark. `load_table_changes_as_spark` requires" + " PySpark.", ): load_table_changes_as_spark("not-used") + + +@pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE) +@pytest.mark.parametrize( + "fragments,version,expected", + [ + pytest.param( + "share8.default.add_columns_non_partitioned_cdf", + 3, + # table initially contains c1 = [1, 2] + # then we add int column c2 + # then we insert a row with c1 = 3, c2 = 4 + pd.DataFrame( + { + "c1": pd.Series([1, 2, 3], dtype="int32"), + "c2": pd.Series([None, None, 4], dtype="object"), + } + ), + id="version 3", + ), + pytest.param( + "share8.default.add_columns_non_partitioned_cdf", + 4, + # change c1 to 5 where c1 = 1 + pd.DataFrame({"c1": pd.Series([1, 2, 3], dtype="int32"), "c2": [5.0, None, 4.0]}), + id="version 4", + ), + pytest.param( + "share8.default.add_columns_non_partitioned_cdf", + 5, + # add float columns c3 and c4 + pd.DataFrame( + { + "c1": pd.Series([1, 2, 3], dtype="int32"), + "c2": [5.0, None, 4.0], + "c3": [None, None, None], + "c4": [None, None, None], + } + ), + id="version 5", + ), + pytest.param( + "share8.default.add_columns_non_partitioned_cdf", + 6, + # add rows, some with non-null values for c3 and c4 + pd.DataFrame( + { + "c1": pd.Series([1, 2, 3, 5, 6, 7], dtype="int32"), + "c2": [5, None, 4, 6, None, 8], + "c3": pd.Series([None, None, None, 0.1, 0.2, None], dtype="float32"), + "c4": pd.Series([None, None, None, 0.1, 0.2, None], dtype="float32"), + } + ), + id="version 6", + ), + pytest.param( + "share8.default.add_columns_non_partitioned_cdf", + 7, + # delete rows where c1 = 2 or c1 = 6 + pd.DataFrame( + { + "c1": pd.Series([1, 3, 5, 7], dtype="int32"), + "c2": pd.Series([5, 4, 6, 8], dtype="int32"), + "c3": pd.Series([None, None, 0.1, None], dtype="float32"), + "c4": pd.Series([None, None, 0.1, None], dtype="float32"), + } + ), + id="version 7", + ), + pytest.param( + "share8.default.add_columns_non_partitioned_cdf", + 8, + # update c3 to 0.0 where c1 = 3 or c2 = 8 + pd.DataFrame( + { + "c1": pd.Series([1, 3, 5, 7], dtype="int32"), + "c2": pd.Series([5, 4, 6, 8], dtype="int32"), + "c3": pd.Series([None, 0, 0.1, 0], dtype="float32"), + "c4": pd.Series([None, None, 0.1, None], dtype="float32"), + } + ), + id="version 8", + ), + pytest.param( + "share8.default.add_columns_non_partitioned_cdf", + 9, + # set c4 to NULL where c1 < 7 + pd.DataFrame( + { + "c1": pd.Series([1, 3, 5, 7], dtype="int32"), + "c2": pd.Series([5, 4, 6, 8], dtype="int32"), + "c3": pd.Series([None, 0, 0.1, 0], dtype="float32"), + "c4": pd.Series([None, None, None, None], dtype="float32"), + } + ), + id="version 9", + ), + pytest.param( + "share8.default.add_columns_non_partitioned_cdf", + None, + pd.DataFrame( + { + "c1": pd.Series([1, 3, 5, 7], dtype="int32"), + "c2": pd.Series([5, 4, 6, 8], dtype="int32"), + "c3": pd.Series([None, 0, 0.1, 0], dtype="float32"), + "c4": pd.Series([None, None, None, None], dtype="float32"), + } + ), + id="latest", + ), + ], +) +def test_add_column_non_partitioned( + profile_path: str, + fragments: str, + version: Optional[int], + expected: pd.DataFrame, +): + pdf = load_as_pandas(f"{profile_path}#{fragments}", version=version, use_delta_format=False) + # sort to eliminate row order inconsistencies + pdf = pdf.sort_values(by="c1").reset_index(drop=True) + pd.testing.assert_frame_equal(pdf, expected) + # TODO: enable once delta format + version in OSS server is fixed + # pdf_delta = load_as_pandas(f"{profile_path}#{fragments}", version, use_delta_format=True) + # pdf_delta = pdf_delta.sort_values(by="c1").reset_index(drop=True) + # pd.testing.assert_frame_equal(pdf_delta, expected) + + +@pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE) +@pytest.mark.parametrize( + "fragments,version,expected", + [ + pytest.param( + "share8.default.add_columns_partitioned_cdf", + 1, + # initial data + pd.DataFrame( + { + "p1": pd.Series([0.1, 0.1, 0.1, 0.1, None], dtype="float32"), + "p2": ["a", "a", "a", "b", "a"], + "c1": [0, 1, 2, None, 0], + } + ), + id="version 1", + ), + pytest.param( + "share8.default.add_columns_partitioned_cdf", + 2, + # add some rows containing NULL partition values + pd.DataFrame( + { + "p1": pd.Series([0.1, 0.1, 0.1, 0.1, None, None, None], dtype="float32"), + "p2": ["a", "a", "a", "b", "a", None, None], + "c1": [0, 1, 2, None, 0, 0, None], + } + ), + id="version 2", + ), + pytest.param( + "share8.default.add_columns_partitioned_cdf", + 3, + # add int columns c2 and c3 + pd.DataFrame( + { + "p1": pd.Series([0.1, 0.1, 0.1, 0.1, None, None, None], dtype="float32"), + "p2": ["a", "a", "a", "b", "a", None, None], + "c1": [0, 1, 2, None, 0, 0, None], + "c2": [None, None, None, None, None, None, None], + "c3": [None, None, None, None, None, None, None], + } + ), + id="version 3", + ), + pytest.param( + "share8.default.add_columns_partitioned_cdf", + 4, + # set c2 = 10 where c1 = 0 + pd.DataFrame( + { + "p1": pd.Series([0.1, 0.1, 0.1, 0.1, None, None, None], dtype="float32"), + "p2": ["a", "a", "a", "b", "a", None, None], + "c1": [0, 1, 2, None, 0, 0, None], + "c2": [10, None, None, None, 10, 10, None], + "c3": [None, None, None, None, None, None, None], + } + ), + id="version 4", + ), + pytest.param( + "share8.default.add_columns_partitioned_cdf", + 5, + # add more rows, containing values in c2 and c3 + pd.DataFrame( + { + "p1": pd.Series( + [0.1, 0.1, 0.1, 0.1, 0.1, 0.2, 0.3, None, None, None], dtype="float32" + ), + "p2": ["a", "a", "a", "b", "b", "a", "a", "a", None, None], + "c1": [0, 1, 2, 3, None, 2, 2, 0, 0, None], + "c2": [10, None, None, 30, None, 20, 20, 10, 10, None], + "c3": pd.Series( + [None, None, None, 300, None, 200, 200, None, None, None], dtype="object" + ), + } + ), + id="version 5", + ), + pytest.param( + "share8.default.add_columns_partitioned_cdf", + 6, + # delete partitions where p2 is NULL + pd.DataFrame( + { + "p1": pd.Series([0.1, 0.1, 0.1, 0.1, 0.1, 0.2, 0.3, None], dtype="float32"), + "p2": ["a", "a", "a", "b", "b", "a", "a", "a"], + "c1": [0, 1, 2, 3, None, 2, 2, 0], + "c2": [10, None, None, 30, None, 20, 20, 10], + "c3": pd.Series([None, None, None, 300, None, 200, 200, None], dtype="object"), + } + ), + id="version 6", + ), + pytest.param( + "share8.default.add_columns_partitioned_cdf", + 7, + # update c1 = c3, c2 =-1 where p1 is NULL or p2 = "a" + pd.DataFrame( + { + "p1": pd.Series([0.1, 0.1, 0.1, 0.1, 0.1, 0.2, 0.3, None], dtype="float32"), + "p2": ["a", "a", "a", "b", "b", "a", "a", "a"], + "c1": [10, None, None, 3, None, 20, 20, 10], + "c2": [10, None, None, 30, None, 20, 20, 10], + "c3": pd.Series([-1, -1, -1, 300, None, -1, -1, -1], dtype="object"), + } + ), + id="version 7", + ), + pytest.param( + "share8.default.add_columns_partitioned_cdf", + None, + pd.DataFrame( + { + "p1": pd.Series([0.1, 0.1, 0.1, 0.1, 0.1, 0.2, 0.3, None], dtype="float32"), + "p2": ["a", "a", "a", "b", "b", "a", "a", "a"], + "c1": [10, None, None, 3, None, 20, 20, 10], + "c2": [10, None, None, 30, None, 20, 20, 10], + "c3": pd.Series([-1, -1, -1, 300, None, -1, -1, -1], dtype="object"), + } + ), + id="latest", + ), + ], +) +def test_add_column_partitioned( + profile_path: str, + fragments: str, + version: Optional[int], + expected: pd.DataFrame, +): + pdf = load_as_pandas(f"{profile_path}#{fragments}", version=version, use_delta_format=False) + # sort to eliminate row order inconsistencies + pdf = pdf.sort_values(by=["p1", "p2", "c1"]).reset_index(drop=True) + pd.testing.assert_frame_equal(pdf, expected) + # TODO: enable once delta format + version in OSS server is fixed + # pdf_delta = load_as_pandas(f"{profile_path}#{fragments}", version, use_delta_format=True) + # pdf_delta = pdf_delta.sort_values(by="c1").reset_index(drop=True) + # pd.testing.assert_frame_equal(pdf_delta, expected) + + +@pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE) +@pytest.mark.parametrize( + "fragments,starting_version,ending_version,expected", + [ + pytest.param( + "share8.default.add_columns_non_partitioned_cdf", + 0, + None, + pd.DataFrame( + [ + [1, None, None, None, "insert", 1, "2025-05-22T19:06:23Z"], + [2, None, None, None, "insert", 1, "2025-05-22T19:06:23Z"], + [3, 4.0, None, None, "insert", 3, "2025-05-22T19:07:32Z"], + [1, 5.0, None, None, "update_postimage", 4, "2025-05-22T19:08:34Z"], + [1, None, None, None, "update_preimage", 4, "2025-05-22T19:08:34Z"], + [5, 6.0, 0.1, 0.1, "insert", 6, "2025-05-22T21:23:40Z"], + [6, None, 0.2, 0.2, "insert", 6, "2025-05-22T21:23:40Z"], + [7, 8.0, None, None, "insert", 6, "2025-05-22T21:23:40Z"], + [2, None, None, None, "delete", 7, "2025-05-22T21:26:46Z"], + [6, None, 0.2, 0.2, "delete", 7, "2025-05-22T21:26:46Z"], + [3, 4.0, 0.0, None, "update_postimage", 8, "2025-05-22T21:28:02Z"], + [7, 8.0, 0.0, None, "update_postimage", 8, "2025-05-22T21:28:02Z"], + [3, 4.0, None, None, "update_preimage", 8, "2025-05-22T21:28:02Z"], + [7, 8.0, None, None, "update_preimage", 8, "2025-05-22T21:28:02Z"], + [1, 5.0, None, None, "update_postimage", 9, "2025-05-22T21:28:53Z"], + [3, 4.0, 0.0, None, "update_postimage", 9, "2025-05-22T21:28:53Z"], + [5, 6.0, 0.1, None, "update_postimage", 9, "2025-05-22T21:28:53Z"], + [1, 5.0, None, None, "update_preimage", 9, "2025-05-22T21:28:53Z"], + [3, 4.0, 0.0, None, "update_preimage", 9, "2025-05-22T21:28:53Z"], + [5, 6.0, 0.1, 0.1, "update_preimage", 9, "2025-05-22T21:28:53Z"], + ], + columns=[ + "c1", + "c2", + "c3", + "c4", + "_change_type", + "_commit_version", + "_commit_timestamp", + ], + ), + id="Full CDF with columns added", + ), + ], +) +def test_add_column_non_partitioned_cdf( + profile_path: str, + fragments: str, + starting_version: Optional[int], + ending_version: Optional[int], + expected: pd.DataFrame, +): + expected["c1"] = expected["c1"].astype("int32") + expected["c3"] = expected["c3"].astype("float32") + expected["c4"] = expected["c4"].astype("float32") + # convert _commit_timestamp from date string to unix timestamp + expected["_commit_timestamp"] = ( + pd.to_datetime(expected["_commit_timestamp"]).map(pd.Timestamp.timestamp).astype("int64") + * 1000 + ) + pdf = load_table_changes_as_pandas( + f"{profile_path}#{fragments}", starting_version, ending_version, use_delta_format=False + ) + pdf = pdf.sort_values(by=["_commit_timestamp", "_change_type", "c1"]).reset_index(drop=True) + pd.testing.assert_frame_equal(pdf, expected) + # TODO: enable once OSS server includes writerFeatures in response for CDF queries + # and delta-kernel-rs supports schema changes during version range + # pdf_delta = load_table_changes_as_pandas( + # f"{profile_path}#{fragments}", starting_version, ending_version, use_delta_format=True + # ) + # pdf_delta = pdf_delta.sort_values(by=["_commit_timestamp", "_change_type", "c1"]).reset_index( + # drop=True + # ) + # pd.testing.assert_frame_equal(pdf_delta, expected) + + +@pytest.mark.skipif(not ENABLE_INTEGRATION, reason=SKIP_MESSAGE) +@pytest.mark.parametrize( + "fragments,starting_version,ending_version,expected", + [ + pytest.param( + "share8.default.add_columns_partitioned_cdf", + 0, + None, + pd.DataFrame( + [ + [0.1, "a", 0.0, None, None, "insert", 1, "2025-05-22T21:49:38Z"], + [0.1, "a", 1.0, None, None, "insert", 1, "2025-05-22T21:49:38Z"], + [0.1, "a", 2.0, None, None, "insert", 1, "2025-05-22T21:49:38Z"], + [0.1, "b", None, None, None, "insert", 1, "2025-05-22T21:49:38Z"], + [None, "a", 0.0, None, None, "insert", 1, "2025-05-22T21:49:38Z"], + [None, None, 0.0, None, None, "insert", 2, "2025-05-22T21:53:14Z"], + [None, None, None, None, None, "insert", 2, "2025-05-22T21:53:14Z"], + [0.1, "a", 0.0, 10.0, None, "update_postimage", 4, "2025-05-22T23:46:00Z"], + [None, "a", 0.0, 10.0, None, "update_postimage", 4, "2025-05-22T23:46:00Z"], + [None, None, 0.0, 10.0, None, "update_postimage", 4, "2025-05-22T23:46:00Z"], + [0.1, "a", 0.0, None, None, "update_preimage", 4, "2025-05-22T23:46:00Z"], + [None, "a", 0.0, None, None, "update_preimage", 4, "2025-05-22T23:46:00Z"], + [None, None, 0.0, None, None, "update_preimage", 4, "2025-05-22T23:46:00Z"], + [0.1, "b", 3.0, 30.0, 300.0, "insert", 5, "2025-05-22T23:48:04Z"], + [0.2, "a", 2.0, 20.0, 200.0, "insert", 5, "2025-05-22T23:48:04Z"], + [0.3, "a", 2.0, 20.0, 200.0, "insert", 5, "2025-05-22T23:48:04Z"], + [None, None, 0.0, 10.0, None, "delete", 6, "2025-05-22T23:49:35Z"], + [None, None, None, None, None, "delete", 6, "2025-05-22T23:49:35Z"], + [0.1, "a", 10.0, 10.0, -1.0, "update_postimage", 7, "2025-05-22T23:50:54Z"], + [0.1, "a", None, None, -1.0, "update_postimage", 7, "2025-05-22T23:50:54Z"], + [0.1, "a", None, None, -1.0, "update_postimage", 7, "2025-05-22T23:50:54Z"], + [0.2, "a", 20.0, 20.0, -1.0, "update_postimage", 7, "2025-05-22T23:50:54Z"], + [0.3, "a", 20.0, 20.0, -1.0, "update_postimage", 7, "2025-05-22T23:50:54Z"], + [None, "a", 10.0, 10.0, -1.0, "update_postimage", 7, "2025-05-22T23:50:54Z"], + [0.1, "a", 0.0, 10.0, None, "update_preimage", 7, "2025-05-22T23:50:54Z"], + [0.1, "a", 1.0, None, None, "update_preimage", 7, "2025-05-22T23:50:54Z"], + [0.1, "a", 2.0, None, None, "update_preimage", 7, "2025-05-22T23:50:54Z"], + [0.2, "a", 2.0, 20.0, 200.0, "update_preimage", 7, "2025-05-22T23:50:54Z"], + [0.3, "a", 2.0, 20.0, 200.0, "update_preimage", 7, "2025-05-22T23:50:54Z"], + [None, "a", 0.0, 10.0, None, "update_preimage", 7, "2025-05-22T23:50:54Z"], + ], + columns=[ + "p1", + "p2", + "c1", + "c2", + "c3", + "_change_type", + "_commit_version", + "_commit_timestamp", + ], + ), + id="Full CDF with columns added", + ), + ], +) +def test_add_column_partitioned_cdf( + profile_path: str, + fragments: str, + starting_version: Optional[int], + ending_version: Optional[int], + expected: pd.DataFrame, +): + # expected["c1"] = expected["c1"].astype("int32") + expected["p1"] = expected["p1"].astype("float32") + # expected["c4"] = expected["c4"].astype("float32") + # convert _commit_timestamp from date string to unix timestamp + expected["_commit_timestamp"] = ( + pd.to_datetime(expected["_commit_timestamp"]).map(pd.Timestamp.timestamp).astype("int64") + * 1000 + ) + pdf = load_table_changes_as_pandas( + f"{profile_path}#{fragments}", starting_version, ending_version, use_delta_format=False + ) + pdf = pdf.sort_values(by=["_commit_timestamp", "_change_type", "p1", "p2", "c1"]).reset_index( + drop=True + ) + print(pdf) + pd.testing.assert_frame_equal(pdf, expected) + # TODO: enable once OSS server includes writerFeatures in response for CDF queries + # and delta-kernel-rs supports schema changes during version range + # pdf_delta = load_table_changes_as_pandas( + # f"{profile_path}#{fragments}", starting_version, ending_version, use_delta_format=True + # ) + # pdf_delta = pdf_delta.sort_values(by=["_commit_timestamp", "_change_type", "c1"]).reset_index( + # drop=True + # ) + # pd.testing.assert_frame_equal(pdf_delta, expected) diff --git a/python/delta_sharing/tests/test_reader.py b/python/delta_sharing/tests/test_reader.py index ed9f5f2f5..29e73ba71 100644 --- a/python/delta_sharing/tests/test_reader.py +++ b/python/delta_sharing/tests/test_reader.py @@ -480,6 +480,15 @@ def list_table_changes( pdf = reader.table_changes_to_pandas(CdfOptions()) expected = pd.concat([pdf1, pdf2]).reset_index(drop=True) + expected = expected[ + [ + "a", + "b", + DeltaSharingReader._change_type_col_name(), + DeltaSharingReader._commit_version_col_name(), + DeltaSharingReader._commit_timestamp_col_name(), + ] + ] pd.testing.assert_frame_equal(pdf, expected) diff --git a/server/src/test/scala/io/delta/sharing/server/TestResource.scala b/server/src/test/scala/io/delta/sharing/server/TestResource.scala index 4e04c0902..8bf4d8327 100644 --- a/server/src/test/scala/io/delta/sharing/server/TestResource.scala +++ b/server/src/test/scala/io/delta/sharing/server/TestResource.scala @@ -206,6 +206,18 @@ object TestResource { SchemaConfig( "default", java.util.Arrays.asList( + TableConfig( + "add_columns_partitioned_cdf", + s"s3a://${AWS.bucket}/delta-exchange-test/add_columns_partitioned_cdf", + "00000000-0000-0000-0000-000000000098", + historyShared = true + ), + TableConfig( + "add_columns_non_partitioned_cdf", + s"s3a://${AWS.bucket}/delta-exchange-test/add_columns_non_partitioned_cdf", + "00000000-0000-0000-0000-000000000097", + historyShared = true + ), TableConfig( "timestampntz_cdf_table", s"s3a://${AWS.bucket}/delta-exchange-test/timestampntz_cdf_table",