Spark Shuffle

Spark Shuffle

Spark Shuffle Introduction

  • 將某中具有相同特徵(key)的數據匯聚到同一個計算節點上進行計算的過程稱之為Shuffle
  • 一定存在Wide dependencies
  • 一個Action將被切割成2個Stage
  • Shuffle會產生一系列的Map task去組織數據, 以及一系列Reduece task去聚合數據
  • 可能產生Shuffle的算子
    • KeyBy transformations
    • Join with inputs not co-partitioned
    • Repartition
    • Coalesce(Maybe)

Performance Impact

  • 數據的存儲
  • 序列化與反序列化
  • 數據壓縮
  • Disk/Network IO
  • 跨節點/跨行程數據傳輸

Spark Shuffle Manager

  • Spark在進行Shuffle時,會根據shuffleManager的instance的不同,使用不同的Shuffle方法
  • 在Spark1.2之前,預設使用Hash shuffle,Spark1.2以後,預設使用Sort shuffle
  • 可以在conf中的spark.shuffle.manager指定要使用的Shuffle方法
  • 高版本中已經沒有Hash shuffle
  • 相關程式碼在org.apache.spark.shuffle.sort的SortShuffleManager.scala中
    1
    2
    3
    4
    5
    6
    7
    8
    9
    // Let the user specify short names for shuffle managers
    val shortShuffleMgrNames = Map(
    "sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
    "tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
    val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
    val shuffleMgrClass =
    shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
    val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)

Hash Shuffle

  • 基礎條件
    • 在1個Excutor有2個Core
    • 共有4個Map Task
    • 共有2個Reduce Task
  • Hash Shuffle執行步驟
    1. 因為只有2個core,所以任一挑2個Map Task開始執行
    2. Shuffle Write Phase
      • Map Task將相同key的Data寫入Bucket中(對應①)
      • 每當Bucket滿了,作為一個批次寫入Disk文件中,Disk文件稱為FileSegment,不斷追加後,同一個key所有的Data都會寫到Disk文件中(對應②)
    3. Shuffle Read Phase
      • Reduce Task將符合目標key的Data拉回來進行處理,每次拉取與自己Buffer相同大小的Data,並且透過記憶體中的一個Map進行聚合操作(aggregation),週而復始的直到所有Data拉取結束,並得到最終結果(對應③)
    4. 執行另外2個Map Task,並重複Step2跟3
  • 每個Map Task下所需的Bucket與File數量等於Reduce Task數量
    • 這不難理解,因為最後這些分片是要被Reduce Task所拉取的,所以與Reduce Task數量一一對應
  • Bucket大小可以透過spark.shuffle.file.buffer調整,預設為32K,一般會調整為128K

Disadvantages

  • 若有M個Map Task,R個Reduce Task,C個Core
  • 總共存在M*R個FileSegment
    • 本案例中是4*2=8個FileSegment
    • 若M與R都到了一定量級,則會產生大量的FileSegment,並且進行大量的Disk IO,進而影響性能
  • 同時存在M*C個Bucket
    • 本案例中是2*2=4個Bucket
    • 若將Bucket設為128KB,則需要MC128KB的記憶體空間
    • 同樣的,若M與R的數量上升,同時所需的記憶體空間也會到無法接受的大小

Hash Shuffle with ConsolidateFile

將Spark.suffle.consolidateFiles參數設為True,可以打開consolidate機制

  • Consolidate Hash Shuffle執行步驟
    1. 因為只有2個core,所以任一挑2個Map Task開始執行
    2. Shuffle Write Phase
      • Map Task將相同key的Data寫入Bucket中(對應①)
      • 每當Bucket滿了,創建一個稱為ShuffleFileGroup的Disk文件,並將Data寫入ShuffleFileGroup,不斷追加後,同一個key所有的Data都會寫到Disk文件中(對應②)
    3. Shuffle Read Phase
      • Reduce Task將符合目標key的Data拉回來進行處理,每次拉取與自己Buffer相同大小的Data,並且透過記憶體中的一個Map進行聚合操作(aggregation),週而復始的直到所有Data拉取結束,並得到最終結果(對應③)
    4. 執行另外2個Map Task,並重複Step2跟3,與一般的Hash Shuffle不同的是,每個Map Task不會自行建立自己的Disk文件,而是寫入第一批Map Task所創建的ShuffleFileGroup中

Advantages

  • Consolidate機制允許不同的Task復用同一批Disk文件,這樣可以有效將多個Task的Disk文件進行一定程度上的合併,從而大幅度減少Disk文件的數量,進而提升Shuffle write性能
  • ShuffleFileGroup數量為C*R
    • 本案例中是2*2=4個ShuffleFileGroup
  • 如果是使用Spark 1.3,建議開啟consolidateFiles,並且將Bucket放大至128K

Sort Suffle

  • Sort-based shuffle implementation
  • Sort Suffle執行步驟
    1. Shuffle Write Phase
      •  Map Task先將數據寫入記憶體資料結構中,根據不同的Shuffle算子,可能選用不同的資料結構(對應①②③)
        • 如果是ReduceByKey此類聚合的Shuffle算子,則會使用Map資料結構,一邊透過Map進行聚合,一邊寫入記憶體
        • 如果是Join此類普通的Shuffle算子,則會選用Array資料結構,直接寫入記憶體
      • 每寫一條數據到記憶體的資料結構時,都會判斷是否到達某個臨界閥值,如果達到,就將記憶體中的所有數據寫入到Disk的Data File中(對應④)
        • 在寫入Data File之前會根據Key對記憶體內的資料結構進行排序
        • 根據排序結果,分批將數據寫入Data File,默認Batch數量為1萬條數據。換句話說,排列好的數據,會以每批1萬條的形式分批寫入Data File
        • 使用Java的緩衝輸出流BufferedOutputStream來實作Data File寫入操作
      • 一個Map Task將所有記憶體中的資料結構過程中,會發生多次Disk溢寫操作,也就是會產生多個臨時文件,最後會將所有的臨時文件都進行合併,稱之為Merge操作(對應⑤)
    2. Shuffle Read Phase
      • 每個Map Task對應一個Merge後的Data File和Index File,Index File標示了每個Reduce Task在Data File中,所需要拉取數據的Start offset與End offset(對應⑥)

Advantages

  • 數據會先緩衝在記憶體內的資料結構中,當Cache滿了之後再一次寫入Data File中,有效減少Disk IO次數,提升性能

Spark Shell Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[hadoop@testmain bin]$ ./spark-shell --master local[3] --conf spark.huffle.manager=sort --conf spark.local.dir=/home/hadoop/suffle_output --jars /opt/software/hive/lib/mysql-connector-java-5.1.44-bin.jar
SLF4J: Class path contains multiple SLF4J bindings.
## ...
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_144)
Type in expressions to have them evaluated.
Type :help for more information.
scala> val lines = sc.textFile("file:////home/hadoop/testFile")
lines: org.apache.spark.rdd.RDD[String] = file:////home/hadoop/testFile MapPartitionsRDD[1] at textFile at <console>:24
scala> val words = lines.flatMap(_.split("\t"))
words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:26
scala> val pairs = words.map((_,1))
pairs: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:28
scala> val wordcount = pairs.reduceByKey(_+_)
wordcount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:30
scala> wordcount.collect
res0: Array[(String, Int)] = Array((Hello,1), (we,1), ("",1), (first,1), (How,1), (world,1), (roll,1))
  • 由DAG可得知此Job分割成2個Stage,即對應上述所說的Shuffle write/read stage,
  • Stage0與Stage1分別各有2個Task,分別對應上述的Map task與Reduce Task,
  • Map task數量是由sc.textFile(…)的第二個參數minPartitions決定,預設為2(Partition數與Map Task數相對應)
  • Reduce Task數量沒有指定,故由reduceByKey(…)的內部機制計算得出,此處為2
    • 可以透過spark.default.parallelism參數設定,或是指定textFile的第二個參數來設定
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
## 進入到spark.local.dir參數所指定的路徑
[hadoop@testmain ~]$ cd /home/hadoop/suffle_output
[hadoop@testmain suffle_output]$ ls
blockmgr-9883d706-3c6c-4b96-9093-010fa120649c spark-c90041c3-9032-44e0-91fe-41637951b75a
## 在blockmgr開頭的資夾內,可以看到對應的Data與Index文件
[hadoop@testmain suffle_output]$ cd blockmgr-9883d706-3c6c-4b96-9093-010fa120649c
[hadoop@testmain blockmgr-9883d706-3c6c-4b96-9093-010fa120649c]$ tree
.
|____11
|____0e
|____30
| |____shuffle_0_0_0.index
|____0d
|____13
|____15
| |____shuffle_0_1_0.data
|____0c
| |____shuffle_0_0_0.data
|____0f
| |____shuffle_0_1_0.index
  • 根據上述描述,Sort suffle會產生Data與Index file
  • 此處分別產生2個Data與Index file,對應2個Map Task
  • shuffle_x_y_z:
    • x: shuffle的次數
    • y: 第幾次map產生的
    • z: 第幾次reduce產生的
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
``` bash
scala> val lines = sc.textFile("file:////home/hadoop/testFile")
lines: org.apache.spark.rdd.RDD[String] = file:////home/hadoop/testFile MapPartitionsRDD[6] at textFile at <console>:24
scala> val words = lines.flatMap(_.split("\t"))
words: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[7] at flatMap at <console>:26
scala> val pairs = words.map((_,1))
pairs: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[8] at map at <console>:28
## 設定Reduce task數量為5
scala> val wordcount = pairs.reduceByKey(_+_, 5)
wordcount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[9] at reduceByKey at <console>:30
scala> wordcount.collect
res1: Array[(String, Int)] = Array((Hello,1), (we,1), ("",1), (roll,1), (first,1), (How,1), (world,1))
  • Stage0與Stage1分別對應2個Map task與5個Reduce Task,
  • Map task數量是由sc.textFile(…)的第二個參數minPartitions決定,預設為2(Partition數與Map Task數相對應)
  • Reduce Task數量由reduceByKey(…)的第二個參數設定
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
[hadoop@testmain blockmgr-2bce7fb7-2a45-45ba-8fb9-96be7976e8f4]$ tree
.
|____11
|____0e
|____30
| |____shuffle_0_0_0.index
|____0d
|____36
|____0a
| |____shuffle_1_1_0.data
|____09
|____0b
|____32
| |____shuffle_1_1_0.index
|____13
|____15
| |____shuffle_0_1_0.data
|____0c
| |____shuffle_0_0_0.data
|____0f
| |____shuffle_0_1_0.index
| |____shuffle_1_0_0.index
|____2b
| |____shuffle_1_0_0.data
  • 此處為累積第二次的Shuffle,所以對應的Data與Index文件為shuffle1 開頭
  • 此處分別產生2個Data與Index file,對應2個Map Task

Sort Suffle Bypass Mechanism

  • Bypass的觸發條件如下
    • Shuffle map task數量小於spark.shuffle.sort.bypassMergeThreshold參數的值
    • 不是聚合類的Shuffle算子

Advantages

  • 在Shuffle write過程中,不會對數據進行排序操作,節省此部分的性能開銷

Spark Shuffle Tuning

  • spark.shuffle.file.buffer
    • 默認值: 32KB
    • 建議值: 128KB
    • 此參數用於設置Shuffle write task的Buffer大小,每次寫入Disk File前,會先將數據寫入此Buffer,待Buffer滿了之後,再寫入Disk File
    • 可以Shuffle wirte過程中溢寫Disk文件的次數,進而減少Disk I/O次數
  • spark.reducer.maxSizeInFlight
    • 默認值: 48m
    • 建議值: 96m
    • 此參數用於設置Shuffle read task的Buffer大小,此Buffer決定每次能夠拉取多少數據
    • 減少拉取數據的次數,進而減少網路傳輸的次數
  • spark.shuffle.io.maxRetries
    • 默認值: 3
    • 建議值: 60
    • 當Shuffle read task從Shuffle write task拉取屬於自己的數據時,可能因為JVM的GC或者網路異常導致拉取數據失敗,會自動進行重試,此參數代表最大重試次數
    • 對於超大數據量(數十億至上百億)的Shuffle,調優此參數可以大幅提升穩定性
  • spark.shuffle.io.retryWait
    • 默認值: 5s
    • 建議值: 60s
    • 每次重試拉取數據的間隔時常

Referance