Spark Streaming Basic

Spark Streaming Basic

Introduction

  • Spark streaming introduction
  • Ease of Use
    • 使用高級別的API進行開發Application
    • 支持多語言的API,包含Java、Scala以及Python
    • Streaming job與Batch job的之間的實作方法沒有區別
  • Fault Tolerance
    • 開箱及用的Stateful exactly-once semantics
    • 開箱及用的恢復Lost wrok與Operator state功能,不需要開發者進行額外的開發
  • Spark Integration
    - 與Spark Core、SQL、ML等結合
    • Streaminng job與Batch job之間的處理邏輯不需變動,修改輸入與輸出就可以
  • Deployment Options:
    • 支持多個數據源,包含HDFS、Flume、Kafka、Twitter、ZeroMQ
    • 支持多個運行模式

Overview

  • Spark streaming overview
  • 為Spark core API的的擴展,建立scalable,high-throughput,fault-tolerant的流處理方案
  • 可以接受多個數據源,包含Kafka,Flume,Kinesis,或TCP sockets
  • 使用高級別的API進行複雜的處理,例如map,reduce,join與window
  • 處理後結果的可以輸出到多種外部系統,例如filesystems,databases和live dashboards
  • 可以將Spark machine learning和graph processing應用至資料流處理中
  • Spark streaming接收數據流後,會將劉切割成多個Batch操作,並交給Spark引擎,最終產生的匹處理結果
  • Spark streaming處理時,可以設置匹處理的時間間隔
    • 根據時間間隔切割成多個批次
  • Spark streaming實際上是一個Mini batch的處理流程
  • Spark streaming提供一個高階API抽象操作,稱為Discretized stream或DStream
  • DStream代表一個持續的Steam資料
  • DStream可以透過外入輸入資料流進行創建,例如Kafka, Flume, and Kinesis
  • DStream也可以透過高階操作應在其他DStream進行創建
  • 一個DStream是代表一個RDD的Sequence

A Quick Example in Spark-shell

將下列代碼依序輸入Spark-shell中


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import org.apache.spark._
import org.apache.spark.streaming._
// Create a local StreamingContext with batch interval of 10 second.
val ssc = new StreamingContext(sc, Seconds(10))
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print a few of the counts to the console
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate

接著在同一台電腦上的另一個Terminal輸入以下指令

1
2
3
## Install the nc rpm package: yum install nc
[hadoop@testmain ~]$ nc -lk 9999
hello world

則在原本Spark-shell的Terminal會出現對應的訊息

1
2
3
4
5
6
7
18/03/09 00:58:17 WARN storage.RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
18/03/09 00:58:17 WARN storage.BlockManager: Block input-0-1520528296800 replicated to only 0 peer(s) instead of 1 peers
-------------------------------------------
Time: 1520528300000 ms
-------------------------------------------
(hello,1)
(world,1)

在Spark頁面上以顯示對應的Receiver

在Spark頁面的Streaming分頁上對應的統計數據,其中包含剛剛輸入紀錄而且被處理的訊息

Spark Streaming Basic Logic

  1. 取得Spark streaming context
  2. 使用Input DStream定義input sources
  3. 使用Transformation與Output operation(對應Spark core的action)定義Streaming計算操做,實作業務邏輯
  4. 使用streamingContext.start()啟動接收與處理數據
  5. 使用streamingContext.awaitTermination()等待處理流程結束
  6. 使用streamingContext.stop()關閉處理流程,一般不會在程式中使用,而是透過外部命令直接kill job

Points to Rember

  • 在streamingContext.start()後面加入業務邏輯是不會被執行的
  • 一但streamingContext.stop()後,是無法再被重新啟動的
  • 在一個JVM中,一個時間區段內,只有一個streamingContext會存活