Skip to content

Commit 3faa663

Browse files
authored
[lake/paimon] Paimon lake table support non-string partition keys (#1817)
1 parent c9ecc1f commit 3faa663

File tree

6 files changed

+189
-34
lines changed

6 files changed

+189
-34
lines changed

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/PaimonLakeCatalog.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,9 @@ private Schema toPaimonSchema(TableDescriptor tableDescriptor) {
157157
Schema.Builder schemaBuilder = Schema.newBuilder();
158158
Options options = new Options();
159159

160+
// set default properties
161+
setPaimonDefaultProperties(options);
162+
160163
// When bucket key is undefined, it should use dynamic bucket (bucket = -1) mode.
161164
List<String> bucketKeys = tableDescriptor.getBucketKeys();
162165
if (!bucketKeys.isEmpty()) {
@@ -215,6 +218,12 @@ private Schema toPaimonSchema(TableDescriptor tableDescriptor) {
215218
return schemaBuilder.build();
216219
}
217220

221+
private void setPaimonDefaultProperties(Options options) {
222+
// set partition.legacy-name to false, otherwise paimon will use toString for all types,
223+
// which will cause inconsistent partition value for a same binary value
224+
options.set(CoreOptions.PARTITION_GENERATE_LEGCY_NAME, false);
225+
}
226+
218227
private void setFlussPropertyToPaimon(String key, String value, Options options) {
219228
if (key.startsWith(PAIMON_CONF_PREFIX)) {
220229
options.set(key.substring(PAIMON_CONF_PREFIX.length()), value);

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/RecordWriter.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929

3030
import java.util.List;
3131

32-
import static org.apache.fluss.lake.paimon.utils.PaimonConversions.toPaimonPartitionBinaryRow;
3332
import static org.apache.fluss.utils.Preconditions.checkState;
3433

3534
/** A base interface to write {@link LogRecord} to Paimon. */
@@ -38,7 +37,8 @@ public abstract class RecordWriter<T> implements AutoCloseable {
3837
protected final TableWriteImpl<T> tableWrite;
3938
protected final RowType tableRowType;
4039
protected final int bucket;
41-
@Nullable protected final BinaryRow partition;
40+
protected final List<String> partitionKeys;
41+
@Nullable protected BinaryRow partition;
4242
protected final FlussRecordAsPaimonRow flussRecordAsPaimonRow;
4343

4444
public RecordWriter(
@@ -50,7 +50,11 @@ public RecordWriter(
5050
this.tableWrite = tableWrite;
5151
this.tableRowType = tableRowType;
5252
this.bucket = tableBucket.getBucket();
53-
this.partition = toPaimonPartitionBinaryRow(partitionKeys, partition);
53+
this.partitionKeys = partitionKeys;
54+
// set partition to EMPTY_ROW in advance for non-partitioned table
55+
if (partition == null || partitionKeys.isEmpty()) {
56+
this.partition = BinaryRow.EMPTY_ROW;
57+
}
5458
this.flussRecordAsPaimonRow =
5559
new FlussRecordAsPaimonRow(tableBucket.getBucket(), tableRowType);
5660
}

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/append/AppendOnlyWriter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,12 @@ public AppendOnlyWriter(
5757
@Override
5858
public void write(LogRecord record) throws Exception {
5959
flussRecordAsPaimonRow.setFlussRecord(record);
60+
61+
// get partition once
62+
if (partition == null) {
63+
partition = tableWrite.getPartition(flussRecordAsPaimonRow);
64+
}
65+
6066
// hacky, call internal method tableWrite.getWrite() to support
6167
// to write to given partition, otherwise, it'll always extract a partition from Paimon row
6268
// which may be costly

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/tiering/mergetree/MergeTreeWriter.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,12 @@ private static TableWriteImpl<KeyValue> createTableWrite(FileStoreTable fileStor
7777
@Override
7878
public void write(LogRecord record) throws Exception {
7979
flussRecordAsPaimonRow.setFlussRecord(record);
80+
81+
// get partition once
82+
if (partition == null) {
83+
partition = tableWrite.getPartition(flussRecordAsPaimonRow);
84+
}
85+
8086
rowKeyExtractor.setRecord(flussRecordAsPaimonRow);
8187
keyValue.replace(
8288
rowKeyExtractor.trimmedPrimaryKey(),

fluss-lake/fluss-lake-paimon/src/main/java/org/apache/fluss/lake/paimon/utils/PaimonConversions.java

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -18,24 +18,18 @@
1818
package org.apache.fluss.lake.paimon.utils;
1919

2020
import org.apache.fluss.lake.paimon.source.FlussRowAsPaimonRow;
21-
import org.apache.fluss.metadata.ResolvedPartitionSpec;
2221
import org.apache.fluss.metadata.TableChange;
2322
import org.apache.fluss.metadata.TablePath;
2423
import org.apache.fluss.record.ChangeType;
2524
import org.apache.fluss.row.GenericRow;
2625
import org.apache.fluss.row.InternalRow;
2726

2827
import org.apache.paimon.catalog.Identifier;
29-
import org.apache.paimon.data.BinaryRow;
30-
import org.apache.paimon.data.BinaryRowWriter;
31-
import org.apache.paimon.data.BinaryString;
3228
import org.apache.paimon.schema.SchemaChange;
3329
import org.apache.paimon.types.DataType;
3430
import org.apache.paimon.types.RowKind;
3531
import org.apache.paimon.types.RowType;
3632

37-
import javax.annotation.Nullable;
38-
3933
import java.util.ArrayList;
4034
import java.util.List;
4135
import java.util.function.Function;
@@ -78,31 +72,6 @@ public static Identifier toPaimon(TablePath tablePath) {
7872
return Identifier.create(tablePath.getDatabaseName(), tablePath.getTableName());
7973
}
8074

81-
public static BinaryRow toPaimonPartitionBinaryRow(
82-
List<String> partitionKeys, @Nullable String partitionName) {
83-
if (partitionName == null || partitionKeys.isEmpty()) {
84-
return BinaryRow.EMPTY_ROW;
85-
}
86-
87-
// Fluss's existing utility
88-
ResolvedPartitionSpec resolvedPartitionSpec =
89-
ResolvedPartitionSpec.fromPartitionName(partitionKeys, partitionName);
90-
91-
BinaryRow partitionBinaryRow = new BinaryRow(partitionKeys.size());
92-
BinaryRowWriter writer = new BinaryRowWriter(partitionBinaryRow);
93-
94-
List<String> partitionValues = resolvedPartitionSpec.getPartitionValues();
95-
for (int i = 0; i < partitionKeys.size(); i++) {
96-
// Todo Currently, partition column must be String datatype, so we can always use
97-
// `BinaryString.fromString` to convert to Paimon's data structure. Revisit here when
98-
// #489 is finished.
99-
writer.writeString(i, BinaryString.fromString(partitionValues.get(i)));
100-
}
101-
102-
writer.complete();
103-
return partitionBinaryRow;
104-
}
105-
10675
public static Object toPaimonLiteral(DataType dataType, Object flussLiteral) {
10776
RowType rowType = RowType.of(dataType);
10877
InternalRow flussRow = GenericRow.of(flussLiteral);

fluss-lake/fluss-lake-paimon/src/test/java/org/apache/fluss/lake/paimon/tiering/PaimonTieringITCase.java

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,22 @@
1717

1818
package org.apache.fluss.lake.paimon.tiering;
1919

20+
import org.apache.fluss.client.table.getter.PartitionGetter;
2021
import org.apache.fluss.config.AutoPartitionTimeUnit;
2122
import org.apache.fluss.config.ConfigOptions;
2223
import org.apache.fluss.lake.paimon.testutils.FlinkPaimonTieringTestBase;
24+
import org.apache.fluss.metadata.PartitionInfo;
2325
import org.apache.fluss.metadata.Schema;
2426
import org.apache.fluss.metadata.TableBucket;
2527
import org.apache.fluss.metadata.TableChange;
2628
import org.apache.fluss.metadata.TableDescriptor;
29+
import org.apache.fluss.metadata.TableInfo;
2730
import org.apache.fluss.metadata.TablePath;
31+
import org.apache.fluss.row.BinaryString;
32+
import org.apache.fluss.row.Decimal;
2833
import org.apache.fluss.row.InternalRow;
34+
import org.apache.fluss.row.TimestampLtz;
35+
import org.apache.fluss.row.TimestampNtz;
2936
import org.apache.fluss.server.testutils.FlussClusterExtension;
3037
import org.apache.fluss.types.DataTypes;
3138
import org.apache.fluss.utils.types.Tuple2;
@@ -40,15 +47,25 @@
4047
import org.junit.jupiter.api.BeforeAll;
4148
import org.junit.jupiter.api.Test;
4249
import org.junit.jupiter.api.extension.RegisterExtension;
50+
import org.junit.jupiter.params.ParameterizedTest;
51+
import org.junit.jupiter.params.provider.Arguments;
52+
import org.junit.jupiter.params.provider.MethodSource;
4353

54+
import java.math.BigDecimal;
55+
import java.nio.charset.StandardCharsets;
4456
import java.time.Duration;
57+
import java.time.Instant;
58+
import java.time.LocalDate;
59+
import java.time.LocalDateTime;
60+
import java.time.LocalTime;
4561
import java.util.ArrayList;
4662
import java.util.Arrays;
4763
import java.util.Collections;
4864
import java.util.HashMap;
4965
import java.util.Iterator;
5066
import java.util.List;
5167
import java.util.Map;
68+
import java.util.stream.Stream;
5269

5370
import static org.apache.fluss.lake.committer.BucketOffset.FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY;
5471
import static org.apache.fluss.testutils.DataTestUtils.row;
@@ -185,6 +202,150 @@ void testTiering() throws Exception {
185202
}
186203
}
187204

205+
private static Stream<Arguments> tieringAllTypesWriteArgs() {
206+
return Stream.of(Arguments.of(true), Arguments.of(false));
207+
}
208+
209+
@ParameterizedTest
210+
@MethodSource("tieringAllTypesWriteArgs")
211+
void testTieringForAllTypes(boolean isPrimaryKeyTable) throws Exception {
212+
// create a table, write some records and wait until snapshot finished
213+
TablePath t1 =
214+
TablePath.of(
215+
DEFAULT_DB,
216+
isPrimaryKeyTable ? "pkTableForAllTypes" : "logTableForAllTypes");
217+
Schema.Builder builder =
218+
Schema.newBuilder()
219+
.column("c0", DataTypes.STRING())
220+
.column("c1", DataTypes.BOOLEAN())
221+
.column("c2", DataTypes.TINYINT())
222+
.column("c3", DataTypes.SMALLINT())
223+
.column("c4", DataTypes.INT())
224+
.column("c5", DataTypes.BIGINT())
225+
.column("c6", DataTypes.FLOAT())
226+
.column("c7", DataTypes.DOUBLE())
227+
// decimal not support for partition key
228+
.column("c8", DataTypes.DECIMAL(10, 2))
229+
.column("c9", DataTypes.CHAR(10))
230+
.column("c10", DataTypes.STRING())
231+
.column("c11", DataTypes.BYTES())
232+
.column("c12", DataTypes.BINARY(5))
233+
.column("c13", DataTypes.DATE())
234+
.column("c14", DataTypes.TIME(6))
235+
.column("c15", DataTypes.TIMESTAMP(6))
236+
.column("c16", DataTypes.TIMESTAMP_LTZ(6));
237+
if (isPrimaryKeyTable) {
238+
builder.primaryKey(
239+
"c0", "c1", "c2", "c3", "c4", "c5", "c6", "c7", "c9", "c10", "c11", "c12",
240+
"c13", "c14", "c15", "c16");
241+
}
242+
List<String> partitionKeys =
243+
Arrays.asList(
244+
"c1", "c2", "c3", "c4", "c5", "c6", "c7", "c9", "c10", "c11", "c12", "c13",
245+
"c14", "c15", "c16");
246+
TableDescriptor.Builder tableDescriptor =
247+
TableDescriptor.builder()
248+
.schema(builder.build())
249+
.distributedBy(1, "c0")
250+
.property(ConfigOptions.TABLE_DATALAKE_ENABLED.key(), "true")
251+
.property(ConfigOptions.TABLE_DATALAKE_FRESHNESS, Duration.ofMillis(500));
252+
tableDescriptor.partitionedBy(partitionKeys);
253+
tableDescriptor.customProperties(Collections.emptyMap());
254+
tableDescriptor.properties(Collections.emptyMap());
255+
long t1Id = createTable(t1, tableDescriptor.build());
256+
257+
// write records
258+
List<InternalRow> rows =
259+
Collections.singletonList(
260+
row(
261+
BinaryString.fromString("v0"),
262+
true,
263+
(byte) 1,
264+
(short) 2,
265+
3,
266+
4L,
267+
5.0f,
268+
6.0,
269+
Decimal.fromBigDecimal(new BigDecimal("0.09"), 10, 2),
270+
BinaryString.fromString("v1"),
271+
BinaryString.fromString("v2"),
272+
"v3".getBytes(StandardCharsets.UTF_8),
273+
new byte[] {1, 2, 3, 4, 5},
274+
(int) LocalDate.of(2025, 10, 16).toEpochDay(),
275+
(int)
276+
(LocalTime.of(10, 10, 10, 123000000).toNanoOfDay()
277+
/ 1_000_000),
278+
TimestampNtz.fromLocalDateTime(
279+
LocalDateTime.of(2025, 10, 16, 10, 10, 10, 123000000)),
280+
TimestampLtz.fromInstant(
281+
Instant.parse("2025-10-16T10:10:10.123Z"))));
282+
writeRows(t1, rows, !isPrimaryKeyTable);
283+
284+
TableInfo tableInfo = admin.getTableInfo(t1).get();
285+
List<PartitionInfo> partitionInfos = admin.listPartitionInfos(t1).get();
286+
assertThat(partitionInfos.size()).isEqualTo(1);
287+
PartitionGetter partitionGetter =
288+
new PartitionGetter(tableInfo.getRowType(), partitionKeys);
289+
String partition = partitionGetter.getPartition(rows.get(0));
290+
assertThat(partitionInfos.get(0).getPartitionName()).isEqualTo(partition);
291+
292+
long partitionId = partitionInfos.get(0).getPartitionId();
293+
TableBucket t1Bucket = new TableBucket(t1Id, partitionId, 0);
294+
295+
// then start tiering job
296+
JobClient jobClient = buildTieringJob(execEnv);
297+
298+
try {
299+
// check the status of replica after synced
300+
assertReplicaStatus(t1Bucket, 1);
301+
302+
// check data in paimon
303+
Iterator<org.apache.paimon.data.InternalRow> paimonRowIterator =
304+
getPaimonRowCloseableIterator(t1);
305+
for (InternalRow expectedRow : rows) {
306+
org.apache.paimon.data.InternalRow row = paimonRowIterator.next();
307+
assertThat(row.getString(0).toString())
308+
.isEqualTo(expectedRow.getString(0).toString());
309+
assertThat(row.getBoolean(1)).isEqualTo(expectedRow.getBoolean(1));
310+
assertThat(row.getByte(2)).isEqualTo(expectedRow.getByte(2));
311+
assertThat(row.getShort(3)).isEqualTo(expectedRow.getShort(3));
312+
assertThat(row.getInt(4)).isEqualTo(expectedRow.getInt(4));
313+
assertThat(row.getLong(5)).isEqualTo(expectedRow.getLong(5));
314+
assertThat(row.getFloat(6)).isEqualTo(expectedRow.getFloat(6));
315+
assertThat(row.getDouble(7)).isEqualTo(expectedRow.getDouble(7));
316+
assertThat(row.getDecimal(8, 10, 2).toBigDecimal())
317+
.isEqualTo(expectedRow.getDecimal(8, 10, 2).toBigDecimal());
318+
assertThat(row.getString(9).toString())
319+
.isEqualTo(expectedRow.getString(9).toString());
320+
assertThat(row.getString(10).toString())
321+
.isEqualTo(expectedRow.getString(10).toString());
322+
assertThat(row.getBinary(11)).isEqualTo(expectedRow.getBytes(11));
323+
assertThat(row.getBinary(12)).isEqualTo(expectedRow.getBinary(12, 5));
324+
assertThat(row.getInt(13)).isEqualTo(expectedRow.getInt(13));
325+
assertThat(row.getInt(14)).isEqualTo(expectedRow.getInt(14));
326+
assertThat(row.getTimestamp(15, 6).getMillisecond())
327+
.isEqualTo(expectedRow.getTimestampNtz(15, 6).getMillisecond());
328+
assertThat(row.getTimestamp(16, 6).getMillisecond())
329+
.isEqualTo(expectedRow.getTimestampLtz(16, 6).getEpochMillisecond());
330+
331+
// check snapshot in paimon
332+
Map<String, String> properties =
333+
new HashMap<String, String>() {
334+
{
335+
put(
336+
FLUSS_LAKE_SNAP_BUCKET_OFFSET_PROPERTY,
337+
String.format(
338+
"[{\"partition_id\":%d,\"bucket\":0,\"partition_name\":\"c1=true/c2=1/c3=2/c4=3/c5=4/c6=5_0/c7=6_0/c9=v1/c10=v2/c11=7633/c12=0102030405/c13=2025-10-16/c14=10-10-10_123/c15=2025-10-16-10-10-10_123/c16=2025-10-16-10-10-10_123\",\"offset\":1}]",
339+
partitionId));
340+
}
341+
};
342+
checkSnapshotPropertyInPaimon(t1, properties);
343+
}
344+
} finally {
345+
jobClient.cancel().get();
346+
}
347+
}
348+
188349
@Test
189350
void testTieringForAlterTable() throws Exception {
190351
TablePath t1 = TablePath.of(DEFAULT_DB, "pkTableAlter");

0 commit comments

Comments
 (0)