Skip to content

Commit 1c671ca

Browse files
Shuffle System and BlockStoreShuffleReader
1 parent 78afade commit 1c671ca

File tree

6 files changed

+140
-94
lines changed

6 files changed

+140
-94
lines changed

docs/shuffle/BaseShuffleHandle.md

Lines changed: 43 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -12,40 +12,60 @@
1212

1313
## Demo
1414

15-
```text
16-
// Start a Spark application, e.g. spark-shell, with the Spark properties to trigger selection of BaseShuffleHandle:
17-
// 1. spark.shuffle.spill.numElementsForceSpillThreshold=1
18-
// 2. spark.shuffle.sort.bypassMergeThreshold=1
15+
Start a Spark application (e.g., spark-shell) with the following Spark properties to trigger selection of `BaseShuffleHandle`:
16+
17+
* `spark.shuffle.spill.numElementsForceSpillThreshold=1`
18+
* `spark.shuffle.sort.bypassMergeThreshold=1`
19+
20+
```bash
21+
./bin/spark-shell \
22+
--conf spark.shuffle.spill.numElementsForceSpillThreshold=1 \
23+
--conf spark.shuffle.sort.bypassMergeThreshold=1
24+
```
25+
26+
Create an RDD with the number of partitions (`numSlices`) greater than the value of [spark.shuffle.sort.bypassMergeThreshold](../configuration-properties.md#spark.shuffle.sort.bypassMergeThreshold) configuration property.
1927

20-
// numSlices > spark.shuffle.sort.bypassMergeThreshold
21-
scala> val rdd = sc.parallelize(0 to 4, numSlices = 2).groupBy(_ % 2)
22-
rdd: org.apache.spark.rdd.RDD[(Int, Iterable[Int])] = ShuffledRDD[2] at groupBy at <console>:24
28+
```scala
29+
val rdd = sc.parallelize(0 to 4, numSlices = 2).groupBy(_ % 2)
30+
```
2331

32+
```scala
33+
assert(rdd.getNumPartitions == 2)
34+
```
35+
36+
```text
2437
scala> rdd.dependencies
2538
DEBUG SortShuffleManager: Can't use serialized shuffle for shuffle 0 because an aggregator is defined
2639
res0: Seq[org.apache.spark.Dependency[_]] = List(org.apache.spark.ShuffleDependency@1160c54b)
40+
```
2741

28-
scala> rdd.getNumPartitions
29-
res1: Int = 2
30-
31-
scala> import org.apache.spark.ShuffleDependency
42+
```scala
3243
import org.apache.spark.ShuffleDependency
44+
val shuffleDep = rdd.dependencies(0).asInstanceOf[ShuffleDependency[Int, Int, Int]]
45+
```
3346

34-
scala> val shuffleDep = rdd.dependencies(0).asInstanceOf[ShuffleDependency[Int, Int, Int]]
35-
shuffleDep: org.apache.spark.ShuffleDependency[Int,Int,Int] = org.apache.spark.ShuffleDependency@1160c54b
36-
47+
```scala
3748
// mapSideCombine is disabled
38-
scala> shuffleDep.mapSideCombine
39-
res2: Boolean = false
49+
assert(shuffleDep.mapSideCombine == false)
50+
```
4051

52+
```scala
4153
// aggregator defined
42-
scala> shuffleDep.aggregator
43-
res3: Option[org.apache.spark.Aggregator[Int,Int,Int]] = Some(Aggregator(<function1>,<function2>,<function2>))
54+
assert(shuffleDep.aggregator.isDefined)
55+
```
56+
57+
```text
58+
scala> shuffleDep.aggregator.get.getClass
59+
val res11: Class[_ <: org.apache.spark.Aggregator[Int,Int,Int]] = class org.apache.spark.Aggregator
60+
```
4461

45-
// the number of reduce partitions < spark.shuffle.sort.bypassMergeThreshold
46-
scala> shuffleDep.partitioner.numPartitions
47-
res4: Int = 2
62+
Note the number of reduce partitions that is smaller than [spark.shuffle.sort.bypassMergeThreshold](../configuration-properties.md#spark.shuffle.sort.bypassMergeThreshold) configuration property.
4863

49-
scala> shuffleDep.shuffleHandle
50-
res5: org.apache.spark.shuffle.ShuffleHandle = org.apache.spark.shuffle.BaseShuffleHandle@22b0fe7e
64+
```scala
65+
assert(shuffleDep.partitioner.numPartitions == 2)
66+
```
67+
68+
```text
69+
scala> print(shuffleDep.shuffleHandle)
70+
org.apache.spark.shuffle.sort.SerializedShuffleHandle@2b648069
5171
```

docs/shuffle/BlockStoreShuffleReader.md

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,26 +19,44 @@
1919

2020
* `SortShuffleManager` is requested for a [ShuffleReader](SortShuffleManager.md#getReader) (for a `ShuffleHandle` and a range of reduce partitions)
2121

22-
## <span id="read"> Reading Combined Records (for Reduce Task)
22+
## Reading Combined Records (for Reduce Task) { #read }
2323

24-
```scala
25-
read(): Iterator[Product2[K, C]]
26-
```
24+
??? note "ShuffleReader"
2725

28-
`read` is part of the [ShuffleReader](ShuffleReader.md#read) abstraction.
26+
```scala
27+
read(): Iterator[Product2[K, C]]
28+
```
29+
30+
`read` is part of the [ShuffleReader](ShuffleReader.md#read) abstraction.
2931

3032
`read` creates a [ShuffleBlockFetcherIterator](../storage/ShuffleBlockFetcherIterator.md).
3133

3234
`read`...FIXME
3335

34-
### <span id="fetchContinuousBlocksInBatch"> fetchContinuousBlocksInBatch
36+
### fetchContinuousBlocksInBatch { #fetchContinuousBlocksInBatch }
3537

3638
```scala
3739
fetchContinuousBlocksInBatch: Boolean
3840
```
3941

40-
`fetchContinuousBlocksInBatch`...FIXME
42+
`fetchContinuousBlocksInBatch` reads the following configuration properties to determine whether continuous shuffle block fetching could be used or not:
43+
44+
* [spark.io.encryption.enabled](../configuration-properties.md#spark.io.encryption.enabled)
45+
* [spark.shuffle.compress](../configuration-properties.md#spark.shuffle.compress)
46+
* [spark.shuffle.useOldFetchProtocol](../configuration-properties.md#spark.shuffle.useOldFetchProtocol)
47+
* [supportsRelocationOfSerializedObjects](../serializer/Serializer.md#supportsRelocationOfSerializedObjects) (of the [Serializer](../rdd/ShuffleDependency.md#serializer) of the [ShuffleDependency](BaseShuffleHandle.md#dependency) of this [BaseShuffleHandle](#handle))
48+
49+
`fetchContinuousBlocksInBatch` prints out the following DEBUG message when continuous shuffle block fetching is requested yet not satisfied by the configuration:
50+
51+
```text
52+
The feature tag of continuous shuffle block fetching is set to true, but
53+
we can not enable the feature because other conditions are not satisfied.
54+
Shuffle compress: [compressed], serializer relocatable: [serializerRelocatable],
55+
codec concatenation: [codecConcatenation], use old shuffle fetch protocol:
56+
[useOldFetchProtocol], io encryption: [ioEncryption].
57+
```
4158

59+
<!---
4260
## Review Me
4361
4462
=== [[read]] Reading Combined Records For Reduce Task
@@ -65,3 +83,4 @@ For [keyOrdering](../rdd/ShuffleDependency.md#keyOrdering) defined in the `Shuff
6583
2. shuffle:ExternalSorter.md#insertAll[Inserts all the records] into the `ExternalSorter`
6684
3. Updates context `TaskMetrics`
6785
4. Returns a `CompletionIterator` for the `ExternalSorter`
86+
-->

docs/shuffle/ShuffleHandle.md

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
---
2+
tags:
3+
- DeveloperApi
4+
---
5+
16
# ShuffleHandle
27

38
`ShuffleHandle` is an abstraction of [shuffle handles](#implementations) for [ShuffleManager](ShuffleManager.md) to pass information about shuffles to tasks.
@@ -14,5 +19,5 @@
1419

1520
* <span id="shuffleId"> Shuffle ID
1621

17-
!!! note "Abstract Class"
22+
??? note "Abstract Class"
1823
`ShuffleHandle` is an abstract class and cannot be created directly. It is created indirectly for the [concrete ShuffleHandles](#implementations).

docs/shuffle/index.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Shuffle System
22

3-
**Shuffle System** is a core service of Apache Spark that is responsible for shuffle blocks.
3+
**Shuffle System** is one of the core services of Apache Spark that is responsible for shuffle blocks (of data).
44

55
The main core abstraction is [ShuffleManager](ShuffleManager.md) with [SortShuffleManager](SortShuffleManager.md) as the default and only known implementation.
66

docs/storage/ShuffleBlockFetcherIterator.md

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ While being created, `ShuffleBlockFetcherIterator` [initializes itself](#initial
3232

3333
* `BlockStoreShuffleReader` is requested to [read combined key-value records for a reduce task](../shuffle/BlockStoreShuffleReader.md#read)
3434

35-
### <span id="initialize"> Initializing
35+
### Initializing { #initialize }
3636

3737
```scala
3838
initialize(): Unit
@@ -46,15 +46,15 @@ Internally, `initialize` uses the [TaskContext](#context) to [register](../sched
4646

4747
`initialize`...FIXME
4848

49-
### <span id="partitionBlocksByFetchMode"> partitionBlocksByFetchMode
49+
### partitionBlocksByFetchMode { #partitionBlocksByFetchMode }
5050

5151
```scala
5252
partitionBlocksByFetchMode(): ArrayBuffer[FetchRequest]
5353
```
5454

5555
`partitionBlocksByFetchMode`...FIXME
5656

57-
### <span id="collectFetchRequests"> collectFetchRequests
57+
### collectFetchRequests { #collectFetchRequests }
5858

5959
```scala
6060
collectFetchRequests(
@@ -65,7 +65,7 @@ collectFetchRequests(
6565

6666
`collectFetchRequests`...FIXME
6767

68-
### <span id="createFetchRequests"> createFetchRequests
68+
### createFetchRequests { #createFetchRequests }
6969

7070
```scala
7171
createFetchRequests(
@@ -77,7 +77,7 @@ createFetchRequests(
7777

7878
`createFetchRequests`...FIXME
7979

80-
## <span id="fetchUpToMaxBytes"> fetchUpToMaxBytes
80+
## fetchUpToMaxBytes { #fetchUpToMaxBytes }
8181

8282
```scala
8383
fetchUpToMaxBytes(): Unit
@@ -89,7 +89,7 @@ fetchUpToMaxBytes(): Unit
8989

9090
* `ShuffleBlockFetcherIterator` is requested to [initialize](#initialize) and [next](#next)
9191

92-
## <span id="sendRequest"> Sending Remote Shuffle Block Fetch Request
92+
## Sending Remote Shuffle Block Fetch Request { #sendRequest }
9393

9494
```scala
9595
sendRequest(
@@ -110,11 +110,11 @@ Sending request for [n] blocks ([size]) from [hostPort]
110110

111111
* `ShuffleBlockFetcherIterator` is requested to [fetch remote shuffle blocks](#fetchUpToMaxBytes)
112112

113-
### <span id="BlockFetchingListener"> BlockFetchingListener
113+
### BlockFetchingListener { #BlockFetchingListener }
114114

115115
`sendRequest` creates a new [BlockFetchingListener](../core/BlockFetchingListener.md) to be notified about [successes](#onBlockFetchSuccess) or [failures](#onBlockFetchFailure) of shuffle block fetch requests.
116116

117-
#### <span id="onBlockFetchSuccess"> onBlockFetchSuccess
117+
#### onBlockFetchSuccess { #onBlockFetchSuccess }
118118

119119
On [onBlockFetchSuccess](../core/BlockFetchingListener.md#onBlockFetchSuccess) the `BlockFetchingListener` adds a `SuccessFetchResult` to the [results](#results) registry and prints out the following DEBUG message to the logs (when not a [zombie](#isZombie)):
120120

@@ -128,15 +128,15 @@ In the end, `onBlockFetchSuccess` prints out the following TRACE message to the
128128
Got remote block [blockId] after [time]
129129
```
130130

131-
#### <span id="onBlockFetchFailure"> onBlockFetchFailure
131+
#### onBlockFetchFailure { #onBlockFetchFailure }
132132

133133
On [onBlockFetchFailure](../core/BlockFetchingListener.md#onBlockFetchFailure) the `BlockFetchingListener` adds a `FailureFetchResult` to the [results](#results) registry and prints out the following ERROR message to the logs:
134134

135135
```text
136136
Failed to get block(s) from [host]:[port]
137137
```
138138

139-
## <span id="results"> FetchResults
139+
## FetchResults { #results }
140140

141141
```scala
142142
results: LinkedBlockingQueue[FetchResult]
@@ -165,7 +165,7 @@ For local blocks, `FetchResult`s are added in [fetchHostLocalBlock](#fetchHostLo
165165

166166
Cleaned up in [cleanup](#cleanup)
167167

168-
## <span id="hasNext"> hasNext
168+
## hasNext { #hasNext }
169169

170170
```scala
171171
hasNext: Boolean
@@ -175,7 +175,7 @@ hasNext: Boolean
175175

176176
`hasNext` is `true` when [numBlocksProcessed](#numBlocksProcessed) is below [numBlocksToFetch](#numBlocksToFetch).
177177

178-
## <span id="next"> Retrieving Next Element
178+
## Retrieving Next Element { #next }
179179

180180
```scala
181181
next(): (BlockId, InputStream)
@@ -193,11 +193,11 @@ next(): (BlockId, InputStream)
193193

194194
`next` is part of the `Iterator` ([Scala]({{ scala.api }}/scala/collection/Iterator.html#next():A)) abstraction (to produce the next element of this iterator).
195195

196-
## <span id="numBlocksProcessed"> numBlocksProcessed
196+
## numBlocksProcessed { #numBlocksProcessed }
197197

198198
The number of blocks [fetched and consumed](#next)
199199

200-
## <span id="numBlocksToFetch"> numBlocksToFetch
200+
## numBlocksToFetch { #numBlocksToFetch }
201201

202202
Total number of blocks to [fetch and consume](#next)
203203

@@ -209,7 +209,7 @@ Total number of blocks to [fetch and consume](#next)
209209
Getting [numBlocksToFetch] non-empty blocks out of [totalBlocks] blocks
210210
```
211211

212-
## <span id="releaseCurrentResultBuffer"> releaseCurrentResultBuffer
212+
## releaseCurrentResultBuffer { #releaseCurrentResultBuffer }
213213

214214
```scala
215215
releaseCurrentResultBuffer(): Unit
@@ -222,13 +222,13 @@ releaseCurrentResultBuffer(): Unit
222222
* `ShuffleBlockFetcherIterator` is requested to [cleanup](#cleanup)
223223
* `BufferReleasingInputStream` is requested to `close`
224224

225-
## <span id="onCompleteCallback"> ShuffleFetchCompletionListener
225+
## ShuffleFetchCompletionListener { #onCompleteCallback }
226226

227227
`ShuffleBlockFetcherIterator` creates a [ShuffleFetchCompletionListener](ShuffleFetchCompletionListener.md) when [created](#creating-instance).
228228

229229
`ShuffleFetchCompletionListener` is used when [initialize](#initialize) and [toCompletionIterator](#toCompletionIterator).
230230

231-
## <span id="cleanup"> Cleaning Up
231+
## Cleaning Up { #cleanup }
232232

233233
```scala
234234
cleanup(): Unit
@@ -240,7 +240,7 @@ cleanup(): Unit
240240

241241
`cleanup` iterates over [results](#results) internal queue and for every `SuccessFetchResult`, increments remote bytes read and blocks fetched shuffle task metrics, and eventually releases the managed buffer.
242242

243-
## <span id="bytesInFlight"> bytesInFlight
243+
## bytesInFlight { #bytesInFlight }
244244

245245
The bytes of fetched remote shuffle blocks in flight
246246

@@ -250,7 +250,7 @@ Incremented every [sendRequest](#sendRequest) and decremented every [next](#next
250250

251251
`ShuffleBlockFetcherIterator` makes sure that the invariant of `bytesInFlight` is below [maxBytesInFlight](#maxBytesInFlight) every [remote shuffle block fetch](#fetchUpToMaxBytes).
252252

253-
## <span id="reqsInFlight"> reqsInFlight
253+
## reqsInFlight { #reqsInFlight }
254254

255255
The number of remote shuffle block fetch requests in flight.
256256

@@ -260,7 +260,7 @@ Incremented every [sendRequest](#sendRequest) and decremented every [next](#next
260260

261261
`ShuffleBlockFetcherIterator` makes sure that the invariant of `reqsInFlight` is below [maxReqsInFlight](#maxReqsInFlight) every [remote shuffle block fetch](#fetchUpToMaxBytes).
262262

263-
## <span id="isZombie"> isZombie
263+
## isZombie { #isZombie }
264264

265265
Controls whether `ShuffleBlockFetcherIterator` is still active and records `SuccessFetchResult`s on [successful shuffle block fetches](#onBlockFetchSuccess).
266266

@@ -270,11 +270,11 @@ Enabled (`true`) in [cleanup](#cleanup).
270270

271271
When enabled, [registerTempFileToClean](#registerTempFileToClean) is a noop.
272272

273-
## <span id="DownloadFileManager"> DownloadFileManager
273+
## DownloadFileManager { #DownloadFileManager }
274274

275275
`ShuffleBlockFetcherIterator` is a [DownloadFileManager](../shuffle/DownloadFileManager.md).
276276

277-
## <span id="throwFetchFailedException"> throwFetchFailedException
277+
## throwFetchFailedException { #throwFetchFailedException }
278278

279279
```scala
280280
throwFetchFailedException(

0 commit comments

Comments
 (0)