Spark SQL Thrift Server

Spark SQL Thrift Server

Introduce

  • Spark SQLs當中提到的Standard Connectivity就是指Thrift Server。
  • Thrift Server提供JDBC/ODBC的接口,讓用戶透過JDBC/ODBC連接到Thrift Server,然後透過Spark SQL的訪問或處理數據。
  • Thrift Server啟動後,會啟動一個Spark SQL應用程式,所有通過JDBC/ODBC連接進來的客戶端皆共享這個Spark SQL應用程式的資源;換句話說,不同客戶端之間是共享數據的。

Compared with Spark Application

  • spark-shell與spark-sql都是Spark Application,每次提交作業都要申請各自的資源,作業之間資源獨立。
  • Thrift Server無論連入多少個客戶端都是一個Spark Application,且只要申請一次資源,而且客戶端之間的數據可以共享。

Start Spark SQL Thrift Server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[hadoop@testmain ~]$ cd $SPARK_HOME/sbin
## mysql-connector-java-5.1.44-bin.jar 是MySQL Driver路徑,注意結尾是jar,不是tar
[hadoop@testmain sbin]$ ./start-thriftserver.sh --master local[2] --jars ~/mysql-connector-java-5.1.44-bin.jar
starting org.apache.spark.sql.hive.thriftserver.HiveThriftServer2, logging to /opt/software/spark/logs/spark-hadoop-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-testmain.out
## 查看對應Log
[hadoop@testmain sbin]$ tail -200f /opt/software/spark/logs/spark-hadoop-org.apache.spark.sql.hive.thriftserver.HiveThriftServer2-1-testmain.out
## ...
## 對應的Spark UI啟動在4040 port
17/12/25 00:10:43 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
## ...
## Thrift Server正常啟動的Log,且在10000 port啟動監聽器,等待客戶端連接
17/12/25 00:11:03 INFO thrift.ThriftCLIService: Starting ThriftBinaryCLIService on port 10000 with 5...500 worker threads
## ...

Spark Web UI

  • 連接到http://[ip]:4040 可以查看Thrift Server的頁面
    • 右上角顯示此為Thrift JDBC/ODBC Server Application
  • 若–jars沒有配置或配置錯誤MySQL Driver,Log中會顯示下列錯誤的訊息
    1
    2
    3
    4
    Caused by: org.datanucleus.exceptions.NucleusException:
    Attempt to invoke the "BONECP" plugin to create a ConnectionPool gave an error :
    The specified datastore driver ("com.mysql.jdbc.Driver") was not found in the CLASSPATH.
    Please check your CLASSPATH specification, and the name of the driver.

Use Beeline as Client

  • 使用Spark自帶的Beeline作為客戶端連上Thrift Server
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
[hadoop@testmain ~]$ cd $SPARK_HOME/bin
## 使用beeline透過jdbc連接Thrift Server的10000 port
[hadoop@testmain bin]$ ./beeline -u jdbc:hive2://localhost:10000 -n hadoop
Connecting to jdbc:hive2://localhost:10000
17/12/24 01:26:48 INFO jdbc.Utils: Supplied authorities: localhost:10000
17/12/24 01:26:48 INFO jdbc.Utils: Resolved authority: localhost:10000
17/12/24 01:26:48 INFO jdbc.HiveConnection: Will try to open client transport with JDBC Uri: jdbc:hive2://localhost:10000
Connected to: Spark SQL (version 2.2.0)
Driver: Hive JDBC (version 1.2.1.spark2) ## 此Hive JDBC版本為Spark自帶的
Transaction isolation: TRANSACTION_REPEATABLE_READ
Beeline version 1.2.1.spark2 by Apache Hive
## 透過beeline客戶端進行數據操作
0: jdbc:hive2://localhost:10000> show tables;
+-----------+------------+--------------+--+
| database | tableName | isTemporary |
+-----------+------------+--------------+--+
| default | dept | false |
| default | emp | false |
| default | test | false |
+-----------+------------+--------------+--+
3 rows selected (1.293 seconds)
0: jdbc:hive2://localhost:10000> select * from emp;
+--------+---------+------------+-------+-------------+----------+---------+---------+--+
| empno | ename | job | mgr | hiredate | sal | comm | deptno |
+--------+---------+------------+-------+-------------+----------+---------+---------+--+
| 7369 | SMITH | CLERK | 7902 | 1980-12-17 | 800.0 | NULL | 20 |
| 7499 | ALLEN | SALESMAN | 7698 | 1981-2-20 | 1600.0 | 300.0 | 30 |
| 7521 | WARD | SALESMAN | 7698 | 1981-2-22 | 1250.0 | 500.0 | 30 |
| 7566 | JONES | MANAGER | 7839 | 1981-4-2 | 2975.0 | NULL | 20 |
| 7654 | MARTIN | SALESMAN | 7698 | 1981-9-28 | 1250.0 | 1400.0 | 30 |
| 7698 | BLAKE | MANAGER | 7839 | 1981-5-1 | 2850.0 | NULL | 30 |
| 7782 | CLARK | MANAGER | 7839 | 1981-6-9 | 2450.0 | NULL | 10 |
| 7788 | SCOTT | ANALYST | 7566 | 1987-4-19 | 3000.0 | NULL | 20 |
| 7839 | KING | PRESIDENT | NULL | 1981-11-17 | 5000.0 | NULL | 10 |
| 7844 | TURNER | SALESMAN | 7698 | 1981-9-8 | 1500.0 | 0.0 | 30 |
| 7876 | ADAMS | CLERK | 7788 | 1987-5-23 | 1100.0 | NULL | 20 |
| 7900 | JAMES | CLERK | 7698 | 1981-12-3 | 950.0 | NULL | 30 |
| 7902 | FORD | ANALYST | 7566 | 1981-12-3 | 3000.0 | NULL | 20 |
| 7934 | MILLER | CLERK | 7782 | 1982-1-23 | 1300.0 | NULL | 10 |
| 8888 | HIVE | PROGRAM | 7839 | 1988-1-23 | 10300.0 | NULL | NULL |
+--------+---------+------------+-------+-------------+----------+---------+---------+--+
15 rows selected (3.705 seconds)

Spark Web UI

  • select語句對應的Spark Web的Jobs頁面
  • select語句對應的Spark Web的SQL頁面
    • 內容與explain extended select * from emp相同
  • Spark Web的JDBC/ODBC Server頁面
    • Session Statistics中User對應beeline命令的-n參數
    • SQL Statistics中的JobID對應Jobs頁面

Access Thrift Server via Scala

pom.xml

  • 需在專案的pom.xml中加入以下依賴

    1
    2
    3
    4
    5
    <dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-jdbc</artifactId>
    <version>1.2.2</version>
    </dependency>
  • 若version與環境的version不符合,會得到以下錯誤

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hive/service/cli/thrift/TCLIService$Iface
    at org.apache.hive.jdbc.HiveDriver.connect(HiveDriver.java:105)
    at java.sql.DriverManager.getConnection(DriverManager.java:664)
    at java.sql.DriverManager.getConnection(DriverManager.java:247)
    at com.test.demo.ThriftServerDemo$.main(ThriftServerDemo.scala:9)
    at com.test.demo.ThriftServerDemo.main(ThriftServerDemo.scala)
    Caused by: java.lang.ClassNotFoundException: org.apache.hive.service.cli.thrift.TCLIService$Iface
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 5 more

Scala Code

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import java.sql.DriverManager
object ThriftServerDemo {
def main(args: Array[String]): Unit = {
Class.forName("org.apache.hive.jdbc.HiveDriver")
// Same as beeline command
val connection = DriverManager.getConnection("jdbc:hive2://testmain:10000", "hadoop", "")
val stmt = connection.prepareStatement("select empno, ename, deptno from emp limit 5 ")
val rs = stmt.executeQuery()
while(rs.next()) {
println(rs.getInt("empno") + " : " + rs.getString("ename") + " : " + rs.getInt("deptno"))
}
/*
Result:
7369 : SMITH : 20
7499 : ALLEN : 30
7521 : WARD : 30
7566 : JONES : 20
7654 : MARTIN : 30
*/
}
}

Spark Web UI

  • scala專案對應的Spark Web的Jobs頁面
    • Job Id為1的job對應scala專案
  • scala專案對應的Spark Web的SQL頁面
    • 內容與explain extended select empno, ename, deptno from emp limit 5相同
  • Spark Web的JDBC/ODBC Server頁面
    • scala專案作為第二個客戶端連入
    • Session Statistics中User對應DriverManager.getConnection的第二個參數
    • SQL Statistics中的JobID對應Jobs頁面