Skip to content

Commit 8da592d

Browse files
Kimahrimandongjoon-hyun
authored andcommitted
[SPARK-44639][SS][YARN] Use Java tmp dir for local RocksDB state storage on Yarn
### What changes were proposed in this pull request? Update the RocksDB state store to store its local data underneath `java.io.tmpdir` instead of going through `Utils.getLocalDir` when running on Yarn. This is done through a new util method `createExecutorLocalTempDir`, as there may be other uses cases for this behavior as well. ### Why are the changes needed? On YARN, the local RocksDB directory is placed in a directory created inside the root application folder such as ``` /tmp/nm-local-dir/usercache/<user>/appcache/<app_id>/spark-<uuid>/StateStoreId(...) ``` The problem with this is that if an executor crashes for some reason (like OOM) and the shutdown hooks don't get run, this directory will stay around forever until the application finishes, which can cause jobs to slowly accumulate more and more temporary space until finally the node manager goes unhealthy. Because this data will only ever be accessed by the executor that created this directory, it would make sense to store the data inside the container folder, which will always get cleaned up by the node manager when that yarn container gets cleaned up. Yarn sets the `java.io.tmpdir` to a path inside this directory, such as ``` /tmp/nm-local-dir/usercache/<user>/appcache/<app_id>/<container_id>/tmp/StateStoreId(...) ``` It looks like only Yarn setts the tmpdir property, and other resource managers (standalone and k8s) always rely on the local dirs setting/env vars. ### Does this PR introduce _any_ user-facing change? Shouldn't be any effective changes, other than preventing disk space from filling up on Node Managers under certain scenarios. ### How was this patch tested? New UT Closes #42301 from Kimahriman/rocksdb-tmp-dir. Authored-by: Adam Binford <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 9e9358a) Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent ee7a17c commit 8da592d

File tree

4 files changed

+46
-4
lines changed

4 files changed

+46
-4
lines changed

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,22 @@ private[spark] object Utils
250250
dir
251251
}
252252

253+
/**
254+
* Create a temporary directy that will always be cleaned up when the executor stops,
255+
* even in the case of a hard shutdown when the shutdown hooks don't get run.
256+
*
257+
* Currently this only provides special behavior on YARN, where the local dirs are not
258+
* guaranteed to be cleaned up on executors hard shutdown.
259+
*/
260+
def createExecutorLocalTempDir(conf: SparkConf, namePrefix: String): File = {
261+
if (Utils.isRunningInYarnContainer(conf)) {
262+
// Just use the default Java tmp dir which is set to inside the container directory on YARN
263+
createTempDir(namePrefix = namePrefix)
264+
} else {
265+
createTempDir(getLocalDir(conf), namePrefix)
266+
}
267+
}
268+
253269
/**
254270
* Copy the first `maxSize` bytes of data from the InputStream to an in-memory
255271
* buffer, primarily to check for corruption.

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ case object StoreTaskCompletionListener extends RocksDBOpType("store_task_comple
6969
class RocksDB(
7070
dfsRootDir: String,
7171
val conf: RocksDBConf,
72-
localRootDir: File = Utils.createTempDir(),
72+
val localRootDir: File = Utils.createTempDir(),
7373
hadoopConf: Configuration = new Configuration,
7474
loggingId: String = "",
7575
useColumnFamilies: Boolean = false,

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -837,6 +837,9 @@ private[sql] class RocksDBStateStoreProvider
837837
@volatile private var stateSchemaProvider: Option[StateSchemaProvider] = _
838838
@volatile private var rocksDBEventForwarder: Option[RocksDBEventForwarder] = _
839839
@volatile private var stateStoreProviderId: StateStoreProviderId = _
840+
// Exposed for testing
841+
@volatile private[sql] var sparkConf: SparkConf = Option(SparkEnv.get).map(_.conf)
842+
.getOrElse(new SparkConf)
840843

841844
protected def createRocksDB(
842845
dfsRootDir: String,
@@ -867,8 +870,7 @@ private[sql] class RocksDBStateStoreProvider
867870
val storeIdStr = s"StateStoreId(opId=${stateStoreId.operatorId}," +
868871
s"partId=${stateStoreId.partitionId},name=${stateStoreId.storeName})"
869872
val loggingId = stateStoreProviderId.toString
870-
val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf)
871-
val localRootDir = Utils.createTempDir(Utils.getLocalDir(sparkConf), storeIdStr)
873+
val localRootDir = Utils.createExecutorLocalTempDir(sparkConf, storeIdStr)
872874
createRocksDB(dfsRootDir, RocksDBConf(storeConf), localRootDir, hadoopConf, loggingId,
873875
useColumnFamilies, storeConf.enableStateStoreCheckpointIds, stateStoreId.partitionId,
874876
rocksDBEventForwarder,

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,15 @@ import org.apache.spark.sql.catalyst.util.quietly
4444
import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager
4545
import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, FileContextBasedCheckpointFileManager, FileSystemBasedCheckpointFileManager}
4646
import org.apache.spark.sql.execution.streaming.checkpointing.CheckpointFileManager.{CancellableFSDataOutputStream, RenameBasedFSDataOutputStream}
47+
import org.apache.spark.sql.execution.streaming.runtime.StreamExecution
4748
import org.apache.spark.sql.internal.SQLConf
4849
import org.apache.spark.sql.internal.SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS
4950
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
5051
import org.apache.spark.sql.types._
5152
import org.apache.spark.tags.SlowSQLTest
5253
import org.apache.spark.unsafe.Platform
5354
import org.apache.spark.unsafe.types.UTF8String
54-
import org.apache.spark.util.{ThreadUtils, Utils}
55+
import org.apache.spark.util.{SparkConfWithEnv, ThreadUtils, Utils}
5556
import org.apache.spark.util.ArrayImplicits._
5657

5758
class NoOverwriteFileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration)
@@ -3791,6 +3792,29 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession
37913792
}}
37923793
}
37933794

3795+
test("SPARK-44639: Use Java tmp dir instead of configured local dirs on Yarn") {
3796+
val conf = new Configuration()
3797+
conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString)
3798+
3799+
val provider = new RocksDBStateStoreProvider()
3800+
provider.sparkConf = new SparkConfWithEnv(Map("CONTAINER_ID" -> "1"))
3801+
provider.init(
3802+
StateStoreId(
3803+
"/checkpoint",
3804+
0,
3805+
0
3806+
),
3807+
new StructType(),
3808+
new StructType(),
3809+
NoPrefixKeyStateEncoderSpec(new StructType()),
3810+
false,
3811+
new StateStoreConf(sqlConf),
3812+
conf
3813+
)
3814+
3815+
assert(provider.rocksDB.localRootDir.getParent() == System.getProperty("java.io.tmpdir"))
3816+
}
3817+
37943818
private def dbConf = RocksDBConf(StateStoreConf(SQLConf.get.clone()))
37953819

37963820
class RocksDBCheckpointFormatV2(

0 commit comments

Comments
 (0)