Skip to content

Commit 7a841a3

Browse files
xx789polyzos
authored andcommitted
[lake] Introduce table.datalake.auto-compaction options (#1612)
1 parent 2615ba6 commit 7a841a3

File tree

14 files changed

+161
-80
lines changed

14 files changed

+161
-80
lines changed

fluss-common/src/main/java/org/apache/fluss/config/ConfigOptions.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1290,6 +1290,13 @@ public class ConfigOptions {
12901290
+ "Based on this target freshness, the Fluss service automatically moves data from the Fluss table and updates to the datalake table, so that the data in the datalake table is kept up to date within this target. "
12911291
+ "If the data does not need to be as fresh, you can specify a longer target freshness time to reduce costs.");
12921292

1293+
public static final ConfigOption<Boolean> TABLE_DATALAKE_AUTO_COMPACTION =
1294+
key("table.datalake.auto-compaction")
1295+
.booleanType()
1296+
.defaultValue(false)
1297+
.withDescription(
1298+
"If true, compaction will be triggered automatically when tiering service writes to the datalake. It is disabled by default.");
1299+
12931300
public static final ConfigOption<MergeEngineType> TABLE_MERGE_ENGINE =
12941301
key("table.merge-engine")
12951302
.enumType(MergeEngineType.class)

fluss-common/src/main/java/org/apache/fluss/config/TableConfig.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,11 @@ public Duration getDataLakeFreshness() {
9494
return config.get(ConfigOptions.TABLE_DATALAKE_FRESHNESS);
9595
}
9696

97+
/** Whether auto compaction is enabled. */
98+
public boolean isDataLakeAutoCompaction() {
99+
return config.get(ConfigOptions.TABLE_DATALAKE_AUTO_COMPACTION);
100+
}
101+
97102
/** Gets the optional merge engine type of the table. */
98103
public Optional<MergeEngineType> getMergeEngineType() {
99104
return config.getOptional(ConfigOptions.TABLE_MERGE_ENGINE);

fluss-common/src/main/java/org/apache/fluss/lake/writer/WriterInitContext.java

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,16 @@
1818
package org.apache.fluss.lake.writer;
1919

2020
import org.apache.fluss.annotation.PublicEvolving;
21-
import org.apache.fluss.metadata.Schema;
2221
import org.apache.fluss.metadata.TableBucket;
22+
import org.apache.fluss.metadata.TableInfo;
2323
import org.apache.fluss.metadata.TablePath;
2424

2525
import javax.annotation.Nullable;
2626

27-
import java.util.Map;
28-
2927
/**
3028
* The WriterInitContext interface provides the context needed to create a LakeWriter. It includes
31-
* methods to obtain the table path, table bucket, and an optional partition.
29+
* methods to obtain the table path, table bucket, an optional partition, and table metadata
30+
* information.
3231
*
3332
* @since 0.7
3433
*/
@@ -58,16 +57,9 @@ public interface WriterInitContext {
5857
String partition();
5958

6059
/**
61-
* Returns the table schema.
62-
*
63-
* @return the table schema
64-
*/
65-
Schema schema();
66-
67-
/**
68-
* Returns the table custom properties.
60+
* Returns the Fluss table info.
6961
*
70-
* @return the table custom properties
62+
* @return the Fluss table info
7163
*/
72-
Map<String, String> customProperties();
64+
TableInfo tableInfo();
7365
}

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringSplitReader.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -309,8 +309,7 @@ private LakeWriter<WriteResult> getOrCreateLakeWriter(
309309
currentTablePath,
310310
bucket,
311311
partitionName,
312-
currentTable.getTableInfo().getSchema(),
313-
currentTable.getTableInfo().getCustomProperties().toMap()));
312+
currentTable.getTableInfo()));
314313
lakeWriters.put(bucket, lakeWriter);
315314
}
316315
return lakeWriter;

fluss-flink/fluss-flink-common/src/main/java/org/apache/fluss/flink/tiering/source/TieringWriterInitContext.java

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,34 +18,29 @@
1818
package org.apache.fluss.flink.tiering.source;
1919

2020
import org.apache.fluss.lake.writer.WriterInitContext;
21-
import org.apache.fluss.metadata.Schema;
2221
import org.apache.fluss.metadata.TableBucket;
22+
import org.apache.fluss.metadata.TableInfo;
2323
import org.apache.fluss.metadata.TablePath;
2424

2525
import javax.annotation.Nullable;
2626

27-
import java.util.Map;
28-
2927
/** The implementation of {@link WriterInitContext}. */
3028
public class TieringWriterInitContext implements WriterInitContext {
3129

3230
private final TablePath tablePath;
3331
private final TableBucket tableBucket;
34-
private final Schema schema;
3532
@Nullable private final String partition;
36-
private final Map<String, String> customProperties;
33+
private final TableInfo tableInfo;
3734

3835
public TieringWriterInitContext(
3936
TablePath tablePath,
4037
TableBucket tableBucket,
4138
@Nullable String partition,
42-
Schema schema,
43-
Map<String, String> customProperties) {
39+
TableInfo tableInfo) {
4440
this.tablePath = tablePath;
4541
this.tableBucket = tableBucket;
4642
this.partition = partition;
47-
this.schema = schema;
48-
this.customProperties = customProperties;
43+
this.tableInfo = tableInfo;
4944
}
5045

5146
@Override
@@ -65,12 +60,7 @@ public String partition() {
6560
}
6661

6762
@Override
68-
public Schema schema() {
69-
return schema;
70-
}
71-
72-
@Override
73-
public Map<String, String> customProperties() {
74-
return customProperties;
63+
public TableInfo tableInfo() {
64+
return tableInfo;
7565
}
7666
}

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/AppendOnlyTaskWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public AppendOnlyTaskWriter(
3535
super(
3636
taskWriter,
3737
icebergTable.schema(),
38-
writerInitContext.schema().getRowType(),
38+
writerInitContext.tableInfo().getRowType(),
3939
writerInitContext.tableBucket());
4040
}
4141

fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/writer/DeltaTaskWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ public DeltaTaskWriter(
3535
super(
3636
taskWriter,
3737
icebergTable.schema(),
38-
writerInitContext.schema().getRowType(),
38+
writerInitContext.tableInfo().getRowType(),
3939
writerInitContext.tableBucket());
4040
}
4141

fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergTieringTest.java

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717

1818
package org.apache.fluss.lake.iceberg.tiering;
1919

20+
import org.apache.fluss.config.ConfigOptions;
2021
import org.apache.fluss.config.Configuration;
2122
import org.apache.fluss.lake.committer.LakeCommitter;
2223
import org.apache.fluss.lake.serializer.SimpleVersionedSerializer;
2324
import org.apache.fluss.lake.writer.LakeWriter;
2425
import org.apache.fluss.lake.writer.WriterInitContext;
2526
import org.apache.fluss.metadata.TableBucket;
27+
import org.apache.fluss.metadata.TableDescriptor;
28+
import org.apache.fluss.metadata.TableInfo;
2629
import org.apache.fluss.metadata.TablePath;
2730
import org.apache.fluss.record.ChangeType;
2831
import org.apache.fluss.record.GenericRecord;
@@ -119,6 +122,19 @@ void testTieringWriteTable(boolean isPrimaryKeyTable, boolean isPartitionedTable
119122
isPartitionedTable ? "partitioned" : "unpartitioned"));
120123
createTable(tablePath, isPrimaryKeyTable, isPartitionedTable);
121124

125+
TableDescriptor descriptor =
126+
TableDescriptor.builder()
127+
.schema(
128+
org.apache.fluss.metadata.Schema.newBuilder()
129+
.column("c1", DataTypes.INT())
130+
.column("c2", DataTypes.STRING())
131+
.column("c3", DataTypes.STRING())
132+
.build())
133+
.distributedBy(BUCKET_NUM)
134+
.property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
135+
.build();
136+
TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L, 1L);
137+
122138
Table icebergTable = icebergCatalog.loadTable(toIceberg(tablePath));
123139

124140
Map<Tuple2<String, Integer>, List<LogRecord>> recordsByBucket = new HashMap<>();
@@ -144,7 +160,7 @@ void testTieringWriteTable(boolean isPrimaryKeyTable, boolean isPartitionedTable
144160
for (Map.Entry<Long, String> entry : partitionIdAndName.entrySet()) {
145161
String partition = entry.getValue();
146162
try (LakeWriter<IcebergWriteResult> writer =
147-
createLakeWriter(tablePath, bucket, partition, entry.getKey())) {
163+
createLakeWriter(tablePath, bucket, partition, entry.getKey(), tableInfo)) {
148164
Tuple2<String, Integer> partitionBucket = Tuple2.of(partition, bucket);
149165
Tuple2<List<LogRecord>, List<LogRecord>> writeAndExpectRecords =
150166
isPrimaryKeyTable
@@ -198,7 +214,11 @@ void testTieringWriteTable(boolean isPrimaryKeyTable, boolean isPartitionedTable
198214
}
199215

200216
private LakeWriter<IcebergWriteResult> createLakeWriter(
201-
TablePath tablePath, int bucket, @Nullable String partition, @Nullable Long partitionId)
217+
TablePath tablePath,
218+
int bucket,
219+
@Nullable String partition,
220+
@Nullable Long partitionId,
221+
TableInfo tableInfo)
202222
throws IOException {
203223
return icebergLakeTieringFactory.createLakeWriter(
204224
new WriterInitContext() {
@@ -219,17 +239,8 @@ public String partition() {
219239
}
220240

221241
@Override
222-
public Map<String, String> customProperties() {
223-
return Collections.emptyMap();
224-
}
225-
226-
@Override
227-
public org.apache.fluss.metadata.Schema schema() {
228-
return org.apache.fluss.metadata.Schema.newBuilder()
229-
.column("c1", DataTypes.INT())
230-
.column("c2", DataTypes.STRING())
231-
.column("c3", DataTypes.STRING())
232-
.build();
242+
public TableInfo tableInfo() {
243+
return tableInfo;
233244
}
234245
});
235246
}

fluss-lake/fluss-lake-lance/src/main/java/org/apache/fluss/lake/lance/tiering/LanceLakeWriter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ public LanceLakeWriter(Configuration options, WriterInitContext writerInitContex
4545
LanceConfig config =
4646
LanceConfig.from(
4747
options.toMap(),
48-
writerInitContext.customProperties(),
48+
writerInitContext.tableInfo().getCustomProperties().toMap(),
4949
writerInitContext.tablePath().getDatabaseName(),
5050
writerInitContext.tablePath().getTableName());
5151
int batchSize = LanceConfig.getBatchSize(config);
@@ -56,7 +56,7 @@ public LanceLakeWriter(Configuration options, WriterInitContext writerInitContex
5656

5757
this.arrowWriter =
5858
LanceDatasetAdapter.getArrowWriter(
59-
schema.get(), batchSize, writerInitContext.schema().getRowType());
59+
schema.get(), batchSize, writerInitContext.tableInfo().getRowType());
6060

6161
WriteParams params = LanceConfig.genWriteParamsFromConfig(config);
6262
Callable<List<FragmentMetadata>> fragmentCreator =

fluss-lake/fluss-lake-lance/src/test/java/com/alibaba/fluss/lake/lance/tiering/LanceTieringTest.java

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.fluss.lake.lance.tiering;
1919

20+
import org.apache.fluss.config.ConfigOptions;
2021
import org.apache.fluss.config.Configuration;
2122
import org.apache.fluss.lake.committer.CommittedLakeSnapshot;
2223
import org.apache.fluss.lake.committer.LakeCommitter;
@@ -28,6 +29,8 @@
2829
import org.apache.fluss.lake.writer.WriterInitContext;
2930
import org.apache.fluss.metadata.Schema;
3031
import org.apache.fluss.metadata.TableBucket;
32+
import org.apache.fluss.metadata.TableDescriptor;
33+
import org.apache.fluss.metadata.TableInfo;
3134
import org.apache.fluss.metadata.TablePath;
3235
import org.apache.fluss.record.ChangeType;
3336
import org.apache.fluss.record.GenericRecord;
@@ -96,6 +99,15 @@ void testTieringWriteTable(boolean isPartitioned) throws Exception {
9699
tablePath.getTableName());
97100
Schema schema = createTable(config);
98101

102+
TableDescriptor descriptor =
103+
TableDescriptor.builder()
104+
.schema(schema)
105+
.distributedBy(bucketNum)
106+
.property(ConfigOptions.TABLE_DATALAKE_ENABLED, true)
107+
.customProperties(customProperties)
108+
.build();
109+
TableInfo tableInfo = TableInfo.of(tablePath, 0, 1, descriptor, 1L, 1L);
110+
99111
List<LanceWriteResult> lanceWriteResults = new ArrayList<>();
100112
SimpleVersionedSerializer<LanceWriteResult> writeResultSerializer =
101113
lanceLakeTieringFactory.getWriteResultSerializer();
@@ -126,7 +138,7 @@ void testTieringWriteTable(boolean isPartitioned) throws Exception {
126138
for (Map.Entry<Long, String> entry : partitionIdAndName.entrySet()) {
127139
String partition = entry.getValue();
128140
try (LakeWriter<LanceWriteResult> lakeWriter =
129-
createLakeWriter(tablePath, bucket, partition, schema, customProperties)) {
141+
createLakeWriter(tablePath, bucket, partition, tableInfo)) {
130142
Tuple2<String, Integer> partitionBucket = Tuple2.of(partition, bucket);
131143
Tuple2<List<LogRecord>, List<LogRecord>> writeAndExpectRecords =
132144
genLogTableRecords(partition, bucket, 10);
@@ -239,11 +251,7 @@ private LakeCommitter<LanceWriteResult, LanceCommittable> createLakeCommitter(
239251
}
240252

241253
private LakeWriter<LanceWriteResult> createLakeWriter(
242-
TablePath tablePath,
243-
int bucket,
244-
@Nullable String partition,
245-
Schema schema,
246-
Map<String, String> customProperties)
254+
TablePath tablePath, int bucket, @Nullable String partition, TableInfo tableInfo)
247255
throws IOException {
248256
return lanceLakeTieringFactory.createLakeWriter(
249257
new WriterInitContext() {
@@ -265,13 +273,8 @@ public String partition() {
265273
}
266274

267275
@Override
268-
public org.apache.fluss.metadata.Schema schema() {
269-
return schema;
270-
}
271-
272-
@Override
273-
public Map<String, String> customProperties() {
274-
return customProperties;
276+
public TableInfo tableInfo() {
277+
return tableInfo;
275278
}
276279
});
277280
}

0 commit comments

Comments
 (0)