Read OSS

Apache Spark のコードベースを読み解く:アーキテクチャとモジュールマップ

中級

前提知識

  • 分散システムの基本的な概念を理解していること
  • Scala の構文(case クラス、トレイト、パターンマッチング)に慣れていること
  • Maven または SBT などのビルドツールの基礎知識があること

Apache Spark のコードベースを読み解く:アーキテクチャとモジュールマップ

Apache Spark は、現存する最大かつ最も影響力のあるオープンソースのデータ処理エンジンのひとつです。そのコードベースは約40の Maven モジュールにまたがり、4つのプログラミング言語をサポートし、数千台ものマシン上で計算を調整します。初めてリポジトリを開いたとき、その規模に圧倒されてしまうのは無理もありません。一体どこから手をつければいいのでしょうか?

この記事では、コードベース全体を俯瞰するメンタルマップを提供します。モノリポの構造を概観し、低レベルのユーティリティから SQL オプティマイザに至る依存関係の階層をたどります。spark-submit からアプリケーション起動までの流れを追い、Spark 4.x で導入された Classic と Connect という2つの実行モードの分岐を理解します。読み終えるころには、どの関心事がどのディレクトリに属するかを把握できるはずです。

モノリポの構造とモジュール構成

Spark のリポジトリは典型的なモノリポです。すべてのコードがひとつの Git リポジトリに集約され、ルートの POM ひとつでビルドされます。ルートの pom.xml にすべてのモジュールが列挙されています。

graph TD
    subgraph "Common Utilities"
        A[common/sketch]
        B[common/kvstore]
        C[common/network-common]
        D[common/network-shuffle]
        E[common/unsafe]
        F[common/utils]
        G[common/variant]
    end

    subgraph "Core Engine"
        H[core]
    end

    subgraph "SQL Stack"
        I[sql/api]
        J[sql/catalyst]
        K[sql/core]
        L[sql/hive]
        M[sql/pipelines]
    end

    subgraph "Connect"
        N[sql/connect/common]
        O[sql/connect/server]
        P[sql/connect/client/jvm]
    end

    subgraph "Connectors"
        Q[connector/kafka-0-10-sql]
        R[connector/avro]
        S[connector/protobuf]
    end

    subgraph "Resource Managers"
        T[resource-managers/kubernetes]
        U[resource-managers/yarn]
    end

    C --> H
    E --> H
    H --> J
    I --> K
    J --> K
    K --> O
    K --> L

モジュールは6つの論理的なグループに分類されます。

グループ ディレクトリ 役割
Common common/* 低レベルユーティリティ:ネットワーク、メモリ管理、スケッチ、Key-Value ストア
Core core/ RDD 抽象化、スケジューリング(DAGScheduler、TaskScheduler)、ストレージ(BlockManager)、RPC
SQL sql/api, sql/catalyst, sql/core, sql/hive DataFrame/Dataset API、Catalyst オプティマイザ、クエリ実行エンジン
Connect sql/connect/* クライアント・サーバーアーキテクチャ:Protobuf プロトコル、gRPC サーバー、JVM/JDBC クライアント
Connectors connector/* データソース連携:Kafka、Avro、Protobuf
Resource Managers resource-managers/* クラスタマネージャープラグイン:Kubernetes、YARN

このほかに、言語バインディング(python/R/)、レガシーモジュール(streaming/graphx/mllib/)、ビルドインフラ(assembly/launcher/examples/repl/)も含まれています。

ヒント: common/unsafe モジュールは、Spark のオフヒープメモリ層です。生メモリアクセスのための Platform と、SQL エンジン全体で使われるバイナリ行フォーマットの UnsafeRow を提供しています。パフォーマンスに関わる作業をするなら、まずここを理解しておくことが重要です。

3層アーキテクチャ

Spark のコードベースは、厳格な階層型の依存モデルに従っています。3つの同心円状の層として捉えるとわかりやすいでしょう。

flowchart TB
    subgraph Layer1["Layer 1: Core Engine"]
        direction LR
        RDD["RDD Abstraction"]
        SCHED["DAGScheduler + TaskScheduler"]
        STORE["BlockManager + Shuffle"]
        RPC["RpcEnv"]
    end

    subgraph Layer2["Layer 2: SQL/Catalyst"]
        direction LR
        PARSE["Parser (ANTLR4)"]
        ANALYZE["Analyzer"]
        OPT["Optimizer"]
        PLAN["Physical Planner"]
    end

    subgraph Layer3["Layer 3: User-Facing API"]
        direction LR
        SESSION["SparkSession"]
        DF["DataFrame / Dataset"]
        CONNECT["Spark Connect (gRPC)"]
    end

    Layer3 --> Layer2
    Layer2 --> Layer1

Layer 1 — Core Enginecore/)は、分散コンピューティングのプリミティブを提供します。RDD(Resilient Distributed Dataset)が根本的な抽象化です。これは不変かつパーティション分割されたコレクションで、リネージベースの耐障害性を持ちます。DAGScheduler が計算をステージに分割し、TaskScheduler がタスクをエグゼキューターに送り、BlockManager がストレージを担います。

Layer 2 — SQL/Catalystsql/catalyst/sql/core/)は Spark SQL の中核です。Catalyst オプティマイザは、SQL や DataFrame の操作を「パース → 解析 → 最適化 → 物理プランニング」というパイプラインで変換します。最終的な物理プラン(SparkPlan)の execute() メソッドが RDD[InternalRow] を生成し、Layer 1 へと橋渡しします。

Layer 3 — User-Facing APIssql/api/sql/connect/)は、開発者が実際に触れるエントリーポイントです。SparkSessionDataFrameDataset がここに属します。Spark 4.x ではこの層が2つの実装に分岐しました。Classic(インプロセス JVM)と Connect(gRPC クライアント・サーバー)です。

この階層構造は Maven モジュールの依存関係によって強制されています。sql/catalystcore に依存しますが、core は SQL について一切関知しません。この明確な分離があるからこそ、RDD エンジンを汎用的に保ちながら、SQL 層がより高レベルな抽象化を提供できるのです。

CLI エントリーポイント:シェルから JVM へ

spark-submit my-app.jar を実行したとき、実際には何が起きているのでしょうか。その流れをたどってみましょう。

sequenceDiagram
    participant User
    participant spark_submit as bin/spark-submit
    participant spark_class as bin/spark-class
    participant Launcher as o.a.s.launcher.Main
    participant SparkSubmit as SparkSubmit.doSubmit()
    participant App as User's main()

    User->>spark_submit: spark-submit --class MyApp app.jar
    spark_submit->>spark_class: exec spark-class o.a.s.deploy.SparkSubmit "$@"
    spark_class->>Launcher: java -cp ... o.a.s.launcher.Main SparkSubmit ...
    Launcher-->>spark_class: Returns JVM command with full classpath
    spark_class->>SparkSubmit: exec java -cp <full-classpath> SparkSubmit args...
    SparkSubmit->>SparkSubmit: prepareSubmitEnvironment()
    SparkSubmit->>App: Reflectively invoke main()

Step 1: bin/spark-submit は薄いシェルスクリプトで、実質的なコードはわずか8行です。SPARK_HOME を特定し、Python のハッシュランダム化を無効にし、クラス名 org.apache.spark.deploy.SparkSubmit を指定して bin/spark-class に処理を委ねます。

Step 2: bin/spark-class がシェル側の重要な処理を担います。Java を探し出し、$SPARK_HOME/jars/* から初期クラスパスを組み立て、org.apache.spark.launcher.Main を呼び出します。これは、正しいクラスパス・メモリ設定・JVM オプションを含む最終的な java コマンドを出力する小さな Java プログラムです。出力はパースされ、exec されます。

Step 3: SparkSubmit.doSubmit() が引数を解析し、アクション(SUBMIT、KILL、REQUEST_STATUS、PRINT_VERSION)に応じて処理を振り分けます。SUBMIT の場合は prepareSubmitEnvironment() が呼ばれます。これが本当の要です。クラスタマネージャー(YARN、Kubernetes、Standalone、Local)を判定し、デプロイモード(client または cluster)を解決し、依存 JAR をダウンロードし、最終的にユーザーのメインクラスをリフレクションで呼び出します。

ヒント: サブミット時のトラブルを調査するときは、spark-submit コマンドに --verbose を追加しましょう。doSubmit() 内で appArgs.verbose が有効になり、アプリケーション起動前に解決済みのクラスパス・デプロイモード・すべての設定が出力されます。

SparkSession:ユーザー向けエントリーポイント

Spark SQL を書いたことがあれば、必ず SparkSession を使っているはずです。Spark 4.x では、これは sql/api で定義された抽象クラスになっています。

sql/api/src/main/scala/org/apache/spark/sql/SparkSession.scala#L63-L72

classDiagram
    class SparkSession {
        <<abstract>>
        +sparkContext: SparkContext*
        +version: String
        +sql(query: String): DataFrame
        +read: DataFrameReader
        +createDataFrame(): DataFrame
    }

    class ClassicSparkSession {
        +sparkContext: SparkContext
        -sharedState: SharedState
        -sessionState: SessionState
        +extensions: SparkSessionExtensions
    }

    class ConnectSparkSession {
        -client: SparkConnectClient
        -channel: ManagedChannel
    }

    SparkSession <|-- ClassicSparkSession
    SparkSession <|-- ConnectSparkSession

抽象クラス SparkSession は、ユーザー向けの API(sql()readcreateDataFrame() など)を定義します。そして2つの具体的な実装を持ちます。

Classicsql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala)はすべてをインプロセスで実行します。SparkContext をラップし、パーサー・アナライザー・オプティマイザ・プランナーを内包する SessionState を保持し、クエリをドライバー JVM 上で直接実行します。これが従来の Spark 実行モデルです。

Connect は薄い gRPC クライアントとして動作します。DataFrame 操作は Protobuf メッセージにシリアライズされ、リモートの Spark Connect サーバーに送信されて実行されます。クライアント側は RDD も SparkContext も一切見ることなく、Arrow フォーマットの結果をネットワーク越しに受け取るだけです。

この分岐は Spark 4.x における重要なアーキテクチャ上の決断です。その動機はシンプルです。Classic のインプロセスモデルでは、ユーザーのアプリケーションが Spark ランタイムに強く結びついてしまいます。Spark をアップグレードするたびに、アプリケーションを再コンパイルしなければなりません。Connect ではクライアントが切り離されているため、サーバーを独立してアップグレードでき、Python・Go・Rust などの軽量クライアントが JVM を埋め込まずに Spark と通信できます。

sparkContextsharedState などのメソッドには @ClassicOnly アノテーションが付いています。これらは Classic モードでしか利用できず、Connect では動作しないので注意が必要です。

ビルドシステムと開発時のナビゲーション

Spark は2つのビルドシステムをサポートしており、それぞれ役割が異なります。

flowchart LR
    subgraph Maven["Maven (CI / Releases)"]
        MPOM[pom.xml] --> MBUILD[mvn package -DskipTests]
        MBUILD --> MJARS[assembly/target/jars/]
    end

    subgraph SBT["SBT (Development)"]
        SBUILD[project/SparkBuild.scala] --> SCOMPILE["build/sbt compile"]
        SCOMPILE --> SINCR[Incremental compilation]
    end

Maven は CI や公式リリースで使われる正式なビルドシステムです。ルートの pom.xml がすべてのモジュール依存関係とプラグイン設定を定義しています。完全で再現性のあるビルドが必要なときはこちらを使いましょう。

SBT は開発時に推奨されます。AGENTS.md ガイドにも「インクリメンタルコンパイルを高速に行うために、Maven より SBT を優先してください」と明記されています。SBT ビルドは project/SparkBuild.scala で定義されており、Maven モジュール名を SBT プロジェクト名にマッピングしています。たとえば、sql/core は SBT プロジェクト sqlsql/catalystcatalyst になります。

コードベースを探索するための主なコマンドは以下のとおりです。

やりたいこと コマンド
単一モジュールのコンパイル build/sbt catalyst/compile
ワイルドカードでテストを実行 build/sbt 'catalyst/testOnly *AnalyzerSuite'
特定のテストを実行 build/sbt 'catalyst/testOnly *AnalyzerSuite -- -z "test name"'
SBT インタラクティブシェル build/sbt を起動後に project catalyst

ヒント: Spark のコードを読み解くときは、エントリーポイントから内側に向かってたどっていくのが効果的です。SQL 関連のコードなら SparkSession を、コアエンジンの内部なら SparkContext を出発点にしましょう。コードベース全体を最初から読もうとするのは得策ではありません。自分が関心を持つ機能の実行パスを追うことに集中するのが一番の近道です。

次のステップ

このマップがあれば、全体像をつかんだことになります。次の記事では、アプリケーションが実際に起動するときの内部を深掘りします。SparkContext の初期化シーケンス、SparkEnv というランタイムコンテナ、そしてあなたのコードをクラスター上の分散タスクへと変換する2段階のスケジューリングスタックを詳しく見ていきましょう。