diff --git a/fluss-lake/fluss-lake-ut/pom.xml b/fluss-lake/fluss-lake-ut/pom.xml
new file mode 100644
index 0000000000..0e398b7ae1
--- /dev/null
+++ b/fluss-lake/fluss-lake-ut/pom.xml
@@ -0,0 +1,190 @@
+
+
+
+
+ 4.0.0
+
+ org.apache.fluss
+ fluss-lake
+ 0.8-SNAPSHOT
+
+
+ fluss-lake-ut
+ Fluss : Lake : UT
+
+
+
+
+
+
+ org.apache.fluss
+ fluss-client
+ ${project.version}
+ test
+
+
+
+ org.apache.fluss
+ fluss-test-utils
+
+
+
+ org.apache.curator
+ curator-test
+ ${curator.version}
+ test
+
+
+
+ org.apache.fluss
+ fluss-server
+ ${project.version}
+ test
+
+
+
+
+ org.apache.hadoop
+ hadoop-mapreduce-client-core
+ 2.8.5
+ test
+
+
+ commons-io
+ commons-io
+
+
+
+
+
+ org.apache.hadoop
+ hadoop-hdfs-client
+ ${fluss.hadoop.version}
+ provided
+
+
+ org.apache.hadoop
+ hadoop-common
+ provided
+
+
+ avro
+ org.apache.avro
+
+
+ log4j
+ log4j
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+ ch.qos.reload4j
+ reload4j
+
+
+ org.slf4j
+ slf4j-reload4j
+
+
+ jdk.tools
+ jdk.tools
+
+
+ protobuf-java
+ com.google.protobuf
+
+
+ commons-io
+ commons-io
+
+
+
+
+
+ org.apache.fluss
+ fluss-server
+ ${project.version}
+ test-jar
+ test
+
+
+
+
+ org.apache.fluss
+ fluss-flink-${flink.major.version}
+ test
+
+
+
+ org.apache.fluss
+ fluss-common
+ ${project.version}
+ test-jar
+ test
+
+
+
+ org.apache.fluss
+ fluss-flink-common
+ ${project.version}
+ test
+ test-jar
+
+
+
+ org.apache.flink
+ flink-core
+ test
+
+
+
+ org.apache.flink
+ flink-table-common
+ test
+
+
+
+ org.apache.flink
+ flink-table-runtime
+ test
+
+
+
+ org.apache.flink
+ flink-connector-base
+ test
+
+
+
+ org.apache.flink
+ flink-connector-files
+ test
+
+
+
+ org.apache.flink
+ flink-table-test-utils
+ test
+
+
+
\ No newline at end of file
diff --git a/fluss-lake/fluss-lake-ut/src/test/java/org/apache/fluss/lake/flink/FlinkUnionReadLogTableITCase.java b/fluss-lake/fluss-lake-ut/src/test/java/org/apache/fluss/lake/flink/FlinkUnionReadLogTableITCase.java
new file mode 100644
index 0000000000..1e3f55debf
--- /dev/null
+++ b/fluss-lake/fluss-lake-ut/src/test/java/org/apache/fluss/lake/flink/FlinkUnionReadLogTableITCase.java
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.flink;
+
+import org.apache.fluss.config.AutoPartitionTimeUnit;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.types.DataTypes;
+
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.core.execution.SavepointFormatType;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.CollectionUtil;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsExactOrder;
+import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsIgnoreOrder;
+import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertRowResultsIgnoreOrder;
+import static org.apache.fluss.testutils.DataTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test union read log table with full type. */
+public class FlinkUnionReadLogTableITCase extends FlinkUnionReadTestBase {
+ @TempDir public static File savepointDir;
+
+ @BeforeAll
+ protected static void beforeAll() {
+ FlinkUnionReadTestBase.beforeAll();
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testReadLogTableFullType(boolean isPartitioned) throws Exception {
+ // first of all, start tiering
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ String tableName = "logTable_" + (isPartitioned ? "partitioned" : "non_partitioned");
+
+ TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
+ List writtenRows = new ArrayList<>();
+ long tableId = prepareLogTable(t1, DEFAULT_BUCKET_NUM, isPartitioned, writtenRows);
+ // wait until records has been synced
+ waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
+
+ // now, start to read the log table, which will read iceberg
+ // may read fluss or not, depends on the log offset of iceberg snapshot
+ List actual =
+ CollectionUtil.iteratorToList(
+ batchTEnv.executeSql("select * from " + tableName).collect());
+
+ assertThat(actual).containsExactlyInAnyOrderElementsOf(writtenRows);
+
+ // cancel the tiering job
+ jobClient.cancel().get();
+
+ // write some log data again
+ writtenRows.addAll(writeRows(t1, 3, isPartitioned));
+
+ // query the log table again and check the data
+ // it should read both iceberg snapshot and fluss log
+ actual =
+ CollectionUtil.iteratorToList(
+ batchTEnv.executeSql("select * from " + tableName).collect());
+ assertThat(actual).containsExactlyInAnyOrderElementsOf(writtenRows);
+
+ // test project push down
+ actual =
+ CollectionUtil.iteratorToList(
+ batchTEnv.executeSql("select f_byte from " + tableName).collect());
+ List expected =
+ writtenRows.stream()
+ .map(row -> Row.of(row.getField(1)))
+ .collect(Collectors.toList());
+ assertThat(actual).containsExactlyInAnyOrderElementsOf(expected);
+
+ if (isPartitioned) {
+ // get first partition
+ String partition = waitUntilPartitions(t1).values().iterator().next();
+ String sqlWithPartitionFilter =
+ "select * FROM " + tableName + " WHERE p = '" + partition + "'";
+
+ String plan = batchTEnv.explainSql(sqlWithPartitionFilter);
+
+ // check if the plan contains partition filter
+ // check filter push down
+ assertThat(plan)
+ .contains("TableSourceScan(")
+ .contains("LogicalFilter(condition=[=($15, _UTF-16LE'" + partition + "'")
+ .contains("filter=[=(p, _UTF-16LE'" + partition + "'");
+
+ List expectedFiltered =
+ writtenRows.stream()
+ .filter(r -> partition.equals(r.getField(15)))
+ .collect(Collectors.toList());
+
+ List actualFiltered =
+ CollectionUtil.iteratorToList(
+ batchTEnv.executeSql(sqlWithPartitionFilter).collect());
+
+ assertThat(actualFiltered).containsExactlyInAnyOrderElementsOf(expectedFiltered);
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testReadLogTableInStreamMode(boolean isPartitioned) throws Exception {
+ // first of all, start tiering
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ String tableName = "stream_logTable_" + (isPartitioned ? "partitioned" : "non_partitioned");
+
+ TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
+ List writtenRows = new LinkedList<>();
+ long tableId = prepareLogTable(t1, DEFAULT_BUCKET_NUM, isPartitioned, writtenRows);
+ // wait until records has been synced
+ waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
+
+ // now, start to read the log table, which will read iceberg
+ // may read fluss or not, depends on the log offset of iceberg snapshot
+ CloseableIterator actual =
+ streamTEnv.executeSql("select * from " + tableName).collect();
+ assertResultsIgnoreOrder(
+ actual, writtenRows.stream().map(Row::toString).collect(Collectors.toList()), true);
+
+ // cancel the tiering job
+ jobClient.cancel().get();
+
+ // write some log data again
+ writtenRows.addAll(writeRows(t1, 3, isPartitioned));
+
+ // query the log table again and check the data
+ // it should read both iceberg snapshot and fluss log
+ actual =
+ streamTEnv
+ .executeSql(
+ "select * from "
+ + tableName
+ + " /*+ OPTIONS('scan.partition.discovery.interval'='100ms') */")
+ .collect();
+ if (isPartitioned) {
+ // we write to a new partition to verify partition discovery
+ writtenRows.addAll(writeFullTypeRows(t1, 10, "3027"));
+ }
+ assertResultsIgnoreOrder(
+ actual, writtenRows.stream().map(Row::toString).collect(Collectors.toList()), true);
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testUnionReadLogTableFailover(boolean isPartitioned) throws Exception {
+ // first of all, start tiering
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ String tableName1 =
+ "restore_logTable_" + (isPartitioned ? "partitioned" : "non_partitioned");
+ String resultTableName =
+ "result_table" + (isPartitioned ? "partitioned" : "non_partitioned");
+
+ TablePath table1 = TablePath.of(DEFAULT_DB, tableName1);
+ TablePath resultTable = TablePath.of(DEFAULT_DB, resultTableName);
+ List writtenRows = new LinkedList<>();
+ long tableId = prepareLogTable(table1, DEFAULT_BUCKET_NUM, isPartitioned, writtenRows);
+ // wait until records has been synced
+ waitUntilBucketSynced(table1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
+
+ StreamTableEnvironment streamTEnv = buildStreamTEnv(null);
+ // now, start to read the log table to write to a fluss result table
+ // may read fluss or not, depends on the log offset of iceberg snapshot
+ createFullTypeLogTable(resultTable, DEFAULT_BUCKET_NUM, isPartitioned, false);
+ TableResult insertResult =
+ streamTEnv.executeSql(
+ "insert into " + resultTableName + " select * from " + tableName1);
+
+ CloseableIterator actual =
+ streamTEnv.executeSql("select * from " + resultTableName).collect();
+ if (isPartitioned) {
+ assertRowResultsIgnoreOrder(actual, writtenRows, false);
+ } else {
+ assertResultsExactOrder(actual, writtenRows, false);
+ }
+
+ // now, stop the job with save point
+ String savepointPath =
+ insertResult
+ .getJobClient()
+ .get()
+ .stopWithSavepoint(
+ false,
+ savepointDir.getAbsolutePath(),
+ SavepointFormatType.CANONICAL)
+ .get();
+
+ // re buildStreamTEnv
+ streamTEnv = buildStreamTEnv(savepointPath);
+ insertResult =
+ streamTEnv.executeSql(
+ "insert into " + resultTableName + " select * from " + tableName1);
+
+ // write some log data again
+ List rows = writeRows(table1, 3, isPartitioned);
+ if (isPartitioned) {
+ assertRowResultsIgnoreOrder(actual, rows, true);
+ } else {
+ assertResultsExactOrder(actual, rows, true);
+ }
+
+ // cancel jobs
+ insertResult.getJobClient().get().cancel().get();
+ jobClient.cancel().get();
+ }
+
+ private long prepareLogTable(
+ TablePath tablePath, int bucketNum, boolean isPartitioned, List flinkRows)
+ throws Exception {
+ // createFullTypeLogTable creates a datalake-enabled table with a partition column.
+ long t1Id = createFullTypeLogTable(tablePath, bucketNum, isPartitioned);
+ if (isPartitioned) {
+ Map partitionNameById = waitUntilPartitions(tablePath);
+ for (String partition :
+ partitionNameById.values().stream().sorted().collect(Collectors.toList())) {
+ for (int i = 0; i < 3; i++) {
+ flinkRows.addAll(writeFullTypeRows(tablePath, 10, partition));
+ }
+ }
+ } else {
+ for (int i = 0; i < 3; i++) {
+ flinkRows.addAll(writeFullTypeRows(tablePath, 10, null));
+ }
+ }
+ return t1Id;
+ }
+
+ protected long createFullTypeLogTable(TablePath tablePath, int bucketNum, boolean isPartitioned)
+ throws Exception {
+ return createFullTypeLogTable(tablePath, bucketNum, isPartitioned, true);
+ }
+
+ protected long createFullTypeLogTable(
+ TablePath tablePath, int bucketNum, boolean isPartitioned, boolean lakeEnabled)
+ throws Exception {
+ Schema.Builder schemaBuilder =
+ Schema.newBuilder()
+ .column("f_boolean", DataTypes.BOOLEAN())
+ .column("f_byte", DataTypes.TINYINT())
+ .column("f_short", DataTypes.SMALLINT())
+ .column("f_int", DataTypes.INT())
+ .column("f_long", DataTypes.BIGINT())
+ .column("f_float", DataTypes.FLOAT())
+ .column("f_double", DataTypes.DOUBLE())
+ .column("f_string", DataTypes.STRING())
+ .column("f_decimal1", DataTypes.DECIMAL(5, 2))
+ .column("f_decimal2", DataTypes.DECIMAL(20, 0))
+ .column("f_timestamp_ltz1", DataTypes.TIMESTAMP_LTZ(3))
+ .column("f_timestamp_ltz2", DataTypes.TIMESTAMP_LTZ(6))
+ .column("f_timestamp_ntz1", DataTypes.TIMESTAMP(3))
+ .column("f_timestamp_ntz2", DataTypes.TIMESTAMP(6))
+ .column("f_binary", DataTypes.BINARY(4));
+
+ TableDescriptor.Builder tableBuilder =
+ TableDescriptor.builder()
+ .distributedBy(bucketNum, "f_int")
+ .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true")
+ .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500));
+
+ if (lakeEnabled) {
+ tableBuilder
+ .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true")
+ .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500));
+ }
+
+ if (isPartitioned) {
+ schemaBuilder.column("p", DataTypes.STRING());
+ tableBuilder.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, true);
+ tableBuilder.partitionedBy("p");
+ tableBuilder.property(
+ ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, AutoPartitionTimeUnit.YEAR);
+ }
+ tableBuilder.schema(schemaBuilder.build());
+ return createTable(tablePath, tableBuilder.build());
+ }
+
+ private List writeFullTypeRows(
+ TablePath tablePath, int rowCount, @Nullable String partition) throws Exception {
+ List rows = new ArrayList<>();
+ List flinkRows = new ArrayList<>();
+ for (int i = 0; i < rowCount; i++) {
+ if (partition == null) {
+ rows.add(
+ row(
+ true,
+ (byte) 100,
+ (short) 200,
+ 30,
+ 400L,
+ 500.1f,
+ 600.0d,
+ "another_string_" + i,
+ Decimal.fromUnscaledLong(900, 5, 2),
+ Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0),
+ TimestampLtz.fromEpochMillis(1698235273400L),
+ TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+ TimestampNtz.fromMillis(1698235273501L),
+ TimestampNtz.fromMillis(1698235273501L, 8000),
+ new byte[] {5, 6, 7, 8}));
+
+ flinkRows.add(
+ Row.of(
+ true,
+ (byte) 100,
+ (short) 200,
+ 30,
+ 400L,
+ 500.1f,
+ 600.0d,
+ "another_string_" + i,
+ new java.math.BigDecimal("9.00"),
+ new java.math.BigDecimal("1000"),
+ Instant.ofEpochMilli(1698235273400L),
+ Instant.ofEpochMilli(1698235273400L).plusNanos(7000),
+ LocalDateTime.ofInstant(
+ Instant.ofEpochMilli(1698235273501L), ZoneId.of("UTC")),
+ LocalDateTime.ofInstant(
+ Instant.ofEpochMilli(1698235273501L),
+ ZoneId.of("UTC"))
+ .plusNanos(8000),
+ new byte[] {5, 6, 7, 8}));
+ } else {
+ rows.add(
+ row(
+ true,
+ (byte) 100,
+ (short) 200,
+ 30,
+ 400L,
+ 500.1f,
+ 600.0d,
+ "another_string_" + i,
+ Decimal.fromUnscaledLong(900, 5, 2),
+ Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0),
+ TimestampLtz.fromEpochMillis(1698235273400L),
+ TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+ TimestampNtz.fromMillis(1698235273501L),
+ TimestampNtz.fromMillis(1698235273501L, 8000),
+ new byte[] {5, 6, 7, 8},
+ partition));
+
+ flinkRows.add(
+ Row.of(
+ true,
+ (byte) 100,
+ (short) 200,
+ 30,
+ 400L,
+ 500.1f,
+ 600.0d,
+ "another_string_" + i,
+ new java.math.BigDecimal("9.00"),
+ new java.math.BigDecimal("1000"),
+ Instant.ofEpochMilli(1698235273400L),
+ Instant.ofEpochMilli(1698235273400L).plusNanos(7000),
+ LocalDateTime.ofInstant(
+ Instant.ofEpochMilli(1698235273501L), ZoneId.of("UTC")),
+ LocalDateTime.ofInstant(
+ Instant.ofEpochMilli(1698235273501L),
+ ZoneId.of("UTC"))
+ .plusNanos(8000),
+ new byte[] {5, 6, 7, 8},
+ partition));
+ }
+ }
+ writeRows(tablePath, rows, true);
+ return flinkRows;
+ }
+
+ private List writeRows(TablePath tablePath, int rowCount, boolean isPartitioned)
+ throws Exception {
+ if (isPartitioned) {
+ List rows = new ArrayList<>();
+ for (String partition : waitUntilPartitions(tablePath).values()) {
+ rows.addAll(writeFullTypeRows(tablePath, rowCount, partition));
+ }
+ return rows;
+ } else {
+ return writeFullTypeRows(tablePath, rowCount, null);
+ }
+ }
+}
diff --git a/fluss-lake/fluss-lake-ut/src/test/java/org/apache/fluss/lake/flink/FlinkUnionReadPrimaryKeyTableITCase.java b/fluss-lake/fluss-lake-ut/src/test/java/org/apache/fluss/lake/flink/FlinkUnionReadPrimaryKeyTableITCase.java
new file mode 100644
index 0000000000..f366d55f5f
--- /dev/null
+++ b/fluss-lake/fluss-lake-ut/src/test/java/org/apache/fluss/lake/flink/FlinkUnionReadPrimaryKeyTableITCase.java
@@ -0,0 +1,417 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.flink;
+
+import org.apache.fluss.config.AutoPartitionTimeUnit;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.metadata.Schema;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.Decimal;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.row.TimestampLtz;
+import org.apache.fluss.row.TimestampNtz;
+import org.apache.fluss.server.replica.Replica;
+import org.apache.fluss.types.DataTypes;
+
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertResultsExactOrder;
+import static org.apache.fluss.flink.source.testutils.FlinkRowAssertionsUtils.assertRowResultsIgnoreOrder;
+import static org.apache.fluss.testutils.DataTestUtils.row;
+
+/** Test case for union read primary key table. */
+public class FlinkUnionReadPrimaryKeyTableITCase extends FlinkUnionReadTestBase {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ void testUnionReadInStreamMode(Boolean isPartitioned) throws Exception {
+ // first of all, start tiering
+ JobClient jobClient = buildTieringJob(execEnv);
+
+ String tableName =
+ "stream_pk_table_full" + (isPartitioned ? "_partitioned" : "_non_partitioned");
+ TablePath t1 = TablePath.of(DEFAULT_DB, tableName);
+ Map bucketLogEndOffset = new HashMap<>();
+ // create table & write initial data
+ long tableId =
+ preparePKTableFullType(t1, DEFAULT_BUCKET_NUM, isPartitioned, bucketLogEndOffset);
+
+ // wait unit records have been synced
+ waitUntilBucketSynced(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned);
+
+ // check the status of replica after synced
+ assertReplicaStatus(t1, tableId, DEFAULT_BUCKET_NUM, isPartitioned, bucketLogEndOffset);
+
+ // will read iceberg snapshot, should only +I since no change log
+ List expectedRows = new ArrayList<>();
+ if (isPartitioned) {
+ for (String partition : waitUntilPartitions(t1).values()) {
+ expectedRows.add(
+ Row.of(
+ false,
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4L,
+ 5.1f,
+ 6.0d,
+ "string",
+ Decimal.fromUnscaledLong(9, 5, 2),
+ Decimal.fromBigDecimal(new java.math.BigDecimal(10), 20, 0),
+ TimestampLtz.fromEpochMillis(1698235273182L),
+ TimestampLtz.fromEpochMillis(1698235273182L, 5000),
+ TimestampNtz.fromMillis(1698235273183L),
+ TimestampNtz.fromMillis(1698235273183L, 6000),
+ new byte[] {1, 2, 3, 4},
+ partition));
+ expectedRows.add(
+ Row.of(
+ true,
+ (byte) 10,
+ (short) 20,
+ 30,
+ 40L,
+ 50.1f,
+ 60.0d,
+ "another_string",
+ Decimal.fromUnscaledLong(90, 5, 2),
+ Decimal.fromBigDecimal(new java.math.BigDecimal(100), 20, 0),
+ TimestampLtz.fromEpochMillis(1698235273200L),
+ TimestampLtz.fromEpochMillis(1698235273200L, 5000),
+ TimestampNtz.fromMillis(1698235273201L),
+ TimestampNtz.fromMillis(1698235273201L, 6000),
+ new byte[] {1, 2, 3, 4},
+ partition));
+ }
+ } else {
+ expectedRows =
+ Arrays.asList(
+ Row.of(
+ false,
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4L,
+ 5.1f,
+ 6.0d,
+ "string",
+ Decimal.fromUnscaledLong(9, 5, 2),
+ Decimal.fromBigDecimal(new java.math.BigDecimal(10), 20, 0),
+ TimestampLtz.fromEpochMillis(1698235273182L),
+ TimestampLtz.fromEpochMillis(1698235273182L, 5000),
+ TimestampNtz.fromMillis(1698235273183L),
+ TimestampNtz.fromMillis(1698235273183L, 6000),
+ new byte[] {1, 2, 3, 4},
+ null),
+ Row.of(
+ true,
+ (byte) 10,
+ (short) 20,
+ 30,
+ 40L,
+ 50.1f,
+ 60.0d,
+ "another_string",
+ Decimal.fromUnscaledLong(90, 5, 2),
+ Decimal.fromBigDecimal(new java.math.BigDecimal(100), 20, 0),
+ TimestampLtz.fromEpochMillis(1698235273200L),
+ TimestampLtz.fromEpochMillis(1698235273200L, 5000),
+ TimestampNtz.fromMillis(1698235273201L),
+ TimestampNtz.fromMillis(1698235273201L, 6000),
+ new byte[] {1, 2, 3, 4},
+ null));
+ }
+
+ String query = "select * from " + tableName;
+ CloseableIterator actual = streamTEnv.executeSql(query).collect();
+ assertRowResultsIgnoreOrder(actual, expectedRows, false);
+
+ // stop lake tiering service
+ jobClient.cancel().get();
+
+ // write a row again
+ if (isPartitioned) {
+ Map partitionNameById = waitUntilPartitions(t1);
+ for (String partition : partitionNameById.values()) {
+ writeFullTypeRow(t1, partition);
+ }
+ } else {
+ writeFullTypeRow(t1, null);
+ }
+
+ // should generate -U & +U
+ List expectedRows2 = new ArrayList<>();
+ if (isPartitioned) {
+ for (String partition : waitUntilPartitions(t1).values()) {
+ expectedRows2.add(
+ Row.ofKind(
+ RowKind.UPDATE_BEFORE,
+ true,
+ (byte) 10,
+ (short) 20,
+ 30,
+ 40L,
+ 50.1f,
+ 60.0d,
+ "another_string",
+ Decimal.fromUnscaledLong(90, 5, 2),
+ Decimal.fromBigDecimal(new java.math.BigDecimal(100), 20, 0),
+ TimestampLtz.fromEpochMillis(1698235273200L),
+ TimestampLtz.fromEpochMillis(1698235273200L, 5000),
+ TimestampNtz.fromMillis(1698235273201L),
+ TimestampNtz.fromMillis(1698235273201L, 6000),
+ new byte[] {1, 2, 3, 4},
+ partition));
+ expectedRows2.add(
+ Row.ofKind(
+ RowKind.UPDATE_AFTER,
+ true,
+ (byte) 100,
+ (short) 200,
+ 30,
+ 400L,
+ 500.1f,
+ 600.0d,
+ "another_string_2",
+ Decimal.fromUnscaledLong(900, 5, 2),
+ Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0),
+ TimestampLtz.fromEpochMillis(1698235273400L),
+ TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+ TimestampNtz.fromMillis(1698235273501L),
+ TimestampNtz.fromMillis(1698235273501L, 8000),
+ new byte[] {5, 6, 7, 8},
+ partition));
+ }
+ } else {
+ expectedRows2.add(
+ Row.ofKind(
+ RowKind.UPDATE_BEFORE,
+ true,
+ (byte) 10,
+ (short) 20,
+ 30,
+ 40L,
+ 50.1f,
+ 60.0d,
+ "another_string",
+ Decimal.fromUnscaledLong(90, 5, 2),
+ Decimal.fromBigDecimal(new java.math.BigDecimal(100), 20, 0),
+ TimestampLtz.fromEpochMillis(1698235273200L),
+ TimestampLtz.fromEpochMillis(1698235273200L, 5000),
+ TimestampNtz.fromMillis(1698235273201L),
+ TimestampNtz.fromMillis(1698235273201L, 6000),
+ new byte[] {1, 2, 3, 4},
+ null));
+ expectedRows2.add(
+ Row.ofKind(
+ RowKind.UPDATE_AFTER,
+ true,
+ (byte) 100,
+ (short) 200,
+ 30,
+ 400L,
+ 500.1f,
+ 600.0d,
+ "another_string_2",
+ Decimal.fromUnscaledLong(900, 5, 2),
+ Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0),
+ TimestampLtz.fromEpochMillis(1698235273400L),
+ TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+ TimestampNtz.fromMillis(1698235273501L),
+ TimestampNtz.fromMillis(1698235273501L, 8000),
+ new byte[] {5, 6, 7, 8},
+ null));
+ }
+
+ if (isPartitioned) {
+ assertRowResultsIgnoreOrder(actual, expectedRows2, true);
+ } else {
+ assertResultsExactOrder(actual, expectedRows2, true);
+ }
+
+ // query again
+ actual = streamTEnv.executeSql(query).collect();
+ List totalExpectedRows = new ArrayList<>(expectedRows);
+ totalExpectedRows.addAll(expectedRows2);
+
+ if (isPartitioned) {
+ assertRowResultsIgnoreOrder(actual, totalExpectedRows, true);
+ } else {
+ assertResultsExactOrder(actual, totalExpectedRows, true);
+ }
+ }
+
+ private void writeFullTypeRow(TablePath tablePath, String partition) throws Exception {
+ List rows =
+ Collections.singletonList(
+ row(
+ true,
+ (byte) 100,
+ (short) 200,
+ 30,
+ 400L,
+ 500.1f,
+ 600.0d,
+ "another_string_2",
+ Decimal.fromUnscaledLong(900, 5, 2),
+ Decimal.fromBigDecimal(new java.math.BigDecimal(1000), 20, 0),
+ TimestampLtz.fromEpochMillis(1698235273400L),
+ TimestampLtz.fromEpochMillis(1698235273400L, 7000),
+ TimestampNtz.fromMillis(1698235273501L),
+ TimestampNtz.fromMillis(1698235273501L, 8000),
+ new byte[] {5, 6, 7, 8},
+ partition));
+ writeRows(tablePath, rows, false);
+ }
+
+ private long preparePKTableFullType(
+ TablePath tablePath,
+ int bucketNum,
+ boolean isPartitioned,
+ Map bucketLogEndOffset)
+ throws Exception {
+ long tableId = createPkTableFullType(tablePath, bucketNum, isPartitioned);
+ if (isPartitioned) {
+ Map partitionNameById = waitUntilPartitions(tablePath);
+ for (String partition : partitionNameById.values()) {
+ for (int i = 0; i < 2; i++) {
+ List rows = generateKvRowsFullType(partition);
+ // write records
+ writeRows(tablePath, rows, false);
+ }
+ }
+ for (Long partitionId : partitionNameById.keySet()) {
+ bucketLogEndOffset.putAll(getBucketLogEndOffset(tableId, bucketNum, partitionId));
+ }
+ } else {
+ for (int i = 0; i < 2; i++) {
+ List rows = generateKvRowsFullType(null);
+ // write records
+ writeRows(tablePath, rows, false);
+ }
+ bucketLogEndOffset.putAll(getBucketLogEndOffset(tableId, bucketNum, null));
+ }
+ return tableId;
+ }
+
+ protected long createPkTableFullType(TablePath tablePath, int bucketNum, boolean isPartitioned)
+ throws Exception {
+ Schema.Builder schemaBuilder =
+ Schema.newBuilder()
+ .column("c1", DataTypes.BOOLEAN())
+ .column("c2", DataTypes.TINYINT())
+ .column("c3", DataTypes.SMALLINT())
+ .column("c4", DataTypes.INT())
+ .column("c5", DataTypes.BIGINT())
+ .column("c6", DataTypes.FLOAT())
+ .column("c7", DataTypes.DOUBLE())
+ .column("c8", DataTypes.STRING())
+ .column("c9", DataTypes.DECIMAL(5, 2))
+ .column("c10", DataTypes.DECIMAL(20, 0))
+ .column("c11", DataTypes.TIMESTAMP_LTZ(3))
+ .column("c12", DataTypes.TIMESTAMP_LTZ(6))
+ .column("c13", DataTypes.TIMESTAMP(3))
+ .column("c14", DataTypes.TIMESTAMP(6))
+ .column("c15", DataTypes.BINARY(4))
+ .column("c16", DataTypes.STRING());
+
+ TableDescriptor.Builder tableBuilder =
+ TableDescriptor.builder()
+ .distributedBy(bucketNum)
+ .property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true")
+ .property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500));
+
+ if (isPartitioned) {
+ tableBuilder.property(ConfigOptions.TABLE_AUTO_PARTITION_ENABLED, true);
+ tableBuilder.partitionedBy("c16");
+ schemaBuilder.primaryKey("c4", "c16");
+ tableBuilder.property(
+ ConfigOptions.TABLE_AUTO_PARTITION_TIME_UNIT, AutoPartitionTimeUnit.YEAR);
+ } else {
+ schemaBuilder.primaryKey("c4");
+ }
+ tableBuilder.schema(schemaBuilder.build());
+ return createTable(tablePath, tableBuilder.build());
+ }
+
+ private List generateKvRowsFullType(@Nullable String partition) {
+ return Arrays.asList(
+ row(
+ false,
+ (byte) 1,
+ (short) 2,
+ 3,
+ 4L,
+ 5.1f,
+ 6.0d,
+ "string",
+ Decimal.fromUnscaledLong(9, 5, 2),
+ Decimal.fromBigDecimal(new java.math.BigDecimal(10), 20, 0),
+ TimestampLtz.fromEpochMillis(1698235273182L),
+ TimestampLtz.fromEpochMillis(1698235273182L, 5000),
+ TimestampNtz.fromMillis(1698235273183L),
+ TimestampNtz.fromMillis(1698235273183L, 6000),
+ new byte[] {1, 2, 3, 4},
+ partition),
+ row(
+ true,
+ (byte) 10,
+ (short) 20,
+ 30,
+ 40L,
+ 50.1f,
+ 60.0d,
+ "another_string",
+ Decimal.fromUnscaledLong(90, 5, 2),
+ Decimal.fromBigDecimal(new java.math.BigDecimal(100), 20, 0),
+ TimestampLtz.fromEpochMillis(1698235273200L),
+ TimestampLtz.fromEpochMillis(1698235273200L, 5000),
+ TimestampNtz.fromMillis(1698235273201L),
+ TimestampNtz.fromMillis(1698235273201L, 6000),
+ new byte[] {1, 2, 3, 4},
+ partition));
+ }
+
+ private Map getBucketLogEndOffset(
+ long tableId, int bucketNum, Long partitionId) {
+ Map bucketLogEndOffsets = new HashMap<>();
+ for (int i = 0; i < bucketNum; i++) {
+ TableBucket tableBucket = new TableBucket(tableId, partitionId, i);
+ Replica replica = getLeaderReplica(tableBucket);
+ bucketLogEndOffsets.put(tableBucket, replica.getLocalLogEndOffset());
+ }
+ return bucketLogEndOffsets;
+ }
+}
diff --git a/fluss-lake/fluss-lake-ut/src/test/java/org/apache/fluss/lake/flink/FlinkUnionReadTestBase.java b/fluss-lake/fluss-lake-ut/src/test/java/org/apache/fluss/lake/flink/FlinkUnionReadTestBase.java
new file mode 100644
index 0000000000..24d23e50a8
--- /dev/null
+++ b/fluss-lake/fluss-lake-ut/src/test/java/org/apache/fluss/lake/flink/FlinkUnionReadTestBase.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.flink;
+
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.lake.testutils.FlinkLakeTieringTestBase;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+
+import javax.annotation.Nullable;
+
+import static org.apache.fluss.flink.FlinkConnectorOptions.BOOTSTRAP_SERVERS;
+
+/** Base class for iceberg union read test. */
+class FlinkUnionReadTestBase extends FlinkLakeTieringTestBase {
+ protected static final String DEFAULT_DB = "fluss";
+
+ protected static final String CATALOG_NAME = "test_iceberg_lake";
+ protected static final int DEFAULT_BUCKET_NUM = 1;
+ StreamTableEnvironment batchTEnv;
+ StreamTableEnvironment streamTEnv;
+
+ @BeforeAll
+ protected static void beforeAll() {
+ FlinkLakeTieringTestBase.beforeAll();
+ }
+
+ @BeforeEach
+ public void beforeEach() {
+ super.beforeEach();
+ String bootstrapServers = String.join(",", clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
+ // create table environment
+ batchTEnv = StreamTableEnvironment.create(execEnv, EnvironmentSettings.inBatchMode());
+ // crate catalog using sql
+ batchTEnv.executeSql(
+ String.format(
+ "create catalog %s with ('type' = 'fluss', '%s' = '%s')",
+ CATALOG_NAME, BOOTSTRAP_SERVERS.key(), bootstrapServers));
+ batchTEnv.executeSql("use catalog " + CATALOG_NAME);
+ batchTEnv.executeSql("use " + DEFAULT_DB);
+ buildStreamTEnv(null);
+ }
+
+ protected StreamTableEnvironment buildStreamTEnv(@Nullable String savepointPath) {
+ Configuration conf = new Configuration();
+ if (savepointPath != null) {
+ conf.setString("execution.savepoint.path", savepointPath);
+ execEnv.configure(conf);
+ }
+ String bootstrapServers = String.join(",", clientConf.get(ConfigOptions.BOOTSTRAP_SERVERS));
+ // create table environment
+ streamTEnv = StreamTableEnvironment.create(execEnv, EnvironmentSettings.inStreamingMode());
+ // crate catalog using sql
+ streamTEnv.executeSql(
+ String.format(
+ "create catalog %s with ('type' = 'fluss', '%s' = '%s')",
+ CATALOG_NAME, BOOTSTRAP_SERVERS.key(), bootstrapServers));
+ streamTEnv.executeSql("use catalog " + CATALOG_NAME);
+ streamTEnv.executeSql("use " + DEFAULT_DB);
+ return streamTEnv;
+ }
+}
diff --git a/fluss-lake/fluss-lake-ut/src/test/java/org/apache/fluss/lake/testutils/FlinkLakeTieringTestBase.java b/fluss-lake/fluss-lake-ut/src/test/java/org/apache/fluss/lake/testutils/FlinkLakeTieringTestBase.java
new file mode 100644
index 0000000000..2d004f0a5d
--- /dev/null
+++ b/fluss-lake/fluss-lake-ut/src/test/java/org/apache/fluss/lake/testutils/FlinkLakeTieringTestBase.java
@@ -0,0 +1,271 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.fluss.lake.testutils;
+
+import org.apache.fluss.client.Connection;
+import org.apache.fluss.client.ConnectionFactory;
+import org.apache.fluss.client.admin.Admin;
+import org.apache.fluss.client.table.Table;
+import org.apache.fluss.client.table.writer.AppendWriter;
+import org.apache.fluss.client.table.writer.TableWriter;
+import org.apache.fluss.client.table.writer.UpsertWriter;
+import org.apache.fluss.config.ConfigOptions;
+import org.apache.fluss.config.Configuration;
+import org.apache.fluss.exception.FlussRuntimeException;
+import org.apache.fluss.flink.tiering.LakeTieringJobBuilder;
+import org.apache.fluss.metadata.DataLakeFormat;
+import org.apache.fluss.metadata.TableBucket;
+import org.apache.fluss.metadata.TableDescriptor;
+import org.apache.fluss.metadata.TablePath;
+import org.apache.fluss.row.InternalRow;
+import org.apache.fluss.server.replica.Replica;
+import org.apache.fluss.server.testutils.FlussClusterExtension;
+import org.apache.fluss.server.zk.ZooKeeperClient;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.extension.RegisterExtension;
+
+import java.nio.file.Files;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.fluss.flink.tiering.source.TieringSourceOptions.POLL_TIERING_TABLE_INTERVAL;
+import static org.apache.fluss.testutils.common.CommonTestUtils.retry;
+import static org.apache.fluss.testutils.common.CommonTestUtils.waitUntil;
+import static org.apache.fluss.testutils.common.CommonTestUtils.waitValue;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test base for tiering to Iceberg by Flink. */
+public class FlinkLakeTieringTestBase {
+
+ @RegisterExtension
+ public static final FlussClusterExtension FLUSS_CLUSTER_EXTENSION =
+ FlussClusterExtension.builder()
+ .setClusterConf(initConfig())
+ .setNumOfTabletServers(3)
+ .build();
+
+ protected StreamExecutionEnvironment execEnv;
+
+ protected static Connection conn;
+ protected static Admin admin;
+ protected static Configuration clientConf;
+ protected static String warehousePath;
+
+ private static Configuration initConfig() {
+ Configuration conf = new Configuration();
+ conf.set(ConfigOptions.KV_SNAPSHOT_INTERVAL, Duration.ofSeconds(1))
+ .set(ConfigOptions.KV_MAX_RETAINED_SNAPSHOTS, Integer.MAX_VALUE);
+
+ // Configure the tiering sink to be Iceberg
+ conf.set(ConfigOptions.DATALAKE_FORMAT, DataLakeFormat.ICEBERG);
+ conf.setString("datalake.iceberg.type", "hadoop");
+ try {
+ warehousePath =
+ Files.createTempDirectory("fluss-testing-iceberg-tiered")
+ .resolve("warehouse")
+ .toString();
+ } catch (Exception e) {
+ throw new FlussRuntimeException("Failed to create Iceberg warehouse path", e);
+ }
+ conf.setString("datalake.iceberg.warehouse", warehousePath);
+ return conf;
+ }
+
+ @BeforeAll
+ protected static void beforeAll() {
+ clientConf = FLUSS_CLUSTER_EXTENSION.getClientConfig();
+ conn = ConnectionFactory.createConnection(clientConf);
+ admin = conn.getAdmin();
+ }
+
+ @AfterAll
+ static void afterAll() throws Exception {
+ if (admin != null) {
+ admin.close();
+ admin = null;
+ }
+ if (conn != null) {
+ conn.close();
+ conn = null;
+ }
+ }
+
+ @BeforeEach
+ public void beforeEach() {
+ execEnv = StreamExecutionEnvironment.getExecutionEnvironment();
+ execEnv.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+ execEnv.setParallelism(2);
+ }
+
+ protected JobClient buildTieringJob(StreamExecutionEnvironment execEnv) throws Exception {
+ Configuration flussConfig = new Configuration(clientConf);
+ flussConfig.set(POLL_TIERING_TABLE_INTERVAL, Duration.ofMillis(500L));
+ return LakeTieringJobBuilder.newBuilder(
+ execEnv,
+ flussConfig,
+ Configuration.fromMap(getIcebergCatalogConf()),
+ DataLakeFormat.ICEBERG.toString())
+ .build();
+ }
+
+ protected static Map getIcebergCatalogConf() {
+ Map icebergConf = new HashMap<>();
+ icebergConf.put("type", "hadoop");
+ icebergConf.put("warehouse", warehousePath);
+ return icebergConf;
+ }
+
+ protected long createTable(TablePath tablePath, TableDescriptor tableDescriptor)
+ throws Exception {
+ admin.createTable(tablePath, tableDescriptor, true).get();
+ return admin.getTableInfo(tablePath).get().getTableId();
+ }
+
+ protected void assertReplicaStatus(
+ TablePath tablePath,
+ long tableId,
+ int bucketCount,
+ boolean isPartitioned,
+ Map expectedLogEndOffset) {
+ if (isPartitioned) {
+ Map partitionById =
+ waitUntilPartitions(FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(), tablePath);
+ for (Long partitionId : partitionById.keySet()) {
+ for (int i = 0; i < bucketCount; i++) {
+ TableBucket tableBucket = new TableBucket(tableId, partitionId, i);
+ assertReplicaStatus(tableBucket, expectedLogEndOffset.get(tableBucket));
+ }
+ }
+ } else {
+ for (int i = 0; i < bucketCount; i++) {
+ TableBucket tableBucket = new TableBucket(tableId, i);
+ assertReplicaStatus(tableBucket, expectedLogEndOffset.get(tableBucket));
+ }
+ }
+ }
+
+ protected void assertReplicaStatus(TableBucket tb, long expectedLogEndOffset) {
+ retry(
+ Duration.ofMinutes(1),
+ () -> {
+ Replica replica = getLeaderReplica(tb);
+ // datalake snapshot id should be updated
+ assertThat(replica.getLogTablet().getLakeTableSnapshotId())
+ .isGreaterThanOrEqualTo(0);
+ assertThat(replica.getLakeLogEndOffset()).isEqualTo(expectedLogEndOffset);
+ });
+ }
+
+ /**
+ * Wait until the default number of partitions is created. Return the map from partition id to
+ * partition name.
+ */
+ public static Map waitUntilPartitions(
+ ZooKeeperClient zooKeeperClient, TablePath tablePath) {
+ return waitUntilPartitions(
+ zooKeeperClient,
+ tablePath,
+ ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE.defaultValue());
+ }
+
+ public static Map waitUntilPartitions(TablePath tablePath) {
+ return waitUntilPartitions(
+ FLUSS_CLUSTER_EXTENSION.getZooKeeperClient(),
+ tablePath,
+ ConfigOptions.TABLE_AUTO_PARTITION_NUM_PRECREATE.defaultValue());
+ }
+
+ /**
+ * Wait until the given number of partitions is created. Return the map from partition id to
+ * partition name.
+ */
+ public static Map waitUntilPartitions(
+ ZooKeeperClient zooKeeperClient, TablePath tablePath, int expectPartitions) {
+ return waitValue(
+ () -> {
+ Map gotPartitions =
+ zooKeeperClient.getPartitionIdAndNames(tablePath);
+ return expectPartitions == gotPartitions.size()
+ ? Optional.of(gotPartitions)
+ : Optional.empty();
+ },
+ Duration.ofMinutes(1),
+ String.format("expect %d table partition has not been created", expectPartitions));
+ }
+
+ protected Replica getLeaderReplica(TableBucket tableBucket) {
+ return FLUSS_CLUSTER_EXTENSION.waitAndGetLeaderReplica(tableBucket);
+ }
+
+ protected void writeRows(TablePath tablePath, List rows, boolean append)
+ throws Exception {
+ try (Table table = conn.getTable(tablePath)) {
+ TableWriter tableWriter;
+ if (append) {
+ tableWriter = table.newAppend().createWriter();
+ } else {
+ tableWriter = table.newUpsert().createWriter();
+ }
+ for (InternalRow row : rows) {
+ if (tableWriter instanceof AppendWriter) {
+ ((AppendWriter) tableWriter).append(row);
+ } else {
+ ((UpsertWriter) tableWriter).upsert(row);
+ }
+ }
+ tableWriter.flush();
+ }
+ }
+
+ protected void waitUntilBucketSynced(
+ TablePath tablePath, long tableId, int bucketCount, boolean isPartition) {
+ if (isPartition) {
+ Map partitionById = waitUntilPartitions(tablePath);
+ for (Long partitionId : partitionById.keySet()) {
+ for (int i = 0; i < bucketCount; i++) {
+ TableBucket tableBucket = new TableBucket(tableId, partitionId, i);
+ waitUntilBucketSynced(tableBucket);
+ }
+ }
+ } else {
+ for (int i = 0; i < bucketCount; i++) {
+ TableBucket tableBucket = new TableBucket(tableId, i);
+ waitUntilBucketSynced(tableBucket);
+ }
+ }
+ }
+
+ protected void waitUntilBucketSynced(TableBucket tb) {
+ waitUntil(
+ () -> {
+ Replica replica = getLeaderReplica(tb);
+ return replica.getLogTablet().getLakeTableSnapshotId() >= 0;
+ },
+ Duration.ofMinutes(2),
+ "bucket " + tb + " not synced");
+ }
+}
diff --git a/fluss-lake/pom.xml b/fluss-lake/pom.xml
index e13cc30261..27bdde4818 100644
--- a/fluss-lake/pom.xml
+++ b/fluss-lake/pom.xml
@@ -79,6 +79,7 @@
fluss-lake-paimon
fluss-lake-iceberg
fluss-lake-lance
+ fluss-lake-ut
pom
\ No newline at end of file