diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 1fcf75b025033..caca73955e1ec 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2666,4 +2666,14 @@ package object config { .version("4.0.0") .booleanConf .createWithDefault(false) + + private[spark] val SPARK_SHUTDOWN_TIMEOUT_MS = + ConfigBuilder("spark.shutdown.timeout") + .internal() + .doc("Defines the timeout period to wait for all shutdown hooks to be executed. " + + "This must be passed as a system property argument in the Java options, for example " + + "spark.driver.extraJavaOptions=\"-Dspark.shutdown.timeout=60s\".") + .version("4.0.0") + .timeConf(TimeUnit.MILLISECONDS) + .createOptional } diff --git a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala index 4db268604a3e9..c6cad94401689 100644 --- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala +++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala @@ -19,12 +19,16 @@ package org.apache.spark.util import java.io.File import java.util.PriorityQueue +import java.util.concurrent.TimeUnit import scala.util.Try import org.apache.hadoop.fs.FileSystem +import org.apache.spark.SparkConf import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.SPARK_SHUTDOWN_TIMEOUT_MS + /** * Various utility methods used by Spark. @@ -177,8 +181,19 @@ private [util] class SparkShutdownHookManager { val hookTask = new Runnable() { override def run(): Unit = runAll() } - org.apache.hadoop.util.ShutdownHookManager.get().addShutdownHook( - hookTask, FileSystem.SHUTDOWN_HOOK_PRIORITY + 30) + val priority = FileSystem.SHUTDOWN_HOOK_PRIORITY + 30 + // The timeout property must be passed as a Java system property because this + // is initialized before Spark configurations are registered as system + // properties later in initialization. + val timeout = new SparkConf().get(SPARK_SHUTDOWN_TIMEOUT_MS) + + timeout.fold { + org.apache.hadoop.util.ShutdownHookManager.get().addShutdownHook( + hookTask, priority) + } { t => + org.apache.hadoop.util.ShutdownHookManager.get().addShutdownHook( + hookTask, priority, t, TimeUnit.MILLISECONDS) + } } def runAll(): Unit = {