Read OSS

从计划到执行:RDD、Stage、Task 与 Shuffle

高级

前置知识

  • 第 1–3 篇(架构、启动流程、Catalyst Pipeline)
  • 了解分区数据与基于哈希/排序的 Shuffle 机制
  • 熟悉 DAG(有向无环图)的基本概念

从计划到执行:RDD、Stage、Task 与 Shuffle

上一篇文章中,我们跟随一条 SQL 查询走完了 Catalyst 优化器的完整 pipeline,最终得到了一个物理 SparkPlan。接下来才是真正的考验:将这个计划转化为跨集群的分布式计算。这里是逻辑变换的优雅世界与网络分区、数据倾斜、磁盘 I/O 等现实问题正面交锋的地方。

本文将覆盖完整的执行链路:从 SparkPlan.execute() 生成 RDD,到 DAGScheduler 的 Stage 划分算法,再到在 Stage 之间搬运数据的 Shuffle 系统、BlockManager 的分布式存储,最后是自适应查询执行(AQE)的运行时再优化。这是 Spark 分布式计算引擎的核心所在。

SparkPlan.execute() 与 RDD 的桥接

Spark SQL 与底层分布式引擎之间的桥梁,只是一次方法调用:

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L197-L202

final def execute(): RDD[InternalRow] = executeQuery {
  if (isCanonicalizedPlan) {
    throw SparkException.internalError("A canonicalized plan is not supposed to be executed.")
  }
  executeRDD.get
}

每个具体的 SparkPlan 子类都会覆写 doExecute() 来生成 RDD[InternalRow]

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L328-L332

flowchart TD
    PLAN["SparkPlan tree<br/>(physical plan)"] --> EXEC["execute()"]
    EXEC --> RDD["RDD[InternalRow]<br/>(lazy)"]
    RDD -->|"Action<br/>e.g. collect()"| DAG["DAGScheduler.submitJob()"]
    DAG --> STAGES["Stage graph"]
    STAGES --> TASKS["TaskSets → Executors"]

关键点在于:这一切都是惰性的。调用 execute() 本身不会触发任何计算,它只是构建了一张 RDD 的血缘图。真正的计算只有在行动算子(如 collect()count()saveAsTable())触发 SparkContext.runJob() 时才会启动,此时最终的 RDD 才会被提交给 DAGScheduler。

RDD 抽象:五大属性

RDD 由五个属性定义,Scaladoc 中对此有明确说明:

core/src/main/scala/org/apache/spark/rdd/RDD.scala#L69-L78

这五个属性在抽象类中以可覆写的方法形式呈现:

core/src/main/scala/org/apache/spark/rdd/RDD.scala#L116-L139

classDiagram
    class RDD~T~ {
        <<abstract>>
        +compute(split, context): Iterator~T~
        #getPartitions: Array[Partition]
        #getDependencies: Seq[Dependency]
        #getPreferredLocations(split): Seq[String]
        +partitioner: Option[Partitioner]
    }

    note for RDD "1. Partitions list\n2. Compute function\n3. Dependencies\n4. Optional Partitioner\n5. Preferred locations"
属性 方法 作用
分区列表 getPartitions 每个分片对应一个 Task
计算函数 compute(split, context) 单个分区的实际计算逻辑
依赖关系 getDependencies 指向父 RDD,构成血缘图
分区器 partitioner 对于键值对 RDD:数据的分区方式(哈希或范围)
首选位置 getPreferredLocations 数据本地性提示(如 HDFS Block 位置)

调度器只需这五个方法即可工作。DAGScheduler 通过 getDependencies 构建 Stage 图,通过 getPartitions 确定并行度,通过 getPreferredLocations 决定 Task 的放置位置。计算函数则被序列化后发送到 Executor 执行。正是这个精简的契约,使得 RDD 能够表示从 HDFS 读取到 Shuffle 聚合的任意计算。

窄依赖与 Shuffle 依赖

理解 Spark 执行模型,最重要的概念就是 NarrowDependencyShuffleDependency 的区别:

core/src/main/scala/org/apache/spark/Dependency.scala#L41-L95

flowchart LR
    subgraph "Narrow Dependency (pipelined)"
        P1A[Parent\nPartition 1] --> C1A[Child\nPartition 1]
        P2A[Parent\nPartition 2] --> C2A[Child\nPartition 2]
        P3A[Parent\nPartition 3] --> C3A[Child\nPartition 3]
    end

    subgraph "Shuffle Dependency (stage boundary)"
        P1B[Parent\nPartition 1] --> C1B[Child\nPartition 1]
        P1B --> C2B[Child\nPartition 2]
        P2B[Parent\nPartition 2] --> C1B
        P2B --> C2B
        P3B[Parent\nPartition 3] --> C1B
        P3B --> C2B
    end

NarrowDependency(窄依赖):每个子分区只依赖固定数量的父分区。mapfilterunion 等操作产生窄依赖。这类依赖可以流水线化执行——在进程内完成,无需将中间数据写入磁盘。

ShuffleDependency(Shuffle 依赖):每个子分区可能依赖所有父分区。groupByKeyreduceByKeyjoin 等操作产生 Shuffle 依赖。这类依赖需要经历一次 Shuffle——在 Map 端将数据写入磁盘,再在 Reduce 端通过网络读取。每一个 Shuffle 依赖都会产生一个 Stage 边界

提示: Shuffle 边界的数量直接决定了 Stage 的数量,进而决定了同步点的数量——在每个同步点,所有 Task 都必须完成才能进入下一个 Stage。减少 Shuffle 次数,是 Spark 中单项收益最高的优化手段。

DAGScheduler:从 RDD 图构建 Stage

当行动算子触发一个 Job 时,DAGScheduler 会沿着 RDD 依赖图反向遍历,逐步构建 Stage。整个算法从 createResultStage() 开始:

flowchart BT
    RDD1["RDD: textFile()"] -->|narrow| RDD2["RDD: map()"]
    RDD2 -->|narrow| RDD3["RDD: filter()"]
    RDD3 -->|"shuffle<br/>(stage boundary)"| RDD4["RDD: reduceByKey()"]
    RDD4 -->|narrow| RDD5["RDD: map()"]
    RDD5 -->|"ACTION: collect()"| RESULT[ResultStage]

    subgraph Stage0["ShuffleMapStage 0"]
        RDD1
        RDD2
        RDD3
    end

    subgraph Stage1["ResultStage 1"]
        RDD4
        RDD5
    end

核心方法是 getShuffleDependenciesAndResourceProfiles():遍历 RDD 的依赖关系时,窄依赖会被直接穿透(保持在同一个 Stage 内),而 Shuffle 依赖则被收集为 Stage 边界。该方法使用显式栈而非递归,以避免在 RDD 血缘较深时出现 StackOverflowError

Stage 分为两种类型:

  • ShuffleMapStage:负责生成 Shuffle 输出文件。由 createShuffleMapStage() 创建,并向 MapOutputTracker 注册,供下游 Stage 定位 Shuffle 数据。
  • ResultStage:最终 Stage,负责计算行动算子的结果。其 Task 执行用户定义的结果函数(如收集行数据),并将结果返回给 Driver。

Task 执行:序列化、分发与运行

Stage 创建完成后,DAGScheduler 将 TaskSet 提交给 TaskScheduler。每个 TaskSet 包含该 Stage 中每个分区对应的 Task——要么是 ShuffleMapTask(写入 Shuffle 输出),要么是 ResultTask(产生最终结果)。

sequenceDiagram
    participant DAG as DAGScheduler
    participant TS as TaskSchedulerImpl
    participant SB as SchedulerBackend
    participant EX as Executor

    DAG->>TS: submitTasks(TaskSet)
    TS->>TS: Create TaskSetManager
    TS->>SB: reviveOffers()
    SB->>TS: resourceOffers(workers)
    TS->>TS: Match tasks to workers (locality)
    TS-->>SB: TaskDescriptions
    SB->>EX: LaunchTask(serialized task)
    EX->>EX: Deserialize + run task
    EX-->>SB: StatusUpdate(finished/failed)
    SB-->>TS: statusUpdate
    TS-->>DAG: taskEnded / taskFailed

TaskScheduler 在分配 Task 时会考虑数据本地性,优先将 Task 调度到持有输入数据的节点上(本地性优先级:PROCESS_LOCAL > NODE_LOCAL > RACK_LOCAL > ANY)。在回退到较低本地性级别之前,调度器会等待一段可配置的时间。

Task 序列化也值得关注:Task 的闭包(包括 RDD 的计算函数)会使用 Spark 的闭包序列化器进行序列化。这意味着闭包中捕获的所有变量都必须是可序列化的——这是 NotSerializableException 的常见根因。

Shuffle 系统

Shuffle 是 Spark 最复杂、也是对性能影响最大的子系统。ShuffleManager trait 定义了可插拔的接口:

private[spark] trait ShuffleManager {
  def registerShuffle[K, V, C](shuffleId: Int, dependency: ShuffleDependency[K, V, C]): ShuffleHandle
  def getWriter[K, V](handle: ShuffleHandle, mapId: Long, context: TaskContext, ...): ShuffleWriter[K, V]
  def getReader[K, C](handle: ShuffleHandle, startMapIndex: Int, ...): ShuffleReader[K, C]
}
flowchart LR
    subgraph "Map Side (ShuffleMapTask)"
        DATA[Partition Data] --> SORT[SortShuffleWriter]
        SORT --> FILE[Data File + Index File]
    end

    subgraph "Shuffle Service"
        FILE --> RESOLVER[IndexShuffleBlockResolver]
    end

    subgraph "Reduce Side (ResultTask)"
        RESOLVER --> READER[BlockStoreShuffleReader]
        READER --> MERGE[Merge sorted streams]
        MERGE --> OUTPUT[Shuffled Partition]
    end

默认的 SortShuffleManager 会将数据按分区排序后写入数据文件,并附带一个索引文件。IndexShuffleBlockResolver 负责管理磁盘上的文件布局,支持对 Shuffle 文件中特定分区的高效随机访问。在读取端,BlockStoreShuffleReader 从远端 Executor(或 External Shuffle Service)拉取数据块,并合并排好序的数据流。

BlockManager:分布式存储

BlockManager 是分布式存储层,负责管理所有数据块——包括 Shuffle 数据、广播变量和已缓存的 RDD:

每个 Executor 都运行一个 BlockManager,并向 Driver 端的 BlockManagerMaster 注册。BlockManager 管理两种存储介质:JVM 堆内存(由 MemoryStore 管理)和本地磁盘(由 DiskStore 管理)。对于远端数据,BlockTransferService(基于 Netty 的服务)负责从其他 Executor 拉取数据块。

当你调用 rdd.persist(StorageLevel.MEMORY_AND_DISK) 时,BlockManager 会存储已计算的分区。后续访问时,DAGScheduler 会跳过已缓存分区的重新计算——这是 Spark 支持迭代计算高性能的核心机制。

自适应查询执行(AQE)

Spark 最强大的特性之一是自适应查询执行(AQE)——它能够根据运行时收集到的真实数据统计信息,对物理计划进行动态再优化:

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala#L53-L75

flowchart TD
    INITIAL["Initial Physical Plan"] --> WRAP["AdaptiveSparkPlanExec wraps plan"]
    WRAP --> MATERIALIZE["Materialize shuffle stages"]
    MATERIALIZE --> STATS["Collect runtime statistics<br/>(actual partition sizes)"]
    STATS --> REOPT["Re-optimize remaining plan"]
    REOPT --> DECIDE{More stages?}
    DECIDE -->|Yes| MATERIALIZE
    DECIDE -->|No| FINAL["Execute final stage"]

    REOPT --> COALESCE["CoalesceShufflePartitions<br/>(reduce small partitions)"]
    REOPT --> JOINSWITCH["DynamicJoinSelection<br/>(switch join strategy)"]
    REOPT --> SKEW["OptimizeSkewedJoin<br/>(split skewed partitions)"]

AQE 的工作方式是在 Shuffle 边界处介入执行流程。当一个 Shuffle Stage 完成后,其实际输出统计信息便可获取——包括各分区的大小,这比静态优化阶段依赖 catalog 估算的数值要精准得多。AQE 随即对剩余计划重新运行一部分优化规则。

AQE 执行的三项关键优化:

  1. 合并 Shuffle 分区(Coalesce Shuffle Partitions):将大量小分区合并为数量更少、体量更大的分区。默认的 200 个 Shuffle 分区往往会产生过多的小 Task,这项优化至关重要。

  2. 动态 Join 策略切换(Dynamic Join Selection):如果某个 Join 输入的实际大小远小于估算值,AQE 可以在运行时将 Sort-Merge Join 切换为效率更高的 Broadcast Hash Join。

  3. 数据倾斜 Join 处理(Skew Join Handling):当某个分区数据量异常偏大(数据倾斜)时,AQE 会将其拆分为多个较小的分区,并复制 Join 另一侧的对应数据来匹配。

提示: 自 Spark 3.2 起,AQE 默认开启(spark.sql.adaptive.enabled=true)。如果你还在手动调整 spark.sql.shuffle.partitions,AQE 的分区合并功能完全可以自动处理这件事。只需启用 spark.sql.adaptive.coalescePartitions.enabled=true,就可以移除手动设置的分区数了。

下一步

至此,我们已经完整追踪了一条查询从 SQL 文本,经过 Catalyst 优化,到跨集群分布式执行的全链路。在最后一篇文章中,我们将探索 Spark Connect 架构——这个基于 gRPC 的客户端-服务端模型将应用程序与 Spark 运行时解耦——以及允许你定制系统各个层次的扩展机制。