From 46e652f78d61d26c9b7654f12dbdde618d5e5983 Mon Sep 17 00:00:00 2001 From: qsliu Date: Thu, 5 Feb 2026 04:26:44 +0800 Subject: [PATCH] list agg --- .../storage/ducklake_metadata_manager.hpp | 11 +++ src/storage/ducklake_metadata_manager.cpp | 81 ++++++++++++++----- 2 files changed, 70 insertions(+), 22 deletions(-) diff --git a/src/include/storage/ducklake_metadata_manager.hpp b/src/include/storage/ducklake_metadata_manager.hpp index 2f26ec92fb..6a9b3c097b 100644 --- a/src/include/storage/ducklake_metadata_manager.hpp +++ b/src/include/storage/ducklake_metadata_manager.hpp @@ -232,6 +232,17 @@ class DuckLakeMetadataManager { protected: virtual string GetLatestSnapshotQuery() const; + //! Wrap field selections with list aggregation of struct objects (DBMS-specific) + //! For DuckDB: LIST({'key1': val1, 'key2': val2, ...}) + //! For Postgres: jsonb_agg(jsonb_build_object('key1', val1, 'key2', val2, ...)) + virtual string ListAggregation(const vector> &fields) const; + //! Parse tag list from ListAggregation value + virtual vector LoadTags(const Value &tag_map) const; + //! Parse inlined data tables list from ListAggregation value + virtual vector LoadInlinedDataTables(const Value &list) const; + //! Parse macro implementations list from ListAggregation value + virtual vector LoadMacroImplementations(const Value &list) const; + protected: string GetInlinedTableQuery(const DuckLakeTableInfo &table, const string &table_name); string GetColumnType(const DuckLakeColumnInfo &col); diff --git a/src/storage/ducklake_metadata_manager.cpp b/src/storage/ducklake_metadata_manager.cpp index 0c9928bd46..00b00d9141 100644 --- a/src/storage/ducklake_metadata_manager.cpp +++ b/src/storage/ducklake_metadata_manager.cpp @@ -67,6 +67,18 @@ FileSystem &DuckLakeMetadataManager::GetFileSystem() { return FileSystem::GetFileSystem(transaction.GetCatalog().GetDatabase()); } +string DuckLakeMetadataManager::ListAggregation(const vector> &fields) const { + // DuckDB syntax: LIST({'key1': val1, 'key2': val2, ...}) + string fields_part; + for (auto const &entry : fields) { + if (!fields_part.empty()) { + fields_part += ", "; + } + fields_part += "'" + entry.first + "': " + entry.second; + } + return "LIST({" + fields_part + "})"; +} + void DuckLakeMetadataManager::InitializeDuckLake(bool has_explicit_schema, DuckLakeEncryption encryption) { string initialize_query; if (has_explicit_schema) { @@ -283,7 +295,7 @@ static bool AddChildColumn(vector &columns, FieldIndex paren return false; } -static vector LoadTags(const Value &tag_map) { +vector DuckLakeMetadataManager::LoadTags(const Value &tag_map) const { vector result; for (auto &tag : ListValue::GetChildren(tag_map)) { auto &struct_children = StructValue::GetChildren(tag); @@ -298,7 +310,7 @@ static vector LoadTags(const Value &tag_map) { return result; } -static vector LoadInlinedDataTables(const Value &list) { +vector DuckLakeMetadataManager::LoadInlinedDataTables(const Value &list) const { vector result; for (auto &val : ListValue::GetChildren(list)) { auto &struct_children = StructValue::GetChildren(val); @@ -310,7 +322,7 @@ static vector LoadInlinedDataTables(const Value &list) return result; } -static vector LoadMacroImplementations(const Value &list) { +vector DuckLakeMetadataManager::LoadMacroImplementations(const Value &list) const { vector result; for (auto &val : ListValue::GetChildren(list)) { auto &struct_children = StructValue::GetChildren(val); @@ -443,24 +455,33 @@ WHERE {SNAPSHOT_ID} >= begin_snapshot AND ({SNAPSHOT_ID} < end_snapshot OR end_s catalog.schemas.push_back(std::move(schema)); } + static const vector> TAG_FIELDS = { + {"key", "key"}, + {"value", "value"}, + }; + static const vector> INLINED_DATA_TABLES_FIELDS = { + {"name", "table_name"}, + {"schema_version", "schema_version"}, + }; + // load the table information - result = transaction.Query(snapshot, R"( + result = transaction.Query(snapshot, StringUtil::Format(R"( SELECT schema_id, tbl.table_id, table_uuid::VARCHAR, table_name, ( - SELECT LIST({'key': key, 'value': value}) + SELECT %s FROM {METADATA_CATALOG}.ducklake_tag tag WHERE object_id=table_id AND {SNAPSHOT_ID} >= tag.begin_snapshot AND ({SNAPSHOT_ID} < tag.end_snapshot OR tag.end_snapshot IS NULL) ) AS tag, ( - SELECT LIST({'name': table_name, 'schema_version': schema_version}) + SELECT %s FROM {METADATA_CATALOG}.ducklake_inlined_data_tables inlined_data_tables WHERE inlined_data_tables.table_id = tbl.table_id ) AS inlined_data_tables, path, path_is_relative, col.column_id, column_name, column_type, initial_default, default_value, nulls_allowed, parent_column, ( - SELECT LIST({'key': key, 'value': value}) + SELECT %s FROM {METADATA_CATALOG}.ducklake_column_tag col_tag WHERE col_tag.table_id=tbl.table_id AND col_tag.column_id=col.column_id AND {SNAPSHOT_ID} >= col_tag.begin_snapshot AND ({SNAPSHOT_ID} < col_tag.end_snapshot OR col_tag.end_snapshot IS NULL) @@ -470,7 +491,10 @@ LEFT JOIN {METADATA_CATALOG}.ducklake_column col USING (table_id) WHERE {SNAPSHOT_ID} >= tbl.begin_snapshot AND ({SNAPSHOT_ID} < tbl.end_snapshot OR tbl.end_snapshot IS NULL) AND (({SNAPSHOT_ID} >= col.begin_snapshot AND ({SNAPSHOT_ID} < col.end_snapshot OR col.end_snapshot IS NULL)) OR column_id IS NULL) ORDER BY table_id, parent_column NULLS FIRST, column_order -)"); +)", + ListAggregation(TAG_FIELDS), + ListAggregation(INLINED_DATA_TABLES_FIELDS), + ListAggregation(TAG_FIELDS))); if (result->HasError()) { result->GetErrorObject().Throw("Failed to get table information from DuckLake: "); } @@ -557,17 +581,18 @@ ORDER BY table_id, parent_column NULLS FIRST, column_order } } // load view information - result = transaction.Query(snapshot, R"( + result = transaction.Query(snapshot, StringUtil::Format(R"( SELECT view_id, view_uuid, schema_id, view_name, dialect, sql, column_aliases, ( - SELECT LIST({'key': key, 'value': value}) + SELECT %s FROM {METADATA_CATALOG}.ducklake_tag tag WHERE object_id=view_id AND {SNAPSHOT_ID} >= tag.begin_snapshot AND ({SNAPSHOT_ID} < tag.end_snapshot OR tag.end_snapshot IS NULL) ) AS tag FROM {METADATA_CATALOG}.ducklake_view view WHERE {SNAPSHOT_ID} >= begin_snapshot AND ({SNAPSHOT_ID} < view.end_snapshot OR view.end_snapshot IS NULL) -)"); +)", + ListAggregation(TAG_FIELDS))); if (result->HasError()) { result->GetErrorObject().Throw("Failed to get partition information from DuckLake: "); } @@ -588,21 +613,33 @@ WHERE {SNAPSHOT_ID} >= begin_snapshot AND ({SNAPSHOT_ID} < view.end_snapshot OR views.push_back(std::move(view_info)); } + static const vector> MACRO_PARAM_FIELDS = {{"parameter_name", "parameter_name"}, + {"parameter_type", "parameter_type"}, + {"default_value", "default_value"}, + {"default_value_type", "default_value_type"}}; + auto macro_param_query = StringUtil::Format(R"( + ( + SELECT %s + FROM {METADATA_CATALOG}.ducklake_macro_parameters + WHERE ducklake_macro_impl.macro_id = ducklake_macro_parameters.macro_id + AND ducklake_macro_impl.impl_id = ducklake_macro_parameters.impl_id + ) + )", + ListAggregation(MACRO_PARAM_FIELDS)); + const vector> MACRO_IMPL_FIELDS = { + {"dialect", "dialect"}, {"sql", "sql"}, {"type", "type"}, {"params", macro_param_query}}; + // load macro information - result = transaction.Query(snapshot, R"( + result = transaction.Query(snapshot, StringUtil::Format(R"( SELECT schema_id, ducklake_macro.macro_id, macro_name, ( - SELECT LIST({'dialect': dialect, 'sql':sql, 'type':type, 'params': ( - SELECT LIST({'parameter_name': parameter_name, 'parameter_type': parameter_type, 'default_value': default_value, 'default_value_type': default_value_type}) - FROM {METADATA_CATALOG}.ducklake_macro_parameters - WHERE ducklake_macro_impl.macro_id = ducklake_macro_parameters.macro_id - AND ducklake_macro_impl.impl_id = ducklake_macro_parameters.impl_id - )}) + SELECT %s FROM {METADATA_CATALOG}.ducklake_macro_impl WHERE ducklake_macro.macro_id = ducklake_macro_impl.macro_id ) AS impl FROM {METADATA_CATALOG}.ducklake_macro WHERE {SNAPSHOT_ID} >= ducklake_macro.begin_snapshot AND ({SNAPSHOT_ID} < ducklake_macro.end_snapshot OR ducklake_macro.end_snapshot IS NULL) -)"); +)", + ListAggregation(MACRO_IMPL_FIELDS))); if (result->HasError()) { result->GetErrorObject().Throw("Failed to get macro information from DuckLake: "); } @@ -1667,7 +1704,7 @@ WITH snapshot_ranges AS ( WHERE table_id=%d ORDER BY begin_snapshot ) -SELECT %s, +SELECT %s FROM {METADATA_CATALOG}.ducklake_data_file data LEFT JOIN snapshot_ranges sr ON data.begin_snapshot >= sr.begin_snapshot AND data.begin_snapshot < sr.end_snapshot @@ -1677,7 +1714,7 @@ LEFT JOIN ( WHERE table_id=%d ) del USING (data_file_id) LEFT JOIN ( - SELECT data_file_id, LIST(partition_value ORDER BY partition_key_index) keys + SELECT data_file_id, ARRAY_AGG(partition_value ORDER BY partition_key_index) keys FROM {METADATA_CATALOG}.ducklake_file_partition_value GROUP BY data_file_id ) partition_info USING (data_file_id) @@ -3271,7 +3308,7 @@ string DuckLakeMetadataManager::WriteNewSortKeys(DuckLakeSnapshot commit_snapsho auto update_sort_query = StringUtil::Format(R"( UPDATE {METADATA_CATALOG}.ducklake_sort_info SET end_snapshot = {SNAPSHOT_ID} -WHERE table_id IN (%s) AND end_snapshot IS NULL +WHERE table_id IN (%s) AND end_snapshot IS NULL ;)", old_sort_table_ids); string batch_query = update_sort_query;