Spark SQL DataFrame API Programming

Spark SQL DataFrame API Programming

  • 此文章舉出多個常用的DataFrame API例子
  • DataFrame常用API皆在org.apache.spark.sql的Dataset.scala和functions.scala內,可以透過觀看相關程式源碼來熟悉DataFrame
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
object DataFrameApiApp {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder().master("local[2]")
.appName("DataFrameApiApp")
.getOrCreate()
import spark.implicits._
val insuranceRdd = spark.sparkContext.textFile("file/FL_insurance_sample.csv")
val rowRdd = insuranceRdd.map(_.split(","))
.map((x => Row(x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7), x(8), x(9), x(10), x(11), x(12), x(13), x(14), x(15), x(16), x(17))))
val schemaString = "policyID statecode county eq_site_limit hu_site_limit fl_site_limit fr_site_limit tiv_2011" +
" tiv_2012 eq_site_deductible hu_site_deductible fl_site_deductible fr_site_deductible point_latitude point_longitude" +
" line construction point_granularity"
val fields = schemaString.split(" ")
.map(fieldName => StructField(fieldName, StringType))
val schema = StructType(fields)
val insuranceDF = spark.createDataFrame(rowRdd, schema)
insuranceDF.schema.printTreeString()
/*
root
|-- policyID: string (nullable = true)
|-- statecode: string (nullable = true)
|-- county: string (nullable = true)
|-- eq_site_limit: string (nullable = true)
|-- hu_site_limit: string (nullable = true)
|-- fl_site_limit: string (nullable = true)
|-- fr_site_limit: string (nullable = true)
|-- tiv_2011: string (nullable = true)
|-- tiv_2012: string (nullable = true)
|-- eq_site_deductible: string (nullable = true)
|-- hu_site_deductible: string (nullable = true)
|-- fl_site_deductible: string (nullable = true)
|-- fr_site_deductible: string (nullable = true)
|-- point_latitude: string (nullable = true)
|-- point_longitude: string (nullable = true)
|-- line: string (nullable = true)
|-- construction: string (nullable = true)
|-- point_granularity: string (nullable = true)
*/
// Selects a set of SQL expressions
insuranceDF.select("policyID", "county").show(3)
/*
+--------+-----------+
|policyID| county|
+--------+-----------+
| 119736|CLAY COUNTY|
| 448094|CLAY COUNTY|
| 206893|CLAY COUNTY|
+--------+-----------+
only showing top 3 rows
*/
// Filters rows using the given SQL expression.
insuranceDF.filter("point_granularity == 3").show(3)
// Uses build-in functions from org.apache.spark.sql.functions
val insuranceDF2 = insuranceDF.filter("SUBSTR(county,0,1) = 'S'")
insuranceDF2.show(3)
// Get the DataSet ascending ordering used in sorting
insuranceDF.sort($"policyID".asc).show(3)
// Get the DataSet sorted by multiple specified column
insuranceDF.sort("policyID", "county").show(3)
// Join with another `DataFrame`.
// Default is inner join
insuranceDF.join(insuranceDF2, "policyID").show(3)
spark.stop()
}
}