Read OSS

启动序列:SparkContext、SparkEnv 与调度栈

高级

前置知识

  • 第 1 篇:架构与模块全景
  • 理解 Scala 的 lazy val 与初始化模式
  • 熟悉 RPC 与序列化的基本概念

启动序列:SparkContext、SparkEnv 与调度栈

当你的应用调用 SparkSession.builder().getOrCreate() 时,水面之下已经悄然启动了一套复杂的初始化流程。各组件必须按照严格的顺序启动——listener bus 先于 environment,environment 先于 scheduler,scheduler 先于 heartbeat。一旦顺序出错,等待你的将是难以排查的空指针异常或死锁。

本文将完整追踪这一启动序列,从 SparkSubmit 调用你的 main() 开始,直到 executor 完成注册、准备好接收任务为止。无论是排查启动故障、编写 Spark plugin,还是参与核心引擎的开发,理解这一序列都是必不可少的基础。

SparkSubmit:从 CLI 到应用启动

正如第 1 篇所述,spark-submit 最终会调用 SparkSubmit.doSubmit(),而真正承担核心逻辑的方法是 prepareSubmitEnvironment()

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L243-L267

这个超过 500 行的方法通过整型枚举模式处理四种集群管理器:

val clusterManager: Int = args.maybeMaster match {
  case Some(v) => v match {
    case "yarn" => YARN
    case m if m.startsWith("spark") => STANDALONE
    case m if SparkMasterRegex.isK8s(m) => KUBERNETES
    case m if m.startsWith("local") => LOCAL
    case _ => error(...)
  }
  case None => LOCAL
}

针对不同的集群管理器,该方法会组装不同的 classpath、改写主类(例如在 cluster 模式下将其包装进 YARN 的 ApplicationMaster),并解析远程依赖 JAR。最终返回一个四元组:子进程参数、classpath 条目、SparkConf,以及要调用的主类名称。

实际的启动操作发生在 runMain() 中——它通过 Java 反射调用用户的 main() 方法,将控制权交给应用代码。随后,应用代码调用 SparkSession.builder().getOrCreate(),进而创建 SparkContext

SparkContext 初始化序列

Spark 启动流程的核心,全部集中在 SparkContext 的一个 try 块里:

core/src/main/scala/org/apache/spark/SparkContext.scala#L409-L649

这段 240 行的代码以严格规定的顺序完成所有初始化。一旦发生异常,catch 块会调用 stop() 清理已部分初始化的状态。下面是主要的关键节点:

sequenceDiagram
    participant SC as SparkContext
    participant LB as LiveListenerBus
    participant ENV as SparkEnv
    participant HB as HeartbeatReceiver
    participant TS as TaskScheduler
    participant DAG as DAGScheduler
    participant BM as BlockManager

    SC->>SC: Validate SparkConf
    SC->>LB: Create LiveListenerBus
    SC->>SC: Create AppStatusStore
    SC->>ENV: createSparkEnv(conf, isLocal, listenerBus)
    SC->>HB: Register HeartbeatReceiver endpoint
    SC->>SC: Initialize PluginContainer
    SC->>ENV: initializeShuffleManager()
    SC->>ENV: initializeMemoryManager()
    SC->>TS: createTaskScheduler(master)
    SC->>DAG: new DAGScheduler(this)
    SC->>HB: Notify TaskSchedulerIsSet
    SC->>TS: start()
    SC->>BM: initialize(applicationId)

以下是各关键顺序约束(附源码中的注释说明):

  1. LiveListenerBus 最先创建(第 492 行):需要在 SparkEnv 之前创建,以便捕获包括环境创建在内的所有事件。

  2. SparkEnv 先于 HeartbeatReceiver(第 501 行):HeartbeatReceiver 依赖 SparkEnv 提供的 RPC 层。

  3. HeartbeatReceiver 先于 TaskScheduler(第 588–591 行):"We need to register HeartbeatReceiver before createTaskScheduler because Executor will retrieve HeartbeatReceiver in the constructor. (SPARK-6640)"

  4. DAGScheduler 在 TaskScheduler 之后创建(第 603 行):DAGScheduler 的构造函数会将自身注册到 TaskScheduler,从而建立向上回调关系。

  5. TaskScheduler.start() 在 DAGScheduler 之后调用(第 629 行):"start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's constructor."

  6. BlockManager.initialize() 在获取 applicationId 之后调用(第 650 行):BlockManager 在向 driver 注册时需要用到 applicationId。

提示: 如果 Spark 启动时出现 NullPointerException,根本原因几乎都是初始化顺序问题——某个组件在其依赖项完成初始化之前就被访问了。这个 try 块中的注释是排查此类问题的最佳指南。

SparkEnv:运行时容器模式

SparkEnv 是一个上帝对象(God Object)——这是有意为之的设计。它持有 driver 和 executor 都需要的所有运行时服务的引用:

core/src/main/scala/org/apache/spark/SparkEnv.scala#L61-L73

class SparkEnv (
    val executorId: String,
    private[spark] val rpcEnv: RpcEnv,
    val serializer: Serializer,
    val closureSerializer: Serializer,
    val serializerManager: SerializerManager,
    val mapOutputTracker: MapOutputTracker,
    val broadcastManager: BroadcastManager,
    val blockManager: BlockManager,
    val securityManager: SecurityManager,
    val metricsSystem: MetricsSystem,
    val outputCommitCoordinator: OutputCommitCoordinator,
    val conf: SparkConf)
graph TB
    subgraph SparkEnv["SparkEnv (Runtime Container)"]
        RPC[RpcEnv<br/>Netty-based RPC]
        SER[Serializer<br/>Kryo/Java]
        MOT[MapOutputTracker<br/>Shuffle metadata]
        BM[BlockManager<br/>Storage layer]
        MM[MemoryManager<br/>On/off-heap allocation]
        SM[ShuffleManager<br/>Sort-based shuffle]
        BC[BroadcastManager<br/>Variable distribution]
        MET[MetricsSystem<br/>Codahale metrics]
    end

SparkEnv 的设计有两点值得关注:

Driver 与 Executor 的差异化创建: 伴生对象分别提供了 createDriverEnv()createExecutorEnv() 两个方法,两者最终都委托给私有的 create() 方法,但传入的参数不同。Driver 会获得一个 LiveListenerBus,而 executor 则没有。MapOutputTracker 也有所区别:driver 使用 MapOutputTrackerMaster(负责追踪所有 shuffle 输出位置),executor 则使用 MapOutputTrackerWorker(负责从 driver 获取 shuffle 位置信息)。

延迟初始化: 注意 ShuffleManagerMemoryManager 并不在构造函数中初始化,而是在 SparkEnv 创建完成之后才进行初始化。ShuffleManager 之所以延迟,是因为用户 JAR 包中可能定义了自定义实现,而这些 JAR 在初始环境创建阶段尚未加入 classpath。MemoryManager 之所以延迟,是因为 driver plugin(在 SparkEnv 之后加载)可能会覆盖 executor 的内存配置。

配置系统:SparkConf 与 ConfigEntry

在深入调度机制之前,有必要先了解配置是如何在系统中流转的。Spark 采用两层配置机制:

core/src/main/scala/org/apache/spark/SparkConf.scala#L38-L56

对外暴露的是基于字符串的 API:conf.get("spark.executor.memory")。但在内部,Spark 定义了带类型的 ConfigEntry[T] 对象,提供类型安全保证、文档说明、默认值和参数校验。ReadOnlySparkConf trait 同时暴露了这两种方式:

trait ReadOnlySparkConf {
  def get(key: String): String
  private[spark] def get[T](entry: ConfigEntry[T]): T
}
classDiagram
    class ReadOnlySparkConf {
        <<trait>>
        +get(key: String): String
        +get(entry: ConfigEntry~T~): T
    }
    class SparkConf {
        -settings: ConcurrentHashMap
        +set(key: String, value: String)
        +clone(): SparkConf
        +validateSettings()
    }
    ReadOnlySparkConf <|-- SparkConf

ConfigEntry 系统(定义于 internal/config/)提供了类似 ConfigBuilder("spark.executor.memory").bytesConf(ByteUnit.MiB).createWithDefault(1024) 这样的类型化访问器,确保内存配置始终以字节为单位解析,具有合理的默认值,并在输入非法时给出清晰的错误提示。

配置传播通过克隆实现:SparkContext 在第 410 行克隆 SparkConf,executor 则接收一份序列化后的副本。初始化完成后对 driver 配置所做的修改,不会同步到正在运行的 executor——这是一个常见的困惑来源。

两级调度栈

Spark 的调度架构分为两个层次,各自职责清晰:

flowchart TD
    JOB[User Action<br/>e.g., df.collect&#40;&#41;] --> DAG[DAGScheduler<br/>Stage-level scheduling]
    DAG -->|"Breaks RDD graph<br/>at shuffle boundaries"| STAGES[Stage 1, Stage 2, ...]
    STAGES -->|"Submits TaskSet<br/>per stage"| TS[TaskScheduler<br/>Task-level scheduling]
    TS -->|"Assigns tasks<br/>to executors"| SB[SchedulerBackend]
    SB --> EX1[Executor 1]
    SB --> EX2[Executor 2]
    SB --> EX3[Executor N]

DAGSchedulercore/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala)工作在 stage 层面。当 collect() 等 action 触发一个 job 时,DAGScheduler 会沿 RDD 依赖图反向遍历,在每个 ShuffleDependency 处划定 stage 边界。仅包含窄依赖的 stage 可以将多个操作流水线化。DAGScheduler 按依赖顺序依次提交 TaskSet(每个 stage 对应一个),并负责处理 stage 级别的故障(例如 shuffle 输出丢失时的重试)。

TaskSchedulercore/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala#L36-L55)工作在 task 层面。它接收来自 DAGScheduler 的 TaskSet,在兼顾数据本地性偏好、资源约束以及调度策略(FIFO 或 Fair)的前提下,将单个 task 分发给 executor。它还负责 task 级别的重试,以及对慢节点的推测执行。

SchedulerBackend 是 TaskScheduler 与实际集群之间的抽象层。不同的集群管理器对应不同的 backend 实现,负责资源申请和 executor 的生命周期管理:

ExternalClusterManagercore/src/main/scala/org/apache/spark/scheduler/ExternalClusterManager.scala#L25-L62)是接入 YARN、Kubernetes 及其他集群管理器的 SPI 接口。它定义了三个方法:canCreate() 用于匹配 master URL,createTaskScheduler() 用于提供 TaskScheduler,createSchedulerBackend() 用于提供 SchedulerBackend。实现类通过 Java 的 ServiceLoader 机制发现,因此添加新的集群管理器无需修改 Spark 核心代码。

提示: 两级调度的分层设计意味着 stage 级别的故障(shuffle 数据丢失)与 task 级别的故障(OOM、网络超时)有着不同的处理路径。如果看到 "FetchFailedException",这是由 DAGScheduler 处理的 stage 级故障;如果看到 "TaskKilled" 或 "ExecutorLostFailure",则是由 TaskScheduler 处理的 task 级故障。

下一步

至此,我们已经完整了解了 Spark 从 shell 脚本到 executor 就绪的整个启动过程。那么,当你执行 spark.sql("SELECT * FROM t WHERE x > 10") 时,背后又发生了什么?下一篇文章将深入 Catalyst 查询处理流水线——包括核心的 TreeNode 抽象、RuleExecutor 框架,以及从 SQL 文本到优化后物理计划的完整旅程。