Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class ShuffleTaskManager extends Logging {
// key appId-shuffleId
// store all reducerId-EpochId for a shuffleKey
private[deploy] val shuffleEpochSetMap =
new ConcurrentHashMap[String, ConcurrentHashMap[Int, util.List[PartitionInfo]]]()
new ConcurrentHashMap[String, util.HashSet[PartitionInfo]]()
private[deploy] val batchBlacklistMap =
new ConcurrentHashMap[String, ConcurrentHashMap[Int, util.List[FailedPartitionInfoBatch]]]()

Expand All @@ -48,7 +48,7 @@ class ShuffleTaskManager extends Logging {
numMappers: Int,
numPartitions: Int): Unit = {
shuffleMapperAttempts.putIfAbsent(shuffleKey, Array.fill(numMappers)(-1))
shuffleEpochSetMap.putIfAbsent(shuffleKey, new ConcurrentHashMap[Int, util.List[PartitionInfo]]())
shuffleEpochSetMap.putIfAbsent(shuffleKey, new util.HashSet[PartitionInfo]()))
reducerFileGroupsMap.putIfAbsent(shuffleKey, new Array[Array[CommittedPartitionInfo]](numPartitions))
}

Expand All @@ -71,9 +71,9 @@ class ShuffleTaskManager extends Logging {

// epochList not be null.
val epochSet = shuffleEpochSetMap.computeWhenAbsent(shuffleKey, _ => {
new ConcurrentHashMap[Int, util.List[PartitionInfo]]()
new util.HashSet[PartitionInfo]()
})
epochSet.put(mapId, epochList)
epochSet.addAll(epochList)

if (batchBlacklist != null) {
val blacklist = batchBlacklistMap.computeWhenAbsent(shuffleKey, _ => {
Expand Down Expand Up @@ -112,10 +112,7 @@ class ShuffleTaskManager extends Logging {
commitPieces: ConcurrentHashMap[String, ConcurrentSet[CommittedPartitionInfo]]): Boolean = {

// check for data lost
val allEpochSets = new util.HashSet[PartitionInfo]()
allEpochSets.addAll(
shuffleEpochSetMap.getOrDefault(shuffleKey, new ConcurrentHashMap[Int, util.List[PartitionInfo]]())
.values().asScala.flatMap(x => x.asScala.toSet[PartitionInfo]).toSet.asJava)
val allEpochSets = shuffleEpochSetMap.getOrDefault(shuffleKey, new util.HashSet[PartitionInfo]())

var dataLost = false
val validCommitted = allEpochSets.asScala
Expand Down