Skip to content

Commit 0982041

Browse files
liviazhuanishshri-db
authored andcommitted
[SPARK-52637][SS] Fix version ID mismatch issue for RocksDB compaction leading to incorrect file mapping
### What changes were proposed in this pull request? We found a bug leading to checkpoint corruption with RocksDB VersionID Mismatch error due to local file mappings not being cleared correctly when native RocksDB does a compaction resulting in the original SST file not being deleted. This resulted in the DFS file UUID being reused for 2 SST files that were not the same, leading the version ID mismatch. This change purges from the local file mappings any mapping that was created by a version equal or greater than the one loaded. ### Why are the changes needed? The described bug fails streaming queries. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit test ### Was this patch authored or co-authored using generative AI tooling? No Closes #51520 from liviazhu/liviazhu-db/versionid-mismatch. Authored-by: Livia Zhu <[email protected]> Signed-off-by: Anish Shrigondekar <[email protected]>
1 parent 9204b05 commit 0982041

File tree

3 files changed

+79
-0
lines changed

3 files changed

+79
-0
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1717,6 +1717,24 @@ class RocksDBFileMapping {
17171717
}.getOrElse(None)
17181718
}
17191719

1720+
/**
1721+
* Remove all local file mappings that are incompatible with the current version we are
1722+
* trying to load.
1723+
*
1724+
* @return seq of purged mappings
1725+
*/
1726+
def purgeIncompatibleMappingsForLoad(versionToLoad: Long):
1727+
Seq[(String, (Long, RocksDBImmutableFile))] = {
1728+
val filesToRemove = localFileMappings.filter {
1729+
case (_, (dfsFileMappedVersion, _)) =>
1730+
dfsFileMappedVersion >= versionToLoad
1731+
}.toSeq
1732+
filesToRemove.foreach { case (localFileName, _) =>
1733+
remove(localFileName)
1734+
}
1735+
filesToRemove
1736+
}
1737+
17201738
def mapToDfsFile(
17211739
localFileName: String,
17221740
dfsFile: RocksDBImmutableFile,

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import org.json4s.jackson.Serialization
3838

3939
import org.apache.spark.{SparkConf, SparkEnv, SparkException}
4040
import org.apache.spark.internal.{Logging, LogKeys, MDC, MessageWithContext}
41+
import org.apache.spark.internal.LogKeys.{DFS_FILE, VERSION_NUM}
4142
import org.apache.spark.io.CompressionCodec
4243
import org.apache.spark.sql.errors.QueryExecutionErrors
4344
import org.apache.spark.sql.execution.streaming.CheckpointFileManager
@@ -785,6 +786,17 @@ class RocksDBFileManager(
785786
}
786787
}
787788

789+
// Delete remaining unnecessary local immutable file mappings.
790+
// Files present in the file mapping but not the filesystem may lead to
791+
// versionID mismatch error (SPARK-52637), so we should explicitly delete
792+
// them.
793+
rocksDBFileMapping.purgeIncompatibleMappingsForLoad(version).foreach {
794+
case (_, (dfsFileMappedVersion, dfsFile)) =>
795+
logInfo(log"Deleted local fileMapping to ${MDC(DFS_FILE, dfsFile)} because " +
796+
log"mapped file version ${MDC(VERSION_NUM, dfsFileMappedVersion)} was " +
797+
log"incompatible with versionToLoad ${MDC(VERSION_NUM, version)}")
798+
}
799+
788800
var filesCopied = 0L
789801
var bytesCopied = 0L
790802
var filesReused = 0L

sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3566,6 +3566,55 @@ class RocksDBSuite extends AlsoTestWithRocksDBFeatures with SharedSparkSession
35663566
}
35673567
}
35683568

3569+
test("SPARK-52637: RocksDB compaction leading to incorrect file mapping during load " +
3570+
"does not lead to versionID mismatch") {
3571+
val sqlConf = new SQLConf
3572+
sqlConf.setConf(
3573+
SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT,
3574+
1)
3575+
val dbConf = RocksDBConf(StateStoreConf(sqlConf))
3576+
3577+
withTempDir { remoteDir => withTempDir { localDir =>
3578+
withDB(remoteDir.toString, localDir = localDir, conf = dbConf) { db =>
3579+
db.load(0)
3580+
db.commit()
3581+
3582+
val workingDir = localDir.listFiles().filter(_.getName.startsWith("workingDir")).head
3583+
3584+
logInfo(s"files: ${db.fileManager.listRocksDBFiles(workingDir)}")
3585+
3586+
db.load(1)
3587+
db.put("0", "0")
3588+
db.commit()
3589+
3590+
db.doMaintenance() // upload snapshot to remoteDir
3591+
3592+
// confirm that sst files exist
3593+
assert(db.fileManager.listRocksDBFiles(workingDir)._1.nonEmpty)
3594+
db.fileManager.listRocksDBFiles(workingDir)._1
3595+
.foreach(file => file.delete()) // simulate rocksdb compaction by removing SST files
3596+
3597+
// confirm that there are entries in the mapping
3598+
val fileMapping = PrivateMethod[RocksDBFileMapping](Symbol("rocksDBFileMapping"))
3599+
val localFileMappings = PrivateMethod[mutable.Map[String, (Long, RocksDBImmutableFile)]](
3600+
Symbol("localFileMappings"))
3601+
val fileMappingObj = db invokePrivate fileMapping()
3602+
val localFileMappingsObj = fileMappingObj invokePrivate localFileMappings()
3603+
assert(localFileMappingsObj.exists { case (_, (version, _)) =>
3604+
version >= 1
3605+
})
3606+
3607+
// reload version 1
3608+
db.load(1)
3609+
3610+
// ensure that there are no leftover fileMappings from the first load of version 1
3611+
assert(!localFileMappingsObj.exists { case (_, (version, _)) =>
3612+
version >= 1
3613+
})
3614+
}
3615+
}}
3616+
}
3617+
35693618
private def assertAcquiredThreadIsCurrentThread(db: RocksDB): Unit = {
35703619
val threadInfo = db.getAcquiredThreadInfo()
35713620
assert(threadInfo != None,

0 commit comments

Comments
 (0)