計画から実行へ:RDD・ステージ・タスク・シャッフルの仕組み
前提知識
- ›第1〜3回(アーキテクチャ、起動シーケンス、Catalyst パイプライン)
- ›パーティション分割とハッシュ/ソートベースのシャッフルの基礎知識
- ›DAG(有向非巡回グラフ)の概念への理解
計画から実行へ:RDD・ステージ・タスク・シャッフルの仕組み
前回の記事では、SQL クエリが Catalyst のオプティマイザパイプラインを通過し、物理的な SparkPlan が生成されるまでを追いました。今回はいよいよ核心です。その計画をクラスター上の分散計算として実際に動かします。論理的な変換の世界を離れ、ネットワーク分断、データスキュー、ディスク I/O といった現実の問題と向き合う場面です。
この記事では、実行パス全体を追います。SparkPlan.execute() が RDD を生成するところから始まり、DAGScheduler によるステージ生成アルゴリズム、ステージ間でデータを移動するシャッフルシステム、BlockManager の分散ストレージを解説します。さらに、Adaptive Query Execution によるランタイム再最適化まで、Spark の分散コンピューティングエンジンの中枢を掘り下げます。
SparkPlan.execute() と RDD ブリッジ
Spark SQL とコア分散エンジンをつなぐ橋は、たった1つのメソッド呼び出しです。
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L197-L202
final def execute(): RDD[InternalRow] = executeQuery {
if (isCanonicalizedPlan) {
throw SparkException.internalError("A canonicalized plan is not supposed to be executed.")
}
executeRDD.get
}
具体的な SparkPlan の各サブクラスは doExecute() をオーバーライドし、RDD[InternalRow] を返します。
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L328-L332
flowchart TD
PLAN["SparkPlan tree<br/>(physical plan)"] --> EXEC["execute()"]
EXEC --> RDD["RDD[InternalRow]<br/>(lazy)"]
RDD -->|"Action<br/>e.g. collect()"| DAG["DAGScheduler.submitJob()"]
DAG --> STAGES["Stage graph"]
STAGES --> TASKS["TaskSets → Executors"]
重要なのは、これがすべて遅延評価である点です。execute() を呼んでも何も実行されません。RDD のリネージグラフを構築するだけです。実際の計算が始まるのは、collect()・count()・saveAsTable() などのアクションが SparkContext.runJob() を呼び出し、最終的な RDD が DAGScheduler に渡されたときです。
RDD の抽象化:5 つの特性
RDD は 5 つの特性によって定義されており、Scaladoc にも明示されています。
core/src/main/scala/org/apache/spark/rdd/RDD.scala#L69-L78
これら 5 つの特性は、抽象クラスのオーバーライドポイントとして実装されています。
core/src/main/scala/org/apache/spark/rdd/RDD.scala#L116-L139
classDiagram
class RDD~T~ {
<<abstract>>
+compute(split, context): Iterator~T~
#getPartitions: Array[Partition]
#getDependencies: Seq[Dependency]
#getPreferredLocations(split): Seq[String]
+partitioner: Option[Partitioner]
}
note for RDD "1. Partitions list\n2. Compute function\n3. Dependencies\n4. Optional Partitioner\n5. Preferred locations"
| 特性 | メソッド | 役割 |
|---|---|---|
| パーティション | getPartitions |
分割の一覧。それぞれが 1 つのタスクに対応する |
| コンピュート | compute(split, context) |
単一パーティションに対する実際の計算ロジック |
| 依存関係 | getDependencies |
親 RDD へのリンク。リネージグラフを構成する |
| パーティショナー | partitioner |
キーバリュー RDD におけるデータの分割方式(ハッシュ、レンジなど) |
| 優先ロケーション | getPreferredLocations |
データローカリティのヒント(例:HDFS ブロックの場所) |
スケジューラが必要とするのはこの 5 つのメソッドだけです。DAGScheduler は getDependencies でステージグラフを構築し、getPartitions で並列度を決定し、getPreferredLocations でタスクの配置を最適化します。そして compute 関数はシリアライズされてエグゼキューターに送られます。この最小限のインターフェースのおかげで、HDFS の読み取りからシャッフルを伴う集計まで、あらゆる処理を RDD で表現できます。
Narrow 依存と Shuffle 依存
NarrowDependency と ShuffleDependency の違いは、Spark の実行モデルを理解するうえで最も重要な概念です。
core/src/main/scala/org/apache/spark/Dependency.scala#L41-L95
flowchart LR
subgraph "Narrow Dependency (pipelined)"
P1A[Parent\nPartition 1] --> C1A[Child\nPartition 1]
P2A[Parent\nPartition 2] --> C2A[Child\nPartition 2]
P3A[Parent\nPartition 3] --> C3A[Child\nPartition 3]
end
subgraph "Shuffle Dependency (stage boundary)"
P1B[Parent\nPartition 1] --> C1B[Child\nPartition 1]
P1B --> C2B[Child\nPartition 2]
P2B[Parent\nPartition 2] --> C1B
P2B --> C2B
P3B[Parent\nPartition 3] --> C1B
P3B --> C2B
end
NarrowDependency:子パーティションが依存する親パーティションの数が少なく、固定されています。map・filter・union などの操作がこれにあたります。パイプライン処理が可能なため、中間データをディスクに書き出すことなくインプロセスで実行できます。
ShuffleDependency:子パーティションがすべての親パーティションに依存する可能性があります。groupByKey・reduceByKey・join などの操作がこれにあたります。データをマップ側でディスクに書き出し、リデュース側でネットワーク越しに読み込むシャッフルが必要です。シャッフル依存が発生するたびに、ステージの境界が生まれます。
ヒント: シャッフルの境界数がそのままステージ数になります。ステージ数は、次のステージが始まる前にすべてのタスクが完了しなければならない同期ポイントの数でもあります。シャッフルの削減は、Spark における最もインパクトの大きい最適化です。
DAGScheduler:RDD グラフからステージを構築する
アクションがジョブを発火させると、DAGScheduler は RDD の依存グラフを逆方向に辿ってステージを構築します。アルゴリズムは createResultStage() から始まります。
flowchart BT
RDD1["RDD: textFile()"] -->|narrow| RDD2["RDD: map()"]
RDD2 -->|narrow| RDD3["RDD: filter()"]
RDD3 -->|"shuffle<br/>(stage boundary)"| RDD4["RDD: reduceByKey()"]
RDD4 -->|narrow| RDD5["RDD: map()"]
RDD5 -->|"ACTION: collect()"| RESULT[ResultStage]
subgraph Stage0["ShuffleMapStage 0"]
RDD1
RDD2
RDD3
end
subgraph Stage1["ResultStage 1"]
RDD4
RDD5
end
中心となるのは getShuffleDependenciesAndResourceProfiles() です。このメソッドは RDD の依存関係を辿りながら、Narrow 依存は同じステージ内で通過させ、Shuffle 依存はステージ境界として収集します。深い RDD リネージで StackOverflowError が起きないよう、再帰ではなく明示的なスタックを使っています。
ステージには 2 種類あります。
ShuffleMapStage:シャッフル出力ファイルを生成します。createShuffleMapStage()で生成され、MapOutputTrackerに登録されることで、後続ステージがシャッフルデータの場所を特定できるようになります。ResultStage:アクションの結果を計算する最終ステージです。このタスクはユーザーの result 関数(例:行の収集)を実行し、結果をドライバーに返します。
タスクの実行:シリアライズ・転送・実行
ステージが生成されると、DAGScheduler は TaskSet を TaskScheduler に送信します。各 TaskSet には、ステージ内の全パーティション分のタスクが含まれています。ShuffleMapTask(シャッフル出力を書き込む)または ResultTask(最終結果を生成する)のいずれかです。
sequenceDiagram
participant DAG as DAGScheduler
participant TS as TaskSchedulerImpl
participant SB as SchedulerBackend
participant EX as Executor
DAG->>TS: submitTasks(TaskSet)
TS->>TS: Create TaskSetManager
TS->>SB: reviveOffers()
SB->>TS: resourceOffers(workers)
TS->>TS: Match tasks to workers (locality)
TS-->>SB: TaskDescriptions
SB->>EX: LaunchTask(serialized task)
EX->>EX: Deserialize + run task
EX-->>SB: StatusUpdate(finished/failed)
SB-->>TS: statusUpdate
TS-->>DAG: taskEnded / taskFailed
TaskScheduler はタスクを割り当てる際にデータローカリティを考慮します。入力データを持つノードでタスクを実行することを優先し(PROCESS_LOCAL > NODE_LOCAL > RACK_LOCAL > ANY)、設定した待機時間を超えた場合に限り、ローカリティの低い割り当てにフォールバックします。
タスクのシリアライズにも注目しましょう。タスクのクロージャ(RDD の compute 関数を含む)は Spark のクロージャシリアライザでシリアライズされます。つまり、クロージャにキャプチャされた変数はすべてシリアライズ可能でなければならず、これが NotSerializableException のよくある原因になります。
シャッフルシステム
シャッフルは Spark の中でも最も複雑でパフォーマンスに直結するサブシステムです。ShuffleManager トレイトは、プラグイン可能なインターフェースを定義しています。
private[spark] trait ShuffleManager {
def registerShuffle[K, V, C](shuffleId: Int, dependency: ShuffleDependency[K, V, C]): ShuffleHandle
def getWriter[K, V](handle: ShuffleHandle, mapId: Long, context: TaskContext, ...): ShuffleWriter[K, V]
def getReader[K, C](handle: ShuffleHandle, startMapIndex: Int, ...): ShuffleReader[K, C]
}
flowchart LR
subgraph "Map Side (ShuffleMapTask)"
DATA[Partition Data] --> SORT[SortShuffleWriter]
SORT --> FILE[Data File + Index File]
end
subgraph "Shuffle Service"
FILE --> RESOLVER[IndexShuffleBlockResolver]
end
subgraph "Reduce Side (ResultTask)"
RESOLVER --> READER[BlockStoreShuffleReader]
READER --> MERGE[Merge sorted streams]
MERGE --> OUTPUT[Shuffled Partition]
end
デフォルトの SortShuffleManager は、ソート済みのパーティション分割データファイルとインデックスファイルをセットで書き出します。IndexShuffleBlockResolver がディスク上のレイアウトを管理し、シャッフルファイル内の特定パーティションへの効率的なランダムアクセスを実現します。リーダー側では、BlockStoreShuffleReader がリモートエグゼキューター(または External Shuffle Service)からブロックを取得し、ソート済みストリームをマージします。
BlockManager:分散ストレージ
BlockManager は、シャッフルデータ・ブロードキャスト変数・キャッシュされた RDD など、あらゆるデータブロックを管理する分散ストレージ層です。
各エグゼキューターは BlockManager を持ち、ドライバーの BlockManagerMaster に登録します。BlockManager は 2 つのストレージレベルを管理しています。MemoryStore が担うヒープ上の JVM メモリと、DiskStore が担うローカルディスクです。リモートデータの取得には、Netty ベースの BlockTransferService が他のエグゼキューターへのブロック転送を処理します。
rdd.persist(StorageLevel.MEMORY_AND_DISK) を呼ぶと、BlockManager が計算済みのパーティションを保存します。次回以降のアクセス時、DAGScheduler はキャッシュ済みパーティションの再計算をスキップします。これが、Spark の反復計算におけるパフォーマンスの主な要因です。
Adaptive Query Execution(AQE)
Spark の強力な機能の一つが Adaptive Query Execution です。実際のデータ統計に基づいて、物理プランをランタイムで再最適化します。
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala#L53-L75
flowchart TD
INITIAL["Initial Physical Plan"] --> WRAP["AdaptiveSparkPlanExec wraps plan"]
WRAP --> MATERIALIZE["Materialize shuffle stages"]
MATERIALIZE --> STATS["Collect runtime statistics<br/>(actual partition sizes)"]
STATS --> REOPT["Re-optimize remaining plan"]
REOPT --> DECIDE{More stages?}
DECIDE -->|Yes| MATERIALIZE
DECIDE -->|No| FINAL["Execute final stage"]
REOPT --> COALESCE["CoalesceShufflePartitions<br/>(reduce small partitions)"]
REOPT --> JOINSWITCH["DynamicJoinSelection<br/>(switch join strategy)"]
REOPT --> SKEW["OptimizeSkewedJoin<br/>(split skewed partitions)"]
AQE はシャッフルの境界で実行に割り込みます。シャッフルステージが完了すると、実際の出力統計(パーティションサイズを含む)が得られます。これは、静的最適化時にカタログの推定値から算出した値よりもはるかに正確です。AQE はこの情報をもとに、残りのプランに対してオプティマイザルールのサブセットを再実行します。
AQE が行う主要な最適化は 3 つです。
-
シャッフルパーティションのコアレス(統合):小さなパーティションを少数の大きなパーティションにまとめます。デフォルトの 200 パーティションは往々にして小さなタスクを大量に生み出すため、この最適化が特に重要です。
-
動的なジョイン戦略の切り替え:ジョインの入力データが推定より大幅に小さいことが判明した場合、AQE はランタイムでソートマージジョインからより効率的なブロードキャストハッシュジョインに切り替えられます。
-
スキュージョインの処理:特定のパーティションが異常に大きい(データスキュー)場合、AQE はそのパーティションを複数の小さなパーティションに分割し、ジョインのもう一方のデータを複製して対応します。
ヒント: AQE は Spark 3.2 以降でデフォルト有効です(
spark.sql.adaptive.enabled=true)。spark.sql.shuffle.partitionsを手動でチューニングしているなら、AQE のコアレス機能に任せることで自動化できます。spark.sql.adaptive.coalescePartitions.enabled=trueを設定し、手動のパーティション数指定を削除しましょう。
次回予告
SQL テキストから Catalyst による最適化を経て、クラスター上の分散実行に至るまでの一連の流れを追ってきました。最終回では、Spark Connect アーキテクチャを取り上げます。アプリケーションを Spark ランタイムから切り離す gRPC ベースのクライアント・サーバーモデルと、システムの各レイヤーをカスタマイズするための拡張メカニズムについて解説します。