Read OSS

Spark Connect と拡張性アーキテクチャ

上級

前提知識

  • 記事 1〜4(Spark アーキテクチャと実行モデルの十分な理解)
  • gRPC と Protocol Buffers の基礎知識
  • API 抽象化とクライアント・サーバーパターンの理解

Spark Connect と拡張性アーキテクチャ

このシリーズでは、モノレポ構成からクエリ最適化、分散実行まで、Spark の内部を順を追って見てきました。その過程で繰り返し登場しながらも深く掘り下げられなかったテーマがあります。それは「Spark はあらゆるレイヤーで拡張できるように設計されている」という点です。マネージド Spark サービスを提供するクラウドベンダー、カスタムオプティマイザールールを注入するデータプラットフォームチーム、Go や Rust で Spark クライアントを構築する言語コミュニティ — それぞれに向けた拡張ポイントが Spark には用意されています。

最終回となるこの記事では、2 つの関連するトピックを取り上げます。gRPC を介してクライアントとサーバーを切り離す Spark Connect のアーキテクチャと、Spark を現存する最もカスタマイズ性の高いデータエンジンたらしめる拡張パターン全般です。

Classic モードと Connect モード:2 つのアーキテクチャ

Part 1 で紹介したように、Spark 4.x では SparkSessionsql/api 内の抽象クラスとして定義し、2 つの実装を持ちます。

sql/api/src/main/scala/org/apache/spark/sql/SparkSession.scala#L63

classDiagram
    class SparkSession {
        <<abstract, sql/api>>
        +sql(query): DataFrame
        +read: DataFrameReader
        +createDataFrame(): DataFrame
        +version: String
    }

    class ClassicSparkSession {
        <<sql/core/classic/>>
        +sparkContext: SparkContext
        +sessionState: SessionState
        +sharedState: SharedState
    }

    class ConnectClient {
        <<sql/connect/client/>>
        -channel: ManagedChannel
        -stub: SparkConnectServiceStub
    }

    SparkSession <|-- ClassicSparkSession : in-process JVM
    SparkSession <|-- ConnectClient : gRPC client

    class SparkConnectServer {
        <<sql/connect/server/>>
        +SparkConnectService
        +SparkConnectPlanner
    }

    ConnectClient ..> SparkConnectServer : gRPC/Protobuf
    SparkConnectServer ..> ClassicSparkSession : uses internally

Classic モード (sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala#L92-L99) は、Spark ランタイム全体をユーザーの JVM プロセスに組み込みます。SparkSessionSparkContext と、パーサー・アナライザー・オプティマイザー・プランナーを含む SessionState、そしてスケジューリング基盤のすべてを保持します。これは従来の方式で最大限の柔軟性を提供しますが、アプリケーションが特定の Spark バージョンに強く依存する形になります。

Connect モード では、アプリケーションは軽量な gRPC クライアントとして動作します。DataFrame API の呼び出しは Protobuf メッセージにシリアライズされてリモートの Spark Connect サーバーに送信され、サーバー側で Catalyst パイプライン全体を実行して結果をストリームで返します。クライアント側では SparkContext のインスタンス化も Spark の JAR のロードも行いません。

この設計の意義は大きいです。Connect を使うと、バージョン独立性(クライアントを再コンパイルせずにサーバーをアップグレード可能)、言語独立性(JVM 不要の Python・Go・Rust クライアント)、リソース分離(クライアントプロセスが Spark のメモリを消費しない)といった恩恵が得られます。

Spark Connect プロトコル:gRPC 上の Protobuf プラン

ワイヤープロトコルは sql/connect/common/src/main/protobuf/ 以下の Protobuf ファイルで定義されています。

sql/connect/common/src/main/protobuf/spark/connect/base.proto#L35-L58

message Plan {
  oneof op_type {
    Relation root = 1;
    Command command = 2;
    CompressedOperation compressed_operation = 3;
  }
}
graph TB
    subgraph Client["Client (any language)"]
        DF["df.filter('age > 25').select('name')"]
        PROTO["Protobuf Plan message"]
        DF --> PROTO
    end

    subgraph Wire["gRPC Channel"]
        REQ["ExecutePlanRequest"]
        RESP["ExecutePlanResponse\n(Arrow batches)"]
    end

    subgraph Server["Spark Connect Server"]
        SERVICE["SparkConnectService"]
        PLANNER["SparkConnectPlanner"]
        CATALYST["Catalyst Pipeline"]
        EXEC["SparkPlan.execute()"]
    end

    PROTO --> REQ
    REQ --> SERVICE
    SERVICE --> PLANNER
    PLANNER --> CATALYST
    CATALYST --> EXEC
    EXEC --> RESP
    RESP --> Client

Plan メッセージには主に 2 つのバリアントがあります。

  • Relation: クエリ(SELECT・JOIN・FILTER など)を表します。relations.proto で定義され、各リレーション型が 1 つの DataFrame 操作に対応します。
  • Command: DDL/DML 操作(CREATE TABLE・INSERT・SET CONFIGURATION など)を表します。commands.proto で定義されています。

3 つ目のバリアント CompressedOperation は、大きなプランに対して Zstandard 圧縮をサポートします。複雑な DataFrame パイプラインが予想外に大きな Protobuf メッセージを生成することがあるため、これは実用的な最適化です。

サーバーは base.proto で定義された複数の RPC を公開しています。クエリ実行用の ExecutePlan、プラン解析(explain・スキーマ確認)用の AnalyzePlan、設定管理用の Config、JAR やファイルのアップロード用の AddArtifacts です。

SparkConnectPlanner:Protobuf から Catalyst への変換

変換レイヤーの中心は 4000 行超の SparkConnectPlanner クラスです。

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala#L18-L80

このプランナーは、すべての Protobuf Relation 型を対応する Catalyst の LogicalPlan ノードに変換します。proto.Filter は Catalyst の Filter に、proto.Join は Catalyst の Join になるといった具合です。式についても同様で、Protobuf の式ツリーは再帰的に Catalyst の Expression ツリーへと変換されます。

Protobuf プランが Catalyst の LogicalPlan に変換されると、Part 3 で詳しく見たのと同じパイプラインに入ります。解析 → 最適化 → 物理プランニング → 実行という流れです。Spark Connect サーバーは本質的に、gRPC フロントエンドを持つ Classic SparkSession です。

ヒント: Spark Connect の問題をデバッグするときは、変換レイヤーで微妙な差異が生じる可能性があることを念頭に置きましょう。EXPLAIN オプション付きの AnalyzePlan を使うと、クライアントの操作からサーバーが実際に生成した Catalyst プランを確認できます。

Connect サーバー:セッション管理とサービス

gRPC サービスは SparkConnectService として実装されています。

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala#L59-L80

class SparkConnectService(debug: Boolean) extends AsyncService with BindableService with Logging {
  override def executePlan(
      request: proto.ExecutePlanRequest,
      responseObserver: StreamObserver[proto.ExecutePlanResponse]): Unit = {
    try {
      new SparkConnectExecutePlanHandler(responseObserver).handle(request)
    } catch {
      ErrorUtils.handleError("execute", observer = responseObserver, ...)
    }
  }
}
graph TB
    subgraph Server["Spark Connect Server Process"]
        GRPC["Netty gRPC Server"]
        SVC["SparkConnectService"]
        SESSIONS["SessionManager"]
        S1["Session 1<br/>(User A)"]
        S2["Session 2<br/>(User B)"]
        S3["Session 3<br/>(User A, session 2)"]
    end

    GRPC --> SVC
    SVC --> SESSIONS
    SESSIONS --> S1
    SESSIONS --> S2
    SESSIONS --> S3

    S1 --> SC["Shared SparkContext"]
    S2 --> SC
    S3 --> SC

サーバーは SparkConnectSessionManager を通じてマルチテナントのセッション管理をサポートしています。各クライアント接続は user_idsession_id で識別されます。セッションマネージャーは SparkSession インスタンスのプールを管理し、各セッションは独立した設定と一時ビューを持つ SessionState を持ちながら、単一の SparkContext とクラスター接続を共有します。

サーバーは Netty の gRPC 実装上に構築されており、最大インバウンドメッセージサイズ、認証トークン、接続バインドアドレスを設定できます。スタンドアロンプロセスとして起動することも、既存の Spark アプリケーションに組み込むことも可能です。

SparkSessionExtensions:カスタムルールの注入

Spark をフォークせずに Catalyst パイプラインをカスタマイズしたい場合、SparkSessionExtensions が注入ポイントを提供します。

sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala#L34-L72

拡張ポイントはパイプラインの全フェーズをカバーしています。

flowchart LR
    PARSE["Parser<br/>injectParser"] --> ANALYZE["Analyzer<br/>injectResolutionRule"]
    ANALYZE --> CHECK["Check Analysis<br/>injectCheckRule"]
    CHECK --> OPT["Optimizer<br/>injectOptimizerRule"]
    OPT --> PLAN["Planner<br/>injectPlannerStrategy"]
    PLAN --> PREP["Columnar<br/>injectColumnar"]
拡張ポイント メソッド ユースケース
Parser injectParser カスタム SQL 構文、DSL からプランへの変換
Analyzer Rules injectResolutionRule カスタム名前解決、仮想テーブル
Check Rules injectCheckRule カスタムバリデーション(例:ポリシー適用)
Optimizer Rules injectOptimizerRule カスタム書き換え(例:インデックスを考慮した最適化)
Planner Strategies injectPlannerStrategy カスタム物理演算子
Columnar Rules injectColumnar GPU・ベクトル化実行アダプター
AQE Rules injectRuntimeOptimizerRule ランタイム再最適化のカスタマイズ

拡張機能はプログラム的に登録するか、spark.sql.extensions 設定プロパティ経由で指定します。

SparkSession.builder()
  .config("spark.sql.extensions", "com.example.MyExtensions")
  .getOrCreate()

SparkSessionExtensionsProvider トレイト(ServiceLoader 経由でロード)を使えば、クラスパス上の JAR から拡張を自動検出できます。これにより、完全にプラグアンドプレイな運用が可能になります。

プラグイン可能なサブシステム:クラスターマネージャー・Shuffle・プラグイン

拡張性は SQL パイプラインにとどまりません。Spark のコアエンジンはあらゆるレイヤーにプラグイン可能なサブシステムを持っています。

graph TB
    subgraph "Extension Points"
        ECM["ExternalClusterManager<br/>(YARN, K8s, Standalone)"]
        SM["ShuffleManager<br/>(Sort, Custom)"]
        PLUGIN["PluginContainer<br/>(Driver/Executor Plugins)"]
        DSV2["DataSource V2<br/>(Custom Connectors)"]
    end

    subgraph "Core Engine"
        SC[SparkContext]
        TS[TaskScheduler]
        SE[SparkEnv]
        CAT[Catalog]
    end

    ECM --> TS
    SM --> SE
    PLUGIN --> SC
    DSV2 --> CAT

ExternalClusterManager (core/src/main/scala/org/apache/spark/scheduler/ExternalClusterManager.scala#L25-L62) はクラスターマネージャーの SPI です。Part 2 で見たように ServiceLoader で検出され、指定のマスター URL に対応する TaskSchedulerSchedulerBackend を提供します。YARN や Kubernetes のサポートはこの仕組みで実装されており、それぞれが ExternalClusterManager 実装を登録する独立したモジュールとして存在しています。

ShuffleManager (core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala#L38) はプラグイン可能な shuffle インターフェースです。デフォルトの SortShuffleManager はプロダクション品質ですが、Databricks や LinkedIn などのクラウドベンダーは自社インフラに最適化したカスタム ShuffleManager(リモートシャッフルサービスや分離ストレージなど)を独自に構築しています。

Plugin システム: Part 2 での SparkContext 初期化時に登場した PluginContainer は、ドライバープラグインとエグゼキュータープラグインをサポートします。メトリクスの傍受、設定の変更、カスタム RPC エンドポイントの登録、モニタリングフックの追加が可能で、オブザーバビリティやリソース管理の統合に役立ちます。

Data Source API V2: 最も広く使われている拡張ポイントです。データベース、メッセージキュー、ファイルフォーマット、REST API など、あらゆるデータソースに対してカスタムコネクターを記述できます。述語プッシュダウン、カラムプルーニング、並列読み取りもサポートしています。V2 ソースは Catalyst の最適化に参加し、プッシュダウンされたフィルターや射影を受け取ります。

ヒント: どの拡張ポイントを使うかは、実行のどのフェーズに介入したいかで考えましょう。クエリの解釈方法を変えたいならパーサーやアナライザーのルールを注入します。特定のストレージシステム向けに最適化するなら Data Source V2 のプッシュダウン機能を使います。実行環境そのものを変更したいなら Plugin システムが適しています。

まとめ:5 つの記事を振り返る

この 5 回のシリーズを通じて、Apache Spark の完全な実行経路をたどってきました。

  1. アーキテクチャとモジュールマップ — 3 つのレイヤーに整理されたモノレポの約 40 モジュール
  2. 起動シーケンスspark-submit から SparkContext の慎重に順序付けられた初期化まで
  3. Catalyst パイプライン — SQL テキストから TreeNode の変換を経て最適化された論理プランへ
  4. 実行エンジン — RDD・ステージ・タスク・シャッフル・アダプティブ再最適化
  5. Connect と拡張性 — gRPC クライアント・サーバー分割とプラグインアーキテクチャ

このシリーズを貫く洞察があります。Spark は一連のクリーンな抽象レイヤーとして構築されているということです。プランのための TreeNode、分散データのための RDD、変換のための Rule、データ交換のための ShuffleManager — それぞれに明確に定義された拡張ポイントが備わっています。この重層的でプラグイン可能なアーキテクチャこそが、Spark を研究プロジェクトからモダンデータスタックの基盤へと進化させ、何千もの組織がそれぞれのインフラとワークロードに合わせて使い続けている理由です。

Catalyst の最適化パスを調べて遅いクエリをデバッグするときも、新しいオプティマイザールールを作るときも、新言語で Spark Connect クライアントを構築するときも同様です。カスタムシャッフルサービスを実装する場合でも、Spark のコードベースを自信を持って読み進めるための地図が手に入りました。エントリーポイントから始め、実行経路をたどり、抽象レイヤーを道標にして進んでいきましょう。