diff --git a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run23.sql b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run23.sql new file mode 100644 index 00000000000000..313766e7b591b7 --- /dev/null +++ b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run23.sql @@ -0,0 +1,40 @@ + +CREATE DATABASE IF NOT EXISTS demo.wap_test; + + +USE demo.wap_test; + + +DROP TABLE IF EXISTS orders_wap; + +-- WAP-enabled orders table +CREATE TABLE orders_wap ( + order_id INT, + customer_id INT, + amount DECIMAL(10, 2), + order_date STRING +) +USING iceberg; +ALTER TABLE wap_test.orders_wap SET TBLPROPERTIES ('write.wap.enabled'='true'); + +SET spark.wap.id = test_wap_001; + + + +INSERT INTO orders_wap VALUES + (1, 103, 150.00, '2025-12-03'), + (2, 104, 320.25, '2025-12-04'); + + +DROP TABLE IF EXISTS orders_non_wap; +-- Non WAP-enabled orders table +CREATE TABLE orders_non_wap ( + order_id INT, + customer_id INT, + amount DECIMAL(10, 2), + order_date STRING +) +USING iceberg; + +INSERT INTO orders_non_wap VALUES +(1, 201, 10.00, '2025-12-01'); diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExecuteActionFactory.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExecuteActionFactory.java index 7d7afc610f52e9..f7e1e425e39e8c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExecuteActionFactory.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExecuteActionFactory.java @@ -39,6 +39,7 @@ public class IcebergExecuteActionFactory { public static final String FAST_FORWARD = "fast_forward"; public static final String EXPIRE_SNAPSHOTS = "expire_snapshots"; public static final String REWRITE_DATA_FILES = "rewrite_data_files"; + public static final String PUBLISH_CHANGES = "publish_changes"; /** * Create an Iceberg-specific ExecuteAction instance. @@ -80,6 +81,9 @@ public static ExecuteAction createAction(String actionType, Map case REWRITE_DATA_FILES: return new IcebergRewriteDataFilesAction(properties, partitionNamesInfo, whereCondition); + case PUBLISH_CHANGES: + return new IcebergPublishChangesAction(properties, partitionNamesInfo, + whereCondition); default: throw new DdlException("Unsupported Iceberg procedure: " + actionType + ". Supported procedures: " + String.join(", ", getSupportedActions())); @@ -99,7 +103,8 @@ public static String[] getSupportedActions() { CHERRYPICK_SNAPSHOT, FAST_FORWARD, EXPIRE_SNAPSHOTS, - REWRITE_DATA_FILES + REWRITE_DATA_FILES, + PUBLISH_CHANGES }; } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergPublishChangesAction.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergPublishChangesAction.java new file mode 100644 index 00000000000000..e1bf8cbdad4472 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergPublishChangesAction.java @@ -0,0 +1,128 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.datasource.iceberg.action; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.TableIf; +import org.apache.doris.catalog.Type; +import org.apache.doris.common.ArgumentParsers; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.ExternalTable; +import org.apache.doris.datasource.iceberg.IcebergExternalTable; +import org.apache.doris.info.PartitionNamesInfo; +import org.apache.doris.nereids.trees.expressions.Expression; + +import com.google.common.collect.Lists; +import org.apache.iceberg.Snapshot; +import org.apache.iceberg.Table; + +import java.util.List; +import java.util.Map; +import java.util.Optional; + +/** + * Implements Iceberg's publish_changes action (Core of the WAP pattern). + * This action finds a snapshot tagged with a specific 'wap.id' and cherry-picks it + * into the current table state. + * Corresponds to Spark syntax: CALL catalog.system.publish_changes('table', 'wap_id_123') + */ +public class IcebergPublishChangesAction extends BaseIcebergAction { + public static final String WAP_ID = "wap_id"; + private static final String WAP_ID_PROP = "wap.id"; + + public IcebergPublishChangesAction(Map properties, + Optional partitionNamesInfo, Optional whereCondition) { + super("publish_changes", properties, partitionNamesInfo, whereCondition); + } + + @Override + protected void registerIcebergArguments() { + namedArguments.registerRequiredArgument(WAP_ID, + "The WAP ID matching the snapshot to publish", + ArgumentParsers.nonEmptyString(WAP_ID)); + } + + @Override + protected void validateIcebergAction() throws UserException { + validateNoPartitions(); + validateNoWhereCondition(); + } + + @Override + protected List executeAction(TableIf table) throws UserException { + Table icebergTable = ((IcebergExternalTable) table).getIcebergTable(); + String targetWapId = namedArguments.getString(WAP_ID); + + // Find the target WAP snapshot + Snapshot wapSnapshot = null; + for (Snapshot snapshot : icebergTable.snapshots()) { + if (targetWapId.equals(snapshot.summary().get(WAP_ID_PROP))) { + wapSnapshot = snapshot; + break; + } + } + + if (wapSnapshot == null) { + throw new UserException("Cannot find snapshot with " + WAP_ID_PROP + " = " + targetWapId); + } + + long wapSnapshotId = wapSnapshot.snapshotId(); + + try { + // Get previous snapshot ID for result + Snapshot previousSnapshot = icebergTable.currentSnapshot(); + Long previousSnapshotId = previousSnapshot != null ? previousSnapshot.snapshotId() : null; + + // Execute Cherry-pick + icebergTable.manageSnapshots().cherrypick(wapSnapshotId).commit(); + + // Get current snapshot ID after commit + Snapshot currentSnapshot = icebergTable.currentSnapshot(); + Long currentSnapshotId = currentSnapshot != null ? currentSnapshot.snapshotId() : null; + + // Invalidate iceberg catalog table cache + Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache((ExternalTable) table); + + String previousSnapshotIdString = previousSnapshotId != null ? String.valueOf(previousSnapshotId) : "null"; + String currentSnapshotIdString = currentSnapshotId != null ? String.valueOf(currentSnapshotId) : "null"; + + return Lists.newArrayList( + previousSnapshotIdString, + currentSnapshotIdString + ); + + } catch (Exception e) { + throw new UserException("Failed to publish changes for wap.id " + targetWapId + ": " + e.getMessage(), e); + } + } + + @Override + protected List getResultSchema() { + return Lists.newArrayList( + new Column("previous_snapshot_id", Type.STRING, false, + "ID of the snapshot before the publish operation"), + new Column("current_snapshot_id", Type.STRING, false, + "ID of the new snapshot created as a result of the publish operation")); + } + + @Override + public String getDescription() { + return "Publish a WAP snapshot by cherry-picking it to the current table state"; + } +} diff --git a/regression-test/data/external_table_p0/iceberg/action/test_iceberg_execute_actions.out b/regression-test/data/external_table_p0/iceberg/action/test_iceberg_execute_actions.out index a815666d92aa9b..bac2b4e6bf7f10 100644 --- a/regression-test/data/external_table_p0/iceberg/action/test_iceberg_execute_actions.out +++ b/regression-test/data/external_table_p0/iceberg/action/test_iceberg_execute_actions.out @@ -66,3 +66,9 @@ 2 record2 200 3 record3 300 +-- !wap_before_publish -- + +-- !wap_after_publish -- +1 103 150.00 2025-12-03 +2 104 320.25 2025-12-04 + diff --git a/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_execute_actions.groovy b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_execute_actions.groovy index f84e05de167321..00906456633e5f 100644 --- a/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_execute_actions.groovy +++ b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_execute_actions.groovy @@ -636,4 +636,98 @@ suite("test_iceberg_optimize_actions_ddl", "p0,external,doris,external_docker,ex """ exception "Action 'expire_snapshots' does not support partition specification" } + + // ===================================================================================== +// Test Case 6: publish_changes action with WAP (Write-Audit-Publish) pattern +// Simplified workflow: +// +// - Main branch is initially empty (0 rows) +// - A WAP snapshot exists with wap.id = "test_wap_001" and 2 rows +// - publish_changes should cherry-pick the WAP snapshot into the main branch +// ===================================================================================== + +logger.info("Starting simplified WAP (Write-Audit-Publish) workflow verification test") + +// WAP test database and table +String wap_db = "wap_test" +String wap_table = "orders_wap" + +// Step 1: Verify no data is visible before publish_changes +logger.info("Step 1: Verifying table is empty before publish_changes") +qt_wap_before_publish """ + SELECT order_id, customer_id, amount, order_date + FROM ${catalog_name}.${wap_db}.${wap_table} + ORDER BY order_id +""" + +// Step 2: Publish the WAP changes with wap_id = "test_wap_001" +logger.info("Step 2: Publishing WAP changes with wap_id=test_wap_001") +sql """ + ALTER TABLE ${catalog_name}.${wap_db}.${wap_table} + EXECUTE publish_changes("wap_id" = "test_wap_001") +""" +logger.info("Publish changes executed successfully") + +// Step 3: Verify WAP data is visible after publish_changes +logger.info("Step 3: Verifying WAP data is visible after publish_changes") +qt_wap_after_publish """ + SELECT order_id, customer_id, amount, order_date + FROM ${catalog_name}.${wap_db}.${wap_table} + ORDER BY order_id +""" + +logger.info("Simplified WAP (Write-Audit-Publish) workflow verification completed successfully") + +// Negative tests for publish_changes + +// publish_changes on table without write.wap.enabled = true (should fail) +test { + String nonWapDb = "wap_test" + String nonWapTable = "orders_non_wap" + + sql """ + ALTER TABLE ${catalog_name}.${nonWapDb}.${nonWapTable} + EXECUTE publish_changes("wap_id" = "test_wap_001") + """ + exception "Cannot find snapshot with wap.id = test_wap_001" +} + + +// publish_changes with missing wap_id (should fail) +test { + sql """ + ALTER TABLE ${catalog_name}.${db_name}.${table_name} + EXECUTE publish_changes () + """ + exception "Missing required argument: wap_id" +} + +// publish_changes with invalid wap_id (should fail) +test { + sql """ + ALTER TABLE ${catalog_name}.${wap_db}.${wap_table} + EXECUTE publish_changes("wap_id" = "non_existing_wap_id") + """ + exception "Cannot find snapshot with wap.id = non_existing_wap_id" +} + +// publish_changes with partition specification (should fail) +test { + sql """ + ALTER TABLE ${catalog_name}.${db_name}.${table_name} + EXECUTE publish_changes ("wap_id" = "test_wap_001") PARTITIONS (part1) + """ + exception "Action 'publish_changes' does not support partition specification" +} + +// publish_changes with WHERE condition (should fail) +test { + sql """ + ALTER TABLE ${catalog_name}.${db_name}.${table_name} + EXECUTE publish_changes ("wap_id" = "test_wap_001") WHERE id > 0 + """ + exception "Action 'publish_changes' does not support WHERE condition" +} + + } \ No newline at end of file