Spark SQL DataFrame API Programming Posted on 2017-12-23 | In Big data | Page View Spark SQL DataFrame API Programming 此文章舉出多個常用的DataFrame API例子 DataFrame常用API皆在org.apache.spark.sql的Dataset.scala和functions.scala內,可以透過觀看相關程式源碼來熟悉DataFrame 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384import org.apache.spark.sql.{Row, SparkSession}import org.apache.spark.sql.types.{StringType, StructField, StructType}object DataFrameApiApp { def main(args: Array[String]): Unit = { val spark = SparkSession .builder().master("local[2]") .appName("DataFrameApiApp") .getOrCreate() import spark.implicits._ val insuranceRdd = spark.sparkContext.textFile("file/FL_insurance_sample.csv") val rowRdd = insuranceRdd.map(_.split(",")) .map((x => Row(x(0), x(1), x(2), x(3), x(4), x(5), x(6), x(7), x(8), x(9), x(10), x(11), x(12), x(13), x(14), x(15), x(16), x(17)))) val schemaString = "policyID statecode county eq_site_limit hu_site_limit fl_site_limit fr_site_limit tiv_2011" + " tiv_2012 eq_site_deductible hu_site_deductible fl_site_deductible fr_site_deductible point_latitude point_longitude" + " line construction point_granularity" val fields = schemaString.split(" ") .map(fieldName => StructField(fieldName, StringType)) val schema = StructType(fields) val insuranceDF = spark.createDataFrame(rowRdd, schema) insuranceDF.schema.printTreeString() /* root |-- policyID: string (nullable = true) |-- statecode: string (nullable = true) |-- county: string (nullable = true) |-- eq_site_limit: string (nullable = true) |-- hu_site_limit: string (nullable = true) |-- fl_site_limit: string (nullable = true) |-- fr_site_limit: string (nullable = true) |-- tiv_2011: string (nullable = true) |-- tiv_2012: string (nullable = true) |-- eq_site_deductible: string (nullable = true) |-- hu_site_deductible: string (nullable = true) |-- fl_site_deductible: string (nullable = true) |-- fr_site_deductible: string (nullable = true) |-- point_latitude: string (nullable = true) |-- point_longitude: string (nullable = true) |-- line: string (nullable = true) |-- construction: string (nullable = true) |-- point_granularity: string (nullable = true) */ // Selects a set of SQL expressions insuranceDF.select("policyID", "county").show(3) /* +--------+-----------+ |policyID| county| +--------+-----------+ | 119736|CLAY COUNTY| | 448094|CLAY COUNTY| | 206893|CLAY COUNTY| +--------+-----------+ only showing top 3 rows */ // Filters rows using the given SQL expression. insuranceDF.filter("point_granularity == 3").show(3) // Uses build-in functions from org.apache.spark.sql.functions val insuranceDF2 = insuranceDF.filter("SUBSTR(county,0,1) = 'S'") insuranceDF2.show(3) // Get the DataSet ascending ordering used in sorting insuranceDF.sort($"policyID".asc).show(3) // Get the DataSet sorted by multiple specified column insuranceDF.sort("policyID", "county").show(3) // Join with another `DataFrame`. // Default is inner join insuranceDF.join(insuranceDF2, "policyID").show(3) spark.stop() }}