As we have seen one of the major improvements in Spark since Spark 1.6 is SparkSQL engine. This is extensively used in every Spark component. And since then underline execution engine being improved in every release. SparkSQL queries performance is evaluated using query optimizer and planner which helps in generating high quality query execution plan. We have discussed in previous chapters that cost-based optimizer is the framework which collects and leverages a variety of data statistics which include number of rows, how many distinct values, number of NULL values, and what is the minimum and maximum values. Based on this statistics Spark Cost based optimizer chooses the best plans to execute query. Following are the examples where cost-based optimization is included.
Right Join: In this based on the cost it will decide which of the join should be used out of below two.
Broadcast Hash Join
Sort Merge Join
Selecting Build Side: It helps in selecting correct build side in hash-join.
Adjusting join: It helps in adjusting join to order in a multi-way join.
Based on above, you might immediately have a question in mind. What happens if statistics is outdated. In relation databases, you have seen that DBA or Developer regularly gather stats so that query runs faster. Gathering stats means get the latest statistics in the table. If you have latest statistics then cost based optimizer can work better. However, in Spark outdated statistics and wrong cardinality estimates can select wrong or not optimized query plan. So how this can be avoided. In Apache Spark 3.0 new feature is introduced that is called Adaptive Query Execution i.e. AQE, which we will discuss in next section.
The AQE (Adaptive Query Execution) is a new framework which is introduced in Spark 3.0. This framework is introduced to avoid issue with respect to outdated statistics and imperfect cardinality, which causes wrong plan to be selected in cost-based optimizer.
So AQE helps in re-optimizing and adjusting query plans based on runtime statistics collected in the process of query execution.
This is very critical point to discuss, when to re-optimize the execution plan. You might already be knowing that Spark works in pipeline and all operations are executed in parallel. Which are the steps which breaks thus pipeline? Below is the place where pipeline is broken
Shuffle
Broadcast
These are known as materialization points also you can say “Query Stages”. This is called materialization point because each query materializes this intermediate results. And the next following stages can only proceed if all the parallel processing is completed and intermediate results are materialized. This is the place where you can think re-optimization is possible because now you have data statistics is available for each partition and following operation is not yet started.
Get Databricks PySpark Certification All 230+ Questions and Answer
Runtime re-optimization make sure that Spark execution engine has the most up-to-date and accurate statistics at the end of shuffle and broadcast this is also referred as stages in AQE framework. With this Spark execution engine can pick up better physical plan, select optimal partition size and number of partitions after shuffle. You may not need to provide any hint, it can select optimal partition size and number. And switch to the require join strategy.
Stages which are not depend on another stage is known as leaf stages. So whenever your query start executing AQE or Adaptive Query Execution framework look for the leaf stages and kicks them. As soon as one of the stages completes means materialization point is reached for that stage, then AQE framework will mark them as complete in the physical query plan. And logical query plan is updated accordingly. And at this point new runtime statistics would be gathered from completed stages.
Now AQE framework has new statistics framework will runs the optimizer using selected list of logical optimization rules. And newly optimized plan would be created and some stages also completed. The AQE framework will run new query stages whose child stages already completed or materialized and repeat above steps until entire query is done or all the stages are completed from the query. AQE framework support Runtime SQL query optimization.
The AQE framework is delivered as part of Spark 3.0 and which is shipped with 3 features:
1. Dynamically coalescing shuffle partitions.
2. Dynamically Switching join strategies.
3. Dynamically optimizing skew joins.