Spark SQL Built-in and UDF Functions

Spark Sql Built-in and UDF Functions

Spark Sql中常用的函數分兩類,分別為Built-in函數與UDF函數,以下分別提出案例說明該如何使用

Built-in Functions

  • 大部分在functions.sacla
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    import org.apache.spark.sql.SparkSession
    object SqlBuiltinFunctionsDemo {
    def main(args: Array[String]): Unit = {
    val spark = SparkSession
    .builder().master("local[2]")
    .appName("DataFrameApiApp")
    .getOrCreate()
    import spark.implicits._
    val logDF = spark.sparkContext.textFile("file/pvuv.log")
    .map(_.split(","))
    .map(x => Log(x(0), x(1).toInt))
    .toDF()
    /*
    file/pvuv.log:
    tv1,1001
    tv2,1001
    tv3,1002
    tv1,1001
    tv1,1003
    tv2,1003
    tv3,1003
    tv1,1003
    */
    import org.apache.spark.sql.functions._
    // sum is a function from functions.scala, and it returns the sum of all values in the given column.
    logDF.groupBy($"name").agg(sum("times").as("sum")).show()
    /*
    +----+-----+
    |name|count|
    +----+-----+
    | tv2| 2|
    | tv3| 2|
    | tv1| 4|
    +----+-----+
    */
    spark.stop()
    }
    case class Log(name: String, times: Int)
    }

UDF Functions

  • UDF函數使用時,可以分為以下三個步驟:
    • 定義函數
    • 註冊函數
    • 使用函數
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import org.apache.spark.sql.SparkSession
object SqlUdfFunctionsDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder().master("local[2]")
.appName("DataFrameApiApp")
.getOrCreate()
import spark.implicits._
val hobbiesDF = spark.sparkContext.textFile("file/hobbies.txt")
.map(_.split(":"))
.map(x => hobbyInfo(x(0), x(1)))
.toDF()
/*
file/hobbies.txt:
alice:jogging,coding,cooking
lina:travel,dance
*/
// Step1 and 2, define a UDF and register the UDF
spark.udf.register("hobby_num", (str: String) => str.split(",").length)
hobbiesDF.createOrReplaceTempView("Hobbies")
// Step 3, use the UDF
spark.sql("select name, hobby_num(hobbies) as hobby_num from Hobbies").show()
spark.stop()
}
case class hobbyInfo(name: String, hobbies: String)
}