#apache-spark #scala #plan

Logical and physical plans are very common concepts across databases and SQL engines. In practice, whenever we run an SQL query, it goes through numerous stages and different representations until we get a result. Apache Spark achieves this through its Catalyst Optimizer, a highly sophisticated component of Spark SQL that is responsible for the parsing, execution, and optimization of SQL queries through numerous steps and intermediate representations.

Below we’ll go through a couple of examples that consist of loading some sample data from a CSV file and performing a join. We will be using Scala.

The entry point to our demo is a very common operation. We are just reading a CSV from the filesystem. Additionally, we are passing some parameters, related to automatically inferring the schema, and declaring the presence of a header row.

Finally, we cache the whole resulting DataFrame into memory for fast access, benefiting any subsequent actions. The concepts of actions and transformations, and by extension, lazy evaluations, are well-documented and explained, so we will be omitting any discussions about them.

val table = spark
		.read
    .option("header", "true")
    .option("inferSchema", true)
    .csv("data/airline-safety.csv")
    .cache()

Spark provides convenient functionality for inquiring about the generated plans, across different granularities. In Snippet #2, we are requesting the extended logical and physical plans for our statements.

As per documentation, it generates the parsed logical plan, analyzed logical plan, optimized logical plan, and physical plan. The Parsed Logical plan is an unresolved plan that is extracted from the query. Analyzed logical plans translate unresolvedAttribute and unresolvedRelation into fully typed objects. And, finally, the optimized logical plan transforms through a set of optimization rules, resulting in the physical plan.

table.explain(extended=true)

It is apparent from the result (Snippet #3) that there are few intermediate representations constructed during the planning. Every plan contains numerous operations, which can/are composed by other operators.

A brief description follows on the plans:

  1. Parsed Logical Plan (Unresolved)

    Here the plan just parses and verifies the syntax of our query. This also includes the generation of an abstract syntax tree (AST) for representing the logical plan. The engine has no knowledge of the columns and tables involved, and thus are completely ignored.

  2. Analyzed Logical Plan (Resolved)

    As soon as the parsed logical plan is generated, the engine integrates the Spark’s Catalog and attempts to resolve (as its name implies) the referenced columns and the source table/s or view/s utilized. In case the plan fails to resolve an entity, it will raise an AnalysisException. In our case, you can see the columns and their data types properly infered by the engine.

  3. Optimized Logical Plan

    It is a tree that represents both the data and its schema. A series of rules are applied to the tree targeting optmizations rules, such as grouping tasks into stages, evaluating any filters, etc. The Catalyst Optimizer takes care of employing these rules.

  4. Physical Plan

    In reality, Spark creates numerous Physical Plans and employs a Cost Model for choosing the most efficient one.

In general, one could think of the notions of logical and physical plans as follows:

This separation of responsibilities allows Spark to evaluate various execution strategies independently of the logical operations. Additionally, it allows the use of external operators. Such an example would be Delta’s Deletion Vectors, a storage optimization feature, which efficiently manages data by tracking the records (in the form of log files called “deletion vectors”) that have been logically updated/deleted/etc. from a Delta table, ensuring consistent and reliable data maintenance and query results. So, whenever we need to operate on a Delta table, the corresponding operators (i.e., that of loading a Parquet file) will also load the relevant deletion vector files and skip any necessary rows, and resolve the current state of the table.

<aside> ℹ️ We read the lines in each plan from BOTTOM to TOP.

</aside>

== Parsed Logical Plan ==
Relation [airline#17,avail_seat_km_per_week#18L,incidents_85_99#19,fatal_accidents_85_99#20,fatalities_85_99#21,incidents_00_14#22,fatal_accidents_00_14#23,fatalities_00_14#24] csv

== Analyzed Logical Plan ==
airline: string, avail_seat_km_per_week: bigint, incidents_85_99: int, fatal_accidents_85_99: int, fatalities_85_99: int, incidents_00_14: int, fatal_accidents_00_14: int, fatalities_00_14: int
Relation [airline#17,avail_seat_km_per_week#18L,incidents_85_99#19,fatal_accidents_85_99#20,fatalities_85_99#21,incidents_00_14#22,fatal_accidents_00_14#23,fatalities_00_14#24] csv

== Optimized Logical Plan ==
InMemoryRelation [airline#17, avail_seat_km_per_week#18L, incidents_85_99#19, fatal_accidents_85_99#20, fatalities_85_99#21, incidents_00_14#22, fatal_accidents_00_14#23, fatalities_00_14#24], StorageLevel(disk, memory, deserialized, 1 replicas)
   +- FileScan csv [airline#17,avail_seat_km_per_week#18L,incidents_85_99#19,fatal_accidents_85_99#20,fatalities_85_99#21,incidents_00_14#22,fatal_accidents_00_14#23,fatalities_00_14#24] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/makism/Jupyter/plans/data/airline-safety.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<airline:string,avail_seat_km_per_week:bigint,incidents_85_99:int,fatal_accidents_85_99:int,fatalities_85_99:int,incidents_00_14:int,fatal_accidents_00_14:int,fatalities_00_14:int>

== Physical Plan ==
InMemoryTableScan [airline#17, avail_seat_km_per_week#18L, incidents_85_99#19, fatal_accidents_85_99#20, fatalities_85_99#21, incidents_00_14#22, fatal_accidents_00_14#23, fatalities_00_14#24]
   +- InMemoryRelation [airline#17, avail_seat_km_per_week#18L, incidents_85_99#19, fatal_accidents_85_99#20, fatalities_85_99#21, incidents_00_14#22, fatal_accidents_00_14#23, fatalities_00_14#24], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- FileScan csv [airline#17,avail_seat_km_per_week#18L,incidents_85_99#19,fatal_accidents_85_99#20,fatalities_85_99#21,incidents_00_14#22,fatal_accidents_00_14#23,fatalities_00_14#24] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/makism/Jupyter/plans/data/airline-safety.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<airline:string,avail_seat_km_per_week:bigint,incidents_85_99:int,fatal_accidents_85_99:int,fatalities_85_99:int,incidents_00_14:int,fatal_accidents_00_14:int,fatalities_00_14:int>

Back to our working example, let us continue to a more realistic query. The Snippet #4 on the right demonstrates two common units of work.

➊ Creating a small lookup-like table, and,

➋ Performing a broadcast inner-join with the lookup table.

val airline_names = table
    .select($"airline".as("airline_name"))
    .distinct
    .limit(10)

val filtered = table
    .as("data")
    .join(
        broadcast(airline_names.as("filter")),
        $"data.airline".equalTo($"filter.airline")
    )

Parsed and Analyzed Logical Plans

Let’s consider the plans separately. First, the series of logical plans.

The parsed logical plan is the very first plan Spak constructs. The top root of the plan (or tree), represents a Join Inner with the join condition being identified within the parentheses; airline = airline_name.

We can easily spot two branches in the plan.

SubqueryAlias data

It just aliases or renames the given table under a new name, data in this case. The table it uses is denoted by the operator Relation downstream.

ResolvedHint (strategy=broadcast)

Hints Spark to perform a broadcast join. Reading the tree from bottom to top. First, we identify the table that we will be using from the operator Relation . Then, select the fields as specified by the operator Project; in this case, select the field airline, and alias it to airline_name. Subsequently, perform deduplication (Deduplicate) on the specific field name (airline_name). The next two operators LocalLimit and GlobalLimit are responsible for limiting the results in our table. LocalLimit performs the limit per partition (i.e. #num_partitions x 10), while GlocalLimit triggers a shuffle (if needed) and is applied to the previous results and limits the results to 10. A more efficient, single-stage alternative to LIMIT is TABLESAMPLE. Finally, we alias the resulting table to filter.

In our simple case, since we are not using functions, expressions, or any lazy evaluations, the Parsed and Analyzed logical plans are identical.

Optimized Logical Plan

The plan is pretty much identical with the exception that some more implementation details and dependencies are being added.

Inner Join

Effectively Inner Join just merges the rows in the two tables based on the common key and ignores the NULL matches. So, we identify the added Filter isnotnull(KEY) operator in the plan, filtering out NULL values.

Deduplicate

Deduplication is implemented by an Aggregate operator, by grouping the rows and based on the keys (i.e. airline_name in our case), will produce the unique values.

<aside> ℹ️ Something minute, but of interest. A Left Join will be represented by an Inner Join operator when effectively there is a condition WHERE columnX IS NOT NULL in place.

</aside>

== Parsed Logical Plan ==
Join Inner, (airline#17 = airline_name#193)
:- SubqueryAlias data
:  +- Relation [airline#17,avail_seat_km_per_week#18L,incidents_85_99#19,fatal_accidents_85_99#20,fatalities_85_99#21,incidents_00_14#22,fatal_accidents_00_14#23,fatalities_00_14#24] csv
+- ResolvedHint (strategy=broadcast)
   +- SubqueryAlias filter
      +- GlobalLimit 10
         +- LocalLimit 10
            +- Deduplicate [airline_name#193]
               +- Project [airline#409 AS airline_name#193]
                  +- Relation [airline#409,avail_seat_km_per_week#410L,incidents_85_99#411,fatal_accidents_85_99#412,fatalities_85_99#413,incidents_00_14#414,fatal_accidents_00_14#415,fatalities_00_14#416] csv

== Analyzed Logical Plan ==
airline: string, avail_seat_km_per_week: bigint, incidents_85_99: int, fatal_accidents_85_99: int, fatalities_85_99: int, incidents_00_14: int, fatal_accidents_00_14: int, fatalities_00_14: int, airline_name: string
Join Inner, (airline#17 = airline_name#193)
:- SubqueryAlias data
:  +- Relation [airline#17,avail_seat_km_per_week#18L,incidents_85_99#19,fatal_accidents_85_99#20,fatalities_85_99#21,incidents_00_14#22,fatal_accidents_00_14#23,fatalities_00_14#24] csv
+- ResolvedHint (strategy=broadcast)
   +- SubqueryAlias filter
      +- GlobalLimit 10
         +- LocalLimit 10
            +- Deduplicate [airline_name#193]
               +- Project [airline#409 AS airline_name#193]
                  +- Relation [airline#409,avail_seat_km_per_week#410L,incidents_85_99#411,fatal_accidents_85_99#412,fatalities_85_99#413,incidents_00_14#414,fatal_accidents_00_14#415,fatalities_00_14#416] csv

== Optimized Logical Plan ==
Join Inner, (airline#17 = airline_name#193), rightHint=(strategy=broadcast)
:- Filter isnotnull(airline#17)
:  +- InMemoryRelation [airline#17, avail_seat_km_per_week#18L, incidents_85_99#19, fatal_accidents_85_99#20, fatalities_85_99#21, incidents_00_14#22, fatal_accidents_00_14#23, fatalities_00_14#24], StorageLevel(disk, memory, deserialized, 1 replicas)
:        +- FileScan csv [airline#17,avail_seat_km_per_week#18L,incidents_85_99#19,fatal_accidents_85_99#20,fatalities_85_99#21,incidents_00_14#22,fatal_accidents_00_14#23,fatalities_00_14#24] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/makism/Jupyter/plans/data/airline-safety.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<airline:string,avail_seat_km_per_week:bigint,incidents_85_99:int,fatal_accidents_85_99:int,fatalities_85_99:int,incidents_00_14:int,fatal_accidents_00_14:int,fatalities_00_14:int>
+- Filter isnotnull(airline_name#193)
   +- GlobalLimit 10
      +- LocalLimit 10
         +- Aggregate [airline_name#193], [airline_name#193]
            +- Project [airline#409 AS airline_name#193]
               +- InMemoryRelation [airline#409, avail_seat_km_per_week#410L, incidents_85_99#411, fatal_accidents_85_99#412, fatalities_85_99#413, incidents_00_14#414, fatal_accidents_00_14#415, fatalities_00_14#416], StorageLevel(disk, memory, deserialized, 1 replicas)
                     +- FileScan csv [airline#17,avail_seat_km_per_week#18L,incidents_85_99#19,fatal_accidents_85_99#20,fatalities_85_99#21,incidents_00_14#22,fatal_accidents_00_14#23,fatalities_00_14#24] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/makism/Jupyter/plans/data/airline-safety.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<airline:string,avail_seat_km_per_week:bigint,incidents_85_99:int,fatal_accidents_85_99:int,fatalities_85_99:int,incidents_00_14:int,fatal_accidents_00_14:int,fatalities_00_14:int>

Physical Plan

As we briefly noted earlier, this plan contains specific implementation details on how Spark will process the data and execute our queries.

The first thing one can notice is that the root node now is being represented by the operator AdaptiveSparkPlan with parameters isFinalPlan=false. This operator comes from the Adaptive Query Execution (AQE), a query re-optimization feature. Using runtime statistics, from shuffling and broadcasting data around, Spark can modify the plan during execution to account for these changes. The parameter isFinalPlan=false denotes that the query/plan has not yet been executed, and after completion will be set to true.

Next, we see that the best strategy to perform our Inner Join is through BroadcastHashJoin (a hash-based join, for broadcast hash join). As a reminder, the significantly smaller dataset is being broadcasted to all the nodes and is cached into memory. Then each worker creates a hash table using the join keys; subsequently, they find the matching rows across the two tables by scanning their partitions (of the larger dataset) for matching keys from the hash table. Finaly, its corresponding parameters are Left Key=[airline#17], Right Key=[airline_name#193], Join Type=Inner, BuildSide=BuildRight and isNullAwareAntiJoin=false.

Additionally, under the same node, we have a small branch, FileScan -> InMemoryRelation -> InMemoryTableScan -> Filter, that is identical to the previous branches we saw in the earlier plans.

The other node that participates in the join, the BroadcastExchange, hosts numerous operators. The first sequence of operators FileScan -> InMemoryRelation -> InMemoryTableScan -> Project is identical to the previous ones and is responsible for loading the CSV file in memory and selecting the fields.

Going a step upwards, the operator HashAggregate, with the given set of parameters (key, function, and output) the goal is to perform partial grouping on the keys identified by the key parameter, and since there is no function defined, it just collects the unique key values; implementing this way the distinct() call in our code segment above. It is important to note that this node works on a per-partition basis.

The next operator, Exchange, exchanges data (shuffles) among the workers using a hashPartitioning function, making sure that all the rows with the same airline_name land in the same partition.

The final HashAggregate operator performs the same grouping as before, but this time on the shuffled data to ensure global uniqueness.

Since we have already introduced the operators LocalLimit, Exchange, GlocalLimit, and Filter we just skip them and finish with the BroadcastExchange. This is the operator that physically broadcasts the smaller dataset to the workers. While the HashedRelationBroadcastMode indicated the mode of broadcasting. Similar to what we discussed earlier, every worker builds a hash table locally from the broadcasted dataset using the joining keys. The keys are identified by the parameter List(input[0, string, false]); use the column at index 0 which is of type string and cannot be null (just like we build StructField). The last false parameter indicates that this broadcast does not include null-safe equality checks.

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [airline#17], [airline_name#193], Inner, BuildRight, false
   :- Filter isnotnull(airline#17)
   :  +- InMemoryTableScan [airline#17, avail_seat_km_per_week#18L, incidents_85_99#19, fatal_accidents_85_99#20, fatalities_85_99#21, incidents_00_14#22, fatal_accidents_00_14#23, fatalities_00_14#24], [isnotnull(airline#17)]
   :        +- InMemoryRelation [airline#17, avail_seat_km_per_week#18L, incidents_85_99#19, fatal_accidents_85_99#20, fatalities_85_99#21, incidents_00_14#22, fatal_accidents_00_14#23, fatalities_00_14#24], StorageLevel(disk, memory, deserialized, 1 replicas)
   :              +- FileScan csv [airline#17,avail_seat_km_per_week#18L,incidents_85_99#19,fatal_accidents_85_99#20,fatalities_85_99#21,incidents_00_14#22,fatal_accidents_00_14#23,fatalities_00_14#24] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/makism/Jupyter/plans/data/airline-safety.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<airline:string,avail_seat_km_per_week:bigint,incidents_85_99:int,fatal_accidents_85_99:int,fatalities_85_99:int,incidents_00_14:int,fatal_accidents_00_14:int,fatalities_00_14:int>
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, string, false]),false), [id=#407]
      +- Filter isnotnull(airline_name#193)
         +- GlobalLimit 10
            +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#403]
               +- LocalLimit 10
                  +- HashAggregate(keys=[airline_name#193], functions=[], output=[airline_name#193])
                     +- Exchange hashpartitioning(airline_name#193, 200), ENSURE_REQUIREMENTS, [id=#399]
                        +- HashAggregate(keys=[airline_name#193], functions=[], output=[airline_name#193])
                           +- Project [airline#409 AS airline_name#193]
                              +- InMemoryTableScan [airline#409]
                                    +- InMemoryRelation [airline#409, avail_seat_km_per_week#410L, incidents_85_99#411, fatal_accidents_85_99#412, fatalities_85_99#413, incidents_00_14#414, fatal_accidents_00_14#415, fatalities_00_14#416], StorageLevel(disk, memory, deserialized, 1 replicas)
                                          +- FileScan csv [airline#17,avail_seat_km_per_week#18L,incidents_85_99#19,fatal_accidents_85_99#20,fatalities_85_99#21,incidents_00_14#22,fatal_accidents_00_14#23,fatalities_00_14#24] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/makism/Jupyter/plans/data/airline-safety.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<airline:string,avail_seat_km_per_week:bigint,incidents_85_99:int,fatal_accidents_85_99:int,fatalities_85_99:int,incidents_00_14:int,fatal_accidents_00_14:int,fatalities_00_14:int>


Continue reading these…