The Boot Sequence: SparkContext, SparkEnv, and the Scheduling Stack
Prerequisites
- ›Article 1: Architecture and Module Map
- ›Understanding of Scala lazy vals and initialization patterns
- ›Familiarity with RPC and serialization concepts
The Boot Sequence: SparkContext, SparkEnv, and the Scheduling Stack
When your application calls SparkSession.builder().getOrCreate(), a complex initialization sequence kicks off beneath the surface. Components must start in a precise order — the listener bus before the environment, the environment before the scheduler, the scheduler before the heartbeat. Get it wrong, and you'll see cryptic null pointer exceptions or deadlocks.
This article traces the entire boot sequence, from the moment SparkSubmit invokes your main() to the point where executors are registered and ready to run tasks. Understanding this sequence is essential for debugging startup failures, writing Spark plugins, or contributing to the core engine.
SparkSubmit: From CLI to Application Launch
As we saw in Part 1, spark-submit ultimately calls SparkSubmit.doSubmit(). The method that does the real work is prepareSubmitEnvironment():
core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L243-L267
This 500+ line method handles four cluster manager types through an integer enum pattern:
val clusterManager: Int = args.maybeMaster match {
case Some(v) => v match {
case "yarn" => YARN
case m if m.startsWith("spark") => STANDALONE
case m if SparkMasterRegex.isK8s(m) => KUBERNETES
case m if m.startsWith("local") => LOCAL
case _ => error(...)
}
case None => LOCAL
}
For each cluster manager, it assembles a different classpath, rewrites the main class (e.g., wrapping it in a YARN ApplicationMaster for cluster mode), and resolves remote dependency JARs. The method returns a 4-tuple: child arguments, classpath entries, SparkConf, and the main class to invoke.
The actual launch happens in runMain(), which uses Java reflection to call the user's main() method — and at that point, control passes to your application code. Your code then calls SparkSession.builder().getOrCreate(), which creates a SparkContext.
SparkContext Initialization Sequence
The heart of Spark's boot process lives in a single try block inside SparkContext:
core/src/main/scala/org/apache/spark/SparkContext.scala#L409-L649
This 240-line block initializes everything in a carefully ordered sequence. If anything fails, the catch block calls stop() to clean up partially initialized state. Let's walk through the major milestones:
sequenceDiagram
participant SC as SparkContext
participant LB as LiveListenerBus
participant ENV as SparkEnv
participant HB as HeartbeatReceiver
participant TS as TaskScheduler
participant DAG as DAGScheduler
participant BM as BlockManager
SC->>SC: Validate SparkConf
SC->>LB: Create LiveListenerBus
SC->>SC: Create AppStatusStore
SC->>ENV: createSparkEnv(conf, isLocal, listenerBus)
SC->>HB: Register HeartbeatReceiver endpoint
SC->>SC: Initialize PluginContainer
SC->>ENV: initializeShuffleManager()
SC->>ENV: initializeMemoryManager()
SC->>TS: createTaskScheduler(master)
SC->>DAG: new DAGScheduler(this)
SC->>HB: Notify TaskSchedulerIsSet
SC->>TS: start()
SC->>BM: initialize(applicationId)
Key ordering constraints (with inline comments from the source):
-
LiveListenerBus first (line 492): Created before SparkEnv so it captures all events, including environment creation.
-
SparkEnv before HeartbeatReceiver (line 501): The environment provides the RPC layer that HeartbeatReceiver needs.
-
HeartbeatReceiver before TaskScheduler (lines 588-591): "We need to register HeartbeatReceiver before createTaskScheduler because Executor will retrieve HeartbeatReceiver in the constructor. (SPARK-6640)"
-
DAGScheduler after TaskScheduler (line 603): The DAGScheduler's constructor sets itself on the TaskScheduler, establishing the upcall relationship.
-
TaskScheduler.start() after DAGScheduler (line 629): "start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's constructor."
-
BlockManager.initialize() after applicationId (line 650): Block manager needs the application ID for registration with the driver.
Tip: If you see
NullPointerExceptionduring Spark startup, the root cause is almost always an ordering violation — something was accessed before its dependency was initialized. The comments in thistryblock are your best debugging guide.
SparkEnv: The Runtime Container Pattern
SparkEnv is a God Object — deliberately so. It holds references to every runtime service that both drivers and executors need:
core/src/main/scala/org/apache/spark/SparkEnv.scala#L61-L73
class SparkEnv (
val executorId: String,
private[spark] val rpcEnv: RpcEnv,
val serializer: Serializer,
val closureSerializer: Serializer,
val serializerManager: SerializerManager,
val mapOutputTracker: MapOutputTracker,
val broadcastManager: BroadcastManager,
val blockManager: BlockManager,
val securityManager: SecurityManager,
val metricsSystem: MetricsSystem,
val outputCommitCoordinator: OutputCommitCoordinator,
val conf: SparkConf)
graph TB
subgraph SparkEnv["SparkEnv (Runtime Container)"]
RPC[RpcEnv<br/>Netty-based RPC]
SER[Serializer<br/>Kryo/Java]
MOT[MapOutputTracker<br/>Shuffle metadata]
BM[BlockManager<br/>Storage layer]
MM[MemoryManager<br/>On/off-heap allocation]
SM[ShuffleManager<br/>Sort-based shuffle]
BC[BroadcastManager<br/>Variable distribution]
MET[MetricsSystem<br/>Codahale metrics]
end
Two things are notable about SparkEnv's design:
Driver vs. Executor creation: The companion object provides createDriverEnv() and createExecutorEnv(). Both delegate to a private create() method but with different parameters — the driver gets a LiveListenerBus, while the executor does not. The MapOutputTracker is also different: the driver gets MapOutputTrackerMaster (which tracks all shuffle output locations), while executors get MapOutputTrackerWorker (which fetches shuffle locations from the driver).
Deferred initialization: Note that ShuffleManager and MemoryManager are initialized after SparkEnv creation, not in the constructor. The ShuffleManager is deferred because user JARs might define custom implementations, and those JARs aren't on the classpath during initial environment creation. The MemoryManager is deferred because driver plugins (loaded after SparkEnv) can override executor memory configurations.
Configuration System: SparkConf and ConfigEntry
Before diving deeper into scheduling, it's worth understanding how configuration flows through the system. Spark uses a two-tier configuration approach:
core/src/main/scala/org/apache/spark/SparkConf.scala#L38-L56
The public API is string-based: conf.get("spark.executor.memory"). But internally, Spark defines typed ConfigEntry[T] objects that provide type safety, documentation, default values, and validation. The ReadOnlySparkConf trait exposes both:
trait ReadOnlySparkConf {
def get(key: String): String
private[spark] def get[T](entry: ConfigEntry[T]): T
}
classDiagram
class ReadOnlySparkConf {
<<trait>>
+get(key: String): String
+get(entry: ConfigEntry~T~): T
}
class SparkConf {
-settings: ConcurrentHashMap
+set(key: String, value: String)
+clone(): SparkConf
+validateSettings()
}
ReadOnlySparkConf <|-- SparkConf
The ConfigEntry system (defined in internal/config/) provides typed accessors like ConfigBuilder("spark.executor.memory").bytesConf(ByteUnit.MiB).createWithDefault(1024). This ensures that memory configuration is always parsed as bytes, has a sensible default, and produces clear error messages on invalid input.
Configuration propagation works by cloning: SparkContext clones the SparkConf at line 410, and executors receive a serialized copy. Changes to the driver's conf after initialization do not propagate to running executors — this is a common source of confusion.
The Two-Level Scheduling Stack
Spark's scheduling architecture has two distinct levels, each with a well-defined responsibility:
flowchart TD
JOB[User Action<br/>e.g., df.collect()] --> DAG[DAGScheduler<br/>Stage-level scheduling]
DAG -->|"Breaks RDD graph<br/>at shuffle boundaries"| STAGES[Stage 1, Stage 2, ...]
STAGES -->|"Submits TaskSet<br/>per stage"| TS[TaskScheduler<br/>Task-level scheduling]
TS -->|"Assigns tasks<br/>to executors"| SB[SchedulerBackend]
SB --> EX1[Executor 1]
SB --> EX2[Executor 2]
SB --> EX3[Executor N]
DAGScheduler (core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala) operates at the stage level. When an action like collect() triggers a job, the DAGScheduler walks backward through the RDD lineage graph. At each ShuffleDependency, it creates a stage boundary. Stages with only narrow dependencies can pipeline operations together. The DAGScheduler submits TaskSets — one per stage — in dependency order, and handles stage-level failures (retrying when shuffle output is lost).
TaskScheduler (core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala#L36-L55) operates at the task level. It receives TaskSets from the DAGScheduler and distributes individual tasks to executors, respecting data locality preferences, resource constraints, and scheduling policies (FIFO or Fair). It handles task-level retries and speculative execution for stragglers.
The SchedulerBackend is the abstraction between the TaskScheduler and the actual cluster. For different cluster managers, different backends handle resource acquisition and executor lifecycle:
ExternalClusterManager (core/src/main/scala/org/apache/spark/scheduler/ExternalClusterManager.scala#L25-L62) is the SPI that plugs in YARN, Kubernetes, and other cluster managers. It defines three methods: canCreate() to match a master URL, createTaskScheduler() to provide a TaskScheduler, and createSchedulerBackend() to provide a SchedulerBackend. Implementations are discovered via Java's ServiceLoader, so new cluster managers can be added without modifying Spark's core.
Tip: The two-level split means stage-level failures (shuffle data lost) are handled differently from task-level failures (OOM, network timeout). If you see "FetchFailedException," that's a stage-level failure handled by DAGScheduler. If you see "TaskKilled" or "ExecutorLostFailure," that's task-level, handled by TaskScheduler.
What's Next
We've seen how Spark boots up, from shell script to running executors. But what happens when you write spark.sql("SELECT * FROM t WHERE x > 10")? In the next article, we'll dive into the Catalyst query pipeline — the foundational TreeNode abstraction, the RuleExecutor framework, and the complete journey from SQL text to an optimized physical plan.