Spark Connect と拡張性アーキテクチャ
前提知識
- ›記事 1〜4(Spark アーキテクチャと実行モデルの十分な理解)
- ›gRPC と Protocol Buffers の基礎知識
- ›API 抽象化とクライアント・サーバーパターンの理解
Spark Connect と拡張性アーキテクチャ
このシリーズでは、モノレポ構成からクエリ最適化、分散実行まで、Spark の内部を順を追って見てきました。その過程で繰り返し登場しながらも深く掘り下げられなかったテーマがあります。それは「Spark はあらゆるレイヤーで拡張できるように設計されている」という点です。マネージド Spark サービスを提供するクラウドベンダー、カスタムオプティマイザールールを注入するデータプラットフォームチーム、Go や Rust で Spark クライアントを構築する言語コミュニティ — それぞれに向けた拡張ポイントが Spark には用意されています。
最終回となるこの記事では、2 つの関連するトピックを取り上げます。gRPC を介してクライアントとサーバーを切り離す Spark Connect のアーキテクチャと、Spark を現存する最もカスタマイズ性の高いデータエンジンたらしめる拡張パターン全般です。
Classic モードと Connect モード:2 つのアーキテクチャ
Part 1 で紹介したように、Spark 4.x では SparkSession を sql/api 内の抽象クラスとして定義し、2 つの実装を持ちます。
sql/api/src/main/scala/org/apache/spark/sql/SparkSession.scala#L63
classDiagram
class SparkSession {
<<abstract, sql/api>>
+sql(query): DataFrame
+read: DataFrameReader
+createDataFrame(): DataFrame
+version: String
}
class ClassicSparkSession {
<<sql/core/classic/>>
+sparkContext: SparkContext
+sessionState: SessionState
+sharedState: SharedState
}
class ConnectClient {
<<sql/connect/client/>>
-channel: ManagedChannel
-stub: SparkConnectServiceStub
}
SparkSession <|-- ClassicSparkSession : in-process JVM
SparkSession <|-- ConnectClient : gRPC client
class SparkConnectServer {
<<sql/connect/server/>>
+SparkConnectService
+SparkConnectPlanner
}
ConnectClient ..> SparkConnectServer : gRPC/Protobuf
SparkConnectServer ..> ClassicSparkSession : uses internally
Classic モード (sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala#L92-L99) は、Spark ランタイム全体をユーザーの JVM プロセスに組み込みます。SparkSession は SparkContext と、パーサー・アナライザー・オプティマイザー・プランナーを含む SessionState、そしてスケジューリング基盤のすべてを保持します。これは従来の方式で最大限の柔軟性を提供しますが、アプリケーションが特定の Spark バージョンに強く依存する形になります。
Connect モード では、アプリケーションは軽量な gRPC クライアントとして動作します。DataFrame API の呼び出しは Protobuf メッセージにシリアライズされてリモートの Spark Connect サーバーに送信され、サーバー側で Catalyst パイプライン全体を実行して結果をストリームで返します。クライアント側では SparkContext のインスタンス化も Spark の JAR のロードも行いません。
この設計の意義は大きいです。Connect を使うと、バージョン独立性(クライアントを再コンパイルせずにサーバーをアップグレード可能)、言語独立性(JVM 不要の Python・Go・Rust クライアント)、リソース分離(クライアントプロセスが Spark のメモリを消費しない)といった恩恵が得られます。
Spark Connect プロトコル:gRPC 上の Protobuf プラン
ワイヤープロトコルは sql/connect/common/src/main/protobuf/ 以下の Protobuf ファイルで定義されています。
sql/connect/common/src/main/protobuf/spark/connect/base.proto#L35-L58
message Plan {
oneof op_type {
Relation root = 1;
Command command = 2;
CompressedOperation compressed_operation = 3;
}
}
graph TB
subgraph Client["Client (any language)"]
DF["df.filter('age > 25').select('name')"]
PROTO["Protobuf Plan message"]
DF --> PROTO
end
subgraph Wire["gRPC Channel"]
REQ["ExecutePlanRequest"]
RESP["ExecutePlanResponse\n(Arrow batches)"]
end
subgraph Server["Spark Connect Server"]
SERVICE["SparkConnectService"]
PLANNER["SparkConnectPlanner"]
CATALYST["Catalyst Pipeline"]
EXEC["SparkPlan.execute()"]
end
PROTO --> REQ
REQ --> SERVICE
SERVICE --> PLANNER
PLANNER --> CATALYST
CATALYST --> EXEC
EXEC --> RESP
RESP --> Client
Plan メッセージには主に 2 つのバリアントがあります。
Relation: クエリ(SELECT・JOIN・FILTER など)を表します。relations.protoで定義され、各リレーション型が 1 つの DataFrame 操作に対応します。Command: DDL/DML 操作(CREATE TABLE・INSERT・SET CONFIGURATION など)を表します。commands.protoで定義されています。
3 つ目のバリアント CompressedOperation は、大きなプランに対して Zstandard 圧縮をサポートします。複雑な DataFrame パイプラインが予想外に大きな Protobuf メッセージを生成することがあるため、これは実用的な最適化です。
サーバーは base.proto で定義された複数の RPC を公開しています。クエリ実行用の ExecutePlan、プラン解析(explain・スキーマ確認)用の AnalyzePlan、設定管理用の Config、JAR やファイルのアップロード用の AddArtifacts です。
SparkConnectPlanner:Protobuf から Catalyst への変換
変換レイヤーの中心は 4000 行超の SparkConnectPlanner クラスです。
このプランナーは、すべての Protobuf Relation 型を対応する Catalyst の LogicalPlan ノードに変換します。proto.Filter は Catalyst の Filter に、proto.Join は Catalyst の Join になるといった具合です。式についても同様で、Protobuf の式ツリーは再帰的に Catalyst の Expression ツリーへと変換されます。
Protobuf プランが Catalyst の LogicalPlan に変換されると、Part 3 で詳しく見たのと同じパイプラインに入ります。解析 → 最適化 → 物理プランニング → 実行という流れです。Spark Connect サーバーは本質的に、gRPC フロントエンドを持つ Classic SparkSession です。
ヒント: Spark Connect の問題をデバッグするときは、変換レイヤーで微妙な差異が生じる可能性があることを念頭に置きましょう。
EXPLAINオプション付きのAnalyzePlanを使うと、クライアントの操作からサーバーが実際に生成した Catalyst プランを確認できます。
Connect サーバー:セッション管理とサービス
gRPC サービスは SparkConnectService として実装されています。
class SparkConnectService(debug: Boolean) extends AsyncService with BindableService with Logging {
override def executePlan(
request: proto.ExecutePlanRequest,
responseObserver: StreamObserver[proto.ExecutePlanResponse]): Unit = {
try {
new SparkConnectExecutePlanHandler(responseObserver).handle(request)
} catch {
ErrorUtils.handleError("execute", observer = responseObserver, ...)
}
}
}
graph TB
subgraph Server["Spark Connect Server Process"]
GRPC["Netty gRPC Server"]
SVC["SparkConnectService"]
SESSIONS["SessionManager"]
S1["Session 1<br/>(User A)"]
S2["Session 2<br/>(User B)"]
S3["Session 3<br/>(User A, session 2)"]
end
GRPC --> SVC
SVC --> SESSIONS
SESSIONS --> S1
SESSIONS --> S2
SESSIONS --> S3
S1 --> SC["Shared SparkContext"]
S2 --> SC
S3 --> SC
サーバーは SparkConnectSessionManager を通じてマルチテナントのセッション管理をサポートしています。各クライアント接続は user_id と session_id で識別されます。セッションマネージャーは SparkSession インスタンスのプールを管理し、各セッションは独立した設定と一時ビューを持つ SessionState を持ちながら、単一の SparkContext とクラスター接続を共有します。
サーバーは Netty の gRPC 実装上に構築されており、最大インバウンドメッセージサイズ、認証トークン、接続バインドアドレスを設定できます。スタンドアロンプロセスとして起動することも、既存の Spark アプリケーションに組み込むことも可能です。
SparkSessionExtensions:カスタムルールの注入
Spark をフォークせずに Catalyst パイプラインをカスタマイズしたい場合、SparkSessionExtensions が注入ポイントを提供します。
sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala#L34-L72
拡張ポイントはパイプラインの全フェーズをカバーしています。
flowchart LR
PARSE["Parser<br/>injectParser"] --> ANALYZE["Analyzer<br/>injectResolutionRule"]
ANALYZE --> CHECK["Check Analysis<br/>injectCheckRule"]
CHECK --> OPT["Optimizer<br/>injectOptimizerRule"]
OPT --> PLAN["Planner<br/>injectPlannerStrategy"]
PLAN --> PREP["Columnar<br/>injectColumnar"]
| 拡張ポイント | メソッド | ユースケース |
|---|---|---|
| Parser | injectParser |
カスタム SQL 構文、DSL からプランへの変換 |
| Analyzer Rules | injectResolutionRule |
カスタム名前解決、仮想テーブル |
| Check Rules | injectCheckRule |
カスタムバリデーション(例:ポリシー適用) |
| Optimizer Rules | injectOptimizerRule |
カスタム書き換え(例:インデックスを考慮した最適化) |
| Planner Strategies | injectPlannerStrategy |
カスタム物理演算子 |
| Columnar Rules | injectColumnar |
GPU・ベクトル化実行アダプター |
| AQE Rules | injectRuntimeOptimizerRule |
ランタイム再最適化のカスタマイズ |
拡張機能はプログラム的に登録するか、spark.sql.extensions 設定プロパティ経由で指定します。
SparkSession.builder()
.config("spark.sql.extensions", "com.example.MyExtensions")
.getOrCreate()
SparkSessionExtensionsProvider トレイト(ServiceLoader 経由でロード)を使えば、クラスパス上の JAR から拡張を自動検出できます。これにより、完全にプラグアンドプレイな運用が可能になります。
プラグイン可能なサブシステム:クラスターマネージャー・Shuffle・プラグイン
拡張性は SQL パイプラインにとどまりません。Spark のコアエンジンはあらゆるレイヤーにプラグイン可能なサブシステムを持っています。
graph TB
subgraph "Extension Points"
ECM["ExternalClusterManager<br/>(YARN, K8s, Standalone)"]
SM["ShuffleManager<br/>(Sort, Custom)"]
PLUGIN["PluginContainer<br/>(Driver/Executor Plugins)"]
DSV2["DataSource V2<br/>(Custom Connectors)"]
end
subgraph "Core Engine"
SC[SparkContext]
TS[TaskScheduler]
SE[SparkEnv]
CAT[Catalog]
end
ECM --> TS
SM --> SE
PLUGIN --> SC
DSV2 --> CAT
ExternalClusterManager (core/src/main/scala/org/apache/spark/scheduler/ExternalClusterManager.scala#L25-L62) はクラスターマネージャーの SPI です。Part 2 で見たように ServiceLoader で検出され、指定のマスター URL に対応する TaskScheduler と SchedulerBackend を提供します。YARN や Kubernetes のサポートはこの仕組みで実装されており、それぞれが ExternalClusterManager 実装を登録する独立したモジュールとして存在しています。
ShuffleManager (core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala#L38) はプラグイン可能な shuffle インターフェースです。デフォルトの SortShuffleManager はプロダクション品質ですが、Databricks や LinkedIn などのクラウドベンダーは自社インフラに最適化したカスタム ShuffleManager(リモートシャッフルサービスや分離ストレージなど)を独自に構築しています。
Plugin システム: Part 2 での SparkContext 初期化時に登場した PluginContainer は、ドライバープラグインとエグゼキュータープラグインをサポートします。メトリクスの傍受、設定の変更、カスタム RPC エンドポイントの登録、モニタリングフックの追加が可能で、オブザーバビリティやリソース管理の統合に役立ちます。
Data Source API V2: 最も広く使われている拡張ポイントです。データベース、メッセージキュー、ファイルフォーマット、REST API など、あらゆるデータソースに対してカスタムコネクターを記述できます。述語プッシュダウン、カラムプルーニング、並列読み取りもサポートしています。V2 ソースは Catalyst の最適化に参加し、プッシュダウンされたフィルターや射影を受け取ります。
ヒント: どの拡張ポイントを使うかは、実行のどのフェーズに介入したいかで考えましょう。クエリの解釈方法を変えたいならパーサーやアナライザーのルールを注入します。特定のストレージシステム向けに最適化するなら Data Source V2 のプッシュダウン機能を使います。実行環境そのものを変更したいなら Plugin システムが適しています。
まとめ:5 つの記事を振り返る
この 5 回のシリーズを通じて、Apache Spark の完全な実行経路をたどってきました。
- アーキテクチャとモジュールマップ — 3 つのレイヤーに整理されたモノレポの約 40 モジュール
- 起動シーケンス —
spark-submitから SparkContext の慎重に順序付けられた初期化まで - Catalyst パイプライン — SQL テキストから TreeNode の変換を経て最適化された論理プランへ
- 実行エンジン — RDD・ステージ・タスク・シャッフル・アダプティブ再最適化
- Connect と拡張性 — gRPC クライアント・サーバー分割とプラグインアーキテクチャ
このシリーズを貫く洞察があります。Spark は一連のクリーンな抽象レイヤーとして構築されているということです。プランのための TreeNode、分散データのための RDD、変換のための Rule、データ交換のための ShuffleManager — それぞれに明確に定義された拡張ポイントが備わっています。この重層的でプラグイン可能なアーキテクチャこそが、Spark を研究プロジェクトからモダンデータスタックの基盤へと進化させ、何千もの組織がそれぞれのインフラとワークロードに合わせて使い続けている理由です。
Catalyst の最適化パスを調べて遅いクエリをデバッグするときも、新しいオプティマイザールールを作るときも、新言語で Spark Connect クライアントを構築するときも同様です。カスタムシャッフルサービスを実装する場合でも、Spark のコードベースを自信を持って読み進めるための地図が手に入りました。エントリーポイントから始め、実行経路をたどり、抽象レイヤーを道標にして進んでいきましょう。