Dynamically coalescing shuffle partitions.
Dynamically Switching join strategies.
Dynamically optimizing skew joins.
Dynamically optimizing broadcast joins.
It can automatically pick an optimal post shuffle partition size and number
Correct Answer : 1,2,3,5
Explanation : Runtime re-optimization make sure that Spark execution engine has the most up-to-date and accurate statistics at the end of shuffle and broadcast this is also referred as stages in AQE framework. With this Spark execution engine can pick up better physical plan, select optimal partition size and number of partitions after shuffle. You may not need to provide any hint, it can select optimal partition size and number. And switch to the require join strategy.
Leaf stages
Stages which are not depend on another stage is known as leaf stages. So whenever your query start executing AQE or Adaptive Query Execution framework look for the leaf stages and kicks them. As soon as one of the stages completes means materialization point is reached for that stage, then AQE framework will mark them as complete in the physical query plan. And logical query plan is updated accordingly. And at this point new runtime statistics would be gathered from completed stages.
Run optimizer
Now AQE framework has new statistics framework will runs the optimizer using selected list of logical optimization rules. And newly optimized plan would be created and some stages also completed. The AQE framework will run new query stages whose child stages already completed or materialized and repeat above steps until entire query is done or all the stages are completed from the query. AQE framework support Runtime SQL query optimization.
Features in AQE (Adaptive Query Execution Framework
The AQE framework is delivered as part of Spark 3.0 and which is shipped with 3 features:
1. Dynamically coalescing shuffle partitions.
2. Dynamically Switching join strategies.
3. Dynamically optimizing skew joins.
BROADCAST Hint
MERGE Hint
SHUFFLE_HASH Hint
SHUFFLE_REPLICATE_NL hint
Correct Answer : 1,2,3,4
Explanation : The join strategy hints, namely BROADCAST, MERGE, SHUFFLE_HASH and SHUFFLE_REPLICATE_NL, instruct Spark to use the hinted strategy on each specified relation when joining them with another relation. For example, when the BROADCAST hint is used on table ‘t1’, broadcast join (either broadcast hash join or broadcast nested loop join depending on whether there is any equi-join key) with ‘t1’ as the build side will be prioritized by Spark even if the size of table ‘t1’ suggested by the statistics is above the configuration spark.sql.autoBroadcastJoinThreshold.
When different join strategy hints are specified on both sides of a join, Spark prioritizes the BROADCAST hint over the MERGE hint over the SHUFFLE_HASH hint over the SHUFFLE_REPLICATE_NL hint. When both sides are specified with the BROADCAST hint or the SHUFFLE_HASH hint, Spark will pick the build side based on the join type and the sizes of the relations.
Note that there is no guarantee that Spark will choose the join strategy specified in the hint since a specific strategy may not support all join types.
Sample using SparkSQL
-- We accept BROADCAST, BROADCASTJOIN and MAPJOIN for broadcast hint
SELECT /*+ BROADCAST(r) */ * FROM records r JOIN src s ON r.key = s.key
Sample using PySpark
spark.table("src").join(spark.table("records").hint("broadcast"), "key").show()
Sample using Spark Scala
spark.table("src").join(spark.table("records").hint("broadcast"), "key").show()
Less number of small tasks or partitions
Less number of Big Tasks or partitions
More number of small tasks or partitions
More number of Big Tasks or partitions
Correct Answer : 2
Explanation : Shuffle Optimizations
As you know shuffling is one of the critical things for any query which involves it. If you can find that correct number of partitions for shuffle then its great, but that is always been challenging. This is challenging because the amount of data varies from query to query, stage to stage (because you apply filter, join etc. in query). And for each stage if you are using same number of shuffle partitions then it would have small size tasks which is inefficient for the Spark scheduler or if a smaller number of big tasks then it would have excessive garbage collection overhead and disk spilling.
Adaptive query execution (AQE) is query re-optimization that occurs during query execution.
Adaptive query execution (AQE) is query re-optimization that occurs after query execution.
Adaptive query execution (AQE) is query re-optimization that occurs before query execution.
Correct Answer : 1
Explanation : The AQE (Adaptive Query Execution) is a new framework which is introduced in Spark 3.0. This framework is introduced to avoid issue with respect to outdated statistics and imperfect cardinality, which causes wrong plan to be selected in cost-based optimizer.
So AQE helps in re-optimizing and adjusting query plans based on runtime statistics collected in the process of query execution.
he_schema=StructType(
[
StructField("sr" , IntegerType(), True),
StructField("phrase" , StringType(), True),
StructField("title" , StringType(), True),
StructField("url" , StringType(), True),
StructField("year" , IntegerType(), True),
StructField("month" , IntegerType(), True),
StructField("day" , IntegerType(), True),
StructField("fee" , IntegerType(), True),
]
)
//Create an RDD from given Data
quickTechieRDD = sc.parallelize([(0,"Amazing","Datastacks","https://www.datastax.com/",2015,6,14,7000),
(1,"Amazing","Cloudera","https://www.cloudera.com/",2010,8,29,9000),
(2,"Great","Hortonworks","https://hortonworks.com/",2012,3,23,8000),
(3,"Great","MapR","https://mapr.com/",2013,2,27,6000),
(4,"Great","QuickTechie","http://quicktechie.com/cs/",2008,9,19,9000),
(5,"Good","Acmeshell","https://acmeshell.com",2004,11,18,10000),
(6,"Awsome","HadoopExam","http://www.hadoopexam.com/",2012,3,22,4000),
(7,"Amazing","Databricks","https://databricks.com",2014,7,30,30000)])
// Convert RDD to DataFrame and also assign schema to it.
heCoursesDF = spark.createDataFrame(quickTechieRDD, he_schema)
quuickTechieDF = heCoursesDF.groupBy("fee")
quuickTechieCountDF = quuickTechieDF.count()
quuickTechieCountDFSorted= quuickTechieCountDF.sort(desc("count"))
quuickTechieTop5Fee=quuickTechieCountDFSorted.take(5)
quuickTechieCountDFSorted.write.format("json").mode("overwrite").save("/home/hadoopexam/spark2/data/dataset1")
If folder "/home/hadoopexam/spark2/data/dataset1" is already exists than it will throw error.
If folder "/home/hadoopexam/spark2/data/dataset1" is already exists than it will not throw error.
If folder "/home/hadoopexam/spark2/data/dataset1" is does not exists than it will throw error.
If folder "/home/hadoopexam/spark2/data/dataset1" does not exists than it will successfully run.
Correct Answer : Check Here
Explanation : While saving the data overwrite mode is very important. Suppose this folder "hadoopexam/spark/data" does not exists than save method will create it. Hence, 4th option is correct. But what if folder already exists than "Overwrite" does not have role to overwrite the folder but if any file already exists in the folder than it will be overwritten. Suppose folder already exists and you don't use overwrite than you will get error.