Spark SQL Configuration and Performance Tuning

Spark SQL Configuration and Performance Tuning

Configuration Properties

  • Spark SQL Performance Tuning
  • 下面列出需要關注的屬性,以及其對應的Spark source code
    • Spark source code: org.apache.spark.sql.internal.SQLConf.scala
    • buildConf指出可以透過–conf傳入的屬性名稱,例如–conf spark.sql.sources.default
    • createWithDefault指出屬性的預設值
  • DEFAULT_DATA_SOURCE_NAME
    • 在Load或Save外部數據源時所使用的格式
1
2
3
4
5
// This is used to set the default data source
val DEFAULT_DATA_SOURCE_NAME = buildConf("spark.sql.sources.default")
.doc("The default data source to use in input/output.")
.stringConf
.createWithDefault("parquet")
  • COMPRESS_CACHED
    • Columnar存儲壓縮是否進行壓縮
1
2
3
4
5
6
val COMPRESS_CACHED = buildConf("spark.sql.inMemoryColumnarStorage.compressed")
.internal()
.doc("When set to true Spark SQL will automatically select a compression codec for each " +
"column based on statistics of the data.")
.booleanConf
.createWithDefault(true)
  • COLUMN_BATCH_SIZE
    • Columnar緩存的每個批次處理多少數據
1
2
3
4
5
6
val COLUMN_BATCH_SIZE = buildConf("spark.sql.inMemoryColumnarStorage.batchSize")
.internal()
.doc("Controls the size of batches for columnar caching. Larger batch sizes can improve " +
"memory utilization and compression, but risk OOMs when caching data.")
.intConf
.createWithDefault(10000)
  • IN_MEMORY_PARTITION_PRUNING
    • Columnar table是否支持分區裁減(Partition pruning)
1
2
3
4
5
6
val IN_MEMORY_PARTITION_PRUNING =
buildConf("spark.sql.inMemoryColumnarStorage.partitionPruning")
.internal()
.doc("When true, enable partition pruning for in-memory columnar tables.")
.booleanConf
.createWithDefault(true)
  • PREFER_SORTMERGEJOIN
    • 是否優先採用Sort merge join
1
2
3
4
5
val PREFER_SORTMERGEJOIN = buildConf("spark.sql.join.preferSortMergeJoin")
.internal()
.doc("When true, prefer sort merge join over shuffle hash join.")
.booleanConf
.createWithDefault(true)
  • AUTO_BROADCASTJOIN_THRESHOLD
    • Broadcast join的閾值
1
2
3
4
5
6
7
8
9
val AUTO_BROADCASTJOIN_THRESHOLD = buildConf("spark.sql.autoBroadcastJoinThreshold")
.doc("Configures the maximum size in bytes for a table that will be broadcast to all worker " +
"nodes when performing a join. By setting this value to -1 broadcasting can be disabled. " +
"Note that currently statistics are only supported for Hive Metastore tables where the " +
"command <code>ANALYZE TABLE &lt;tableName&gt; COMPUTE STATISTICS noscan</code> has been " +
"run, and file-based data source tables where the statistics are computed directly on " +
"the files of data.")
.longConf
.createWithDefault(10L * 1024 * 1024)
  • SHUFFLE_PARTITIONS
    • Join或Aggregation操作進行shuffle步驟時,預設的Partition數量
1
2
3
4
val SHUFFLE_PARTITIONS = buildConf("spark.sql.shuffle.partitions")
.doc("The default number of partitions to use when shuffling data for joins or aggregations.")
.intConf
.createWithDefault(200)
  • CASE_SENSITIVE
    • Query語句分析時是否區分大小寫
1
2
3
4
5
6
val CASE_SENSITIVE = buildConf("spark.sql.caseSensitive")
.internal()
.doc("Whether the query analyzer should be case sensitive or not. " +
"Default to case insensitive. It is highly discouraged to turn on case sensitive mode.")
.booleanConf
.createWithDefault(false)
  • PARQUET_SCHEMA_MERGING_ENABLED
    • 當外部數據源為Parquet時,是否支持Schema合併
    • 建議關閉
1
2
3
4
5
6
val PARQUET_SCHEMA_MERGING_ENABLED = buildConf("spark.sql.parquet.mergeSchema")
.doc("When true, the Parquet data source merges schemas collected from all data files, " +
"otherwise the schema is picked from the summary file or a random data file " +
"if no summary file is available.")
.booleanConf
.createWithDefault(false)
  • PARQUET_COMPRESSION
    • 寫出Parquet格式時所採用的Codec
    • 預設值snappy的壓縮比例較低(CPU耗費較小),輸出的檔案較大(存儲空間較大),會導致無法進行分片,並且對整體性能造成影響
1
2
3
4
5
6
7
val PARQUET_COMPRESSION = buildConf("spark.sql.parquet.compression.codec")
.doc("Sets the compression codec use when writing Parquet files. Acceptable values include: " +
"uncompressed, snappy, gzip, lzo.")
.stringConf
.transform(_.toLowerCase(Locale.ROOT))
.checkValues(Set("uncompressed", "snappy", "gzip", "lzo"))
.createWithDefault("snappy")
  • PARQUET_FILTER_PUSHDOWN_ENABLED
    • Parquet是否支持Pushdown
1
2
3
4
val PARQUET_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.parquet.filterPushdown")
.doc("Enables Parquet filter push-down optimization when set to true.")
.booleanConf
.createWithDefault(true)
  • ORC_FILTER_PUSHDOWN_ENABLED
    • ORC是否支持Pushdown
    • 預設值要特別注意
1
2
3
4
val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown")
.doc("When true, enable filter pushdown for ORC files.")
.booleanConf
.createWithDefault(false)
  • HIVE_METASTORE_PARTITION
    • Metastore是否支持分區裁減(Partition pruning)
    • Metastore儲存在MySQL當中,一個分區就是一個紀錄,如果分區很多會導致Metadata查詢效率極低
    • 若為True,會將分區Pushdown到Hive的Metastore(MySQL)中,將沒有被匹配到Partition可以提早被過濾,提升查詢效率,否則需要將所有Partition查完
1
2
3
4
5
6
7
8
val HIVE_METASTORE_PARTITION_PRUNING =
buildConf("spark.sql.hive.metastorePartitionPruning")
.doc("When true, some predicates will be pushed down into the Hive metastore so that " +
"unmatching partitions can be eliminated earlier. This only affects Hive tables " +
"not converted to filesource relations (see HiveUtils.CONVERT_METASTORE_PARQUET and " +
"HiveUtils.CONVERT_METASTORE_ORC for more information).")
.booleanConf
.createWithDefault(true)
  • COLUMN_NAME_OF_CORRUPT_RECORD
    • JSON或CSV當字段解析失敗時,會放入的Column
1
2
3
4
5
val COLUMN_NAME_OF_CORRUPT_RECORD = buildConf("spark.sql.columnNameOfCorruptRecord")
.doc("The name of internal column for storing raw/un-parsed JSON and CSV records that fail " +
"to parse.")
.stringConf
.createWithDefault("_corrupt_record")
  • THRIFTSERVER_UI_SESSION_LIMIT
    • thriftserver UI上最多保留多少Session
1
2
3
4
val THRIFTSERVER_UI_SESSION_LIMIT = buildConf("spark.sql.thriftserver.ui.retainedSessions")
.doc("The number of SQL client sessions kept in the JDBC/ODBC web UI history.")
.intConf
.createWithDefault(200)
  • CONVERT_CTAS
    • 是否支持Hive中的CTAS
1
2
3
4
5
6
7
val CONVERT_CTAS = buildConf("spark.sql.hive.convertCTAS")
.internal()
.doc("When true, a table created by a Hive CTAS statement (no USING clause) " +
"without specifying any storage property will be converted to a data source table, " +
"using the data source set by spark.sql.sources.default.")
.booleanConf
.createWithDefault(false)
  • CROSS_JOINS_ENABLED
    • 是否支持Cross join(迪卡爾積)
1
2
3
4
5
val CROSS_JOINS_ENABLED = buildConf("spark.sql.crossJoin.enabled")
.doc("When false, we will throw an error if a query contains a cartesian product without " +
"explicit CROSS JOIN syntax.")
.booleanConf
.createWithDefault(false)

Knowledge is a city to building of which every human-been bring a stone