Skip to content

Commit 47844b4

Browse files
committed
use V2 iceberg sink
1 parent 0823fef commit 47844b4

File tree

7 files changed

+6
-32
lines changed

7 files changed

+6
-32
lines changed

sqrl-planner/src/main/java/com/datasqrl/config/CompilerConfigImpl.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,6 @@ public boolean compileFlinkPlan() {
4040
return sqrlConfig.asBool("compile-flink-plan").get();
4141
}
4242

43-
@Override
44-
public boolean addIcebergSerializationConfig() {
45-
return sqrlConfig.asBool("add-iceberg-serialization-config").get();
46-
}
47-
4843
@Override
4944
public CostModel getCostModel() {
5045
var costModel = sqrlConfig.as("cost-model", Type.class).get();

sqrl-planner/src/main/java/com/datasqrl/config/PackageJson.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,6 @@ interface CompilerConfig {
5454

5555
boolean compileFlinkPlan();
5656

57-
boolean addIcebergSerializationConfig();
58-
5957
CostModel getCostModel();
6058

6159
ExplainConfig getExplain();

sqrl-planner/src/main/java/com/datasqrl/planner/FlinkPhysicalPlan.java

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.apache.calcite.sql.SqlNode;
3535
import org.apache.calcite.sql.parser.SqlParserPos;
3636
import org.apache.flink.configuration.Configuration;
37-
import org.apache.flink.configuration.PipelineOptions;
3837
import org.apache.flink.sql.parser.ddl.SqlCreateFunction;
3938
import org.apache.flink.sql.parser.ddl.SqlCreateTable;
4039
import org.apache.flink.sql.parser.ddl.SqlTableOption;
@@ -56,12 +55,6 @@
5655
@Builder
5756
public class FlinkPhysicalPlan implements EnginePhysicalPlan {
5857

59-
private static final List<String> ICEBERG_SERIALIZATION_CONFIG =
60-
List.of(
61-
"org.apache.iceberg.SerializableByteBufferMap: {type: kryo, kryo-type: registered, class: com.esotericsoftware.kryo.serializers.JavaSerializer}",
62-
"org.apache.iceberg.GenericDataFile: {type: kryo, kryo-type: registered, class: com.esotericsoftware.kryo.serializers.JavaSerializer}",
63-
"org.apache.iceberg.io.WriteResult: {type: kryo, kryo-type: registered, class: com.esotericsoftware.kryo.serializers.JavaSerializer}");
64-
6558
List<String> flinkSql;
6659
Set<String> connectors;
6760
Set<String> formats;
@@ -97,11 +90,9 @@ public static class Builder {
9790
private final Set<String> fullyResolvedFunctions = new HashSet<>();
9891
private final List<List<RichSqlInsert>> statementSets = new ArrayList<>();
9992

100-
private final boolean addIcebergSerializationConfig;
10193
private Configuration config;
10294

103-
public Builder(Configuration config, boolean addIcebergSerializationConfig) {
104-
this.addIcebergSerializationConfig = addIcebergSerializationConfig;
95+
public Builder(Configuration config) {
10596
this.config = config.clone();
10697
nextBatch();
10798
}
@@ -180,15 +171,9 @@ public FlinkPhysicalPlan build(Optional<CompiledPlan> compiledPlan) {
180171
plan.explain(
181172
ExplainFormat.TEXT, ExplainDetail.CHANGELOG_MODE, ExplainDetail.PLAN_ADVICE));
182173

183-
if (connectors.contains(IcebergEngineFactory.ENGINE_NAME) && addIcebergSerializationConfig) {
184-
// We need to enforce the Kryo JavaSerializer for some built-in Iceberg classes
185-
var updatedSerConf =
186-
ImmutableList.<String>builder()
187-
.addAll(config.get(PipelineOptions.SERIALIZATION_CONFIG, new ArrayList<>()))
188-
.addAll(ICEBERG_SERIALIZATION_CONFIG)
189-
.build();
190-
191-
config.set(PipelineOptions.SERIALIZATION_CONFIG, updatedSerConf);
174+
if (connectors.contains(IcebergEngineFactory.ENGINE_NAME)) {
175+
// Make sure we use the V2 sink
176+
config.setString("table.exec.iceberg.use-v2-sink", "true");
192177
}
193178

194179
return new FlinkPhysicalPlan(

sqrl-planner/src/main/java/com/datasqrl/planner/Sqrl2FlinkSQLTranslator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ public Sqrl2FlinkSQLTranslator(
202202
jarUrls.stream().map(URL::toString).collect(Collectors.toList()));
203203
}
204204

205-
this.planBuilder = new Builder(config.clone(), compilerConfig.addIcebergSerializationConfig());
205+
this.planBuilder = new Builder(config.clone());
206206

207207
if (executionMode == RuntimeExecutionMode.STREAMING) {
208208
planBuilder.addInferredConfig(flink.getStreamingSpecificConfig());

sqrl-planner/src/main/resources/default-package.json

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
"logger": "print",
66
"extended-scalar-types": true,
77
"compile-flink-plan": true,
8-
"add-iceberg-serialization-config": true,
98
"cost-model": "DEFAULT",
109
"explain": {
1110
"text": true,

sqrl-planner/src/main/resources/jsonSchema/packageSchema.json

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,6 @@
3939
"compile-flink-plan": {
4040
"type": "boolean"
4141
},
42-
"add-iceberg-serialization-config": {
43-
"type": "boolean"
44-
},
4542
"cost-model": {
4643
"type": "string",
4744
"enum": ["DEFAULT", "READ", "WRITE"],

sqrl-testing/sqrl-testing-integration/src/test/java/com/datasqrl/FullUseCaseIT.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public class FullUseCaseIT extends AbstractFullUseCaseTest {
6161
@Disabled("Intended for manual usage")
6262
@Test
6363
void specificUseCase() {
64-
var pkg = USE_CASES.resolve("duckdb").resolve("package.json");
64+
var pkg = USE_CASES.resolve("jwt-authorized").resolve("package.json");
6565

6666
var param = new UseCaseParam(pkg);
6767
fullUseCaseTest(param);

0 commit comments

Comments
 (0)