Spark Streaming File Stream
Spark Streaming File Stream
- Spark streaming file stream
- 支持任何兼容HDFS API的文件系統,例如HDFS、S3、NFS
- File stream不需要執行Receiver去接收數據,故不需要配置任何Core
- 使用key-value的方式來表示input的格式
- key: offset
- value: file content
- 在文件移動到受監控目錄後,使用附加的方式所新增的內容,不會被處理
- 文件必須是寫到受監控的目錄,並且是透過move的方式從其他位置寫入的文件
- 不能使用copy的原因,是因為copy可能無法完成一次性移動到受監控的目錄,導致Streaming報錯
- 文件會不會被處理,主要是根據文件最後修改時間作為依據,必須在Streaming啟動之後的時間,才會被處理
- 所有輸入的文件的格式需要相同
- 使用下列API創建File input stream,來監控新文件的產生
- streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)
- streamingContext.textFileStream(dataDirectory): For text files
1234567891011121314151617/** * Create an input stream that monitors a Hadoop-compatible filesystem * for new files and reads them using the given key-value types and input format. * Files must be written to the monitored directory by "moving" them from another * location within the same file system. File names starting with . are ignored. * @param directory HDFS directory to monitor for new file * @tparam K Key type for reading HDFS file * @tparam V Value type for reading HDFS file * @tparam F Input format for reading HDFS file */def fileStream[ K: ClassTag, V: ClassTag, F <: NewInputFormat[K, V]: ClassTag] (directory: String): InputDStream[(K, V)] = { new FileInputDStream[K, V, F](this, directory)}
|
|