Spark SQL External Data Source

Spark SQL External Data Source

Spark SQL Data Sources

External Data Source API

Why

  • 多個不同數據源使用不同的方式進行讀寫操作,非常不方便
  • 不同讀入與寫出的格式之間的轉換

What

  • 為一個擴展的方式,集成多種外部數據源到Spark SQL
  • 將外部數據源轉換成DataFrame後,透過DataFrame進行數據的操作與寫出
    • Flow: External Data Source -> External Data Source API -> DataFrame
  • 自動對外部數據源進行prune columns與push filters
    • Parquet: 忽略與操作無關的columns和blocks,減少讀入數據與減少IO
    • JDBC/Parquet: 對查詢進行優化,以便進行位次下壓(push predicates down)

Supported

  • Spark 1.x:
    • Built-In Libraries: JSON, JDBC, Parquet, Hive, MySQL, PostgreSQL, HDFS, S3, H2
    • External Libraries: AVRO, CSV, dBase, HBASE, Elasticsearch, Cassandra, Redshift
  • Spark 2.x:
    • 將CSV集成至Built-in Libraries

Benefit

  • Library Developer:
  • Library User
    • 引入對應的Package後,就可以快速且簡單的對外部數據源進行讀寫操作
  • 透過Spark SQL提供的查詢最佳化,進行高效的數據存取
  • 提供接口來進行數據下壓最佳化,避免不需要的數據存取

Specific Optimizations:

  • Column pruning
  • Pushing predicates to data source(filter pushdown)
  • Partition pruning

External Data Source API Usage

Two Steps:

  • read.load:
    • 讀取一個已經存在的外部數據源,並創建成DataFrame
    • functions:
      • format(Data source name): 外部數據源的格式
      • options: 外部數據源所需的特定參數,例如jdbc的帳號密碼
  • df.save:
    • 將DataFrame儲存成指定的外部數據源
    • functions/params:
      • format(Data sourc name): 外部數據源的儲存格式
      • save mode: 當外部數據源已存在時的寫出模式,例如追加、複寫、拋出異常
      • options: 外部數據源所需的特定參數

Example

1
2
3
4
5
6
7
8
9
10
11
val spark = SparkSession
.builder().master("local[2]")
.appName("ExternalDataSourceApiApp")
.getOrCreate()
// com.databricks.spark.avro is data source name of avro. Set data source name based on type of external data source.
val df = spark.read
.format("com.databricks.spark.avro")
.load("file/data.avro")
df.filter("number > 18").write.format("com.databricks.spark.avro").save("/tmp/output")

Run SQL on files directly

1
val sqlDF = spark.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")