Spark SQL Architecture

Spark SQL architecture

Spark SQL Execution Plan

透過 Spark SQL執行計畫對應Spark SQL架構圖說明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
spark-sql> select * from test;
10 ACCOUNTING NEW YORK
20 RESEARCH DALLAS
30 SALES CHICAGO
40 OPERATIONS BOSTON
Time taken: 6.141 seconds, Fetched 4 row(s)
spark-sql> explain extended
> select
> a.key*(4+5),
> b.value
> from
> test a join test b
> on a.key=b.key and a.key>10;
17/11/27 23:53:06 INFO execution.SparkSqlParser: Parsing command: explain extended
select
a.key*(4+5),
b.value
from
test a join test b
on a.key=b.key and a.key>10
17/11/27 23:53:06 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=test
17/11/27 23:53:06 INFO HiveMetaStore.audit: ugi=hadoop ip=unknown-ip-addr cmd=get_table : db=default tbl=test
17/11/27 23:53:06 INFO parser.CatalystSqlParser: Parsing command: string
17/11/27 23:53:06 INFO parser.CatalystSqlParser: Parsing command: string
17/11/27 23:53:06 INFO metastore.HiveMetaStore: 0: get_table : db=default tbl=test
17/11/27 23:53:06 INFO HiveMetaStore.audit: ugi=hadoop ip=unknown-ip-addr cmd=get_table : db=default tbl=test
17/11/27 23:53:06 INFO parser.CatalystSqlParser: Parsing command: string
17/11/27 23:53:06 INFO parser.CatalystSqlParser: Parsing command: string
17/11/27 23:53:06 INFO codegen.CodeGenerator: Code generated in 36.760962 ms
== Parsed Logical Plan ==
'Project [unresolvedalias(('a.key * (4 + 5)), None), 'b.value]
+- 'Join Inner, (('a.key = 'b.key) && ('a.key > 10))
:- 'SubqueryAlias a
: +- 'UnresolvedRelation `test`
+- 'SubqueryAlias b
+- 'UnresolvedRelation `test`
== Analyzed Logical Plan ==
(CAST(key AS DOUBLE) * CAST((4 + 5) AS DOUBLE)): double, value: string
Project [(cast(key#93 as double) * cast((4 + 5) as double)) AS (CAST(key AS DOUBLE) * CAST((4 + 5) AS DOUBLE))#97, value#96]
+- Join Inner, ((key#93 = key#95) && (cast(key#93 as int) > 10))
:- SubqueryAlias a
: +- SubqueryAlias test
: +- CatalogRelation `default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#93, value#94]
+- SubqueryAlias b
+- SubqueryAlias test
+- CatalogRelation `default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#95, value#96]
== Optimized Logical Plan ==
Project [(cast(key#93 as double) * 9.0) AS (CAST(key AS DOUBLE) * CAST((4 + 5) AS DOUBLE))#97, value#96]
+- Join Inner, (key#93 = key#95)
:- Project [key#93]
: +- Filter (isnotnull(key#93) && (cast(key#93 as int) > 10))
: +- CatalogRelation `default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#93, value#94]
+- Filter (isnotnull(key#95) && (cast(key#95 as int) > 10))
+- CatalogRelation `default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#95, value#96]
== Physical Plan ==
*Project [(cast(key#93 as double) * 9.0) AS (CAST(key AS DOUBLE) * CAST((4 + 5) AS DOUBLE))#97, value#96]
+- *SortMergeJoin [key#93], [key#95], Inner
:- *Sort [key#93 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(key#93, 200)
: +- *Filter (isnotnull(key#93) && (cast(key#93 as int) > 10))
: +- HiveTableScan [key#93], CatalogRelation `default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#93, value#94]
+- *Sort [key#95 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(key#95, 200)
+- *Filter (isnotnull(key#95) && (cast(key#95 as int) > 10))
+- HiveTableScan [key#95, value#96], CatalogRelation `default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#95, value#96]
Time taken: 0.541 seconds, Fetched 1 row(s)
17/11/27 23:53:06 INFO CliDriver: Time taken: 0.541 seconds, Fetched 1 row(s)
spark-sql>


分別說明各階段

1
2
3
4
5
6
7
== Parsed Logical Plan ==
'Project [unresolvedalias(('a.key * (4 + 5)), None), 'b.value]
+- 'Join Inner, (('a.key = 'b.key) && ('a.key > 10))
:- 'SubqueryAlias a
: +- 'UnresolvedRelation `test`
+- 'SubqueryAlias b
+- 'UnresolvedRelation `test`
  • 對應Unresolved Logical Plan階段
  • Project後顯示最終輸出字段的相關訊息
  • Join Inner指出有一個Join Inner操作,join條件為((‘a.key = ‘b.key) && (‘a.key > 10))
  • SubqueryAlias為參與操作的表
  • 此階段尚未對SQL語法進行解析(Unresolved)

1
2
3
4
5
6
7
8
9
10
== Analyzed Logical Plan ==
(CAST(key AS DOUBLE) * CAST((4 + 5) AS DOUBLE)): double, value: string
Project [(cast(key#93 as double) * cast((4 + 5) as double)) AS (CAST(key AS DOUBLE) * CAST((4 + 5) AS DOUBLE))#97, value#96]
+- Join Inner, ((key#93 = key#95) && (cast(key#93 as int) > 10))
:- SubqueryAlias a
: +- SubqueryAlias test
: +- CatalogRelation `default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#93, value#94]
+- SubqueryAlias b
+- SubqueryAlias test
+- CatalogRelation `default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#95, value#96]
  • 對應Logical Plan階段
  • 經過Schema Catalog階段產生Logical Plan的輸入,明確得知database與table的資訊
  • CatalogRelation指出,參與操作的database與table的訊息,以及包含的欄位,例如a是來自default database中的test table,當中包含key與value欄位
  • org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe指出此table使用的序列化與反序列化的演算法
  • 此階段已經對SQL語法進行解析,並已知database與table的明確資訊
  • 此階段對最終輸出結果(Project)進行類型轉換(cast)

1
2
3
4
5
6
7
8
== Optimized Logical Plan ==
Project [(cast(key#93 as double) * 9.0) AS (CAST(key AS DOUBLE) * CAST((4 + 5) AS DOUBLE))#97, value#96]
+- Join Inner, (key#93 = key#95)
:- Project [key#93]
: +- Filter (isnotnull(key#93) && (cast(key#93 as int) > 10))
: +- CatalogRelation `default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#93, value#94]
+- Filter (isnotnull(key#95) && (cast(key#95 as int) > 10))
+- CatalogRelation `default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#95, value#96]
  • 對應Optimized Logical Plan階段
  • Filter指出對參與操作的數據過濾,換句話說,就是進行位次下壓,減少處理的數據量,提高性能。

1
2
3
4
5
6
7
8
9
10
11
== Physical Plan ==
*Project [(cast(key#93 as double) * 9.0) AS (CAST(key AS DOUBLE) * CAST((4 + 5) AS DOUBLE))#97, value#96]
+- *SortMergeJoin [key#93], [key#95], Inner
:- *Sort [key#93 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(key#93, 200)
: +- *Filter (isnotnull(key#93) && (cast(key#93 as int) > 10))
: +- HiveTableScan [key#93], CatalogRelation `default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#93, value#94]
+- *Sort [key#95 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(key#95, 200)
+- *Filter (isnotnull(key#95) && (cast(key#95 as int) > 10))
+- HiveTableScan [key#95, value#96], CatalogRelation `default`.`test`, org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, [key#95, value#96]
  • 對應Physical Plans階段
  • Inner Join經由內部轉換成SortMergeJoin