Adaptive Query Execution: Speeding Up Spark SQL at Runtime
The document discusses Adaptive Query Execution (AQE) in Spark SQL, a dynamic optimization feature that improves query performance by making adjustments based on runtime statistics. Key enhancements introduced in Spark 3.0 include dynamically coalescing shuffle partitions, switching join strategies, and optimizing skew joins based on partition sizes. The effectiveness of AQE is demonstrated through performance improvements in production environments, achieving significant speedups in various queries.
Agenda Maryann Xue What isAdaptive Query Execution (AQE)? Why AQE? How AQE Works? The Major Optimizations in Spark 3.0 Ke Jia Live Demo TPC-DS Performance AQE in Production
4.
Background ▪ Well-studied problemin database literature ▪ Primitive version in Spark 1.6 ▪ New AQE prototyped and experimented by Intel Big Data ▪ Databricks and Intel co-engineered new AQE in Spark 3.0
5.
What is AdaptiveQuery Execution (AQE)? Dynamic query optimization that happens in the middle of query execution based on runtime statistics.
6.
Why AQE? Cost-based optimization(CBO) aims to choose the best plan, but does NOT work well when: ▪ Stale or missing statistics lead to inaccurate estimates ▪ Statistics collection are too costly (e.g., column histograms) ▪ Predicates contain UDFs ▪ Hints do not work for rapidly evolving data AQE base all optimization decisions on accurate runtime statistics
7.
▪ Shuffle orbroadcast exchanges divide a query into query stages ▪ Intermediate results are materialized at the end of a query stage ▪ Query stage boundaries optimal for runtime optimization: The inherent break point of operator pipelines Statistics available, e.g., data size, partition sizes Query Stages AGGREGATE (final) SHUFFLE AGGREGATE (partial) SCAN Query Stage SORT SHUFFLE Pipeline Break Point Query Stage Pipeline Break Point SELECT x, avg(y) FROM t GROUP BY x ORDER BY avg(y)
8.
How AQE works 1.Run leaf stages 2. Optimize when any stage completes -- new stats available 3. Run more stages with dependency requirement satisfied 4. Repeat (2) (3) until no more stages to run Run query stages with dep. cleared Optimize rest of the query more stages? Done
Dynamically coalesce shufflepartitions -- Why? (1) Shuffle partition number and sizes crucial to query performance Inefficient I/O Scheduler overhead Task setup overhead Partition too largePartition too small GC pressure Disk spilling
11.
Dynamically coalesce shufflepartitions -- Why? (2) Problem: ▪ One universal partition number throughout the entire query execution ▪ Data size changes at different times of query execution Solution by AQE: ▪ Set the initial partition number high to accommodate the largest data size of the entire query execution ▪ Automatically coalesce partitions if needed after each query stage
Dynamically switch joinstrategies - Why? Spark chooses Broadcast Hash Join if either child of the join can fit well in memory. Problem: estimates can go wrong and the opportunity of doing BHJ can be missed: ▪ Stats not qualified for accurate cardinality or selectivity estimate ▪ Child relation being a complex subtree of operators ▪ Blackbox predicates, e.g., UDFs Solution by AQE: replan joins with runtime data sizes.
16.
Dynamically switch joinstrategies -- When & How? SORT SHUFFLE SCAN A SORT MERGE JOIN SORT 2. Optimize1. Run leaf stages 3. Run more stages SHUFFLE FILTER SCAN B Stage 2 complete est: 25MB actual: 8MB SHUFFLE SCAN A BROADCAST HASH JOIN BROADCAST SHUFFLE FILTER SCAN B SHUFFLE SCAN A BROADCAST HASH JOIN BROADCAST SHUFFLE FILTER SCAN B 1 2 1 2 1 3 2 SELECT * FROM a JOIN b ON a.key = b.key WHERE b.value LIKE ‘%xyz%’
17.
Dynamically optimize skewjoins -- Why? Problem: data skew can lead to significant performance downgrade ▪ Individual long running tasks slow down the entire stage ▪ Especially large partitions lead to more slowdown with disk spilling. Solution by AQE: handle skew join automatically using runtime statistics ▪ Detect skew from partition sizes ▪ Split skew partitions into smaller subpartitions
18.
Dynamically optimize skewjoins -- When? SORT SHUFFLE SCAN A SORT MERGE JOIN SORT 2. Optimize1. Run leaf stages SHUFFLE SCAN B Stage 2 complete 1 2Stage 1 complete med: 55MB min: 40MB max: 250MB SORT SHUFFLE SCAN A SORT MERGE JOIN SORT SHUFFLE SCAN B 1 2 SKEW READER SKEW READER SELECT * FROM a JOIN b ON a.col = b.col
19.
Dynamically optimize skewjoins -- How? (1) Regular sort merge join -- no skew optimization: TABLE A - MAP 1 TABLE A - MAP 2 TABLE A - MAP 3 PART. A0 PART. A1 PART. A2 PART. A3 PART. B0 PART. B1 PART. B2 PART. B3 TABLE B - MAP 1 TABLE B - MAP 2 JOIN
TPC-DS Performance (3TB)-- Results 1.76x 1.5x 1.41x 1.4x 1.38x 1.28x 1.27x 1.22x 1.21x 1.19x ▪ Over 1.5x speedup on 2 queries; over 1.1x speedup on 37 queries
26.
TPC-DS Performance (3TB)-- Partition Coalescing • Less scheduler overhead and task startup time. • Less disk IO requests. • Less data are written to disk because more data are aggregated. Partitions Number 1000 (Q8 without AQE) Partitions Number changed to 658 and 717 (Q8 with AQE)
27.
TPC-DS Performance (3TB)-- Join Strategies • Random IO read -> Sequence IO read • Remote shuffle read -> local shuffle read. SortMergeJoin (Q14b without AQE) Broadcast Hash Join (Q14b with AQE)
28.
AQE in Production ▪Performance shared by one of largest E-commerce company in China AQE helped them resolved critical data skew issues and achieved significant performance for online business queries. AQE engine can get 17.7x, 14.0x, 1.6x and 1.3x respectively on 4 typical skewed queries. ▪ Performance shared by one of largest internet company in China AQE can gain 5x and 1.38x performance for two typical queries in their production environment.
29.
AQE in Production-- Skew Join Optimization 17.7x 14.0x 1.6x 1.3x key Avg record Skew records comment sale_order_id 2000 15383717 NULL ivc_content_id 9231804 4077995632 Not NULL ivc_type_id 360 3582336345 Not NULL ▪ Select the user’s invoice details based on the sale order id, invoice content id and the invoice type id. Durations(s)