Spark Connect 与可扩展性架构
前置知识
- ›第 1–4 篇(充分理解 Spark 架构与执行机制)
- ›对 gRPC 和 Protocol Buffers 有基本了解
- ›理解 API 抽象与客户端-服务端模式
Spark Connect 与可扩展性架构
在本系列中,我们从 monorepo 结构出发,依次探索了查询优化和分布式执行等 Spark 核心机制。贯穿始终有一个反复出现的主题,却尚未深入展开:Spark 在每一层都为扩展预留了空间。无论你是运营托管 Spark 服务的云厂商、需要注入自定义优化器规则的数据平台团队,还是正在构建 Go 或 Rust 客户端的语言社区——Spark 都为你提供了对应的扩展点。
本文是系列的最后一篇,聚焦两个紧密相关的主题:通过 gRPC 将客户端与服务端解耦的 Spark Connect 架构,以及使 Spark 成为目前可定制性最强的数据引擎之一的整体扩展性设计。
Classic 模式与 Connect 模式的架构分野
正如第 1 篇所介绍的,Spark 4.x 将 SparkSession 定义为 sql/api 中的抽象类,并提供两种实现:
sql/api/src/main/scala/org/apache/spark/sql/SparkSession.scala#L63
classDiagram
class SparkSession {
<<abstract, sql/api>>
+sql(query): DataFrame
+read: DataFrameReader
+createDataFrame(): DataFrame
+version: String
}
class ClassicSparkSession {
<<sql/core/classic/>>
+sparkContext: SparkContext
+sessionState: SessionState
+sharedState: SharedState
}
class ConnectClient {
<<sql/connect/client/>>
-channel: ManagedChannel
-stub: SparkConnectServiceStub
}
SparkSession <|-- ClassicSparkSession : in-process JVM
SparkSession <|-- ConnectClient : gRPC client
class SparkConnectServer {
<<sql/connect/server/>>
+SparkConnectService
+SparkConnectPlanner
}
ConnectClient ..> SparkConnectServer : gRPC/Protobuf
SparkConnectServer ..> ClassicSparkSession : uses internally
Classic 模式(sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala#L92-L99)将完整的 Spark 运行时嵌入到用户的 JVM 进程中。SparkSession 持有一个 SparkContext,以及包含解析器、分析器、优化器和规划器的 SessionState,还有全套调度基础设施。这是传统模型,灵活性最高,但也将应用程序与特定 Spark 版本紧密绑定。
Connect 模式则让你的应用以轻量级 gRPC 客户端的形式运行。DataFrame API 调用被序列化为 Protobuf 消息,发送给远程的 Spark Connect 服务端,后者将其反序列化,走完完整的 Catalyst 流水线,再将结果以流式方式返回。客户端既不会实例化 SparkContext,也不会加载 Spark 的 JAR 包。
这一设计的价值显而易见:Connect 模式带来了版本独立性(升级服务端无需重新编译客户端)、语言独立性(Python、Go、Rust 等客户端无需依赖 JVM),以及资源隔离(客户端进程不占用 Spark 的内存)。
Spark Connect 协议:gRPC 上的 Protobuf Plan
通信协议通过 sql/connect/common/src/main/protobuf/ 目录下的 Protobuf 文件定义:
sql/connect/common/src/main/protobuf/spark/connect/base.proto#L35-L58
message Plan {
oneof op_type {
Relation root = 1;
Command command = 2;
CompressedOperation compressed_operation = 3;
}
}
graph TB
subgraph Client["Client (any language)"]
DF["df.filter('age > 25').select('name')"]
PROTO["Protobuf Plan message"]
DF --> PROTO
end
subgraph Wire["gRPC Channel"]
REQ["ExecutePlanRequest"]
RESP["ExecutePlanResponse\n(Arrow batches)"]
end
subgraph Server["Spark Connect Server"]
SERVICE["SparkConnectService"]
PLANNER["SparkConnectPlanner"]
CATALYST["Catalyst Pipeline"]
EXEC["SparkPlan.execute()"]
end
PROTO --> REQ
REQ --> SERVICE
SERVICE --> PLANNER
PLANNER --> CATALYST
CATALYST --> EXEC
EXEC --> RESP
RESP --> Client
Plan 消息有两种主要变体:
Relation:表示查询操作(SELECT、JOIN、FILTER 等),定义在relations.proto中,每种 relation 类型对应一个 DataFrame 操作。Command:表示 DDL/DML 操作(CREATE TABLE、INSERT、SET CONFIGURATION 等),定义在commands.proto中。
第三种变体 CompressedOperation 支持对大型 plan 进行 Zstandard 压缩——这是一个务实的优化,因为复杂的 DataFrame pipeline 序列化后可能产生出乎意料的大 Protobuf 消息。
服务端在 base.proto 中暴露了若干 RPC:ExecutePlan 用于执行查询,AnalyzePlan 用于 plan 分析(explain、schema 检查),Config 用于配置管理,AddArtifacts 用于上传 JAR 包和文件。
SparkConnectPlanner:从 Proto 到 Catalyst 的翻译层
这一翻译层由超过 4000 行的 SparkConnectPlanner 类承担:
Planner 将每种 Protobuf Relation 类型转换为对应的 Catalyst LogicalPlan 节点:proto.Filter 变成 Catalyst 的 Filter,proto.Join 变成 Catalyst 的 Join,依此类推。表达式的转换遵循同样的模式——Protobuf 表达式树被递归地转换为 Catalyst Expression 树。
一旦 Protobuf plan 被翻译成 Catalyst LogicalPlan,就会进入我们在第 3 篇中探讨过的同一流水线:分析 → 优化 → 物理规划 → 执行。从本质上看,Spark Connect 服务端就是一个加了 gRPC 前端的 Classic SparkSession。
提示: 调试 Spark Connect 问题时,请注意翻译层可能引入微妙的差异。可以使用带
EXPLAIN选项的AnalyzePlan,查看服务端根据客户端操作实际生成的 Catalyst plan。
Connect 服务端:Session 管理与服务实现
gRPC 服务由 SparkConnectService 实现:
class SparkConnectService(debug: Boolean) extends AsyncService with BindableService with Logging {
override def executePlan(
request: proto.ExecutePlanRequest,
responseObserver: StreamObserver[proto.ExecutePlanResponse]): Unit = {
try {
new SparkConnectExecutePlanHandler(responseObserver).handle(request)
} catch {
ErrorUtils.handleError("execute", observer = responseObserver, ...)
}
}
}
graph TB
subgraph Server["Spark Connect Server Process"]
GRPC["Netty gRPC Server"]
SVC["SparkConnectService"]
SESSIONS["SessionManager"]
S1["Session 1<br/>(User A)"]
S2["Session 2<br/>(User B)"]
S3["Session 3<br/>(User A, session 2)"]
end
GRPC --> SVC
SVC --> SESSIONS
SESSIONS --> S1
SESSIONS --> S2
SESSIONS --> S3
S1 --> SC["Shared SparkContext"]
S2 --> SC
S3 --> SC
服务端通过 SparkConnectSessionManager 支持多租户 session 管理。每个客户端连接通过 user_id 和 session_id 标识自身。Session 管理器维护一个 SparkSession 实例池(每个实例拥有独立的 SessionState,用于隔离配置和临时视图),所有 session 共享同一个 SparkContext 和集群连接。
服务端基于 Netty 的 gRPC 实现构建,支持配置最大入站消息大小、认证令牌和连接绑定地址。它既可以作为独立进程运行,也可以嵌入到现有的 Spark 应用中。
SparkSessionExtensions:注入自定义规则
对于需要定制 Catalyst 流水线但又不想 fork Spark 的用户,SparkSessionExtensions 提供了注入点:
sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala#L34-L72
扩展点覆盖流水线的每个阶段:
flowchart LR
PARSE["Parser<br/>injectParser"] --> ANALYZE["Analyzer<br/>injectResolutionRule"]
ANALYZE --> CHECK["Check Analysis<br/>injectCheckRule"]
CHECK --> OPT["Optimizer<br/>injectOptimizerRule"]
OPT --> PLAN["Planner<br/>injectPlannerStrategy"]
PLAN --> PREP["Columnar<br/>injectColumnar"]
| 扩展点 | 方法 | 适用场景 |
|---|---|---|
| Parser | injectParser |
自定义 SQL 语法、DSL 到 plan 的转换 |
| Analyzer Rules | injectResolutionRule |
自定义名称解析、虚拟表 |
| Check Rules | injectCheckRule |
自定义校验(如策略合规检查) |
| Optimizer Rules | injectOptimizerRule |
自定义改写(如索引感知优化) |
| Planner Strategies | injectPlannerStrategy |
自定义物理算子 |
| Columnar Rules | injectColumnar |
GPU/向量化执行适配器 |
| AQE Rules | injectRuntimeOptimizerRule |
运行时重优化定制 |
扩展既可以通过编程方式注册,也可以通过 spark.sql.extensions 配置项指定:
SparkSession.builder()
.config("spark.sql.extensions", "com.example.MyExtensions")
.getOrCreate()
SparkSessionExtensionsProvider trait(通过 ServiceLoader 加载)支持从 classpath 上的 JAR 包自动发现扩展,实现真正的即插即用。
可插拔子系统:集群管理器、Shuffle 与插件
可扩展性远不止于 SQL 流水线。Spark 核心引擎在每一层都提供了可插拔的子系统:
graph TB
subgraph "Extension Points"
ECM["ExternalClusterManager<br/>(YARN, K8s, Standalone)"]
SM["ShuffleManager<br/>(Sort, Custom)"]
PLUGIN["PluginContainer<br/>(Driver/Executor Plugins)"]
DSV2["DataSource V2<br/>(Custom Connectors)"]
end
subgraph "Core Engine"
SC[SparkContext]
TS[TaskScheduler]
SE[SparkEnv]
CAT[Catalog]
end
ECM --> TS
SM --> SE
PLUGIN --> SC
DSV2 --> CAT
ExternalClusterManager(core/src/main/scala/org/apache/spark/scheduler/ExternalClusterManager.scala#L25-L62)是集群管理器的 SPI 接口。正如第 2 篇所介绍的,它通过 ServiceLoader 发现,并为指定的 master URL 提供 TaskScheduler 和 SchedulerBackend。YARN 和 Kubernetes 支持正是以这种方式实现的——各自作为独立模块,注册自己的 ExternalClusterManager 实现。
ShuffleManager(core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala#L38)是可插拔的 shuffle 接口。默认的 SortShuffleManager 已达到生产级质量,但 Databricks、LinkedIn 等云厂商针对各自基础设施(远程 shuffle 服务、存算分离架构)构建了专属的 ShuffleManager 实现。
插件系统:PluginContainer(在第 2 篇介绍 SparkContext 初始化时提及)支持 driver 插件和 executor 插件。这些插件可以拦截指标、修改配置、注册自定义 RPC 端点或添加监控钩子,非常适合可观测性和资源管理方面的集成需求。
Data Source API V2:使用最广泛的扩展点。它允许你为任意数据源——数据库、消息队列、文件格式、REST API——编写自定义连接器,并支持谓词下推、列裁剪和并行读取。V2 数据源可以参与 Catalyst 优化,接收下推的过滤条件和投影信息。
提示: 选择扩展点时,关键是思考你需要介入执行的哪个阶段。如果需要改变查询的解释方式,注入 parser 或 analyzer 规则;如果需要针对特定存储系统优化,使用 Data Source V2 的下推能力;如果需要修改执行环境,使用插件系统。
全局回顾
在这五篇文章中,我们完整地走过了 Apache Spark 的每一个层次:
- 架构与模块地图 — monorepo 中约 40 个模块的三层组织结构
- 启动流程 — 从
spark-submit到 SparkContext 精心编排的初始化序列 - Catalyst 流水线 — SQL 文本经由 TreeNode 变换,最终生成优化后的逻辑计划
- 执行引擎 — RDD、stage、task、shuffle 与自适应重优化
- Connect 与可扩展性 — gRPC 客户端-服务端分离与插件架构
贯穿始终的核心洞察是:Spark 建立在一系列清晰的抽象之上——用于表示 plan 的 TreeNode、用于分布式数据的 RDD、用于变换的 Rule、用于数据交换的 ShuffleManager——每一层都定义了明确的扩展点。正是这种分层、可插拔的架构,使 Spark 从一个科研项目演进为现代数据栈的核心基础设施,并被数以千计的组织根据自身的独特基础设施和业务负载加以定制。
无论你是通过审视 Catalyst 的优化过程来排查慢查询,还是贡献一条新的优化器规则,抑或是用新语言构建 Spark Connect 客户端,或者实现一个自定义 shuffle 服务——你现在已经拥有了在 Spark 代码库中自信导航的全局视图。从入口点出发,沿着执行路径追踪,让抽象层次指引你前行。