Skip to content

Commit 9e9358a

Browse files
Kimahrimandongjoon-hyun
authored andcommitted
[SPARK-44639][SS][YARN] Use Java tmp dir for local RocksDB state storage on Yarn
### What changes were proposed in this pull request? Update the RocksDB state store to store its local data underneath `java.io.tmpdir` instead of going through `Utils.getLocalDir` when running on Yarn. This is done through a new util method `createExecutorLocalTempDir`, as there may be other uses cases for this behavior as well. ### Why are the changes needed? On YARN, the local RocksDB directory is placed in a directory created inside the root application folder such as ``` /tmp/nm-local-dir/usercache/<user>/appcache/<app_id>/spark-<uuid>/StateStoreId(...) ``` The problem with this is that if an executor crashes for some reason (like OOM) and the shutdown hooks don't get run, this directory will stay around forever until the application finishes, which can cause jobs to slowly accumulate more and more temporary space until finally the node manager goes unhealthy. Because this data will only ever be accessed by the executor that created this directory, it would make sense to store the data inside the container folder, which will always get cleaned up by the node manager when that yarn container gets cleaned up. Yarn sets the `java.io.tmpdir` to a path inside this directory, such as ``` /tmp/nm-local-dir/usercache/<user>/appcache/<app_id>/<container_id>/tmp/StateStoreId(...) ``` It looks like only Yarn setts the tmpdir property, and other resource managers (standalone and k8s) always rely on the local dirs setting/env vars. ### Does this PR introduce _any_ user-facing change? Shouldn't be any effective changes, other than preventing disk space from filling up on Node Managers under certain scenarios. ### How was this patch tested? New UT Closes #42301 from Kimahriman/rocksdb-tmp-dir. Authored-by: Adam Binford <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent f5b9ea8 commit 9e9358a

File tree

4 files changed

+46
-4
lines changed

4 files changed

+46
-4
lines changed

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,22 @@ private[spark] object Utils
250250
dir
251251
}
252252

253+
/**
254+
* Create a temporary directy that will always be cleaned up when the executor stops,
255+
* even in the case of a hard shutdown when the shutdown hooks don't get run.
256+
*
257+
* Currently this only provides special behavior on YARN, where the local dirs are not
258+
* guaranteed to be cleaned up on executors hard shutdown.
259+
*/
260+
def createExecutorLocalTempDir(conf: SparkConf, namePrefix: String): File = {
261+
if (Utils.isRunningInYarnContainer(conf)) {
262+
// Just use the default Java tmp dir which is set to inside the container directory on YARN
263+
createTempDir(namePrefix = namePrefix)
264+
} else {
265+
createTempDir(getLocalDir(conf), namePrefix)
266+
}
267+
}
268+
253269
/**
254270
* Copy the first `maxSize` bytes of data from the InputStream to an in-memory
255271
* buffer, primarily to check for corruption.

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ case object StoreTaskCompletionListener extends RocksDBOpType("store_task_comple
6969
class RocksDB(
7070
dfsRootDir: String,
7171
val conf: RocksDBConf,
72-
localRootDir: File = Utils.createTempDir(),
72+
val localRootDir: File = Utils.createTempDir(),
7373
hadoopConf: Configuration = new Configuration,
7474
loggingId: String = "",
7575
useColumnFamilies: Boolean = false,

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -876,6 +876,9 @@ private[sql] class RocksDBStateStoreProvider
876876
@volatile private var stateSchemaProvider: Option[StateSchemaProvider] = _
877877
@volatile private var rocksDBEventForwarder: Option[RocksDBEventForwarder] = _
878878
@volatile private var stateStoreProviderId: StateStoreProviderId = _
879+
// Exposed for testing
880+
@volatile private[sql] var sparkConf: SparkConf = Option(SparkEnv.get).map(_.conf)
881+
.getOrElse(new SparkConf)
879882

880883
protected def createRocksDB(
881884
dfsRootDir: String,
@@ -906,8 +909,7 @@ private[sql] class RocksDBStateStoreProvider
906909
val storeIdStr = s"StateStoreId(opId=${stateStoreId.operatorId}," +
907910
s"partId=${stateStoreId.partitionId},name=${stateStoreId.storeName})"
908911
val loggingId = stateStoreProviderId.toString
909-
val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf)
910-
val localRootDir = Utils.createTempDir(Utils.getLocalDir(sparkConf), storeIdStr)
912+
val localRootDir = Utils.createExecutorLocalTempDir(sparkConf, storeIdStr)
911913
createRocksDB(dfsRootDir, RocksDBConf(storeConf), localRootDir, hadoopConf, loggingId,
912914
useColumnFamilies, storeConf.enableStateStoreCheckpointIds, stateStoreId.partitionId,
913915
rocksDBEventForwarder,

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,15 @@ import org.apache.spark.sql.catalyst.util.quietly
4444
import org.apache.spark.sql.execution.streaming.CreateAtomicTestManager
4545
import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager, FileContextBasedCheckpointFileManager, FileSystemBasedCheckpointFileManager}
4646
import org.apache.spark.sql.execution.streaming.checkpointing.CheckpointFileManager.{CancellableFSDataOutputStream, RenameBasedFSDataOutputStream}
47+
import org.apache.spark.sql.execution.streaming.runtime.StreamExecution
4748
import org.apache.spark.sql.internal.SQLConf
4849
import org.apache.spark.sql.internal.SQLConf.STREAMING_CHECKPOINT_FILE_MANAGER_CLASS
4950
import org.apache.spark.sql.test.{SharedSparkSession, SQLTestUtils}
5051
import org.apache.spark.sql.types._
5152
import org.apache.spark.tags.SlowSQLTest
5253
import org.apache.spark.unsafe.Platform
5354
import org.apache.spark.unsafe.types.UTF8String
54-
import org.apache.spark.util.{ThreadUtils, Utils}
55+
import org.apache.spark.util.{SparkConfWithEnv, ThreadUtils, Utils}
5556
import org.apache.spark.util.ArrayImplicits._
5657

5758
class NoOverwriteFileSystemBasedCheckpointFileManager(path: Path, hadoopConf: Configuration)
@@ -3941,6 +3942,29 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession
39413942
}}
39423943
}
39433944

3945+
test("SPARK-44639: Use Java tmp dir instead of configured local dirs on Yarn") {
3946+
val conf = new Configuration()
3947+
conf.set(StreamExecution.RUN_ID_KEY, UUID.randomUUID().toString)
3948+
3949+
val provider = new RocksDBStateStoreProvider()
3950+
provider.sparkConf = new SparkConfWithEnv(Map("CONTAINER_ID" -> "1"))
3951+
provider.init(
3952+
StateStoreId(
3953+
"/checkpoint",
3954+
0,
3955+
0
3956+
),
3957+
new StructType(),
3958+
new StructType(),
3959+
NoPrefixKeyStateEncoderSpec(new StructType()),
3960+
false,
3961+
new StateStoreConf(sqlConf),
3962+
conf
3963+
)
3964+
3965+
assert(provider.rocksDB.localRootDir.getParent() == System.getProperty("java.io.tmpdir"))
3966+
}
3967+
39443968
private def dbConf = RocksDBConf(StateStoreConf(SQLConf.get.clone()))
39453969

39463970
class RocksDBCheckpointFormatV2(

0 commit comments

Comments
 (0)