Skip to content

Commit 61e720c

Browse files
committed
Add tests with adding columns + updating rows, with and without partitions, for both snapshot and CDF queries
1 parent a903961 commit 61e720c

File tree

5 files changed

+468
-3
lines changed

5 files changed

+468
-3
lines changed

client/src/test/scala/io/delta/sharing/client/DeltaSharingRestClientSuite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,8 @@ class DeltaSharingRestClientSuite extends DeltaSharingIntegrationTest {
183183
Table(name = "deletion_vectors_with_dvs_dv_property_on", schema = "default", share = "share8"),
184184
Table(name = "dv_and_cm_table", schema = "default", share = "share8"),
185185
Table(name = "timestampntz_cdf_table", schema = "default", share = "share8")
186+
Table(name = "add_columns_non_partitioned_cdf", schema = "default", share = "share8"),
187+
Table(name = "add_columns_partitioned_cdf", schema = "default", share = "share8")
186188
)
187189
assert(expected == client.listAllTables().toSet)
188190
} finally {

python/delta_sharing/reader.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -382,17 +382,24 @@ def table_changes_to_pandas(self, cdfOptions: CdfOptions) -> pd.DataFrame:
382382
response = self._rest_client.list_table_changes(self._table, cdfOptions)
383383

384384
schema_json = loads(response.metadata.schema_string)
385+
converters = to_converters(schema_json)
386+
schema_with_cdf = self._add_special_cdf_schema(schema_json)
385387

386388
if len(response.actions) == 0:
387-
return get_empty_table(self._add_special_cdf_schema(schema_json))
389+
return get_empty_table(schema_with_cdf)
388390

389-
converters = to_converters(schema_json)
390391
pdfs = []
391392
for action in response.actions:
392393
pdf = DeltaSharingReader._to_pandas(action, converters, True, None)
393394
pdfs.append(pdf)
394395

395-
return pd.concat(pdfs, axis=0, ignore_index=True, copy=False)
396+
merged = pd.concat(pdfs, axis=0, ignore_index=True, copy=False)
397+
398+
col_map = {}
399+
for col in merged.columns:
400+
col_map[col.lower()] = col
401+
402+
return merged[[col_map[field["name"].lower()] for field in schema_with_cdf["fields"]]]
396403

397404
def _copy(
398405
self,

0 commit comments

Comments
 (0)