Skip to content

Commit 3d99b0b

Browse files
ericm-dbanishshri-db
authored andcommitted
[SPARK-51955] Adding release() to ReadStateStore interface and reusing ReadStore for Streaming Aggregations
### What changes were proposed in this pull request? Adding a release() method to the ReadStateStore interface to properly close read stores without aborting them Implementing a getWriteStore() method that allows converting a read-only store to a writable store Creating a StateStoreRDDProvider interface for tracking state stores by partition ID Enhancing StateStoreRDD to find and reuse existing state stores through RDD lineage Improving task completion handling with proper cleanup listeners ### Why are the changes needed? Currently, stateful operations like aggregations follow a pattern where both read and write stores are opened simultaneously: readStore.acquire() writeStore.acquire() writeStore.commit() readStore.abort() This pattern creates inefficiency because: The abort() call on the read store unnecessarily invalidates the store's state, causing subsequent operations to reload the entire state store from scratch Having two stores open simultaneously increases memory usage and can create contention issues The upcoming lock hardening changes will only allow one state store to be open at a time, making this pattern incompatible With the new approach, the usage paradigm becomes: readStore = getReadStore() writeStore = getWriteStore(readStore) writeStore.commit() This new paradigm allows us to reuse an existing read store by converting it to a write store using getWriteStore(), and properly clean up resources using release() instead of abort() when operations complete successfully. This avoids the unnecessary reloading of state data and improves performance while being compatible with future lock hardening changes. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #51566 from ericm-db/read-store. Authored-by: Eric Marnadi <[email protected]> Signed-off-by: Anish Shrigondekar <[email protected]>
1 parent 72ce64e commit 3d99b0b

File tree

11 files changed

+505
-41
lines changed

11 files changed

+505
-41
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,6 @@ class StatePartitionReader(
204204
}
205205

206206
override def close(): Unit = {
207-
store.abort()
208207
super.close()
209208
}
210209
}

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.commons.io.IOUtils
3131
import org.apache.hadoop.conf.Configuration
3232
import org.apache.hadoop.fs._
3333

34-
import org.apache.spark.{SparkConf, SparkEnv, SparkException}
34+
import org.apache.spark.{SparkConf, SparkEnv, SparkException, TaskContext}
3535
import org.apache.spark.internal.{Logging, LogKeys, MDC}
3636
import org.apache.spark.io.CompressionCodec
3737
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
@@ -89,6 +89,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
8989

9090
override def abort(): Unit = {}
9191

92+
override def release(): Unit = {}
93+
9294
override def toString(): String = {
9395
s"HDFSReadStateStore[stateStoreId=$stateStoreId_, version=$version]"
9496
}
@@ -112,6 +114,15 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
112114
case object UPDATING extends STATE
113115
case object COMMITTED extends STATE
114116
case object ABORTED extends STATE
117+
case object RELEASED extends STATE
118+
119+
Option(TaskContext.get()).foreach { ctxt =>
120+
ctxt.addTaskCompletionListener[Unit](ctx => {
121+
if (state == UPDATING) {
122+
abort()
123+
}
124+
})
125+
}
115126

116127
private val newVersion = version + 1
117128
@volatile private var state: STATE = UPDATING
@@ -953,7 +964,8 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
953964
* @param endVersion checkpoint version to end with
954965
* @return [[HDFSBackedStateStore]]
955966
*/
956-
override def replayStateFromSnapshot(snapshotVersion: Long, endVersion: Long): StateStore = {
967+
override def replayStateFromSnapshot(
968+
snapshotVersion: Long, endVersion: Long, readOnly: Boolean): StateStore = {
957969
val newMap = replayLoadedMapFromSnapshot(snapshotVersion, endVersion)
958970
logInfo(log"Retrieved snapshot at version " +
959971
log"${MDC(LogKeys.STATE_STORE_VERSION, snapshotVersion)} and apply delta files to version " +

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1025,6 +1025,12 @@ class RocksDB(
10251025
}
10261026
}
10271027

1028+
def release(): Unit = {
1029+
if (db != null) {
1030+
release(LoadStore)
1031+
}
1032+
}
1033+
10281034
/**
10291035
* Commit all the updates made as a version to DFS. The steps it needs to do to commits are:
10301036
* - Flush all changes to disk

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala

Lines changed: 97 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import scala.util.control.NonFatal
2626
import org.apache.hadoop.conf.Configuration
2727
import org.apache.hadoop.fs.Path
2828

29-
import org.apache.spark.{SparkConf, SparkEnv}
29+
import org.apache.spark.{SparkConf, SparkEnv, TaskContext}
3030
import org.apache.spark.internal.{Logging, MDC}
3131
import org.apache.spark.internal.LogKeys._
3232
import org.apache.spark.io.CompressionCodec
@@ -43,12 +43,15 @@ private[sql] class RocksDBStateStoreProvider
4343
with SupportsFineGrainedReplay {
4444
import RocksDBStateStoreProvider._
4545

46-
class RocksDBStateStore(lastVersion: Long) extends StateStore {
46+
class RocksDBStateStore(
47+
lastVersion: Long,
48+
private[RocksDBStateStoreProvider] var readOnly: Boolean) extends StateStore {
4749
/** Trait and classes representing the internal state of the store */
4850
trait STATE
4951
case object UPDATING extends STATE
5052
case object COMMITTED extends STATE
5153
case object ABORTED extends STATE
54+
case object RELEASED extends STATE
5255

5356
@volatile private var state: STATE = UPDATING
5457
@volatile private var isValidated = false
@@ -57,6 +60,23 @@ private[sql] class RocksDBStateStoreProvider
5760

5861
override def version: Long = lastVersion
5962

63+
Option(TaskContext.get()).foreach { ctxt =>
64+
ctxt.addTaskCompletionListener[Unit](ctx => {
65+
try {
66+
if (state == UPDATING) {
67+
if (readOnly) {
68+
release()
69+
} else {
70+
abort() // Abort since this is an error if stateful task completes
71+
}
72+
}
73+
} catch {
74+
case NonFatal(e) =>
75+
logWarning("Failed to abort state store", e)
76+
}
77+
})
78+
}
79+
6080
override def createColFamilyIfAbsent(
6181
colFamilyName: String,
6282
keySchema: StructType,
@@ -246,6 +266,19 @@ private[sql] class RocksDBStateStoreProvider
246266
}
247267
}
248268

269+
override def release(): Unit = {
270+
assert(readOnly, "Release can only be called on a read-only store")
271+
if (state != RELEASED) {
272+
logInfo(log"Releasing ${MDC(VERSION_NUM, version + 1)} " +
273+
log"for ${MDC(STATE_STORE_ID, id)}")
274+
rocksDB.release()
275+
state = RELEASED
276+
} else {
277+
// Optionally log at DEBUG level that it's already released
278+
logDebug(log"State store already released")
279+
}
280+
}
281+
249282
override def abort(): Unit = {
250283
verify(state == UPDATING || state == ABORTED, "Cannot abort after already committed")
251284
logInfo(log"Aborting ${MDC(VERSION_NUM, version + 1)} " +
@@ -450,38 +483,49 @@ private[sql] class RocksDBStateStoreProvider
450483

451484
override protected def logName: String = s"${super.logName} ${stateStoreProviderId}"
452485

453-
override def getStore(version: Long, uniqueId: Option[String] = None): StateStore = {
486+
/**
487+
* Creates and returns a state store with the specified parameters.
488+
*
489+
* @param version The version of the state store to load
490+
* @param uniqueId Optional unique identifier for checkpoint
491+
* @param readOnly Whether to open the store in read-only mode
492+
* @param existingStore Optional existing store to reuse instead of creating a new one
493+
* @return The loaded state store
494+
*/
495+
private def loadStateStore(
496+
version: Long,
497+
uniqueId: Option[String] = None,
498+
readOnly: Boolean,
499+
existingStore: Option[RocksDBStateStore] = None): StateStore = {
454500
try {
455501
if (version < 0) {
456502
throw QueryExecutionErrors.unexpectedStateStoreVersion(version)
457503
}
458-
rocksDB.load(
459-
version,
460-
stateStoreCkptId = if (storeConf.enableStateStoreCheckpointIds) uniqueId else None)
461-
new RocksDBStateStore(version)
462-
}
463-
catch {
464-
case e: OutOfMemoryError =>
465-
throw QueryExecutionErrors.notEnoughMemoryToLoadStore(
466-
stateStoreId.toString,
467-
"ROCKSDB_STORE_PROVIDER",
468-
e)
469-
case e: Throwable => throw StateStoreErrors.cannotLoadStore(e)
470-
}
471-
}
472504

473-
override def getReadStore(version: Long, uniqueId: Option[String] = None): StateStore = {
474-
try {
475-
if (version < 0) {
476-
throw QueryExecutionErrors.unexpectedStateStoreVersion(version)
505+
// Early validation of the existing store type before loading RocksDB
506+
existingStore.foreach { store =>
507+
if (!store.readOnly) {
508+
throw new IllegalArgumentException(
509+
s"Existing store must be readOnly, but got a read-write store")
510+
}
477511
}
512+
478513
rocksDB.load(
479514
version,
480515
stateStoreCkptId = if (storeConf.enableStateStoreCheckpointIds) uniqueId else None,
481-
readOnly = true)
482-
new RocksDBStateStore(version)
483-
}
484-
catch {
516+
readOnly = readOnly)
517+
518+
// Create or reuse store instance
519+
existingStore match {
520+
case Some(store: RocksDBStateStore) =>
521+
// Mark store as being used for write operations
522+
store.readOnly = readOnly
523+
store
524+
case None =>
525+
// Create new store instance
526+
new RocksDBStateStore(version, readOnly)
527+
}
528+
} catch {
485529
case e: OutOfMemoryError =>
486530
throw QueryExecutionErrors.notEnoughMemoryToLoadStore(
487531
stateStoreId.toString,
@@ -491,6 +535,31 @@ private[sql] class RocksDBStateStoreProvider
491535
}
492536
}
493537

538+
override def getStore(
539+
version: Long, uniqueId: Option[String] = None): StateStore = {
540+
loadStateStore(version, uniqueId, readOnly = false)
541+
}
542+
543+
override def upgradeReadStoreToWriteStore(
544+
readStore: ReadStateStore,
545+
version: Long,
546+
uniqueId: Option[String] = None): StateStore = {
547+
assert(version == readStore.version,
548+
s"Can only upgrade readStore to writeStore with the same version," +
549+
s" readStoreVersion: ${readStore.version}, writeStoreVersion: ${version}")
550+
assert(this.stateStoreId == readStore.id, "Can only upgrade readStore to writeStore with" +
551+
" the same stateStoreId")
552+
assert(readStore.isInstanceOf[RocksDBStateStore], "Can only upgrade state store if it is a " +
553+
"RocksDBStateStore")
554+
loadStateStore(version, uniqueId, readOnly = false, existingStore =
555+
Some(readStore.asInstanceOf[RocksDBStateStore]))
556+
}
557+
558+
override def getReadStore(
559+
version: Long, uniqueId: Option[String] = None): StateStore = {
560+
loadStateStore(version, uniqueId, readOnly = true)
561+
}
562+
494563
override def doMaintenance(): Unit = {
495564
try {
496565
rocksDB.doMaintenance()
@@ -578,7 +647,8 @@ private[sql] class RocksDBStateStoreProvider
578647
* @param endVersion checkpoint version to end with
579648
* @return [[StateStore]]
580649
*/
581-
override def replayStateFromSnapshot(snapshotVersion: Long, endVersion: Long): StateStore = {
650+
override def replayStateFromSnapshot(
651+
snapshotVersion: Long, endVersion: Long, readOnly: Boolean): StateStore = {
582652
try {
583653
if (snapshotVersion < 1) {
584654
throw QueryExecutionErrors.unexpectedStateStoreVersion(snapshotVersion)
@@ -587,7 +657,7 @@ private[sql] class RocksDBStateStoreProvider
587657
throw QueryExecutionErrors.unexpectedStateStoreVersion(endVersion)
588658
}
589659
rocksDB.loadFromSnapshot(snapshotVersion, endVersion)
590-
new RocksDBStateStore(endVersion)
660+
new RocksDBStateStore(endVersion, readOnly)
591661
}
592662
catch {
593663
case e: OutOfMemoryError =>

0 commit comments

Comments
 (0)