From 034eea4de027a606dc19a7f6820936603a77e547 Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Tue, 12 Mar 2024 16:19:47 -0700 Subject: [PATCH 1/5] Added timeout. Still need to test --- .../spark/internal/config/package.scala | 7 +++++++ .../spark/util/ShutdownHookManager.scala | 19 ++++++++++++++----- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index 1fcf75b025033..b5fdd11686d96 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2666,4 +2666,11 @@ package object config { .version("4.0.0") .booleanConf .createWithDefault(false) + + private[spark] val SPARK_SHUTDOWN_TIMEOUT_MS = + ConfigBuilder("spark.shutdown.timeout") + .doc("TODO") + .version("4.0.0") + .timeConf(TimeUnit.MILLISECONDS) + .createOptional } diff --git a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala index 4db268604a3e9..d8b87c5d291af 100644 --- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala +++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala @@ -19,12 +19,13 @@ package org.apache.spark.util import java.io.File import java.util.PriorityQueue - import scala.util.Try - import org.apache.hadoop.fs.FileSystem - +import org.apache.spark.SparkConf import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.SPARK_SHUTDOWN_TIMEOUT_MS + +import java.util.concurrent.TimeUnit /** * Various utility methods used by Spark. @@ -177,8 +178,16 @@ private [util] class SparkShutdownHookManager { val hookTask = new Runnable() { override def run(): Unit = runAll() } - org.apache.hadoop.util.ShutdownHookManager.get().addShutdownHook( - hookTask, FileSystem.SHUTDOWN_HOOK_PRIORITY + 30) + val priority = FileSystem.SHUTDOWN_HOOK_PRIORITY + 30 + val timeout = new SparkConf().get(SPARK_SHUTDOWN_TIMEOUT_MS) + + timeout.fold { + org.apache.hadoop.util.ShutdownHookManager.get().addShutdownHook( + hookTask, priority) + } { t => + org.apache.hadoop.util.ShutdownHookManager.get().addShutdownHook( + hookTask, priority, t, TimeUnit.MILLISECONDS) + } } def runAll(): Unit = { From 1131d8f5919c8b1c04978b806f8963e64c07e5b8 Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Wed, 13 Mar 2024 14:41:33 -0700 Subject: [PATCH 2/5] Updated config doc --- .../main/scala/org/apache/spark/internal/config/package.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index b5fdd11686d96..f7dc3b697873b 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2669,7 +2669,9 @@ package object config { private[spark] val SPARK_SHUTDOWN_TIMEOUT_MS = ConfigBuilder("spark.shutdown.timeout") - .doc("TODO") + .doc("Defines the timeout period to wait for all shutdown hooks to be executed. " + + "This must be passed as a system property argument in the Java options, for example " + + "spark.driver.extraJavaOptions=\"-Dspark.shutdown.timeout=60s\".") .version("4.0.0") .timeConf(TimeUnit.MILLISECONDS) .createOptional From cfae485c16d6508c98af047715001d7c27ccd659 Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Wed, 13 Mar 2024 14:44:17 -0700 Subject: [PATCH 3/5] code comment --- .../main/scala/org/apache/spark/util/ShutdownHookManager.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala index d8b87c5d291af..04fed5c25a319 100644 --- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala +++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala @@ -179,6 +179,9 @@ private [util] class SparkShutdownHookManager { override def run(): Unit = runAll() } val priority = FileSystem.SHUTDOWN_HOOK_PRIORITY + 30 + // The timeout property must be passed as a Java system property because this + // is initialized before Spark configurations are registered as system + // properties later in initialization. val timeout = new SparkConf().get(SPARK_SHUTDOWN_TIMEOUT_MS) timeout.fold { From a7e94d456231faa07793fc8ea12c13505a3b86b9 Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Wed, 13 Mar 2024 15:28:01 -0700 Subject: [PATCH 4/5] fix imports --- .../scala/org/apache/spark/util/ShutdownHookManager.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala index 04fed5c25a319..c6cad94401689 100644 --- a/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala +++ b/core/src/main/scala/org/apache/spark/util/ShutdownHookManager.scala @@ -19,13 +19,16 @@ package org.apache.spark.util import java.io.File import java.util.PriorityQueue +import java.util.concurrent.TimeUnit + import scala.util.Try + import org.apache.hadoop.fs.FileSystem + import org.apache.spark.SparkConf import org.apache.spark.internal.Logging import org.apache.spark.internal.config.SPARK_SHUTDOWN_TIMEOUT_MS -import java.util.concurrent.TimeUnit /** * Various utility methods used by Spark. From a61a139db021632a07b7971a408ce3edc7e1a957 Mon Sep 17 00:00:00 2001 From: Rob Reeves Date: Mon, 18 Mar 2024 09:15:22 -0700 Subject: [PATCH 5/5] added internal to config --- .../main/scala/org/apache/spark/internal/config/package.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index f7dc3b697873b..caca73955e1ec 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -2669,6 +2669,7 @@ package object config { private[spark] val SPARK_SHUTDOWN_TIMEOUT_MS = ConfigBuilder("spark.shutdown.timeout") + .internal() .doc("Defines the timeout period to wait for all shutdown hooks to be executed. " + "This must be passed as a system property argument in the Java options, for example " + "spark.driver.extraJavaOptions=\"-Dspark.shutdown.timeout=60s\".")