起動シーケンス:SparkContext、SparkEnv、そしてスケジューリングスタック
前提知識
- ›第1回:アーキテクチャとモジュールマップ
- ›Scala の lazy val と初期化パターンの理解
- ›RPC とシリアライゼーションの基本知識
起動シーケンス:SparkContext、SparkEnv、そしてスケジューリングスタック
アプリケーションが SparkSession.builder().getOrCreate() を呼び出した瞬間、その裏側では複雑な初期化シーケンスが始まります。各コンポーネントは決められた順序で起動しなければなりません。リスナーバスが環境より先、環境がスケジューラより先、スケジューラがハートビートより先です。この順序を誤ると、原因不明の NullPointerException やデッドロックが発生します。
この記事では、SparkSubmit がアプリケーションの main() を呼び出す瞬間から、エグゼキュータが登録されてタスクを実行できる状態になるまでの起動シーケンス全体を追います。この流れを理解しておくと、起動時の障害デバッグ、Spark プラグインの開発、コアエンジンへのコントリビューションに大いに役立ちます。
SparkSubmit:CLI からアプリケーション起動まで
第1回で見たとおり、spark-submit は最終的に SparkSubmit.doSubmit() を呼び出します。実際の処理を担うのは prepareSubmitEnvironment() メソッドです。
core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L243-L267
500行を超えるこのメソッドは、整数の enum パターンで4種類のクラスタマネージャーに対応しています。
val clusterManager: Int = args.maybeMaster match {
case Some(v) => v match {
case "yarn" => YARN
case m if m.startsWith("spark") => STANDALONE
case m if SparkMasterRegex.isK8s(m) => KUBERNETES
case m if m.startsWith("local") => LOCAL
case _ => error(...)
}
case None => LOCAL
}
クラスタマネージャーごとに、異なるクラスパスを構築し、メインクラスを書き換え(たとえばクラスターモードでは YARN の ApplicationMaster でラップする)、リモートの依存 JAR を解決します。メソッドの戻り値は4要素のタプルで、子の引数、クラスパスエントリ、SparkConf、そして起動するメインクラスが返されます。
実際の起動は runMain() 内で Java リフレクションを使って行われ、ユーザーの main() メソッドが呼び出されます。これ以降はアプリケーションコードに制御が移り、SparkSession.builder().getOrCreate() を経て SparkContext が生成されます。
SparkContext の初期化シーケンス
Spark の起動処理の核心は、SparkContext 内のひとつの try ブロックに集約されています。
core/src/main/scala/org/apache/spark/SparkContext.scala#L409-L649
240行にわたるこのブロックが、すべてのコンポーネントを厳密な順序で初期化します。途中でエラーが発生した場合、catch ブロックが stop() を呼び出し、中途半端に初期化された状態をクリーンアップします。主要なマイルストーンを順に見ていきましょう。
sequenceDiagram
participant SC as SparkContext
participant LB as LiveListenerBus
participant ENV as SparkEnv
participant HB as HeartbeatReceiver
participant TS as TaskScheduler
participant DAG as DAGScheduler
participant BM as BlockManager
SC->>SC: Validate SparkConf
SC->>LB: Create LiveListenerBus
SC->>SC: Create AppStatusStore
SC->>ENV: createSparkEnv(conf, isLocal, listenerBus)
SC->>HB: Register HeartbeatReceiver endpoint
SC->>SC: Initialize PluginContainer
SC->>ENV: initializeShuffleManager()
SC->>ENV: initializeMemoryManager()
SC->>TS: createTaskScheduler(master)
SC->>DAG: new DAGScheduler(this)
SC->>HB: Notify TaskSchedulerIsSet
SC->>TS: start()
SC->>BM: initialize(applicationId)
ソースコードのコメントにも記載されている、重要な順序制約を確認しておきましょう。
-
LiveListenerBus を最初に(492行目):環境の生成を含むすべてのイベントをキャプチャできるよう、SparkEnv より先に生成します。
-
SparkEnv を HeartbeatReceiver より先に(501行目):HeartbeatReceiver が必要とする RPC レイヤーは環境が提供します。
-
HeartbeatReceiver を TaskScheduler より先に(588〜591行目):"Executor はコンストラクタで HeartbeatReceiver を取得するため、createTaskScheduler の前に HeartbeatReceiver を登録する必要があります。(SPARK-6640)"
-
DAGScheduler を TaskScheduler の後に(603行目):DAGScheduler のコンストラクタが TaskScheduler に自分自身をセットし、アップコールの関係を確立します。
-
TaskScheduler.start() を DAGScheduler の後に(629行目):"DAGScheduler のコンストラクタが DAGScheduler への参照を taskScheduler にセットした後で TaskScheduler を起動します。"
-
BlockManager.initialize() をアプリケーション ID の取得後に(650行目):BlockManager はドライバへの登録にアプリケーション ID を必要とします。
ヒント: Spark 起動時に
NullPointerExceptionが発生する場合、根本原因はほぼ必ずこの順序の違反です。依存関係が初期化される前に何かにアクセスしようとしています。このtryブロック内のコメントが、デバッグの最良の手がかりになります。
SparkEnv:ランタイムコンテナパターン
SparkEnv はいわゆる God Object です。これは意図的な設計で、ドライバとエグゼキュータの両方が必要とするすべてのランタイムサービスへの参照を保持しています。
core/src/main/scala/org/apache/spark/SparkEnv.scala#L61-L73
class SparkEnv (
val executorId: String,
private[spark] val rpcEnv: RpcEnv,
val serializer: Serializer,
val closureSerializer: Serializer,
val serializerManager: SerializerManager,
val mapOutputTracker: MapOutputTracker,
val broadcastManager: BroadcastManager,
val blockManager: BlockManager,
val securityManager: SecurityManager,
val metricsSystem: MetricsSystem,
val outputCommitCoordinator: OutputCommitCoordinator,
val conf: SparkConf)
graph TB
subgraph SparkEnv["SparkEnv (Runtime Container)"]
RPC[RpcEnv<br/>Netty-based RPC]
SER[Serializer<br/>Kryo/Java]
MOT[MapOutputTracker<br/>Shuffle metadata]
BM[BlockManager<br/>Storage layer]
MM[MemoryManager<br/>On/off-heap allocation]
SM[ShuffleManager<br/>Sort-based shuffle]
BC[BroadcastManager<br/>Variable distribution]
MET[MetricsSystem<br/>Codahale metrics]
end
SparkEnv の設計には、特筆すべき点が2つあります。
ドライバとエグゼキュータで異なる生成方法: コンパニオンオブジェクトは createDriverEnv() と createExecutorEnv() の両方を提供しており、どちらもプライベートな create() メソッドに処理を委譲しますが、引数が異なります。ドライバには LiveListenerBus が渡されますが、エグゼキュータには渡されません。MapOutputTracker も異なります。ドライバはすべてのシャッフル出力場所を追跡する MapOutputTrackerMaster を持ち、エグゼキュータはドライバからシャッフル場所を取得する MapOutputTrackerWorker を持ちます。
遅延初期化: ShuffleManager と MemoryManager は SparkEnv のコンストラクタ内ではなく、生成後に初期化されます。ShuffleManager が遅延される理由は、ユーザーの JAR にカスタム実装が定義されている可能性があり、それらの JAR は初期環境生成時点ではクラスパスに含まれていないためです。MemoryManager が遅延される理由は、SparkEnv の後にロードされるドライバプラグインがエグゼキュータのメモリ設定を上書きできるようにするためです。
設定システム:SparkConf と ConfigEntry
スケジューリングの詳細に進む前に、設定がシステム全体にどのように流れるかを理解しておきましょう。Spark は二層の設定アプローチを採用しています。
core/src/main/scala/org/apache/spark/SparkConf.scala#L38-L56
公開 API は文字列ベースです(conf.get("spark.executor.memory"))。ただし内部では、型安全性・ドキュメント・デフォルト値・バリデーションを提供する型付きの ConfigEntry[T] オブジェクトが使われています。ReadOnlySparkConf トレイトはその両方を公開しています。
trait ReadOnlySparkConf {
def get(key: String): String
private[spark] def get[T](entry: ConfigEntry[T]): T
}
classDiagram
class ReadOnlySparkConf {
<<trait>>
+get(key: String): String
+get(entry: ConfigEntry~T~): T
}
class SparkConf {
-settings: ConcurrentHashMap
+set(key: String, value: String)
+clone(): SparkConf
+validateSettings()
}
ReadOnlySparkConf <|-- SparkConf
ConfigEntry システム(internal/config/ で定義)は、ConfigBuilder("spark.executor.memory").bytesConf(ByteUnit.MiB).createWithDefault(1024) のような型付きアクセサを提供します。これにより、メモリ設定が常にバイト単位で解析され、適切なデフォルト値を持ち、無効な入力に対して明確なエラーメッセージを出せるようになっています。
設定の伝播はクローンによって行われます。SparkContext は410行目で SparkConf をクローンし、エグゼキュータはシリアライズされたコピーを受け取ります。初期化後にドライバの設定を変更しても、実行中のエグゼキュータには反映されません。これはよくある混乱の原因のひとつです。
二層スケジューリングスタック
Spark のスケジューリングアーキテクチャには、明確な責務を持つ2つの層があります。
flowchart TD
JOB[User Action<br/>e.g., df.collect()] --> DAG[DAGScheduler<br/>Stage-level scheduling]
DAG -->|"Breaks RDD graph<br/>at shuffle boundaries"| STAGES[Stage 1, Stage 2, ...]
STAGES -->|"Submits TaskSet<br/>per stage"| TS[TaskScheduler<br/>Task-level scheduling]
TS -->|"Assigns tasks<br/>to executors"| SB[SchedulerBackend]
SB --> EX1[Executor 1]
SB --> EX2[Executor 2]
SB --> EX3[Executor N]
DAGScheduler(core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala)はステージレベルで動作します。collect() などのアクションがジョブをトリガーすると、DAGScheduler は RDD の系譜グラフを逆向きに辿ります。ShuffleDependency が見つかるたびにステージの境界を設けます。ナロー依存のみのステージは処理をパイプライン化できます。DAGScheduler は依存関係の順序に従って TaskSet(ステージごとに1つ)を送出し、シャッフル出力が失われた場合のステージレベルの障害に対応します。
TaskScheduler(core/src/main/scala/org/apache/spark/scheduler/TaskScheduler.scala#L36-L55)はタスクレベルで動作します。DAGScheduler から受け取った TaskSet をもとに、データローカリティの優先度・リソース制約・スケジューリングポリシー(FIFO または Fair)を考慮しながら、個々のタスクをエグゼキュータに割り当てます。タスクレベルのリトライや、処理の遅いタスクに対する投機的実行も担当します。
SchedulerBackend は TaskScheduler と実際のクラスターの間の抽象化層です。クラスタマネージャーごとに異なるバックエンドがリソースの取得とエグゼキュータのライフサイクルを管理します。
ExternalClusterManager は YARN や Kubernetes などのクラスタマネージャーを接続する SPI です。
core/src/main/scala/org/apache/spark/scheduler/ExternalClusterManager.scala#L25-L62
定義するメソッドは3つあります。マスター URL に対応するかを確認する canCreate()、TaskScheduler を提供する createTaskScheduler()、SchedulerBackend を提供する createSchedulerBackend() です。実装は Java の ServiceLoader で検出されるため、Spark のコアを変更せずに新しいクラスタマネージャーを追加できます。
ヒント: この二層構造により、ステージレベルの障害(シャッフルデータの喪失)とタスクレベルの障害(OOM、ネットワークタイムアウト)は異なる仕組みで処理されます。"FetchFailedException" が表示された場合はステージレベルの障害で DAGScheduler が対処します。"TaskKilled" や "ExecutorLostFailure" はタスクレベルの障害で、TaskScheduler が処理します。
次回予告
ここまでで、シェルスクリプトからエグゼキュータの起動に至るまでの Spark の起動プロセスを追ってきました。では spark.sql("SELECT * FROM t WHERE x > 10") と書いたとき、何が起こるのでしょうか?次回は Catalyst クエリパイプラインを掘り下げます。基盤となる TreeNode 抽象化、RuleExecutor フレームワーク、そして SQL テキストから最適化された物理プランに至るまでの全行程を解説します。