Spark Shared Variables

Spark Shared Variables

  • 如果有一個操作(map, reduce)是執行在遠端集群節點上,而此操作內使用到的外部變量,Spark會通過複製的方式,從Driver端傳遞到每台執行該操作的節點上,
    而且每台遠端節點上的變量,是無法進行更新對應在Driver那份原始變量的。
  • 在上述的情況下,讀寫共享變量在跨Task的情況下就不是這麼的高效了,所以Spark提供了兩種機制的共享變量機制:
    • Broadcast Variables
    • Accumulators

Broadcast Variables

  • Broadcast variables
  • 透過SparkContext構建Broadcast variables
  • 可以大幅降低序列化任務的大小,以及降低每個跨集群啟動Job的成本
  • 對較大的變量,透過Broadcast variable傳遞到每個Job,有效降低記憶體使用量
  • 當多個跨Stage的Task需要相同一份Data,或使用Deserialized將Data cache時,使用Broadcast Variables才是有用的
  • 每台Work node中第一個使用到Broadcast variable的Task,會透過BlockManager向Driver端拉取Broadcast variable到自己的Executor上
  • 而相同Work node上但不同Executor的其他Task需要使用到Broadcast variable時,會透過BlockManager向已經有Broadcast variable的Executor拉取
  • Spark提供兩種Broadcast機制
    • TorrentBroadcast: 預設使用
    • HttpBroadcast
  • 若現在有50 Executor, 1000 Tasks, Data 100M
    • 未使用Broadcast Variables所使用的記憶體空間: 1000 * 100M
    • 使用Broadcast Variables所使用的記憶體空間: 50 * 1000M
  • 未使用Broadcast Variables的示意圖
  • 使用Broadcast Variables的示意圖

Accumulators

  • Accumulators
  • 透過SparkContext構建Accumulators
  • 一個只能進行add的操作的變量,透過關聯和交換操作,達成高效的並行處理
  • Spark原生支援Numeric類型的Accumulators,可以透用繼承的方式進行擴展

Broadcast Variables Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
/*
peopleInfo:
110 Smith
130 Allen
departmentInfo:
110 Sales 001
111 Accounting 002
result:
110,Smith,Sales
*/
val peopleInfo = sc.parallelize(Array(("110", "Smith"),("222", "Allen"))).map(x=>(x._1, x))
val departmentInfo = sc.parallelize(Array(("110", "Sales", "001"),("111", "Accounting", "002"))).map(x => (x._1, x))
peopleInfo.join(departmentInfo).map(x=>{
x._1 + "," + x._2._1._2 + "," + x._2._2._2
}).collect
  • 可以透過上述的程式達成peopleInfo與departmentInfo的join操作
  • 另外也可以透過Broadcast Variables達成相同的操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
val peopleInfo = sc.parallelize(Array(("110", "Smith"),("120", "Allen"))).collectAsMap()
val broadcastValue = sc.broadcast(peopleInfo)
val departmentInfo = sc.parallelize(Array(("110", "Sales", "001"),("111", "Accounting", "002")))
departmentInfo.mapPartitions(iter => {
val peopleMap = broadcastValue.value
var res = List[(String, String, String)]()
iter.foreach { case (x, y, z) => {
if (peopleMap.contains(x)) {
res .::= ((x, peopleMap.getOrElse(x, ""), y))
}
}
}
//arrayBuffer.iterator
res.iterator
}).collect
  • 上述透過Broadcast Variables與map類算子實作join操作,並與join操作達成相同效果
  • 此方法在數據傾斜時,而且操作對象是一個大表、一個小表時,可以完全避免Shuffle操作,也就不會發生數據傾斜的問題
  • join操作產生Shuffle操作時,會將擁有相同key的數據拉到一個Shuffle read task中再進行join,也稱之為reduce join,但如果某個key的數據太多時,則會導致該Task處理時間很長,甚至處理不完
  • 可以將操作對象中的小表(RDD)透過Broadcast Variables進行
    廣播,然後對大表進行map類算子,並在算子中使用小表的Broadcast Variables,並將原本的join邏輯實作在算子中,此流程也稱之為map join

Accumulators Code

此處使用Accumulator計算文件中為空的數據

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
val lines = sc.textFile("file:///home/hadoop/testFile")
val nullNum = sc.longAccumulator("null line counter")
val nonNullData = lines.filter(line => {
var isNullLine = true
val splitLines = line.split("\t")
for (splitLine <- splitLines){
if ("".equals(splitLine)){
isNullLine = false
nullNum.add(1)
}
}
isNullLine
})
// 避免多次執行Action後,Transformation內的nullNum.add(1)被多次觸發
nonNullData.cache()
nonNullData.count
println("Null line count: " + nullNum.value)
// Null line count: 1
nonNullData.count
println("Null line count: " + nullNum.value)
// 若不進行cache,此處會輸出 Null line count: 2
  • 在執行Action之前,需要先對RDD進行cache,避免每次執行Action時,重複對Transformation內的Accumulators操作重複進行add,導致非預期的輸出
  • 在對應的Job頁面中的Tasks欄位,可以看到longAccumulator創建時所指定的Accumulator名稱

Reference