From cc3bd5173b3635296ed0a4641312b60621061629 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Thu, 4 Sep 2025 15:52:05 +0200 Subject: [PATCH 1/3] Flink: Dynamic Sink: Use max write parallelism if none is supplied --- .../flink/sink/dynamic/DynamicRecord.java | 12 ++++++++++ .../flink/sink/dynamic/DynamicSinkUtil.java | 10 +++++++++ .../flink/sink/dynamic/HashKeyGenerator.java | 3 ++- .../sink/dynamic/TestHashKeyGenerator.java | 22 +++++++++++++++++++ 4 files changed, 46 insertions(+), 1 deletion(-) diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java index 600a4d8b950c..c9f05ceed09c 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java @@ -39,6 +39,18 @@ public class DynamicRecord { private boolean upsertMode; @Nullable private Set equalityFields; + /** + * Constructs a new DynamicRecord. + * + * @param tableIdentifier The target table identifier. + * @param branch The target table branch. + * @param schema The target table schema. + * @param rowData The data matching the provided schema. + * @param partitionSpec The target table {@link PartitionSpec}. + * @param distributionMode The {@link DistributionMode}. + * @param writeParallelism The number of parallel writers. If set to <= 0, will automatically + * configure the maximum available write parallelism. + */ public DynamicRecord( TableIdentifier tableIdentifier, String branch, diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicSinkUtil.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicSinkUtil.java index 6ea6dcab867a..80552b8acff9 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicSinkUtil.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicSinkUtil.java @@ -62,4 +62,14 @@ static int safeAbs(int input) { return -input; } + + static int firstPositive(int first, int second) { + if (first > 0) { + return first; + } + if (second > 0) { + return second; + } + throw new IllegalArgumentException("None of the supplied ints were positive!"); + } } diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java index 91aa4a91710c..29dd96a26b3e 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/HashKeyGenerator.java @@ -99,7 +99,8 @@ int generateKey( dynamicRecord.distributionMode(), DistributionMode.NONE), MoreObjects.firstNonNull( dynamicRecord.equalityFields(), Collections.emptySet()), - dynamicRecord.writeParallelism())); + DynamicSinkUtil.firstPositive( + dynamicRecord.writeParallelism(), maxWriteParallelism))); try { return keySelector.getKey( overrideRowData != null ? overrideRowData : dynamicRecord.rowData()); diff --git a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java index 8d559e920620..ae96dcfc3a0b 100644 --- a/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java +++ b/flink/v2.1/flink/src/test/java/org/apache/iceberg/flink/sink/dynamic/TestHashKeyGenerator.java @@ -157,6 +157,28 @@ void testEqualityKeys() throws Exception { assertThat(getSubTaskId(writeKey3, writeParallelism, maxWriteParallelism)).isEqualTo(0); } + @Test + void testUseMaxWriteParallelismIfWriteParallelismUnspecified() throws Exception { + final int maxWriteParallelism = 5; + HashKeyGenerator generator = new HashKeyGenerator(16, maxWriteParallelism); + + Set writeKeys = Sets.newHashSet(); + for (int i = 0; i < maxWriteParallelism; i++) { + GenericRowData row = GenericRowData.of(i, StringData.fromString("z")); + writeKeys.add( + getWriteKey( + generator, + PartitionSpec.unpartitioned(), + DistributionMode.NONE, + // Use a writeParallelism <= 0 + -i, + Collections.emptySet(), + row)); + } + + assertThat(writeKeys).hasSize(maxWriteParallelism); + } + @Test void testCapAtMaxWriteParallelism() throws Exception { int writeParallelism = 10; From 1be0e315c0219d8ddaac3d36433e6796398f0da2 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Fri, 26 Sep 2025 17:26:16 +0200 Subject: [PATCH 2/3] fixup! javadoc --- .../org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java index c9f05ceed09c..312a1e3ea4d9 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java @@ -48,7 +48,7 @@ public class DynamicRecord { * @param rowData The data matching the provided schema. * @param partitionSpec The target table {@link PartitionSpec}. * @param distributionMode The {@link DistributionMode}. - * @param writeParallelism The number of parallel writers. If set to <= 0, will automatically + * @param writeParallelism The number of parallel writers. If set to {@literal <= 0}, will automatically * configure the maximum available write parallelism. */ public DynamicRecord( From 539afd866d5e0a910c5356980e4e66267893ab94 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Fri, 26 Sep 2025 17:30:15 +0200 Subject: [PATCH 3/3] fixup! spotless --- .../org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java index 312a1e3ea4d9..cf3efb0d9da5 100644 --- a/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java +++ b/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/sink/dynamic/DynamicRecord.java @@ -48,8 +48,8 @@ public class DynamicRecord { * @param rowData The data matching the provided schema. * @param partitionSpec The target table {@link PartitionSpec}. * @param distributionMode The {@link DistributionMode}. - * @param writeParallelism The number of parallel writers. If set to {@literal <= 0}, will automatically - * configure the maximum available write parallelism. + * @param writeParallelism The number of parallel writers. If set to {@literal <= 0}, will + * automatically configure the maximum available write parallelism. */ public DynamicRecord( TableIdentifier tableIdentifier,