Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@

CREATE DATABASE IF NOT EXISTS demo.wap_test;


USE demo.wap_test;


DROP TABLE IF EXISTS orders_wap;

-- WAP-enabled orders table
CREATE TABLE orders_wap (
order_id INT,
customer_id INT,
amount DECIMAL(10, 2),
order_date STRING
)
USING iceberg;
ALTER TABLE wap_test.orders_wap SET TBLPROPERTIES ('write.wap.enabled'='true');

SET spark.wap.id = test_wap_001;



INSERT INTO orders_wap VALUES
(1, 103, 150.00, '2025-12-03'),
(2, 104, 320.25, '2025-12-04');


DROP TABLE IF EXISTS orders_non_wap;
-- Non WAP-enabled orders table
CREATE TABLE orders_non_wap (
order_id INT,
customer_id INT,
amount DECIMAL(10, 2),
order_date STRING
)
USING iceberg;

INSERT INTO orders_non_wap VALUES
(1, 201, 10.00, '2025-12-01');
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class IcebergExecuteActionFactory {
public static final String FAST_FORWARD = "fast_forward";
public static final String EXPIRE_SNAPSHOTS = "expire_snapshots";
public static final String REWRITE_DATA_FILES = "rewrite_data_files";
public static final String PUBLISH_CHANGES = "publish_changes";

/**
* Create an Iceberg-specific ExecuteAction instance.
Expand Down Expand Up @@ -80,6 +81,9 @@ public static ExecuteAction createAction(String actionType, Map<String, String>
case REWRITE_DATA_FILES:
return new IcebergRewriteDataFilesAction(properties, partitionNamesInfo,
whereCondition);
case PUBLISH_CHANGES:
return new IcebergPublishChangesAction(properties, partitionNamesInfo,
whereCondition);
default:
throw new DdlException("Unsupported Iceberg procedure: " + actionType
+ ". Supported procedures: " + String.join(", ", getSupportedActions()));
Expand All @@ -99,7 +103,8 @@ public static String[] getSupportedActions() {
CHERRYPICK_SNAPSHOT,
FAST_FORWARD,
EXPIRE_SNAPSHOTS,
REWRITE_DATA_FILES
REWRITE_DATA_FILES,
PUBLISH_CHANGES
};
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.apache.doris.datasource.iceberg.action;

import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.catalog.Type;
import org.apache.doris.common.ArgumentParsers;
import org.apache.doris.common.UserException;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.info.PartitionNamesInfo;
import org.apache.doris.nereids.trees.expressions.Expression;

import com.google.common.collect.Lists;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.Table;

import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
* Implements Iceberg's publish_changes action (Core of the WAP pattern).
* This action finds a snapshot tagged with a specific 'wap.id' and cherry-picks it
* into the current table state.
* Corresponds to Spark syntax: CALL catalog.system.publish_changes('table', 'wap_id_123')
*/
public class IcebergPublishChangesAction extends BaseIcebergAction {
public static final String WAP_ID = "wap_id";
private static final String WAP_ID_PROP = "wap.id";

public IcebergPublishChangesAction(Map<String, String> properties,
Optional<PartitionNamesInfo> partitionNamesInfo, Optional<Expression> whereCondition) {
super("publish_changes", properties, partitionNamesInfo, whereCondition);
}

@Override
protected void registerIcebergArguments() {
namedArguments.registerRequiredArgument(WAP_ID,
"The WAP ID matching the snapshot to publish",
ArgumentParsers.nonEmptyString(WAP_ID));
}

@Override
protected void validateIcebergAction() throws UserException {
validateNoPartitions();
validateNoWhereCondition();
}

@Override
protected List<String> executeAction(TableIf table) throws UserException {
Table icebergTable = ((IcebergExternalTable) table).getIcebergTable();
String targetWapId = namedArguments.getString(WAP_ID);

// Find the target WAP snapshot
Snapshot wapSnapshot = null;
for (Snapshot snapshot : icebergTable.snapshots()) {
if (targetWapId.equals(snapshot.summary().get(WAP_ID_PROP))) {
wapSnapshot = snapshot;
break;
}
}

if (wapSnapshot == null) {
throw new UserException("Cannot find snapshot with " + WAP_ID_PROP + " = " + targetWapId);
}

long wapSnapshotId = wapSnapshot.snapshotId();

try {
// Get previous snapshot ID for result
Snapshot previousSnapshot = icebergTable.currentSnapshot();
Long previousSnapshotId = previousSnapshot != null ? previousSnapshot.snapshotId() : null;

// Execute Cherry-pick
icebergTable.manageSnapshots().cherrypick(wapSnapshotId).commit();

// Get current snapshot ID after commit
Snapshot currentSnapshot = icebergTable.currentSnapshot();
Long currentSnapshotId = currentSnapshot != null ? currentSnapshot.snapshotId() : null;

// Invalidate iceberg catalog table cache
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache((ExternalTable) table);

String previousSnapshotIdString = previousSnapshotId != null ? String.valueOf(previousSnapshotId) : "null";
String currentSnapshotIdString = currentSnapshotId != null ? String.valueOf(currentSnapshotId) : "null";

return Lists.newArrayList(
previousSnapshotIdString,
currentSnapshotIdString
);

} catch (Exception e) {
throw new UserException("Failed to publish changes for wap.id " + targetWapId + ": " + e.getMessage(), e);
}
}

@Override
protected List<Column> getResultSchema() {
return Lists.newArrayList(
new Column("previous_snapshot_id", Type.STRING, false,
"ID of the snapshot before the publish operation"),
new Column("current_snapshot_id", Type.STRING, false,
"ID of the new snapshot created as a result of the publish operation"));
}

@Override
public String getDescription() {
return "Publish a WAP snapshot by cherry-picking it to the current table state";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,9 @@
2 record2 200
3 record3 300

-- !wap_before_publish --

-- !wap_after_publish --
1 103 150.00 2025-12-03
2 104 320.25 2025-12-04

Original file line number Diff line number Diff line change
Expand Up @@ -636,4 +636,98 @@ suite("test_iceberg_optimize_actions_ddl", "p0,external,doris,external_docker,ex
"""
exception "Action 'expire_snapshots' does not support partition specification"
}

// =====================================================================================
// Test Case 6: publish_changes action with WAP (Write-Audit-Publish) pattern
// Simplified workflow:
//
// - Main branch is initially empty (0 rows)
// - A WAP snapshot exists with wap.id = "test_wap_001" and 2 rows
// - publish_changes should cherry-pick the WAP snapshot into the main branch
// =====================================================================================

logger.info("Starting simplified WAP (Write-Audit-Publish) workflow verification test")

// WAP test database and table
String wap_db = "wap_test"
String wap_table = "orders_wap"

// Step 1: Verify no data is visible before publish_changes
logger.info("Step 1: Verifying table is empty before publish_changes")
qt_wap_before_publish """
SELECT order_id, customer_id, amount, order_date
FROM ${catalog_name}.${wap_db}.${wap_table}
ORDER BY order_id
"""

// Step 2: Publish the WAP changes with wap_id = "test_wap_001"
logger.info("Step 2: Publishing WAP changes with wap_id=test_wap_001")
sql """
ALTER TABLE ${catalog_name}.${wap_db}.${wap_table}
EXECUTE publish_changes("wap_id" = "test_wap_001")
"""
logger.info("Publish changes executed successfully")

// Step 3: Verify WAP data is visible after publish_changes
logger.info("Step 3: Verifying WAP data is visible after publish_changes")
qt_wap_after_publish """
SELECT order_id, customer_id, amount, order_date
FROM ${catalog_name}.${wap_db}.${wap_table}
ORDER BY order_id
"""

logger.info("Simplified WAP (Write-Audit-Publish) workflow verification completed successfully")

// Negative tests for publish_changes

// publish_changes on table without write.wap.enabled = true (should fail)
test {
String nonWapDb = "wap_test"
String nonWapTable = "orders_non_wap"

sql """
ALTER TABLE ${catalog_name}.${nonWapDb}.${nonWapTable}
EXECUTE publish_changes("wap_id" = "test_wap_001")
"""
exception "Cannot find snapshot with wap.id = test_wap_001"
}


// publish_changes with missing wap_id (should fail)
test {
sql """
ALTER TABLE ${catalog_name}.${db_name}.${table_name}
EXECUTE publish_changes ()
"""
exception "Missing required argument: wap_id"
}

// publish_changes with invalid wap_id (should fail)
test {
sql """
ALTER TABLE ${catalog_name}.${wap_db}.${wap_table}
EXECUTE publish_changes("wap_id" = "non_existing_wap_id")
"""
exception "Cannot find snapshot with wap.id = non_existing_wap_id"
}

// publish_changes with partition specification (should fail)
test {
sql """
ALTER TABLE ${catalog_name}.${db_name}.${table_name}
EXECUTE publish_changes ("wap_id" = "test_wap_001") PARTITIONS (part1)
"""
exception "Action 'publish_changes' does not support partition specification"
}

// publish_changes with WHERE condition (should fail)
test {
sql """
ALTER TABLE ${catalog_name}.${db_name}.${table_name}
EXECUTE publish_changes ("wap_id" = "test_wap_001") WHERE id > 0
"""
exception "Action 'publish_changes' does not support WHERE condition"
}


}
Loading