Spark RDD Persist
- Persist/Cache是Spark中一個重要的特性,其操作是將DataSet持久化至記憶體中。
- 當進行RDD持久化時,每個Node會儲存RDD中的每個Partition至記憶體中,並在其他Action執行時將RDD從記憶體中取出。
Use Spark-shell to demo
|
|
上面兩張圖分別表示執行lines.count後,Spark jobs頁面以及Stage details頁面
下面說明第二張圖Stage details頁面的兩個欄位:
- 最上方的Input Size / Records: 47.0 B / 3
- 47.0 表示讀入的Byte數,Records表示讀入的數據行數。此處的Byte數和行數皆與原始檔案相同
- 最下方的Tasks (2)
- 表示Partition數量,Task與Partition數量相同
- 若此處欄位之間的Record數量相差很多,則表示發生數據傾斜,需要特別注意以及進行調整
|
|
此張圖是在lines.cache後,Spark stages頁面,當中的訊息解讀如下
- Cache size(Size in Memory + Size on Disk)默認比原始數據來源大
- Storage Level默認為MEMORY_ONLY
上述圖片為第三次執行lines.count所對應的Stage details頁面,當中的訊息解讀如下:
- Input Size與Cache size相同
進行Persist與Unpersist需要注意的事項:
- persist()或cache()是lazy操作,碰到action才會進行實際處理
- 與Spark SQL相反,Spark SQL的Cache是Eager操作
- 第一次執行Action時,會先從原始位置建立RDD,然後進行Persist;並在第二次執行Action時,才會從Cache中將RDD取出
- unpersist()為Eager操作
- 在某些場景下會讓Action操作更快的執行完成,實際情況需要根據場景進行測試
- 對於存在迭代過程的演算法,或者交互式的場景下有著關鍵性的幫助
- Cache是支持容錯機制的,若有任一個Partition遺失了,將會自動從源頭開始重新計算來建立Partition
- cache()是調用persist(),並使用MEMORY_ONLY作為默認storage level
- 相關程式碼位於org.apache.spark.rdd RDD.scala
|
|
Storage Level
根據資源情況,使用不同的Storage level進行RDD持久化,較常用的Storage level為:
- MEMORY_ONLY:
- 將RDD當作deserialized Java物件存放在JVM中,如果RDD無法完整儲存在記憶體中,則無法進行持久化,在下次使用該RDD時,會透過重新計算來獲取RDD
- 此為預設值
- deserialized是指將一個partition作為一個Byte Array儲存
- MEMORY_AND_DISK
- 將RDD當作deserialized Java物件存放在JVM中,若記憶體中無法存放所有的Partition,則放入Disk中
- MEMORY_ONLY_SER
- 將RDD當作serialized Java物件存放在JVM中,serialized物件較deserialized物件更為節省記憶體空間,但會提高CPU使用率
- 建議使用高效的序列化演算法,例如Kyro
Which Storage Level to Choose?
Storage Level的選擇主要是在CPU與Memory中做權衡,可以根據下列條件來進行挑選:
- 如果RDD如果可以使用MEMORY_ONLY完美處理,那麼就使用MEMORY_ONLY
- 如果無法使用MEMORY_ONLY,可以使用MEMORY_ONLY_SER,挑選一個快速的序列化演算法,壓縮記憶體所需佔用空間,並額外付出可接受的CPU使用率
- 不要將RDD拆分至Disk中,除非計算該DateSet的成本是昂貴的,或者是一個龐大的DataSet,不然一般來說重新計算Partition的速度是快於從Disk中讀取的速度
- 如果想要達到fast fault recovery,可以使用replicated storage level
- 雖然所有的storage level都提供full fault recovery來重新計算損失的數據,但是有提供replicated的storage level,可以讓task無需等待重新計算partition的時間
- replicated需要額外佔用記憶體空間
Removing Data
- Spark將自動監控cache使用率,並根據LRU(Least-recently-used)挑選犧牲的Partition進行刪除。
- 建議使用RDD.unpersist()來進行手動刪除