Spark Streaming File Stream

Spark Streaming File Stream

Spark Streaming File Stream

  • Spark streaming file stream
  • 支持任何兼容HDFS API的文件系統,例如HDFS、S3、NFS
  • File stream不需要執行Receiver去接收數據,故不需要配置任何Core
  • 使用key-value的方式來表示input的格式
    • key: offset
    • value: file content
  • 在文件移動到受監控目錄後,使用附加的方式所新增的內容,不會被處理
  • 文件必須是寫到受監控的目錄,並且是透過move的方式從其他位置寫入的文件
    • 不能使用copy的原因,是因為copy可能無法完成一次性移動到受監控的目錄,導致Streaming報錯
  • 文件會不會被處理,主要是根據文件最後修改時間作為依據,必須在Streaming啟動之後的時間,才會被處理
  • 所有輸入的文件的格式需要相同
  • 使用下列API創建File input stream,來監控新文件的產生
    • streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
    • streamingContext.textFileStream(dataDirectory): For text files



1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
/**
* Create an input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
* Files must be written to the monitored directory by "moving" them from another
* location within the same file system. File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
* @tparam K Key type for reading HDFS file
* @tparam V Value type for reading HDFS file
* @tparam F Input format for reading HDFS file
*/
def fileStream[
K: ClassTag,
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag
] (directory: String): InputDStream[(K, V)] = {
new FileInputDStream[K, V, F](this, directory)
}

1
2
3
4
5
6
7
8
9
10
11
/**
* Create an input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them as text files (using key as LongWritable, value
* as Text and input format as TextInputFormat). Files must be written to the
* monitored directory by "moving" them from another location within the same
* file system. File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
*/
def textFileStream(directory: String): DStream[String] = withNamedScope("text file stream") {
fileStream[LongWritable, Text, TextInputFormat](directory).map(_._2.toString)
}