Spark Shared Variables
- 如果有一個操作(map, reduce)是執行在遠端集群節點上,而此操作內使用到的外部變量,Spark會通過複製的方式,從Driver端傳遞到每台執行該操作的節點上,
而且每台遠端節點上的變量,是無法進行更新對應在Driver那份原始變量的。 - 在上述的情況下,讀寫共享變量在跨Task的情況下就不是這麼的高效了,所以Spark提供了兩種機制的共享變量機制:
- Broadcast Variables
- Accumulators
Broadcast Variables
- Broadcast variables
- 透過SparkContext構建Broadcast variables
- 可以大幅降低序列化任務的大小,以及降低每個跨集群啟動Job的成本
- 對較大的變量,透過Broadcast variable傳遞到每個Job,有效降低記憶體使用量
- 當多個跨Stage的Task需要相同一份Data,或使用Deserialized將Data cache時,使用Broadcast Variables才是有用的
- 每台Work node中第一個使用到Broadcast variable的Task,會透過BlockManager向Driver端拉取Broadcast variable到自己的Executor上
- 而相同Work node上但不同Executor的其他Task需要使用到Broadcast variable時,會透過BlockManager向已經有Broadcast variable的Executor拉取
- Spark提供兩種Broadcast機制
- TorrentBroadcast: 預設使用
- HttpBroadcast
- 若現在有50 Executor, 1000 Tasks, Data 100M
- 未使用Broadcast Variables所使用的記憶體空間: 1000 * 100M
- 使用Broadcast Variables所使用的記憶體空間: 50 * 1000M
- 未使用Broadcast Variables的示意圖
- 使用Broadcast Variables的示意圖
Accumulators
- Accumulators
- 透過SparkContext構建Accumulators
- 一個只能進行add的操作的變量,透過關聯和交換操作,達成高效的並行處理
- Spark原生支援Numeric類型的Accumulators,可以透用繼承的方式進行擴展
Broadcast Variables Code
|
|
- 可以透過上述的程式達成peopleInfo與departmentInfo的join操作
- 另外也可以透過Broadcast Variables達成相同的操作
|
|
- 上述透過Broadcast Variables與map類算子實作join操作,並與join操作達成相同效果
- 此方法在數據傾斜時,而且操作對象是一個大表、一個小表時,可以完全避免Shuffle操作,也就不會發生數據傾斜的問題
- join操作產生Shuffle操作時,會將擁有相同key的數據拉到一個Shuffle read task中再進行join,也稱之為reduce join,但如果某個key的數據太多時,則會導致該Task處理時間很長,甚至處理不完
- 可以將操作對象中的小表(RDD)透過Broadcast Variables進行
廣播,然後對大表進行map類算子,並在算子中使用小表的Broadcast Variables,並將原本的join邏輯實作在算子中,此流程也稱之為map join
Accumulators Code
此處使用Accumulator計算文件中為空的數據
|
|
- 在執行Action之前,需要先對RDD進行cache,避免每次執行Action時,重複對Transformation內的Accumulators操作重複進行add,導致非預期的輸出
- 在對應的Job頁面中的Tasks欄位,可以看到longAccumulator創建時所指定的Accumulator名稱