启动序列:SparkContext、SparkEnv 与调度栈
前置知识
- ›第 1 篇:架构与模块全景
- ›理解 Scala 的 lazy val 与初始化模式
- ›熟悉 RPC 与序列化的基本概念
启动序列:SparkContext、SparkEnv 与调度栈
当你的应用调用 SparkSession.builder().getOrCreate() 时,水面之下已经悄然启动了一套复杂的初始化流程。各组件必须按照严格的顺序启动——listener bus 先于 environment,environment 先于 scheduler,scheduler 先于 heartbeat。一旦顺序出错,等待你的将是难以排查的空指针异常或死锁。
本文将完整追踪这一启动序列,从 SparkSubmit 调用你的 main() 开始,直到 executor 完成注册、准备好接收任务为止。无论是排查启动故障、编写 Spark plugin,还是参与核心引擎的开发,理解这一序列都是必不可少的基础。
SparkSubmit:从 CLI 到应用启动
正如第 1 篇所述,spark-submit 最终会调用 SparkSubmit.doSubmit(),而真正承担核心逻辑的方法是 prepareSubmitEnvironment():
core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L243-L267
这个超过 500 行的方法通过整型枚举模式处理四种集群管理器:
val clusterManager: Int = args.maybeMaster match {
case Some(v) => v match {
case "yarn" => YARN
case m if m.startsWith("spark") => STANDALONE
case m if SparkMasterRegex.isK8s(m) => KUBERNETES
case m if m.startsWith("local") => LOCAL
case _ => error(...)
}
case None => LOCAL
}
针对不同的集群管理器,该方法会组装不同的 classpath、改写主类(例如在 cluster 模式下将其包装进 YARN 的 ApplicationMaster),并解析远程依赖 JAR。最终返回一个四元组:子进程参数、classpath 条目、SparkConf,以及要调用的主类名称。
实际的启动操作发生在 runMain() 中——它通过 Java 反射调用用户的 main() 方法,将控制权交给应用代码。随后,应用代码调用 SparkSession.builder().getOrCreate(),进而创建 SparkContext。
SparkContext 初始化序列
Spark 启动流程的核心,全部集中在 SparkContext 的一个 try 块里:
core/src/main/scala/org/apache/spark/SparkContext.scala#L409-L649
这段 240 行的代码以严格规定的顺序完成所有初始化。一旦发生异常,catch 块会调用 stop() 清理已部分初始化的状态。下面是主要的关键节点:
sequenceDiagram
participant SC as SparkContext
participant LB as LiveListenerBus
participant ENV as SparkEnv
participant HB as HeartbeatReceiver
participant TS as TaskScheduler
participant DAG as DAGScheduler
participant BM as BlockManager
SC->>SC: Validate SparkConf
SC->>LB: Create LiveListenerBus
SC->>SC: Create AppStatusStore
SC->>ENV: createSparkEnv(conf, isLocal, listenerBus)
SC->>HB: Register HeartbeatReceiver endpoint
SC->>SC: Initialize PluginContainer
SC->>ENV: initializeShuffleManager()
SC->>ENV: initializeMemoryManager()
SC->>TS: createTaskScheduler(master)
SC->>DAG: new DAGScheduler(this)
SC->>HB: Notify TaskSchedulerIsSet
SC->>TS: start()
SC->>BM: initialize(applicationId)
以下是各关键顺序约束(附源码中的注释说明):
-
LiveListenerBus 最先创建(第 492 行):需要在 SparkEnv 之前创建,以便捕获包括环境创建在内的所有事件。
-
SparkEnv 先于 HeartbeatReceiver(第 501 行):HeartbeatReceiver 依赖 SparkEnv 提供的 RPC 层。
-
HeartbeatReceiver 先于 TaskScheduler(第 588–591 行):"We need to register HeartbeatReceiver before createTaskScheduler because Executor will retrieve HeartbeatReceiver in the constructor. (SPARK-6640)"
-
DAGScheduler 在 TaskScheduler 之后创建(第 603 行):DAGScheduler 的构造函数会将自身注册到 TaskScheduler,从而建立向上回调关系。
-
TaskScheduler.start() 在 DAGScheduler 之后调用(第 629 行):"start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's constructor."
-
BlockManager.initialize() 在获取 applicationId 之后调用(第 650 行):BlockManager 在向 driver 注册时需要用到 applicationId。
提示: 如果 Spark 启动时出现
NullPointerException,根本原因几乎都是初始化顺序问题——某个组件在其依赖项完成初始化之前就被访问了。这个try块中的注释是排查此类问题的最佳指南。
SparkEnv:运行时容器模式
SparkEnv 是一个上帝对象(God Object)——这是有意为之的设计。它持有 driver 和 executor 都需要的所有运行时服务的引用:
core/src/main/scala/org/apache/spark/SparkEnv.scala#L61-L73
class SparkEnv (
val executorId: String,
private[spark] val rpcEnv: RpcEnv,
val serializer: Serializer,
val closureSerializer: Serializer,
val serializerManager: SerializerManager,
val mapOutputTracker: MapOutputTracker,
val broadcastManager: BroadcastManager,
val blockManager: BlockManager,
val securityManager: SecurityManager,
val metricsSystem: MetricsSystem,
val outputCommitCoordinator: OutputCommitCoordinator,
val conf: SparkConf)
graph TB
subgraph SparkEnv["SparkEnv (Runtime Container)"]
RPC[RpcEnv<br/>Netty-based RPC]
SER[Serializer<br/>Kryo/Java]
MOT[MapOutputTracker<br/>Shuffle metadata]
BM[BlockManager<br/>Storage layer]
MM[MemoryManager<br/>On/off-heap allocation]
SM[ShuffleManager<br/>Sort-based shuffle]
BC[BroadcastManager<br/>Variable distribution]
MET[MetricsSystem<br/>Codahale metrics]
end
SparkEnv 的设计有两点值得关注:
Driver 与 Executor 的差异化创建: 伴生对象分别提供了 createDriverEnv() 和 createExecutorEnv() 两个方法,两者最终都委托给私有的 create() 方法,但传入的参数不同。Driver 会获得一个 LiveListenerBus,而 executor 则没有。MapOutputTracker 也有所区别:driver 使用 MapOutputTrackerMaster(负责追踪所有 shuffle 输出位置),executor 则使用 MapOutputTrackerWorker(负责从 driver 获取 shuffle 位置信息)。
延迟初始化: 注意 ShuffleManager 和 MemoryManager 并不在构造函数中初始化,而是在 SparkEnv 创建完成之后才进行初始化。ShuffleManager 之所以延迟,是因为用户 JAR 包中可能定义了自定义实现,而这些 JAR 在初始环境创建阶段尚未加入 classpath。MemoryManager 之所以延迟,是因为 driver plugin(在 SparkEnv 之后加载)可能会覆盖 executor 的内存配置。
配置系统:SparkConf 与 ConfigEntry
在深入调度机制之前,有必要先了解配置是如何在系统中流转的。Spark 采用两层配置机制:
core/src/main/scala/org/apache/spark/SparkConf.scala#L38-L56
对外暴露的是基于字符串的 API:conf.get("spark.executor.memory")。但在内部,Spark 定义了带类型的 ConfigEntry[T] 对象,提供类型安全保证、文档说明、默认值和参数校验。ReadOnlySparkConf trait 同时暴露了这两种方式:
trait ReadOnlySparkConf {
def get(key: String): String
private[spark] def get[T](entry: ConfigEntry[T]): T
}
classDiagram
class ReadOnlySparkConf {
<<trait>>
+get(key: String): String
+get(entry: ConfigEntry~T~): T
}
class SparkConf {
-settings: ConcurrentHashMap
+set(key: String, value: String)
+clone(): SparkConf
+validateSettings()
}
ReadOnlySparkConf <|-- SparkConf
ConfigEntry 系统(定义于 internal/config/)提供了类似 ConfigBuilder("spark.executor.memory").bytesConf(ByteUnit.MiB).createWithDefault(1024) 这样的类型化访问器,确保内存配置始终以字节为单位解析,具有合理的默认值,并在输入非法时给出清晰的错误提示。
配置传播通过克隆实现:SparkContext 在第 410 行克隆 SparkConf,executor 则接收一份序列化后的副本。初始化完成后对 driver 配置所做的修改,不会同步到正在运行的 executor——这是一个常见的困惑来源。
两级调度栈
Spark 的调度架构分为两个层次,各自职责清晰:
flowchart TD
JOB[User Action<br/>e.g., df.collect()] --> DAG[DAGScheduler<br/>Stage-level scheduling]
DAG -->|"Breaks RDD graph<br/>at shuffle boundaries"| STAGES[Stage 1, Stage 2, ...]
STAGES -->|"Submits TaskSet<br/>per stage"| TS[TaskScheduler<br/>Task-level scheduling]
TS -->|"Assigns tasks<br/>to executors"| SB[SchedulerBackend]
SB --> EX1[Executor 1]
SB --> EX2[Executor 2]
SB --> EX3[Executor N]
DAGScheduler(core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala)工作在 stage 层面。当 collect() 等 action 触发一个 job 时,DAGScheduler 会沿 RDD 依赖图反向遍历,在每个 ShuffleDependency 处划定 stage 边界。仅包含窄依赖的 stage 可以将多个操作流水线化。DAGScheduler 按依赖顺序依次提交 TaskSet(每个 stage 对应一个),并负责处理 stage 级别的故障(例如 shuffle 输出丢失时的重试)。
TaskScheduler(core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala#L36-L55)工作在 task 层面。它接收来自 DAGScheduler 的 TaskSet,在兼顾数据本地性偏好、资源约束以及调度策略(FIFO 或 Fair)的前提下,将单个 task 分发给 executor。它还负责 task 级别的重试,以及对慢节点的推测执行。
SchedulerBackend 是 TaskScheduler 与实际集群之间的抽象层。不同的集群管理器对应不同的 backend 实现,负责资源申请和 executor 的生命周期管理:
ExternalClusterManager(core/src/main/scala/org/apache/spark/scheduler/ExternalClusterManager.scala#L25-L62)是接入 YARN、Kubernetes 及其他集群管理器的 SPI 接口。它定义了三个方法:canCreate() 用于匹配 master URL,createTaskScheduler() 用于提供 TaskScheduler,createSchedulerBackend() 用于提供 SchedulerBackend。实现类通过 Java 的 ServiceLoader 机制发现,因此添加新的集群管理器无需修改 Spark 核心代码。
提示: 两级调度的分层设计意味着 stage 级别的故障(shuffle 数据丢失)与 task 级别的故障(OOM、网络超时)有着不同的处理路径。如果看到 "FetchFailedException",这是由 DAGScheduler 处理的 stage 级故障;如果看到 "TaskKilled" 或 "ExecutorLostFailure",则是由 TaskScheduler 处理的 task 级故障。
下一步
至此,我们已经完整了解了 Spark 从 shell 脚本到 executor 就绪的整个启动过程。那么,当你执行 spark.sql("SELECT * FROM t WHERE x > 10") 时,背后又发生了什么?下一篇文章将深入 Catalyst 查询处理流水线——包括核心的 TreeNode 抽象、RuleExecutor 框架,以及从 SQL 文本到优化后物理计划的完整旅程。