Spark RDD Lineage and Checkpoint

Spark RDD Lineage and Checkpoint

Lineage

  • 一串序列的RDD transformation組成的Compute chain,在這條Chain的RDD之間的關係依賴圖稱為Lineage
  • 若Chain中的某個RDD的某個Partition遺失了,則可以根據Lineage得知Parent RDD後,重新計算遺失的Partition即可,而不需要從資料來源整個Chain重新計算
  • 可達成RDD的容錯機制

Checkpoint:

  • 當Compute chain太長時,可以使用Checkpoint來達成RDD落地以及容錯
  • Checkpoint將某個RDD保存到某個文件中,並將其Parent RDD移除,換句話說,設為一個新的Chain起點

Lineage and Checkpoint Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
scala> val rdd1 = sc.parallelize(0 to 9)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val rdd2 = rdd1.filter(x => x > 5)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at filter at <console>:26
scala> val rdd3 = rdd2.map(x => (x, x))
rdd3: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[2] at map at <console>:28
## 使用toDebugString列出RDD的Lineage
scala> rdd2.toDebugString
res4: String =
(2) MapPartitionsRDD[1] at filter at <console>:26 []
| ParallelCollectionRDD[0] at parallelize at <console>:24 []
scala> rdd3.toDebugString
res5: String =
(2) MapPartitionsRDD[2] at map at <console>:28 []
| MapPartitionsRDD[1] at filter at <console>:26 []
| ParallelCollectionRDD[0] at parallelize at <console>:24 []
## 對rdd3使用checkpoint
scala> sc.setCheckpointDir("file:///home/hadoop/checkpoint")
scala> rdd3.checkpoint
## checkpoint是lazy操作,所以要碰到action之後才會執行
scala> rdd3.count()
res11: Long = 4
## 在checkpoint後,rdd3為一個chain的起點
scala> rdd3.toDebugString
res8: String =
(2) MapPartitionsRDD[2] at map at <console>:28 []
| ReliableCheckpointRDD[3] at count at <console>:31 []
1
2
3
4
5
6
7
## 在執行action後,會在對應的路徑下看到落地的文件
[hadoop@testmain checkpoint]$ ls
f5546cf9-81bf-4bbc-a00b-cf5b87c384d5
[hadoop@testmain checkpoint]$ ls f5546cf9-81bf-4bbc-a00b-cf5b87c384d5/
rdd-2
[hadoop@testmain checkpoint]$ ls f5546cf9-81bf-4bbc-a00b-cf5b87c384d5/rdd-2/
part-00000 part-00001