Spark SQL DataFrame RDD

Spark SQL DataFrame RDD

DataFrame interoperating with RDDs

  • DataFrame interoperating with RDDs
  • Spark SQL支援兩種方式將既有的RDD轉換成DataFrame:
    • 使用指定類型的物件,透過反射(reflection)將RDD中的schema推導出來後轉換成DataFrame
      • 限制: Scala 2.10最多只能指定22個field
    • 使用interface創建schema,並應用在既有的RDD上得到DataFrame

Inferring the Schema Using Reflection

  • Inferring the Schema Using Reflection
  • Spark SQL中的Sacla interface支持將包含Case class的RDD自動轉換成DataFrame
  • Case class定義table的schema
  • Case class中的參數將會透過reflection變成Dataframe的column名稱與類型
  • Case class可以包涵巢狀或者複雜的類型
  • RDD透過隱式轉換(implicitly converted)變成Dataframe,並且註冊成為一個Table
  • Table可以使用SQL語法來進行操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import org.apache.spark.sql.SparkSession
object DataFrameDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder().master("local[2]")
.appName("DataFrameDemo")
.getOrCreate()
import spark.implicits._
// Create an RDD
val peopleRdd = spark.sparkContext.textFile("file/people.txt")
val peopleDF = peopleRdd.map(_.split(",")) // split RDD
.map(x => Person(x(0), x(1).trim.toInt))
.toDF() // convert to dataframe
peopleDF.show()
/*
+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
*/
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")
// SQL statements can be run by using the sql methods provided by Spark
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")
// The columns of a row in the result can be accessed by field index
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
/*
+------------+
| value|
+------------+
|Name: Justin|
+------------+
*/
// Accessed by field name
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name") + ", Age: " +
teenager.getAs[Int]("age")).show(false)
/*
+---------------------+
|value |
+---------------------+
|Name: Justin, Age: 19|
+---------------------+
*/
spark.stop()
}
// Define case class
case class Person(name: String, age: Int)

Programmatically Specifying the Schema

  • Programmatically Specifying the Schema
  • 常見的業務情況是Case class無法事先定義的,此情況下可以透過編程的方式建立Dataframe
  • 步驟如下:
    1. 將原始的RDD轉換成Rows類型的RDD
    2. 創建透過StructType表示的schema,該schema必須符合Step1中的Rows結構
    3. 使用createDataFrame方法將schema應用於RDD的Rows
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
object DataFrameDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder().master("local[2]")
.appName("DataFrameDemo")
.getOrCreate()
import spark.implicits._
// Create an RDD
val peopleRdd = spark.sparkContext.textFile("file/people.txt")
// Step1. Convert records of the RDD (people) to Rows
val rowRDD = peopleRdd.map(_.split(","))
.map(x => Row(x(0), x(1).trim))
// The schema is encoded in a string
val schemaString = "name age"
// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType, nullable = true))
// Step2. Create the schema represented by a StructType matching the structure of Rows in the RDD
val schema = StructType(fields)
// Step3. Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)
peopleDF.show()
/*
+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
*/
spark.stop()
}