Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions src/include/storage/ducklake_metadata_manager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,17 @@ class DuckLakeMetadataManager {
protected:
virtual string GetLatestSnapshotQuery() const;

//! Wrap field selections with list aggregation of struct objects (DBMS-specific)
//! For DuckDB: LIST({'key1': val1, 'key2': val2, ...})
//! For Postgres: jsonb_agg(jsonb_build_object('key1', val1, 'key2', val2, ...))
virtual string ListAggregation(const vector<pair<string, string>> &fields) const;
//! Parse tag list from ListAggregation value
virtual vector<DuckLakeTag> LoadTags(const Value &tag_map) const;
//! Parse inlined data tables list from ListAggregation value
virtual vector<DuckLakeInlinedTableInfo> LoadInlinedDataTables(const Value &list) const;
//! Parse macro implementations list from ListAggregation value
virtual vector<DuckLakeMacroImplementation> LoadMacroImplementations(const Value &list) const;

protected:
string GetInlinedTableQuery(const DuckLakeTableInfo &table, const string &table_name);
string GetColumnType(const DuckLakeColumnInfo &col);
Expand Down
81 changes: 59 additions & 22 deletions src/storage/ducklake_metadata_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,18 @@ FileSystem &DuckLakeMetadataManager::GetFileSystem() {
return FileSystem::GetFileSystem(transaction.GetCatalog().GetDatabase());
}

string DuckLakeMetadataManager::ListAggregation(const vector<pair<string, string>> &fields) const {
// DuckDB syntax: LIST({'key1': val1, 'key2': val2, ...})
string fields_part;
for (auto const &entry : fields) {
if (!fields_part.empty()) {
fields_part += ", ";
}
fields_part += "'" + entry.first + "': " + entry.second;
}
return "LIST({" + fields_part + "})";
}

void DuckLakeMetadataManager::InitializeDuckLake(bool has_explicit_schema, DuckLakeEncryption encryption) {
string initialize_query;
if (has_explicit_schema) {
Expand Down Expand Up @@ -283,7 +295,7 @@ static bool AddChildColumn(vector<DuckLakeColumnInfo> &columns, FieldIndex paren
return false;
}

static vector<DuckLakeTag> LoadTags(const Value &tag_map) {
vector<DuckLakeTag> DuckLakeMetadataManager::LoadTags(const Value &tag_map) const {
vector<DuckLakeTag> result;
for (auto &tag : ListValue::GetChildren(tag_map)) {
auto &struct_children = StructValue::GetChildren(tag);
Expand All @@ -298,7 +310,7 @@ static vector<DuckLakeTag> LoadTags(const Value &tag_map) {
return result;
}

static vector<DuckLakeInlinedTableInfo> LoadInlinedDataTables(const Value &list) {
vector<DuckLakeInlinedTableInfo> DuckLakeMetadataManager::LoadInlinedDataTables(const Value &list) const {
vector<DuckLakeInlinedTableInfo> result;
for (auto &val : ListValue::GetChildren(list)) {
auto &struct_children = StructValue::GetChildren(val);
Expand All @@ -310,7 +322,7 @@ static vector<DuckLakeInlinedTableInfo> LoadInlinedDataTables(const Value &list)
return result;
}

static vector<DuckLakeMacroImplementation> LoadMacroImplementations(const Value &list) {
vector<DuckLakeMacroImplementation> DuckLakeMetadataManager::LoadMacroImplementations(const Value &list) const {
vector<DuckLakeMacroImplementation> result;
for (auto &val : ListValue::GetChildren(list)) {
auto &struct_children = StructValue::GetChildren(val);
Expand Down Expand Up @@ -443,24 +455,33 @@ WHERE {SNAPSHOT_ID} >= begin_snapshot AND ({SNAPSHOT_ID} < end_snapshot OR end_s
catalog.schemas.push_back(std::move(schema));
}

static const vector<pair<string, string>> TAG_FIELDS = {
{"key", "key"},
{"value", "value"},
};
static const vector<pair<string, string>> INLINED_DATA_TABLES_FIELDS = {
{"name", "table_name"},
{"schema_version", "schema_version"},
};

// load the table information
result = transaction.Query(snapshot, R"(
result = transaction.Query(snapshot, StringUtil::Format(R"(
SELECT schema_id, tbl.table_id, table_uuid::VARCHAR, table_name,
(
SELECT LIST({'key': key, 'value': value})
SELECT %s
FROM {METADATA_CATALOG}.ducklake_tag tag
WHERE object_id=table_id AND
{SNAPSHOT_ID} >= tag.begin_snapshot AND ({SNAPSHOT_ID} < tag.end_snapshot OR tag.end_snapshot IS NULL)
) AS tag,
(
SELECT LIST({'name': table_name, 'schema_version': schema_version})
SELECT %s
FROM {METADATA_CATALOG}.ducklake_inlined_data_tables inlined_data_tables
WHERE inlined_data_tables.table_id = tbl.table_id
) AS inlined_data_tables,
path, path_is_relative,
col.column_id, column_name, column_type, initial_default, default_value, nulls_allowed, parent_column,
(
SELECT LIST({'key': key, 'value': value})
SELECT %s
FROM {METADATA_CATALOG}.ducklake_column_tag col_tag
WHERE col_tag.table_id=tbl.table_id AND col_tag.column_id=col.column_id AND
{SNAPSHOT_ID} >= col_tag.begin_snapshot AND ({SNAPSHOT_ID} < col_tag.end_snapshot OR col_tag.end_snapshot IS NULL)
Expand All @@ -470,7 +491,10 @@ LEFT JOIN {METADATA_CATALOG}.ducklake_column col USING (table_id)
WHERE {SNAPSHOT_ID} >= tbl.begin_snapshot AND ({SNAPSHOT_ID} < tbl.end_snapshot OR tbl.end_snapshot IS NULL)
AND (({SNAPSHOT_ID} >= col.begin_snapshot AND ({SNAPSHOT_ID} < col.end_snapshot OR col.end_snapshot IS NULL)) OR column_id IS NULL)
ORDER BY table_id, parent_column NULLS FIRST, column_order
)");
)",
ListAggregation(TAG_FIELDS),
ListAggregation(INLINED_DATA_TABLES_FIELDS),
ListAggregation(TAG_FIELDS)));
if (result->HasError()) {
result->GetErrorObject().Throw("Failed to get table information from DuckLake: ");
}
Expand Down Expand Up @@ -557,17 +581,18 @@ ORDER BY table_id, parent_column NULLS FIRST, column_order
}
}
// load view information
result = transaction.Query(snapshot, R"(
result = transaction.Query(snapshot, StringUtil::Format(R"(
SELECT view_id, view_uuid, schema_id, view_name, dialect, sql, column_aliases,
(
SELECT LIST({'key': key, 'value': value})
SELECT %s
FROM {METADATA_CATALOG}.ducklake_tag tag
WHERE object_id=view_id AND
{SNAPSHOT_ID} >= tag.begin_snapshot AND ({SNAPSHOT_ID} < tag.end_snapshot OR tag.end_snapshot IS NULL)
) AS tag
FROM {METADATA_CATALOG}.ducklake_view view
WHERE {SNAPSHOT_ID} >= begin_snapshot AND ({SNAPSHOT_ID} < view.end_snapshot OR view.end_snapshot IS NULL)
)");
)",
ListAggregation(TAG_FIELDS)));
if (result->HasError()) {
result->GetErrorObject().Throw("Failed to get partition information from DuckLake: ");
}
Expand All @@ -588,21 +613,33 @@ WHERE {SNAPSHOT_ID} >= begin_snapshot AND ({SNAPSHOT_ID} < view.end_snapshot OR
views.push_back(std::move(view_info));
}

static const vector<pair<string, string>> MACRO_PARAM_FIELDS = {{"parameter_name", "parameter_name"},
{"parameter_type", "parameter_type"},
{"default_value", "default_value"},
{"default_value_type", "default_value_type"}};
auto macro_param_query = StringUtil::Format(R"(
(
SELECT %s
FROM {METADATA_CATALOG}.ducklake_macro_parameters
WHERE ducklake_macro_impl.macro_id = ducklake_macro_parameters.macro_id
AND ducklake_macro_impl.impl_id = ducklake_macro_parameters.impl_id
)
)",
ListAggregation(MACRO_PARAM_FIELDS));
const vector<pair<string, string>> MACRO_IMPL_FIELDS = {
{"dialect", "dialect"}, {"sql", "sql"}, {"type", "type"}, {"params", macro_param_query}};

// load macro information
result = transaction.Query(snapshot, R"(
result = transaction.Query(snapshot, StringUtil::Format(R"(
SELECT schema_id, ducklake_macro.macro_id, macro_name, (
SELECT LIST({'dialect': dialect, 'sql':sql, 'type':type, 'params': (
SELECT LIST({'parameter_name': parameter_name, 'parameter_type': parameter_type, 'default_value': default_value, 'default_value_type': default_value_type})
FROM {METADATA_CATALOG}.ducklake_macro_parameters
WHERE ducklake_macro_impl.macro_id = ducklake_macro_parameters.macro_id
AND ducklake_macro_impl.impl_id = ducklake_macro_parameters.impl_id
)})
SELECT %s
FROM {METADATA_CATALOG}.ducklake_macro_impl
WHERE ducklake_macro.macro_id = ducklake_macro_impl.macro_id
) AS impl
FROM {METADATA_CATALOG}.ducklake_macro
WHERE {SNAPSHOT_ID} >= ducklake_macro.begin_snapshot AND ({SNAPSHOT_ID} < ducklake_macro.end_snapshot OR ducklake_macro.end_snapshot IS NULL)
)");
)",
ListAggregation(MACRO_IMPL_FIELDS)));
if (result->HasError()) {
result->GetErrorObject().Throw("Failed to get macro information from DuckLake: ");
}
Expand Down Expand Up @@ -1667,7 +1704,7 @@ WITH snapshot_ranges AS (
WHERE table_id=%d
ORDER BY begin_snapshot
)
SELECT %s,
SELECT %s
FROM {METADATA_CATALOG}.ducklake_data_file data
LEFT JOIN snapshot_ranges sr
ON data.begin_snapshot >= sr.begin_snapshot AND data.begin_snapshot < sr.end_snapshot
Expand All @@ -1677,7 +1714,7 @@ LEFT JOIN (
WHERE table_id=%d
) del USING (data_file_id)
LEFT JOIN (
SELECT data_file_id, LIST(partition_value ORDER BY partition_key_index) keys
SELECT data_file_id, ARRAY_AGG(partition_value ORDER BY partition_key_index) keys
FROM {METADATA_CATALOG}.ducklake_file_partition_value
GROUP BY data_file_id
) partition_info USING (data_file_id)
Expand Down Expand Up @@ -3271,7 +3308,7 @@ string DuckLakeMetadataManager::WriteNewSortKeys(DuckLakeSnapshot commit_snapsho
auto update_sort_query = StringUtil::Format(R"(
UPDATE {METADATA_CATALOG}.ducklake_sort_info
SET end_snapshot = {SNAPSHOT_ID}
WHERE table_id IN (%s) AND end_snapshot IS NULL
WHERE table_id IN (%s) AND end_snapshot IS NULL
;)",
old_sort_table_ids);
string batch_query = update_sort_query;
Expand Down
Loading