Spark RDD Persist

Spark RDD Persist

  • Persist/Cache是Spark中一個重要的特性,其操作是將DataSet持久化至記憶體中。
  • 當進行RDD持久化時,每個Node會儲存RDD中的每個Partition至記憶體中,並在其他Action執行時將RDD從記憶體中取出。

Use Spark-shell to demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
## 此處先展示之後要用到的本地檔案,該檔案有3行數據
[hadoop@testmain ~]$ cat testFile
Hello world first
How we roll
## 啟動Spark-shell
[hadoop@testmain bin]$ ./spark-shell --master local[2] --jars /opt/software/hive/lib/mysql-connector-java-5.1.44-bin.jar
## ...
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_144)
Type in expressions to have them evaluated.
Type :help for more information.
## 讀入上述本地檔案,並得到一個RDD物件
scala> val lines = sc.textFile("file:///home/hadoop/testFile")
lines: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/testFile MapPartitionsRDD[5] at textFile at <console>:24
scala> lines.count
res2: Long = 3

上面兩張圖分別表示執行lines.count後,Spark jobs頁面以及Stage details頁面
下面說明第二張圖Stage details頁面的兩個欄位:

  • 最上方的Input Size / Records: 47.0 B / 3
    • 47.0 表示讀入的Byte數,Records表示讀入的數據行數。此處的Byte數和行數皆與原始檔案相同
  • 最下方的Tasks (2)
    • 表示Partition數量,Task與Partition數量相同
    • 若此處欄位之間的Record數量相差很多,則表示發生數據傾斜,需要特別注意以及進行調整
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
scala> lines.cache
res3: lines.type = file:///home/hadoop/testFile MapPartitionsRDD[5] at textFile at <console>:24
## 此處仍然是從本地端讀入數據來建立RDD
## 此處的Action觸發後,才會進行Persist
scala> lines.count
res4: Long = 3
## 此處才是從Cache中讀取數據來建立RDD
scala> lines.count
res5: Long = 3
## unpersis
scala> lines.unpersist()
res6: lines.type = file:///home/hadoop/testFile MapPartitionsRDD[5] at textFile at <console>:24

此張圖是在lines.cache後,Spark stages頁面,當中的訊息解讀如下

  • Cache size(Size in Memory + Size on Disk)默認比原始數據來源大
  • Storage Level默認為MEMORY_ONLY

上述圖片為第三次執行lines.count所對應的Stage details頁面,當中的訊息解讀如下:

  • Input Size與Cache size相同

進行Persist與Unpersist需要注意的事項:

  • persist()或cache()是lazy操作,碰到action才會進行實際處理
    • 與Spark SQL相反,Spark SQL的Cache是Eager操作
  • 第一次執行Action時,會先從原始位置建立RDD,然後進行Persist;並在第二次執行Action時,才會從Cache中將RDD取出
  • unpersist()為Eager操作
  • 在某些場景下會讓Action操作更快的執行完成,實際情況需要根據場景進行測試
  • 對於存在迭代過程的演算法,或者交互式的場景下有著關鍵性的幫助
  • Cache是支持容錯機制的,若有任一個Partition遺失了,將會自動從源頭開始重新計算來建立Partition
  • cache()是調用persist(),並使用MEMORY_ONLY作為默認storage level
    • 相關程式碼位於org.apache.spark.rdd RDD.scala
1
2
3
4
5
6
7
8
9
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def cache(): this.type = persist()

Storage Level

根據資源情況,使用不同的Storage level進行RDD持久化,較常用的Storage level為:

  • MEMORY_ONLY:
    • 將RDD當作deserialized Java物件存放在JVM中,如果RDD無法完整儲存在記憶體中,則無法進行持久化,在下次使用該RDD時,會透過重新計算來獲取RDD
    • 此為預設值
    • deserialized是指將一個partition作為一個Byte Array儲存
  • MEMORY_AND_DISK
    • 將RDD當作deserialized Java物件存放在JVM中,若記憶體中無法存放所有的Partition,則放入Disk中
  • MEMORY_ONLY_SER
    • 將RDD當作serialized Java物件存放在JVM中,serialized物件較deserialized物件更為節省記憶體空間,但會提高CPU使用率
    • 建議使用高效的序列化演算法,例如Kyro
Which Storage Level to Choose?

Storage Level的選擇主要是在CPU與Memory中做權衡,可以根據下列條件來進行挑選:

  1. 如果RDD如果可以使用MEMORY_ONLY完美處理,那麼就使用MEMORY_ONLY
  2. 如果無法使用MEMORY_ONLY,可以使用MEMORY_ONLY_SER,挑選一個快速的序列化演算法,壓縮記憶體所需佔用空間,並額外付出可接受的CPU使用率
  3. 不要將RDD拆分至Disk中,除非計算該DateSet的成本是昂貴的,或者是一個龐大的DataSet,不然一般來說重新計算Partition的速度是快於從Disk中讀取的速度
  4. 如果想要達到fast fault recovery,可以使用replicated storage level
    • 雖然所有的storage level都提供full fault recovery來重新計算損失的數據,但是有提供replicated的storage level,可以讓task無需等待重新計算partition的時間
    • replicated需要額外佔用記憶體空間

Removing Data