从计划到执行:RDD、Stage、Task 与 Shuffle
前置知识
- ›第 1–3 篇(架构、启动流程、Catalyst Pipeline)
- ›了解分区数据与基于哈希/排序的 Shuffle 机制
- ›熟悉 DAG(有向无环图)的基本概念
从计划到执行:RDD、Stage、Task 与 Shuffle
上一篇文章中,我们跟随一条 SQL 查询走完了 Catalyst 优化器的完整 pipeline,最终得到了一个物理 SparkPlan。接下来才是真正的考验:将这个计划转化为跨集群的分布式计算。这里是逻辑变换的优雅世界与网络分区、数据倾斜、磁盘 I/O 等现实问题正面交锋的地方。
本文将覆盖完整的执行链路:从 SparkPlan.execute() 生成 RDD,到 DAGScheduler 的 Stage 划分算法,再到在 Stage 之间搬运数据的 Shuffle 系统、BlockManager 的分布式存储,最后是自适应查询执行(AQE)的运行时再优化。这是 Spark 分布式计算引擎的核心所在。
SparkPlan.execute() 与 RDD 的桥接
Spark SQL 与底层分布式引擎之间的桥梁,只是一次方法调用:
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L197-L202
final def execute(): RDD[InternalRow] = executeQuery {
if (isCanonicalizedPlan) {
throw SparkException.internalError("A canonicalized plan is not supposed to be executed.")
}
executeRDD.get
}
每个具体的 SparkPlan 子类都会覆写 doExecute() 来生成 RDD[InternalRow]:
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L328-L332
flowchart TD
PLAN["SparkPlan tree<br/>(physical plan)"] --> EXEC["execute()"]
EXEC --> RDD["RDD[InternalRow]<br/>(lazy)"]
RDD -->|"Action<br/>e.g. collect()"| DAG["DAGScheduler.submitJob()"]
DAG --> STAGES["Stage graph"]
STAGES --> TASKS["TaskSets → Executors"]
关键点在于:这一切都是惰性的。调用 execute() 本身不会触发任何计算,它只是构建了一张 RDD 的血缘图。真正的计算只有在行动算子(如 collect()、count() 或 saveAsTable())触发 SparkContext.runJob() 时才会启动,此时最终的 RDD 才会被提交给 DAGScheduler。
RDD 抽象:五大属性
RDD 由五个属性定义,Scaladoc 中对此有明确说明:
core/src/main/scala/org/apache/spark/rdd/RDD.scala#L69-L78
这五个属性在抽象类中以可覆写的方法形式呈现:
core/src/main/scala/org/apache/spark/rdd/RDD.scala#L116-L139
classDiagram
class RDD~T~ {
<<abstract>>
+compute(split, context): Iterator~T~
#getPartitions: Array[Partition]
#getDependencies: Seq[Dependency]
#getPreferredLocations(split): Seq[String]
+partitioner: Option[Partitioner]
}
note for RDD "1. Partitions list\n2. Compute function\n3. Dependencies\n4. Optional Partitioner\n5. Preferred locations"
| 属性 | 方法 | 作用 |
|---|---|---|
| 分区列表 | getPartitions |
每个分片对应一个 Task |
| 计算函数 | compute(split, context) |
单个分区的实际计算逻辑 |
| 依赖关系 | getDependencies |
指向父 RDD,构成血缘图 |
| 分区器 | partitioner |
对于键值对 RDD:数据的分区方式(哈希或范围) |
| 首选位置 | getPreferredLocations |
数据本地性提示(如 HDFS Block 位置) |
调度器只需这五个方法即可工作。DAGScheduler 通过 getDependencies 构建 Stage 图,通过 getPartitions 确定并行度,通过 getPreferredLocations 决定 Task 的放置位置。计算函数则被序列化后发送到 Executor 执行。正是这个精简的契约,使得 RDD 能够表示从 HDFS 读取到 Shuffle 聚合的任意计算。
窄依赖与 Shuffle 依赖
理解 Spark 执行模型,最重要的概念就是 NarrowDependency 与 ShuffleDependency 的区别:
core/src/main/scala/org/apache/spark/Dependency.scala#L41-L95
flowchart LR
subgraph "Narrow Dependency (pipelined)"
P1A[Parent\nPartition 1] --> C1A[Child\nPartition 1]
P2A[Parent\nPartition 2] --> C2A[Child\nPartition 2]
P3A[Parent\nPartition 3] --> C3A[Child\nPartition 3]
end
subgraph "Shuffle Dependency (stage boundary)"
P1B[Parent\nPartition 1] --> C1B[Child\nPartition 1]
P1B --> C2B[Child\nPartition 2]
P2B[Parent\nPartition 2] --> C1B
P2B --> C2B
P3B[Parent\nPartition 3] --> C1B
P3B --> C2B
end
NarrowDependency(窄依赖):每个子分区只依赖固定数量的父分区。map、filter、union 等操作产生窄依赖。这类依赖可以流水线化执行——在进程内完成,无需将中间数据写入磁盘。
ShuffleDependency(Shuffle 依赖):每个子分区可能依赖所有父分区。groupByKey、reduceByKey、join 等操作产生 Shuffle 依赖。这类依赖需要经历一次 Shuffle——在 Map 端将数据写入磁盘,再在 Reduce 端通过网络读取。每一个 Shuffle 依赖都会产生一个 Stage 边界。
提示: Shuffle 边界的数量直接决定了 Stage 的数量,进而决定了同步点的数量——在每个同步点,所有 Task 都必须完成才能进入下一个 Stage。减少 Shuffle 次数,是 Spark 中单项收益最高的优化手段。
DAGScheduler:从 RDD 图构建 Stage
当行动算子触发一个 Job 时,DAGScheduler 会沿着 RDD 依赖图反向遍历,逐步构建 Stage。整个算法从 createResultStage() 开始:
flowchart BT
RDD1["RDD: textFile()"] -->|narrow| RDD2["RDD: map()"]
RDD2 -->|narrow| RDD3["RDD: filter()"]
RDD3 -->|"shuffle<br/>(stage boundary)"| RDD4["RDD: reduceByKey()"]
RDD4 -->|narrow| RDD5["RDD: map()"]
RDD5 -->|"ACTION: collect()"| RESULT[ResultStage]
subgraph Stage0["ShuffleMapStage 0"]
RDD1
RDD2
RDD3
end
subgraph Stage1["ResultStage 1"]
RDD4
RDD5
end
核心方法是 getShuffleDependenciesAndResourceProfiles():遍历 RDD 的依赖关系时,窄依赖会被直接穿透(保持在同一个 Stage 内),而 Shuffle 依赖则被收集为 Stage 边界。该方法使用显式栈而非递归,以避免在 RDD 血缘较深时出现 StackOverflowError。
Stage 分为两种类型:
ShuffleMapStage:负责生成 Shuffle 输出文件。由createShuffleMapStage()创建,并向MapOutputTracker注册,供下游 Stage 定位 Shuffle 数据。ResultStage:最终 Stage,负责计算行动算子的结果。其 Task 执行用户定义的结果函数(如收集行数据),并将结果返回给 Driver。
Task 执行:序列化、分发与运行
Stage 创建完成后,DAGScheduler 将 TaskSet 提交给 TaskScheduler。每个 TaskSet 包含该 Stage 中每个分区对应的 Task——要么是 ShuffleMapTask(写入 Shuffle 输出),要么是 ResultTask(产生最终结果)。
sequenceDiagram
participant DAG as DAGScheduler
participant TS as TaskSchedulerImpl
participant SB as SchedulerBackend
participant EX as Executor
DAG->>TS: submitTasks(TaskSet)
TS->>TS: Create TaskSetManager
TS->>SB: reviveOffers()
SB->>TS: resourceOffers(workers)
TS->>TS: Match tasks to workers (locality)
TS-->>SB: TaskDescriptions
SB->>EX: LaunchTask(serialized task)
EX->>EX: Deserialize + run task
EX-->>SB: StatusUpdate(finished/failed)
SB-->>TS: statusUpdate
TS-->>DAG: taskEnded / taskFailed
TaskScheduler 在分配 Task 时会考虑数据本地性,优先将 Task 调度到持有输入数据的节点上(本地性优先级:PROCESS_LOCAL > NODE_LOCAL > RACK_LOCAL > ANY)。在回退到较低本地性级别之前,调度器会等待一段可配置的时间。
Task 序列化也值得关注:Task 的闭包(包括 RDD 的计算函数)会使用 Spark 的闭包序列化器进行序列化。这意味着闭包中捕获的所有变量都必须是可序列化的——这是 NotSerializableException 的常见根因。
Shuffle 系统
Shuffle 是 Spark 最复杂、也是对性能影响最大的子系统。ShuffleManager trait 定义了可插拔的接口:
private[spark] trait ShuffleManager {
def registerShuffle[K, V, C](shuffleId: Int, dependency: ShuffleDependency[K, V, C]): ShuffleHandle
def getWriter[K, V](handle: ShuffleHandle, mapId: Long, context: TaskContext, ...): ShuffleWriter[K, V]
def getReader[K, C](handle: ShuffleHandle, startMapIndex: Int, ...): ShuffleReader[K, C]
}
flowchart LR
subgraph "Map Side (ShuffleMapTask)"
DATA[Partition Data] --> SORT[SortShuffleWriter]
SORT --> FILE[Data File + Index File]
end
subgraph "Shuffle Service"
FILE --> RESOLVER[IndexShuffleBlockResolver]
end
subgraph "Reduce Side (ResultTask)"
RESOLVER --> READER[BlockStoreShuffleReader]
READER --> MERGE[Merge sorted streams]
MERGE --> OUTPUT[Shuffled Partition]
end
默认的 SortShuffleManager 会将数据按分区排序后写入数据文件,并附带一个索引文件。IndexShuffleBlockResolver 负责管理磁盘上的文件布局,支持对 Shuffle 文件中特定分区的高效随机访问。在读取端,BlockStoreShuffleReader 从远端 Executor(或 External Shuffle Service)拉取数据块,并合并排好序的数据流。
BlockManager:分布式存储
BlockManager 是分布式存储层,负责管理所有数据块——包括 Shuffle 数据、广播变量和已缓存的 RDD:
每个 Executor 都运行一个 BlockManager,并向 Driver 端的 BlockManagerMaster 注册。BlockManager 管理两种存储介质:JVM 堆内存(由 MemoryStore 管理)和本地磁盘(由 DiskStore 管理)。对于远端数据,BlockTransferService(基于 Netty 的服务)负责从其他 Executor 拉取数据块。
当你调用 rdd.persist(StorageLevel.MEMORY_AND_DISK) 时,BlockManager 会存储已计算的分区。后续访问时,DAGScheduler 会跳过已缓存分区的重新计算——这是 Spark 支持迭代计算高性能的核心机制。
自适应查询执行(AQE)
Spark 最强大的特性之一是自适应查询执行(AQE)——它能够根据运行时收集到的真实数据统计信息,对物理计划进行动态再优化:
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala#L53-L75
flowchart TD
INITIAL["Initial Physical Plan"] --> WRAP["AdaptiveSparkPlanExec wraps plan"]
WRAP --> MATERIALIZE["Materialize shuffle stages"]
MATERIALIZE --> STATS["Collect runtime statistics<br/>(actual partition sizes)"]
STATS --> REOPT["Re-optimize remaining plan"]
REOPT --> DECIDE{More stages?}
DECIDE -->|Yes| MATERIALIZE
DECIDE -->|No| FINAL["Execute final stage"]
REOPT --> COALESCE["CoalesceShufflePartitions<br/>(reduce small partitions)"]
REOPT --> JOINSWITCH["DynamicJoinSelection<br/>(switch join strategy)"]
REOPT --> SKEW["OptimizeSkewedJoin<br/>(split skewed partitions)"]
AQE 的工作方式是在 Shuffle 边界处介入执行流程。当一个 Shuffle Stage 完成后,其实际输出统计信息便可获取——包括各分区的大小,这比静态优化阶段依赖 catalog 估算的数值要精准得多。AQE 随即对剩余计划重新运行一部分优化规则。
AQE 执行的三项关键优化:
-
合并 Shuffle 分区(Coalesce Shuffle Partitions):将大量小分区合并为数量更少、体量更大的分区。默认的 200 个 Shuffle 分区往往会产生过多的小 Task,这项优化至关重要。
-
动态 Join 策略切换(Dynamic Join Selection):如果某个 Join 输入的实际大小远小于估算值,AQE 可以在运行时将 Sort-Merge Join 切换为效率更高的 Broadcast Hash Join。
-
数据倾斜 Join 处理(Skew Join Handling):当某个分区数据量异常偏大(数据倾斜)时,AQE 会将其拆分为多个较小的分区,并复制 Join 另一侧的对应数据来匹配。
提示: 自 Spark 3.2 起,AQE 默认开启(
spark.sql.adaptive.enabled=true)。如果你还在手动调整spark.sql.shuffle.partitions,AQE 的分区合并功能完全可以自动处理这件事。只需启用spark.sql.adaptive.coalescePartitions.enabled=true,就可以移除手动设置的分区数了。
下一步
至此,我们已经完整追踪了一条查询从 SQL 文本,经过 Catalyst 优化,到跨集群分布式执行的全链路。在最后一篇文章中,我们将探索 Spark Connect 架构——这个基于 gRPC 的客户端-服务端模型将应用程序与 Spark 运行时解耦——以及允许你定制系统各个层次的扩展机制。