Adaptive Query Execution: Speeding Up Spark SQL at Runtime Maryann Xue Staff Engineer @ Databricks Ke Jia Software Engineer @ Intel
Agenda Maryann Xue What is Adaptive Query Execution (AQE)? Why AQE? How AQE Works? The Major Optimizations in Spark 3.0 Ke Jia Live Demo TPC-DS Performance AQE in Production
Background ▪ Well-studied problem in database literature ▪ Primitive version in Spark 1.6 ▪ New AQE prototyped and experimented by Intel Big Data ▪ Databricks and Intel co-engineered new AQE in Spark 3.0
What is Adaptive Query Execution (AQE)? Dynamic query optimization that happens in the middle of query execution based on runtime statistics.
Why AQE? Cost-based optimization (CBO) aims to choose the best plan, but does NOT work well when: ▪ Stale or missing statistics lead to inaccurate estimates ▪ Statistics collection are too costly (e.g., column histograms) ▪ Predicates contain UDFs ▪ Hints do not work for rapidly evolving data AQE base all optimization decisions on accurate runtime statistics
▪ Shuffle or broadcast exchanges divide a query into query stages ▪ Intermediate results are materialized at the end of a query stage ▪ Query stage boundaries optimal for runtime optimization: The inherent break point of operator pipelines Statistics available, e.g., data size, partition sizes Query Stages AGGREGATE (final) SHUFFLE AGGREGATE (partial) SCAN Query Stage SORT SHUFFLE Pipeline Break Point Query Stage Pipeline Break Point SELECT x, avg(y) FROM t GROUP BY x ORDER BY avg(y)
How AQE works 1. Run leaf stages 2. Optimize when any stage completes -- new stats available 3. Run more stages with dependency requirement satisfied 4. Repeat (2) (3) until no more stages to run Run query stages with dep. cleared Optimize rest of the query more stages? Done
The AQE Major Features in Spark 3.0 ▪ Dynamically coalesce shuffle partitions ▪ Dynamically switch join strategies ▪ Dynamically optimize skew joins
Dynamically coalesce shuffle partitions -- Why? (1) Shuffle partition number and sizes crucial to query performance Inefficient I/O Scheduler overhead Task setup overhead Partition too largePartition too small GC pressure Disk spilling
Dynamically coalesce shuffle partitions -- Why? (2) Problem: ▪ One universal partition number throughout the entire query execution ▪ Data size changes at different times of query execution Solution by AQE: ▪ Set the initial partition number high to accommodate the largest data size of the entire query execution ▪ Automatically coalesce partitions if needed after each query stage
Dynamically coalesce shuffle partitions -- When? AGGREGATE (final) SHUFFLE (50 part.) AGGREGATE (partial) SCAN Stage 1 complete total: 650MB avg: 13MB SORT SHUFFLE (50 part.) 2. Optimize1. Run leaf stages 3. Run more stages 4. Optimize COALESCE (10 part.) SHUFFLE (50 part.) AGGREGATE (partial) SCAN SORT AGGREGATE (final) SHUFFLE (50 part.) COALESCE (10 part.) SHUFFLE (50 part.) AGGREGATE (partial) SCAN SORT AGGREGATE (final) SHUFFLE (50 part.) Stage 2 complete total: 300MB avg: 6MB COALESCE (10 part.) SHUFFLE (50 part.) AGGREGATE (partial) SCAN SORT AGGREGATE (final) SHUFFLE (50 part.) COALESCE (5 part.) 1 1 1 2 1 2 SELECT x, avg(y) FROM t GROUP BY x ORDER BY avg(y)
Dynamically coalescing shuffle partitions - How? (1) Regular shuffle -- no coalescing ▪ Partitioned into statically specified partition number -- in this case, 5 MAP 1 MAP 2 REDUCE 1 REDUCE 2 REDUCE 3 REDUCE 4 REDUCE 5
Dynamically coalesce shuffle partitions -- How? (2) REDUCE 2’ (COALESCED) AQE Coalesced shuffle ▪ Combine adjacent small partitions -- in this case, orig. partitions 2, 3, 4 REDUCE 3’ MAP 1 MAP 2 REDUCE 1’
Dynamically switch join strategies - Why? Spark chooses Broadcast Hash Join if either child of the join can fit well in memory. Problem: estimates can go wrong and the opportunity of doing BHJ can be missed: ▪ Stats not qualified for accurate cardinality or selectivity estimate ▪ Child relation being a complex subtree of operators ▪ Blackbox predicates, e.g., UDFs Solution by AQE: replan joins with runtime data sizes.
Dynamically switch join strategies -- When & How? SORT SHUFFLE SCAN A SORT MERGE JOIN SORT 2. Optimize1. Run leaf stages 3. Run more stages SHUFFLE FILTER SCAN B Stage 2 complete est: 25MB actual: 8MB SHUFFLE SCAN A BROADCAST HASH JOIN BROADCAST SHUFFLE FILTER SCAN B SHUFFLE SCAN A BROADCAST HASH JOIN BROADCAST SHUFFLE FILTER SCAN B 1 2 1 2 1 3 2 SELECT * FROM a JOIN b ON a.key = b.key WHERE b.value LIKE ‘%xyz%’
Dynamically optimize skew joins -- Why? Problem: data skew can lead to significant performance downgrade ▪ Individual long running tasks slow down the entire stage ▪ Especially large partitions lead to more slowdown with disk spilling. Solution by AQE: handle skew join automatically using runtime statistics ▪ Detect skew from partition sizes ▪ Split skew partitions into smaller subpartitions
Dynamically optimize skew joins -- When? SORT SHUFFLE SCAN A SORT MERGE JOIN SORT 2. Optimize1. Run leaf stages SHUFFLE SCAN B Stage 2 complete 1 2Stage 1 complete med: 55MB min: 40MB max: 250MB SORT SHUFFLE SCAN A SORT MERGE JOIN SORT SHUFFLE SCAN B 1 2 SKEW READER SKEW READER SELECT * FROM a JOIN b ON a.col = b.col
Dynamically optimize skew joins -- How? (1) Regular sort merge join -- no skew optimization: TABLE A - MAP 1 TABLE A - MAP 2 TABLE A - MAP 3 PART. A0 PART. A1 PART. A2 PART. A3 PART. B0 PART. B1 PART. B2 PART. B3 TABLE B - MAP 1 TABLE B - MAP 2 JOIN
Dynamically optimize skew joins -- How? (2) Skew-optimized sort merge join -- with skew shuffle reader: A0 - S2 TABLE A - MAP 1 B0 TABLE A - MAP 2 TABLE A - MAP 3 Split A0 PART. A1 PART. A2 PART. A3 PART. B1 PART. B2 PART. B3 TABLE B - MAP 1 TABLE B - MAP 2 A0 - S1 A0 - S0 B0 B0 Duplicate B0 JOIN
About Me Ke Jia Big Data Product Engineer at Intel Contributor of Spark, OAP and Hive
About Me Ke Jia Big Data Product Engineer at Intel Contributor of Spark, OAP and Hive
Demo Try this notebook in Databricks
TPC-DS Performance (3TB) -- Cluster Setup Hardware BDW Slave Node# 5 CPU Intel(R) Xeon(R) Gold 6252 CPU @ 2.10GHz (96cores) Memory 384 GB Disk 7× 1 TB SSD Network 10 Gigabit Ethernet Master CPU Intel(R) Xeon(R) Gold 6252 CPU @ 2.10GHz (96cores) Memory 384 GB Disk 7× 1 TB SSD Network 10 Gigabit Ethernet Software OS Fedora release 29 Kernel 4.20.6-200.fc29.x86_64 Spark* Spark master (commit ID: 0b6aae422ba37a13531e98c8801589f5f3cb28e0) Hadoop*/HDFS* hadoop-2.7.5 JDK 1.8.0_110 (Oracle* Corporation)
TPC-DS Performance (3TB) -- Results 1.76x 1.5x 1.41x 1.4x 1.38x 1.28x 1.27x 1.22x 1.21x 1.19x ▪ Over 1.5x speedup on 2 queries; over 1.1x speedup on 37 queries
TPC-DS Performance (3TB) -- Partition Coalescing • Less scheduler overhead and task startup time. • Less disk IO requests. • Less data are written to disk because more data are aggregated. Partitions Number 1000 (Q8 without AQE) Partitions Number changed to 658 and 717 (Q8 with AQE)
TPC-DS Performance (3TB) -- Join Strategies • Random IO read -> Sequence IO read • Remote shuffle read -> local shuffle read. SortMergeJoin (Q14b without AQE) Broadcast Hash Join (Q14b with AQE)
AQE in Production ▪ Performance shared by one of largest E-commerce company in China AQE helped them resolved critical data skew issues and achieved significant performance for online business queries. AQE engine can get 17.7x, 14.0x, 1.6x and 1.3x respectively on 4 typical skewed queries. ▪ Performance shared by one of largest internet company in China AQE can gain 5x and 1.38x performance for two typical queries in their production environment.
AQE in Production -- Skew Join Optimization 17.7x 14.0x 1.6x 1.3x key Avg record Skew records comment sale_order_id 2000 15383717 NULL ivc_content_id 9231804 4077995632 Not NULL ivc_type_id 360 3582336345 Not NULL ▪ Select the user’s invoice details based on the sale order id, invoice content id and the invoice type id. Durations(s)
Feedback Your feedback is important to us. Don’t forget to rate and review the sessions.

Adaptive Query Execution: Speeding Up Spark SQL at Runtime

  • 2.
    Adaptive Query Execution: SpeedingUp Spark SQL at Runtime Maryann Xue Staff Engineer @ Databricks Ke Jia Software Engineer @ Intel
  • 3.
    Agenda Maryann Xue What isAdaptive Query Execution (AQE)? Why AQE? How AQE Works? The Major Optimizations in Spark 3.0 Ke Jia Live Demo TPC-DS Performance AQE in Production
  • 4.
    Background ▪ Well-studied problemin database literature ▪ Primitive version in Spark 1.6 ▪ New AQE prototyped and experimented by Intel Big Data ▪ Databricks and Intel co-engineered new AQE in Spark 3.0
  • 5.
    What is AdaptiveQuery Execution (AQE)? Dynamic query optimization that happens in the middle of query execution based on runtime statistics.
  • 6.
    Why AQE? Cost-based optimization(CBO) aims to choose the best plan, but does NOT work well when: ▪ Stale or missing statistics lead to inaccurate estimates ▪ Statistics collection are too costly (e.g., column histograms) ▪ Predicates contain UDFs ▪ Hints do not work for rapidly evolving data AQE base all optimization decisions on accurate runtime statistics
  • 7.
    ▪ Shuffle orbroadcast exchanges divide a query into query stages ▪ Intermediate results are materialized at the end of a query stage ▪ Query stage boundaries optimal for runtime optimization: The inherent break point of operator pipelines Statistics available, e.g., data size, partition sizes Query Stages AGGREGATE (final) SHUFFLE AGGREGATE (partial) SCAN Query Stage SORT SHUFFLE Pipeline Break Point Query Stage Pipeline Break Point SELECT x, avg(y) FROM t GROUP BY x ORDER BY avg(y)
  • 8.
    How AQE works 1.Run leaf stages 2. Optimize when any stage completes -- new stats available 3. Run more stages with dependency requirement satisfied 4. Repeat (2) (3) until no more stages to run Run query stages with dep. cleared Optimize rest of the query more stages? Done
  • 9.
    The AQE MajorFeatures in Spark 3.0 ▪ Dynamically coalesce shuffle partitions ▪ Dynamically switch join strategies ▪ Dynamically optimize skew joins
  • 10.
    Dynamically coalesce shufflepartitions -- Why? (1) Shuffle partition number and sizes crucial to query performance Inefficient I/O Scheduler overhead Task setup overhead Partition too largePartition too small GC pressure Disk spilling
  • 11.
    Dynamically coalesce shufflepartitions -- Why? (2) Problem: ▪ One universal partition number throughout the entire query execution ▪ Data size changes at different times of query execution Solution by AQE: ▪ Set the initial partition number high to accommodate the largest data size of the entire query execution ▪ Automatically coalesce partitions if needed after each query stage
  • 12.
    Dynamically coalesce shufflepartitions -- When? AGGREGATE (final) SHUFFLE (50 part.) AGGREGATE (partial) SCAN Stage 1 complete total: 650MB avg: 13MB SORT SHUFFLE (50 part.) 2. Optimize1. Run leaf stages 3. Run more stages 4. Optimize COALESCE (10 part.) SHUFFLE (50 part.) AGGREGATE (partial) SCAN SORT AGGREGATE (final) SHUFFLE (50 part.) COALESCE (10 part.) SHUFFLE (50 part.) AGGREGATE (partial) SCAN SORT AGGREGATE (final) SHUFFLE (50 part.) Stage 2 complete total: 300MB avg: 6MB COALESCE (10 part.) SHUFFLE (50 part.) AGGREGATE (partial) SCAN SORT AGGREGATE (final) SHUFFLE (50 part.) COALESCE (5 part.) 1 1 1 2 1 2 SELECT x, avg(y) FROM t GROUP BY x ORDER BY avg(y)
  • 13.
    Dynamically coalescing shufflepartitions - How? (1) Regular shuffle -- no coalescing ▪ Partitioned into statically specified partition number -- in this case, 5 MAP 1 MAP 2 REDUCE 1 REDUCE 2 REDUCE 3 REDUCE 4 REDUCE 5
  • 14.
    Dynamically coalesce shufflepartitions -- How? (2) REDUCE 2’ (COALESCED) AQE Coalesced shuffle ▪ Combine adjacent small partitions -- in this case, orig. partitions 2, 3, 4 REDUCE 3’ MAP 1 MAP 2 REDUCE 1’
  • 15.
    Dynamically switch joinstrategies - Why? Spark chooses Broadcast Hash Join if either child of the join can fit well in memory. Problem: estimates can go wrong and the opportunity of doing BHJ can be missed: ▪ Stats not qualified for accurate cardinality or selectivity estimate ▪ Child relation being a complex subtree of operators ▪ Blackbox predicates, e.g., UDFs Solution by AQE: replan joins with runtime data sizes.
  • 16.
    Dynamically switch joinstrategies -- When & How? SORT SHUFFLE SCAN A SORT MERGE JOIN SORT 2. Optimize1. Run leaf stages 3. Run more stages SHUFFLE FILTER SCAN B Stage 2 complete est: 25MB actual: 8MB SHUFFLE SCAN A BROADCAST HASH JOIN BROADCAST SHUFFLE FILTER SCAN B SHUFFLE SCAN A BROADCAST HASH JOIN BROADCAST SHUFFLE FILTER SCAN B 1 2 1 2 1 3 2 SELECT * FROM a JOIN b ON a.key = b.key WHERE b.value LIKE ‘%xyz%’
  • 17.
    Dynamically optimize skewjoins -- Why? Problem: data skew can lead to significant performance downgrade ▪ Individual long running tasks slow down the entire stage ▪ Especially large partitions lead to more slowdown with disk spilling. Solution by AQE: handle skew join automatically using runtime statistics ▪ Detect skew from partition sizes ▪ Split skew partitions into smaller subpartitions
  • 18.
    Dynamically optimize skewjoins -- When? SORT SHUFFLE SCAN A SORT MERGE JOIN SORT 2. Optimize1. Run leaf stages SHUFFLE SCAN B Stage 2 complete 1 2Stage 1 complete med: 55MB min: 40MB max: 250MB SORT SHUFFLE SCAN A SORT MERGE JOIN SORT SHUFFLE SCAN B 1 2 SKEW READER SKEW READER SELECT * FROM a JOIN b ON a.col = b.col
  • 19.
    Dynamically optimize skewjoins -- How? (1) Regular sort merge join -- no skew optimization: TABLE A - MAP 1 TABLE A - MAP 2 TABLE A - MAP 3 PART. A0 PART. A1 PART. A2 PART. A3 PART. B0 PART. B1 PART. B2 PART. B3 TABLE B - MAP 1 TABLE B - MAP 2 JOIN
  • 20.
    Dynamically optimize skewjoins -- How? (2) Skew-optimized sort merge join -- with skew shuffle reader: A0 - S2 TABLE A - MAP 1 B0 TABLE A - MAP 2 TABLE A - MAP 3 Split A0 PART. A1 PART. A2 PART. A3 PART. B1 PART. B2 PART. B3 TABLE B - MAP 1 TABLE B - MAP 2 A0 - S1 A0 - S0 B0 B0 Duplicate B0 JOIN
  • 21.
    About Me Ke Jia BigData Product Engineer at Intel Contributor of Spark, OAP and Hive
  • 22.
    About Me Ke Jia BigData Product Engineer at Intel Contributor of Spark, OAP and Hive
  • 23.
  • 24.
    TPC-DS Performance (3TB)-- Cluster Setup Hardware BDW Slave Node# 5 CPU Intel(R) Xeon(R) Gold 6252 CPU @ 2.10GHz (96cores) Memory 384 GB Disk 7× 1 TB SSD Network 10 Gigabit Ethernet Master CPU Intel(R) Xeon(R) Gold 6252 CPU @ 2.10GHz (96cores) Memory 384 GB Disk 7× 1 TB SSD Network 10 Gigabit Ethernet Software OS Fedora release 29 Kernel 4.20.6-200.fc29.x86_64 Spark* Spark master (commit ID: 0b6aae422ba37a13531e98c8801589f5f3cb28e0) Hadoop*/HDFS* hadoop-2.7.5 JDK 1.8.0_110 (Oracle* Corporation)
  • 25.
    TPC-DS Performance (3TB)-- Results 1.76x 1.5x 1.41x 1.4x 1.38x 1.28x 1.27x 1.22x 1.21x 1.19x ▪ Over 1.5x speedup on 2 queries; over 1.1x speedup on 37 queries
  • 26.
    TPC-DS Performance (3TB)-- Partition Coalescing • Less scheduler overhead and task startup time. • Less disk IO requests. • Less data are written to disk because more data are aggregated. Partitions Number 1000 (Q8 without AQE) Partitions Number changed to 658 and 717 (Q8 with AQE)
  • 27.
    TPC-DS Performance (3TB)-- Join Strategies • Random IO read -> Sequence IO read • Remote shuffle read -> local shuffle read. SortMergeJoin (Q14b without AQE) Broadcast Hash Join (Q14b with AQE)
  • 28.
    AQE in Production ▪Performance shared by one of largest E-commerce company in China AQE helped them resolved critical data skew issues and achieved significant performance for online business queries. AQE engine can get 17.7x, 14.0x, 1.6x and 1.3x respectively on 4 typical skewed queries. ▪ Performance shared by one of largest internet company in China AQE can gain 5x and 1.38x performance for two typical queries in their production environment.
  • 29.
    AQE in Production-- Skew Join Optimization 17.7x 14.0x 1.6x 1.3x key Avg record Skew records comment sale_order_id 2000 15383717 NULL ivc_content_id 9231804 4077995632 Not NULL ivc_type_id 360 3582336345 Not NULL ▪ Select the user’s invoice details based on the sale order id, invoice content id and the invoice type id. Durations(s)
  • 30.
    Feedback Your feedback isimportant to us. Don’t forget to rate and review the sessions.