Spark Shuffle
Spark Shuffle Introduction
- 將某中具有相同特徵(key)的數據匯聚到同一個計算節點上進行計算的過程稱之為Shuffle
- 一定存在Wide dependencies
- 一個Action將被切割成2個Stage
- Shuffle會產生一系列的Map task去組織數據, 以及一系列Reduece task去聚合數據
- 可能產生Shuffle的算子
- KeyBy transformations
- Join with inputs not co-partitioned
- Repartition
- Coalesce(Maybe)
Performance Impact
- 數據的存儲
- 序列化與反序列化
- 數據壓縮
- Disk/Network IO
- 跨節點/跨行程數據傳輸
Spark Shuffle Manager
- Spark在進行Shuffle時,會根據shuffleManager的instance的不同,使用不同的Shuffle方法
- 在Spark1.2之前,預設使用Hash shuffle,Spark1.2以後,預設使用Sort shuffle
- 可以在conf中的spark.shuffle.manager指定要使用的Shuffle方法
- 高版本中已經沒有Hash shuffle
- 相關程式碼在org.apache.spark.shuffle.sort的SortShuffleManager.scala中123456789// Let the user specify short names for shuffle managersval shortShuffleMgrNames = Map("sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,"tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")val shuffleMgrClass =shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
Hash Shuffle
- 基礎條件
- 在1個Excutor有2個Core
- 共有4個Map Task
- 共有2個Reduce Task
- Hash Shuffle執行步驟
- 因為只有2個core,所以任一挑2個Map Task開始執行
- Shuffle Write Phase
- Map Task將相同key的Data寫入Bucket中(對應①)
- 每當Bucket滿了,作為一個批次寫入Disk文件中,Disk文件稱為FileSegment,不斷追加後,同一個key所有的Data都會寫到Disk文件中(對應②)
- Shuffle Read Phase
- Reduce Task將符合目標key的Data拉回來進行處理,每次拉取與自己Buffer相同大小的Data,並且透過記憶體中的一個Map進行聚合操作(aggregation),週而復始的直到所有Data拉取結束,並得到最終結果(對應③)
- 執行另外2個Map Task,並重複Step2跟3
- 每個Map Task下所需的Bucket與File數量等於Reduce Task數量
- 這不難理解,因為最後這些分片是要被Reduce Task所拉取的,所以與Reduce Task數量一一對應
- Bucket大小可以透過spark.shuffle.file.buffer調整,預設為32K,一般會調整為128K
Disadvantages
- 若有M個Map Task,R個Reduce Task,C個Core
- 總共存在M*R個FileSegment
- 本案例中是4*2=8個FileSegment
- 若M與R都到了一定量級,則會產生大量的FileSegment,並且進行大量的Disk IO,進而影響性能
- 同時存在M*C個Bucket
- 本案例中是2*2=4個Bucket
- 若將Bucket設為128KB,則需要MC128KB的記憶體空間
- 同樣的,若M與R的數量上升,同時所需的記憶體空間也會到無法接受的大小
Hash Shuffle with ConsolidateFile
將Spark.suffle.consolidateFiles參數設為True,可以打開consolidate機制
- Consolidate Hash Shuffle執行步驟
- 因為只有2個core,所以任一挑2個Map Task開始執行
- Shuffle Write Phase
- Map Task將相同key的Data寫入Bucket中(對應①)
- 每當Bucket滿了,創建一個稱為ShuffleFileGroup的Disk文件,並將Data寫入ShuffleFileGroup,不斷追加後,同一個key所有的Data都會寫到Disk文件中(對應②)
- Shuffle Read Phase
- Reduce Task將符合目標key的Data拉回來進行處理,每次拉取與自己Buffer相同大小的Data,並且透過記憶體中的一個Map進行聚合操作(aggregation),週而復始的直到所有Data拉取結束,並得到最終結果(對應③)
- 執行另外2個Map Task,並重複Step2跟3,與一般的Hash Shuffle不同的是,每個Map Task不會自行建立自己的Disk文件,而是寫入第一批Map Task所創建的ShuffleFileGroup中
Advantages
- Consolidate機制允許不同的Task復用同一批Disk文件,這樣可以有效將多個Task的Disk文件進行一定程度上的合併,從而大幅度減少Disk文件的數量,進而提升Shuffle write性能
- ShuffleFileGroup數量為C*R
- 本案例中是2*2=4個ShuffleFileGroup
- 如果是使用Spark 1.3,建議開啟consolidateFiles,並且將Bucket放大至128K
Sort Suffle
- Sort-based shuffle implementation
- Sort Suffle執行步驟
- Shuffle Write Phase
- Map Task先將數據寫入記憶體資料結構中,根據不同的Shuffle算子,可能選用不同的資料結構(對應①②③)
- 如果是ReduceByKey此類聚合的Shuffle算子,則會使用Map資料結構,一邊透過Map進行聚合,一邊寫入記憶體
- 如果是Join此類普通的Shuffle算子,則會選用Array資料結構,直接寫入記憶體
- 每寫一條數據到記憶體的資料結構時,都會判斷是否到達某個臨界閥值,如果達到,就將記憶體中的所有數據寫入到Disk的Data File中(對應④)
- 在寫入Data File之前會根據Key對記憶體內的資料結構進行排序
- 根據排序結果,分批將數據寫入Data File,默認Batch數量為1萬條數據。換句話說,排列好的數據,會以每批1萬條的形式分批寫入Data File
- 使用Java的緩衝輸出流BufferedOutputStream來實作Data File寫入操作
- 一個Map Task將所有記憶體中的資料結構過程中,會發生多次Disk溢寫操作,也就是會產生多個臨時文件,最後會將所有的臨時文件都進行合併,稱之為Merge操作(對應⑤)
- Map Task先將數據寫入記憶體資料結構中,根據不同的Shuffle算子,可能選用不同的資料結構(對應①②③)
- Shuffle Read Phase
- 每個Map Task對應一個Merge後的Data File和Index File,Index File標示了每個Reduce Task在Data File中,所需要拉取數據的Start offset與End offset(對應⑥)
- Shuffle Write Phase
Advantages
- 數據會先緩衝在記憶體內的資料結構中,當Cache滿了之後再一次寫入Data File中,有效減少Disk IO次數,提升性能
Spark Shell Demo
|
|
- 由DAG可得知此Job分割成2個Stage,即對應上述所說的Shuffle write/read stage,
- Stage0與Stage1分別各有2個Task,分別對應上述的Map task與Reduce Task,
- Map task數量是由sc.textFile(…)的第二個參數minPartitions決定,預設為2(Partition數與Map Task數相對應)
- Reduce Task數量沒有指定,故由reduceByKey(…)的內部機制計算得出,此處為2
- 可以透過spark.default.parallelism參數設定,或是指定textFile的第二個參數來設定
|
|
- 根據上述描述,Sort suffle會產生Data與Index file
- 此處分別產生2個Data與Index file,對應2個Map Task
- shuffle_x_y_z:
- x: shuffle的次數
- y: 第幾次map產生的
- z: 第幾次reduce產生的
|
|
- Stage0與Stage1分別對應2個Map task與5個Reduce Task,
- Map task數量是由sc.textFile(…)的第二個參數minPartitions決定,預設為2(Partition數與Map Task數相對應)
- Reduce Task數量由reduceByKey(…)的第二個參數設定
|
|
- 此處為累積第二次的Shuffle,所以對應的Data與Index文件為shuffle1 開頭
- 此處分別產生2個Data與Index file,對應2個Map Task
Sort Suffle Bypass Mechanism
- Bypass的觸發條件如下
- Shuffle map task數量小於spark.shuffle.sort.bypassMergeThreshold參數的值
- 不是聚合類的Shuffle算子
Advantages
- 在Shuffle write過程中,不會對數據進行排序操作,節省此部分的性能開銷
Spark Shuffle Tuning
- spark.shuffle.file.buffer
- 默認值: 32KB
- 建議值: 128KB
- 此參數用於設置Shuffle write task的Buffer大小,每次寫入Disk File前,會先將數據寫入此Buffer,待Buffer滿了之後,再寫入Disk File
- 可以Shuffle wirte過程中溢寫Disk文件的次數,進而減少Disk I/O次數
- spark.reducer.maxSizeInFlight
- 默認值: 48m
- 建議值: 96m
- 此參數用於設置Shuffle read task的Buffer大小,此Buffer決定每次能夠拉取多少數據
- 減少拉取數據的次數,進而減少網路傳輸的次數
- spark.shuffle.io.maxRetries
- 默認值: 3
- 建議值: 60
- 當Shuffle read task從Shuffle write task拉取屬於自己的數據時,可能因為JVM的GC或者網路異常導致拉取數據失敗,會自動進行重試,此參數代表最大重試次數
- 對於超大數據量(數十億至上百億)的Shuffle,調優此參數可以大幅提升穩定性
- spark.shuffle.io.retryWait
- 默認值: 5s
- 建議值: 60s
- 每次重試拉取數據的間隔時常