Skip to content

Commit c415f90

Browse files
robreevessweisdb
authored andcommitted
[SPARK-47383][CORE] Support spark.shutdown.timeout config
### What changes were proposed in this pull request? Make the shutdown hook timeout configurable. If this is not defined it falls back to the existing behavior, which uses a default timeout of 30 seconds, or whatever is defined in core-site.xml for the hadoop.service.shutdown.timeout property. ### Why are the changes needed? Spark sometimes times out during the shutdown process. This can result in data left in the queues to be dropped and causes metadata loss (e.g. event logs, anything written by custom listeners). This is not easily configurable before this change. The underlying `org.apache.hadoop.util.ShutdownHookManager` has a default timeout of 30 seconds. It can be configured by setting hadoop.service.shutdown.timeout, but this must be done in the core-site.xml/core-default.xml because a new hadoop conf object is created and there is no opportunity to modify it. ### Does this PR introduce _any_ user-facing change? Yes, a new config `spark.shutdown.timeout` is added. ### How was this patch tested? Manual testing in spark-shell. This behavior is not practical to write a unit test for. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#45504 from robreeves/sc_shutdown_timeout. Authored-by: Rob Reeves <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent 65a0e0b commit c415f90

File tree

2 files changed

+27
-2
lines changed

2 files changed

+27
-2
lines changed

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2683,4 +2683,14 @@ package object config {
26832683
.version("4.0.0")
26842684
.booleanConf
26852685
.createWithDefault(false)
2686+
2687+
private[spark] val SPARK_SHUTDOWN_TIMEOUT_MS =
2688+
ConfigBuilder("spark.shutdown.timeout")
2689+
.internal()
2690+
.doc("Defines the timeout period to wait for all shutdown hooks to be executed. " +
2691+
"This must be passed as a system property argument in the Java options, for example " +
2692+
"spark.driver.extraJavaOptions=\"-Dspark.shutdown.timeout=60s\".")
2693+
.version("4.0.0")
2694+
.timeConf(TimeUnit.MILLISECONDS)
2695+
.createOptional
26862696
}

core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,16 @@ package org.apache.spark.util
1919

2020
import java.io.File
2121
import java.util.PriorityQueue
22+
import java.util.concurrent.TimeUnit
2223

2324
import scala.util.Try
2425

2526
import org.apache.hadoop.fs.FileSystem
2627

28+
import org.apache.spark.SparkConf
2729
import org.apache.spark.internal.Logging
30+
import org.apache.spark.internal.config.SPARK_SHUTDOWN_TIMEOUT_MS
31+
2832

2933
/**
3034
* Various utility methods used by Spark.
@@ -177,8 +181,19 @@ private [util] class SparkShutdownHookManager {
177181
val hookTask = new Runnable() {
178182
override def run(): Unit = runAll()
179183
}
180-
org.apache.hadoop.util.ShutdownHookManager.get().addShutdownHook(
181-
hookTask, FileSystem.SHUTDOWN_HOOK_PRIORITY + 30)
184+
val priority = FileSystem.SHUTDOWN_HOOK_PRIORITY + 30
185+
// The timeout property must be passed as a Java system property because this
186+
// is initialized before Spark configurations are registered as system
187+
// properties later in initialization.
188+
val timeout = new SparkConf().get(SPARK_SHUTDOWN_TIMEOUT_MS)
189+
190+
timeout.fold {
191+
org.apache.hadoop.util.ShutdownHookManager.get().addShutdownHook(
192+
hookTask, priority)
193+
} { t =>
194+
org.apache.hadoop.util.ShutdownHookManager.get().addShutdownHook(
195+
hookTask, priority, t, TimeUnit.MILLISECONDS)
196+
}
182197
}
183198

184199
def runAll(): Unit = {

0 commit comments

Comments
 (0)