Read OSS

The Boot Sequence: SparkContext, SparkEnv, and the Scheduling Stack

Advanced

Prerequisites

  • Article 1: Architecture and Module Map
  • Understanding of Scala lazy vals and initialization patterns
  • Familiarity with RPC and serialization concepts

The Boot Sequence: SparkContext, SparkEnv, and the Scheduling Stack

When your application calls SparkSession.builder().getOrCreate(), a complex initialization sequence kicks off beneath the surface. Components must start in a precise order — the listener bus before the environment, the environment before the scheduler, the scheduler before the heartbeat. Get it wrong, and you'll see cryptic null pointer exceptions or deadlocks.

This article traces the entire boot sequence, from the moment SparkSubmit invokes your main() to the point where executors are registered and ready to run tasks. Understanding this sequence is essential for debugging startup failures, writing Spark plugins, or contributing to the core engine.

SparkSubmit: From CLI to Application Launch

As we saw in Part 1, spark-submit ultimately calls SparkSubmit.doSubmit(). The method that does the real work is prepareSubmitEnvironment():

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L243-L267

This 500+ line method handles four cluster manager types through an integer enum pattern:

val clusterManager: Int = args.maybeMaster match {
  case Some(v) => v match {
    case "yarn" => YARN
    case m if m.startsWith("spark") => STANDALONE
    case m if SparkMasterRegex.isK8s(m) => KUBERNETES
    case m if m.startsWith("local") => LOCAL
    case _ => error(...)
  }
  case None => LOCAL
}

For each cluster manager, it assembles a different classpath, rewrites the main class (e.g., wrapping it in a YARN ApplicationMaster for cluster mode), and resolves remote dependency JARs. The method returns a 4-tuple: child arguments, classpath entries, SparkConf, and the main class to invoke.

The actual launch happens in runMain(), which uses Java reflection to call the user's main() method — and at that point, control passes to your application code. Your code then calls SparkSession.builder().getOrCreate(), which creates a SparkContext.

SparkContext Initialization Sequence

The heart of Spark's boot process lives in a single try block inside SparkContext:

core/src/main/scala/org/apache/spark/SparkContext.scala#L409-L649

This 240-line block initializes everything in a carefully ordered sequence. If anything fails, the catch block calls stop() to clean up partially initialized state. Let's walk through the major milestones:

sequenceDiagram
    participant SC as SparkContext
    participant LB as LiveListenerBus
    participant ENV as SparkEnv
    participant HB as HeartbeatReceiver
    participant TS as TaskScheduler
    participant DAG as DAGScheduler
    participant BM as BlockManager

    SC->>SC: Validate SparkConf
    SC->>LB: Create LiveListenerBus
    SC->>SC: Create AppStatusStore
    SC->>ENV: createSparkEnv(conf, isLocal, listenerBus)
    SC->>HB: Register HeartbeatReceiver endpoint
    SC->>SC: Initialize PluginContainer
    SC->>ENV: initializeShuffleManager()
    SC->>ENV: initializeMemoryManager()
    SC->>TS: createTaskScheduler(master)
    SC->>DAG: new DAGScheduler(this)
    SC->>HB: Notify TaskSchedulerIsSet
    SC->>TS: start()
    SC->>BM: initialize(applicationId)

Key ordering constraints (with inline comments from the source):

  1. LiveListenerBus first (line 492): Created before SparkEnv so it captures all events, including environment creation.

  2. SparkEnv before HeartbeatReceiver (line 501): The environment provides the RPC layer that HeartbeatReceiver needs.

  3. HeartbeatReceiver before TaskScheduler (lines 588-591): "We need to register HeartbeatReceiver before createTaskScheduler because Executor will retrieve HeartbeatReceiver in the constructor. (SPARK-6640)"

  4. DAGScheduler after TaskScheduler (line 603): The DAGScheduler's constructor sets itself on the TaskScheduler, establishing the upcall relationship.

  5. TaskScheduler.start() after DAGScheduler (line 629): "start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's constructor."

  6. BlockManager.initialize() after applicationId (line 650): Block manager needs the application ID for registration with the driver.

Tip: If you see NullPointerException during Spark startup, the root cause is almost always an ordering violation — something was accessed before its dependency was initialized. The comments in this try block are your best debugging guide.

SparkEnv: The Runtime Container Pattern

SparkEnv is a God Object — deliberately so. It holds references to every runtime service that both drivers and executors need:

core/src/main/scala/org/apache/spark/SparkEnv.scala#L61-L73

class SparkEnv (
    val executorId: String,
    private[spark] val rpcEnv: RpcEnv,
    val serializer: Serializer,
    val closureSerializer: Serializer,
    val serializerManager: SerializerManager,
    val mapOutputTracker: MapOutputTracker,
    val broadcastManager: BroadcastManager,
    val blockManager: BlockManager,
    val securityManager: SecurityManager,
    val metricsSystem: MetricsSystem,
    val outputCommitCoordinator: OutputCommitCoordinator,
    val conf: SparkConf)
graph TB
    subgraph SparkEnv["SparkEnv (Runtime Container)"]
        RPC[RpcEnv<br/>Netty-based RPC]
        SER[Serializer<br/>Kryo/Java]
        MOT[MapOutputTracker<br/>Shuffle metadata]
        BM[BlockManager<br/>Storage layer]
        MM[MemoryManager<br/>On/off-heap allocation]
        SM[ShuffleManager<br/>Sort-based shuffle]
        BC[BroadcastManager<br/>Variable distribution]
        MET[MetricsSystem<br/>Codahale metrics]
    end

Two things are notable about SparkEnv's design:

Driver vs. Executor creation: The companion object provides createDriverEnv() and createExecutorEnv(). Both delegate to a private create() method but with different parameters — the driver gets a LiveListenerBus, while the executor does not. The MapOutputTracker is also different: the driver gets MapOutputTrackerMaster (which tracks all shuffle output locations), while executors get MapOutputTrackerWorker (which fetches shuffle locations from the driver).

Deferred initialization: Note that ShuffleManager and MemoryManager are initialized after SparkEnv creation, not in the constructor. The ShuffleManager is deferred because user JARs might define custom implementations, and those JARs aren't on the classpath during initial environment creation. The MemoryManager is deferred because driver plugins (loaded after SparkEnv) can override executor memory configurations.

Configuration System: SparkConf and ConfigEntry

Before diving deeper into scheduling, it's worth understanding how configuration flows through the system. Spark uses a two-tier configuration approach:

core/src/main/scala/org/apache/spark/SparkConf.scala#L38-L56

The public API is string-based: conf.get("spark.executor.memory"). But internally, Spark defines typed ConfigEntry[T] objects that provide type safety, documentation, default values, and validation. The ReadOnlySparkConf trait exposes both:

trait ReadOnlySparkConf {
  def get(key: String): String
  private[spark] def get[T](entry: ConfigEntry[T]): T
}
classDiagram
    class ReadOnlySparkConf {
        <<trait>>
        +get(key: String): String
        +get(entry: ConfigEntry~T~): T
    }
    class SparkConf {
        -settings: ConcurrentHashMap
        +set(key: String, value: String)
        +clone(): SparkConf
        +validateSettings()
    }
    ReadOnlySparkConf <|-- SparkConf

The ConfigEntry system (defined in internal/config/) provides typed accessors like ConfigBuilder("spark.executor.memory").bytesConf(ByteUnit.MiB).createWithDefault(1024). This ensures that memory configuration is always parsed as bytes, has a sensible default, and produces clear error messages on invalid input.

Configuration propagation works by cloning: SparkContext clones the SparkConf at line 410, and executors receive a serialized copy. Changes to the driver's conf after initialization do not propagate to running executors — this is a common source of confusion.

The Two-Level Scheduling Stack

Spark's scheduling architecture has two distinct levels, each with a well-defined responsibility:

flowchart TD
    JOB[User Action<br/>e.g., df.collect&#40;&#41;] --> DAG[DAGScheduler<br/>Stage-level scheduling]
    DAG -->|"Breaks RDD graph<br/>at shuffle boundaries"| STAGES[Stage 1, Stage 2, ...]
    STAGES -->|"Submits TaskSet<br/>per stage"| TS[TaskScheduler<br/>Task-level scheduling]
    TS -->|"Assigns tasks<br/>to executors"| SB[SchedulerBackend]
    SB --> EX1[Executor 1]
    SB --> EX2[Executor 2]
    SB --> EX3[Executor N]

DAGScheduler (core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala) operates at the stage level. When an action like collect() triggers a job, the DAGScheduler walks backward through the RDD lineage graph. At each ShuffleDependency, it creates a stage boundary. Stages with only narrow dependencies can pipeline operations together. The DAGScheduler submits TaskSets — one per stage — in dependency order, and handles stage-level failures (retrying when shuffle output is lost).

TaskScheduler (core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala#L36-L55) operates at the task level. It receives TaskSets from the DAGScheduler and distributes individual tasks to executors, respecting data locality preferences, resource constraints, and scheduling policies (FIFO or Fair). It handles task-level retries and speculative execution for stragglers.

The SchedulerBackend is the abstraction between the TaskScheduler and the actual cluster. For different cluster managers, different backends handle resource acquisition and executor lifecycle:

ExternalClusterManager (core/src/main/scala/org/apache/spark/scheduler/ExternalClusterManager.scala#L25-L62) is the SPI that plugs in YARN, Kubernetes, and other cluster managers. It defines three methods: canCreate() to match a master URL, createTaskScheduler() to provide a TaskScheduler, and createSchedulerBackend() to provide a SchedulerBackend. Implementations are discovered via Java's ServiceLoader, so new cluster managers can be added without modifying Spark's core.

Tip: The two-level split means stage-level failures (shuffle data lost) are handled differently from task-level failures (OOM, network timeout). If you see "FetchFailedException," that's a stage-level failure handled by DAGScheduler. If you see "TaskKilled" or "ExecutorLostFailure," that's task-level, handled by TaskScheduler.

What's Next

We've seen how Spark boots up, from shell script to running executors. But what happens when you write spark.sql("SELECT * FROM t WHERE x > 10")? In the next article, we'll dive into the Catalyst query pipeline — the foundational TreeNode abstraction, the RuleExecutor framework, and the complete journey from SQL text to an optimized physical plan.