Skip to content

Commit 8b7f6d7

Browse files
authored
Spark 4.1: New Async Spark Micro Batch Planner (apache#15299)
1 parent bb8b743 commit 8b7f6d7

File tree

10 files changed

+743
-13
lines changed

10 files changed

+743
-13
lines changed

docs/docs/spark-configuration.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,7 @@ val spark = SparkSession.builder()
196196
| spark.sql.iceberg.executor-cache.locality.enabled | false | Enables locality-aware executor cache usage |
197197
| spark.sql.iceberg.merge-schema | false | Enables modifying the table schema to match the write schema. Only adds columns missing columns |
198198
| spark.sql.iceberg.report-column-stats | true | Report Puffin Table Statistics if available to Spark's Cost Based Optimizer. CBO must be enabled for this to be effective |
199+
| spark.sql.iceberg.async-micro-batch-planning-enabled | false | Enables asynchronous microbatch planning to reduce planning latency by pre-fetching file scan tasks |
199200

200201
### Read options
201202

@@ -220,6 +221,10 @@ spark.read
220221
| stream-from-timestamp | (none) | A timestamp in milliseconds to stream from; if before the oldest known ancestor snapshot, the oldest will be used |
221222
| streaming-max-files-per-micro-batch | INT_MAX | Maximum number of files per microbatch |
222223
| streaming-max-rows-per-micro-batch | INT_MAX | "Soft maximum" number of rows per microbatch; always includes all rows in next unprocessed file, excludes additional files if their inclusion would exceed the soft max limit |
224+
| async-micro-batch-planning-enabled | false | Enables asynchronous microbatch planning to reduce planning latency by pre-fetching file scan tasks |
225+
| streaming-snapshot-polling-interval-ms | 30000 | Overrides the polling time for async planner to refresh and detect new snapshots. Only affects when async-micro-batch-planning-enabled is set |
226+
| async-queue-preload-file-limit | 100 | Overrides the number of files loaded to background queue initially. Tune to prevent queue starvation. Only affects when async-micro-batch-planning-enabled is set |
227+
| async-queue-preload-row-limit | 100000 | Overrides the number of rows loaded to background queue initially. Tune to prevent queue starvation. Only affects when async-micro-batch-planning-enabled is set |
223228

224229
### Write options
225230

docs/docs/spark-structured-streaming.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,13 @@ val df = spark.readStream
6363
!!! info
6464
Note: In addition to limiting micro-batch sizes on queries that use the default trigger (i.e. `Trigger.ProcessingTime`), rate limiting options can be applied to queries that use `Trigger.AvailableNow` to split one-time processing of all available source data into multiple micro-batches for better query scalability. Rate limiting options will be ignored when using the deprecated `Trigger.Once` trigger.
6565

66+
### Asynchronous Micro-Batch Planning
67+
68+
Users can enable asynchronous micro-batch planning by setting `async-micro-batch-planning-enabled` to true. With this option enabled, Iceberg will start processing the current micro-batch while planning the next micro-batches in parallel.
69+
This can help improve query throughput by reducing idle time between micro-batches. Users should weigh the tradeoffs, which include higher memory usage and increased snapshot detection latency.
70+
71+
Users can also set additional options to control the behavior of asynchronous micro-batch planning, found in the [spark configuration](spark-configuration.md#read-options).
72+
6673
## Streaming Writes
6774

6875
To write values from streaming query to Iceberg table, use `DataStreamWriter`:

spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,39 @@ public int maxRecordsPerMicroBatch() {
225225
.parse();
226226
}
227227

228+
public boolean asyncMicroBatchPlanningEnabled() {
229+
return confParser
230+
.booleanConf()
231+
.option(SparkReadOptions.ASYNC_MICRO_BATCH_PLANNING_ENABLED)
232+
.sessionConf(SparkSQLProperties.ASYNC_MICRO_BATCH_PLANNING_ENABLED)
233+
.defaultValue(SparkSQLProperties.ASYNC_MICRO_BATCH_PLANNING_ENABLED_DEFAULT)
234+
.parse();
235+
}
236+
237+
public long streamingSnapshotPollingIntervalMs() {
238+
return confParser
239+
.longConf()
240+
.option(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS)
241+
.defaultValue(SparkReadOptions.STREAMING_SNAPSHOT_POLLING_INTERVAL_MS_DEFAULT)
242+
.parse();
243+
}
244+
245+
public long asyncQueuePreloadFileLimit() {
246+
return confParser
247+
.longConf()
248+
.option(SparkReadOptions.ASYNC_QUEUE_PRELOAD_FILE_LIMIT)
249+
.defaultValue(SparkReadOptions.ASYNC_QUEUE_PRELOAD_FILE_LIMIT_DEFAULT)
250+
.parse();
251+
}
252+
253+
public long asyncQueuePreloadRowLimit() {
254+
return confParser
255+
.longConf()
256+
.option(SparkReadOptions.ASYNC_QUEUE_PRELOAD_ROW_LIMIT)
257+
.defaultValue(SparkReadOptions.ASYNC_QUEUE_PRELOAD_ROW_LIMIT_DEFAULT)
258+
.parse();
259+
}
260+
228261
public boolean preserveDataGrouping() {
229262
return confParser
230263
.booleanConf()

spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkReadOptions.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,19 @@ private SparkReadOptions() {}
8484
public static final String STREAMING_MAX_ROWS_PER_MICRO_BATCH =
8585
"streaming-max-rows-per-micro-batch";
8686

87+
// Enable async micro batch planning
88+
public static final String ASYNC_MICRO_BATCH_PLANNING_ENABLED =
89+
"async-micro-batch-planning-enabled";
90+
// Polling interval for async planner to refresh table metadata (ms)
91+
public static final String STREAMING_SNAPSHOT_POLLING_INTERVAL_MS =
92+
"streaming-snapshot-polling-interval-ms";
93+
public static final long STREAMING_SNAPSHOT_POLLING_INTERVAL_MS_DEFAULT = 30000L;
94+
// Initial queue preload limits for async micro batch planner
95+
public static final String ASYNC_QUEUE_PRELOAD_FILE_LIMIT = "async-queue-preload-file-limit";
96+
public static final long ASYNC_QUEUE_PRELOAD_FILE_LIMIT_DEFAULT = 100L;
97+
public static final String ASYNC_QUEUE_PRELOAD_ROW_LIMIT = "async-queue-preload-row-limit";
98+
public static final long ASYNC_QUEUE_PRELOAD_ROW_LIMIT_DEFAULT = 100000L;
99+
87100
// Table path
88101
public static final String PATH = "path";
89102

spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,4 +112,9 @@ private SparkSQLProperties() {}
112112

113113
// Prefix for custom snapshot properties
114114
public static final String SNAPSHOT_PROPERTY_PREFIX = "spark.sql.iceberg.snapshot-property.";
115+
116+
// Controls whether to enable async micro batch planning for session
117+
public static final String ASYNC_MICRO_BATCH_PLANNING_ENABLED =
118+
"spark.sql.iceberg.async-micro-batch-planning-enabled";
119+
public static final boolean ASYNC_MICRO_BATCH_PLANNING_ENABLED_DEFAULT = false;
115120
}

0 commit comments

Comments
 (0)