From Plan to Execution: RDDs, Stages, Tasks, and the Shuffle
Prerequisites
- ›Articles 1-3 (Architecture, Boot Sequence, Catalyst Pipeline)
- ›Understanding of partitioned data and hash/sort-based shuffling
- ›Familiarity with DAG (directed acyclic graph) concepts
From Plan to Execution: RDDs, Stages, Tasks, and the Shuffle
In the previous article, we traced a SQL query through Catalyst's optimizer pipeline and arrived at a physical SparkPlan. Now comes the critical step: executing that plan as distributed computation across a cluster. This is where the elegant world of logical transformations meets the messy reality of network partitions, data skew, and disk I/O.
This article covers the entire execution path: from SparkPlan.execute() producing an RDD, through the DAGScheduler's stage creation algorithm, the shuffle system that moves data between stages, BlockManager's distributed storage, and finally Adaptive Query Execution's runtime re-optimization. This is the heart of Spark's distributed computing engine.
SparkPlan.execute() and the RDD Bridge
The bridge between Spark SQL and the core distributed engine is a single method call:
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L197-L202
final def execute(): RDD[InternalRow] = executeQuery {
if (isCanonicalizedPlan) {
throw SparkException.internalError("A canonicalized plan is not supposed to be executed.")
}
executeRDD.get
}
Each concrete SparkPlan overrides doExecute() to produce an RDD[InternalRow]:
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L328-L332
flowchart TD
PLAN["SparkPlan tree<br/>(physical plan)"] --> EXEC["execute()"]
EXEC --> RDD["RDD[InternalRow]<br/>(lazy)"]
RDD -->|"Action<br/>e.g. collect()"| DAG["DAGScheduler.submitJob()"]
DAG --> STAGES["Stage graph"]
STAGES --> TASKS["TaskSets → Executors"]
The crucial point: this is all lazy. Calling execute() doesn't run anything — it builds an RDD lineage graph. Actual computation only starts when an action (like collect(), count(), or saveAsTable()) triggers SparkContext.runJob(), which passes the final RDD to the DAGScheduler.
The RDD Abstraction: Five Properties
The RDD is defined by five properties, explicitly documented in its Scaladoc:
core/src/main/scala/org/apache/spark/rdd/RDD.scala#L69-L78
These five properties are implemented as override points in the abstract class:
core/src/main/scala/org/apache/spark/rdd/RDD.scala#L116-L139
classDiagram
class RDD~T~ {
<<abstract>>
+compute(split, context): Iterator~T~
#getPartitions: Array[Partition]
#getDependencies: Seq[Dependency]
#getPreferredLocations(split): Seq[String]
+partitioner: Option[Partitioner]
}
note for RDD "1. Partitions list\n2. Compute function\n3. Dependencies\n4. Optional Partitioner\n5. Preferred locations"
| Property | Method | Purpose |
|---|---|---|
| Partitions | getPartitions |
List of splits — each becomes one task |
| Compute | compute(split, context) |
The actual computation logic for a single partition |
| Dependencies | getDependencies |
Links to parent RDDs — forms the lineage graph |
| Partitioner | partitioner |
For key-value RDDs: how data is partitioned (hash, range) |
| Preferred Locations | getPreferredLocations |
Hints for data locality (e.g., HDFS block locations) |
These five methods are all the scheduler needs. The DAGScheduler uses getDependencies to build the stage graph, getPartitions to determine parallelism, and getPreferredLocations for task placement. The compute function is serialized and shipped to executors. This minimal contract is what allows RDDs to represent anything from HDFS reads to shuffled aggregations.
Narrow vs Shuffle Dependencies
The distinction between NarrowDependency and ShuffleDependency is the single most important concept for understanding Spark's execution model:
core/src/main/scala/org/apache/spark/Dependency.scala#L41-L95
flowchart LR
subgraph "Narrow Dependency (pipelined)"
P1A[Parent\nPartition 1] --> C1A[Child\nPartition 1]
P2A[Parent\nPartition 2] --> C2A[Child\nPartition 2]
P3A[Parent\nPartition 3] --> C3A[Child\nPartition 3]
end
subgraph "Shuffle Dependency (stage boundary)"
P1B[Parent\nPartition 1] --> C1B[Child\nPartition 1]
P1B --> C2B[Child\nPartition 2]
P2B[Parent\nPartition 2] --> C1B
P2B --> C2B
P3B[Parent\nPartition 3] --> C1B
P3B --> C2B
end
NarrowDependency: Each child partition depends on a small, fixed number of parent partitions. Operations like map, filter, and union create narrow dependencies. These can be pipelined — executed in-process without writing intermediate data to disk.
ShuffleDependency: Each child partition may depend on all parent partitions. Operations like groupByKey, reduceByKey, and join create shuffle dependencies. These require a shuffle — writing data to disk on the map side and reading it over the network on the reduce side. Every shuffle dependency creates a stage boundary.
Tip: The number of shuffle boundaries directly determines the number of stages, which in turn determines the number of synchronization points where all tasks must complete before the next stage begins. Minimizing shuffles is the single highest-impact optimization in Spark.
DAGScheduler: Building Stages from the RDD Graph
When an action triggers a job, the DAGScheduler walks backward through the RDD dependency graph to build stages. The algorithm starts in createResultStage():
flowchart BT
RDD1["RDD: textFile()"] -->|narrow| RDD2["RDD: map()"]
RDD2 -->|narrow| RDD3["RDD: filter()"]
RDD3 -->|"shuffle<br/>(stage boundary)"| RDD4["RDD: reduceByKey()"]
RDD4 -->|narrow| RDD5["RDD: map()"]
RDD5 -->|"ACTION: collect()"| RESULT[ResultStage]
subgraph Stage0["ShuffleMapStage 0"]
RDD1
RDD2
RDD3
end
subgraph Stage1["ResultStage 1"]
RDD4
RDD5
end
The key method is getShuffleDependenciesAndResourceProfiles(), which walks the RDD's dependencies: narrow dependencies are traversed (staying within the same stage), while shuffle dependencies are collected as stage boundaries. It uses an explicit stack (not recursion) to prevent StackOverflowError on deep RDD lineages.
Stages come in two types:
ShuffleMapStage: Produces shuffle output files. Created bycreateShuffleMapStage(), it registers withMapOutputTrackerso downstream stages can locate the shuffle data.ResultStage: The final stage that computes the action's result. Its tasks run the user's result function (e.g., collecting rows) and return results to the driver.
Task Execution: Serialization, Shipping, and Running
Once stages are created, the DAGScheduler submits TaskSets to the TaskScheduler. Each TaskSet contains tasks for every partition in the stage — either ShuffleMapTask (writes shuffle output) or ResultTask (produces final results).
sequenceDiagram
participant DAG as DAGScheduler
participant TS as TaskSchedulerImpl
participant SB as SchedulerBackend
participant EX as Executor
DAG->>TS: submitTasks(TaskSet)
TS->>TS: Create TaskSetManager
TS->>SB: reviveOffers()
SB->>TS: resourceOffers(workers)
TS->>TS: Match tasks to workers (locality)
TS-->>SB: TaskDescriptions
SB->>EX: LaunchTask(serialized task)
EX->>EX: Deserialize + run task
EX-->>SB: StatusUpdate(finished/failed)
SB-->>TS: statusUpdate
TS-->>DAG: taskEnded / taskFailed
The TaskScheduler considers data locality when assigning tasks. It prefers to run a task on a node that has its input data (PROCESS_LOCAL > NODE_LOCAL > RACK_LOCAL > ANY), waiting a configurable delay before falling back to a less-local assignment.
Task serialization is worth noting: the task's closure (including the RDD compute function) is serialized using Spark's closure serializer. This means any variables captured in the closure must be serializable — a common source of NotSerializableException.
The Shuffle System
The shuffle is Spark's most complex and performance-critical subsystem. The ShuffleManager trait defines the pluggable interface:
private[spark] trait ShuffleManager {
def registerShuffle[K, V, C](shuffleId: Int, dependency: ShuffleDependency[K, V, C]): ShuffleHandle
def getWriter[K, V](handle: ShuffleHandle, mapId: Long, context: TaskContext, ...): ShuffleWriter[K, V]
def getReader[K, C](handle: ShuffleHandle, startMapIndex: Int, ...): ShuffleReader[K, C]
}
flowchart LR
subgraph "Map Side (ShuffleMapTask)"
DATA[Partition Data] --> SORT[SortShuffleWriter]
SORT --> FILE[Data File + Index File]
end
subgraph "Shuffle Service"
FILE --> RESOLVER[IndexShuffleBlockResolver]
end
subgraph "Reduce Side (ResultTask)"
RESOLVER --> READER[BlockStoreShuffleReader]
READER --> MERGE[Merge sorted streams]
MERGE --> OUTPUT[Shuffled Partition]
end
The default SortShuffleManager writes sorted, partitioned data files with an accompanying index file. The IndexShuffleBlockResolver manages the on-disk layout, enabling efficient random access to specific partitions within a shuffle file. On the reader side, BlockStoreShuffleReader fetches blocks from remote executors (or from the External Shuffle Service) and merges the sorted streams.
BlockManager: Distributed Storage
BlockManager is the distributed storage layer that manages all data blocks — shuffle data, broadcast variables, and cached RDDs:
Every executor runs a BlockManager that registers with the driver's BlockManagerMaster. The BlockManager manages two storage levels: on-heap JVM memory (managed by the MemoryStore) and local disk (managed by the DiskStore). For remote data, the BlockTransferService (a Netty-based service) handles fetching blocks from other executors.
When you call rdd.persist(StorageLevel.MEMORY_AND_DISK), BlockManager stores the computed partitions. On subsequent accesses, the DAGScheduler skips recomputation for cached partitions — this is Spark's primary mechanism for iterative computation performance.
Adaptive Query Execution (AQE)
One of Spark's most powerful features is Adaptive Query Execution, which re-optimizes the physical plan at runtime based on actual data statistics:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala#L53-L75
flowchart TD
INITIAL["Initial Physical Plan"] --> WRAP["AdaptiveSparkPlanExec wraps plan"]
WRAP --> MATERIALIZE["Materialize shuffle stages"]
MATERIALIZE --> STATS["Collect runtime statistics<br/>(actual partition sizes)"]
STATS --> REOPT["Re-optimize remaining plan"]
REOPT --> DECIDE{More stages?}
DECIDE -->|Yes| MATERIALIZE
DECIDE -->|No| FINAL["Execute final stage"]
REOPT --> COALESCE["CoalesceShufflePartitions<br/>(reduce small partitions)"]
REOPT --> JOINSWITCH["DynamicJoinSelection<br/>(switch join strategy)"]
REOPT --> SKEW["OptimizeSkewedJoin<br/>(split skewed partitions)"]
AQE works by intercepting the execution at shuffle boundaries. When a shuffle stage completes, its actual output statistics become available — including partition sizes, which are far more accurate than the catalog estimates used during static optimization. AQE then re-runs a subset of optimizer rules on the remaining plan.
Three key optimizations AQE performs:
-
Coalesce Shuffle Partitions: Merges many small partitions into fewer, larger ones. This is crucial because the default 200 shuffle partitions often produces too many small tasks.
-
Dynamic Join Selection: If a join input turns out to be much smaller than estimated, AQE can switch from a sort-merge join to a more efficient broadcast hash join at runtime.
-
Skew Join Handling: When one partition is disproportionately large (data skew), AQE splits it into multiple smaller partitions and replicates the other side of the join to match.
Tip: AQE is enabled by default since Spark 3.2 (
spark.sql.adaptive.enabled=true). If you're still manually tuningspark.sql.shuffle.partitions, AQE's coalesce feature can handle this automatically. Setspark.sql.adaptive.coalescePartitions.enabled=trueand remove your manual partition count.
What's Next
We've traced a query from SQL text through Catalyst optimization to distributed execution across a cluster. In the final article, we'll explore the Spark Connect architecture — the gRPC-based client-server model that decouples applications from the Spark runtime — and the extensibility mechanisms that let you customize every layer of the system.