Read OSS

Spark Connect 与可扩展性架构

高级

前置知识

  • 第 1–4 篇(充分理解 Spark 架构与执行机制)
  • 对 gRPC 和 Protocol Buffers 有基本了解
  • 理解 API 抽象与客户端-服务端模式

Spark Connect 与可扩展性架构

在本系列中,我们从 monorepo 结构出发,依次探索了查询优化和分布式执行等 Spark 核心机制。贯穿始终有一个反复出现的主题,却尚未深入展开:Spark 在每一层都为扩展预留了空间。无论你是运营托管 Spark 服务的云厂商、需要注入自定义优化器规则的数据平台团队,还是正在构建 Go 或 Rust 客户端的语言社区——Spark 都为你提供了对应的扩展点。

本文是系列的最后一篇,聚焦两个紧密相关的主题:通过 gRPC 将客户端与服务端解耦的 Spark Connect 架构,以及使 Spark 成为目前可定制性最强的数据引擎之一的整体扩展性设计。

Classic 模式与 Connect 模式的架构分野

正如第 1 篇所介绍的,Spark 4.x 将 SparkSession 定义为 sql/api 中的抽象类,并提供两种实现:

sql/api/src/main/scala/org/apache/spark/sql/SparkSession.scala#L63

classDiagram
    class SparkSession {
        <<abstract, sql/api>>
        +sql(query): DataFrame
        +read: DataFrameReader
        +createDataFrame(): DataFrame
        +version: String
    }

    class ClassicSparkSession {
        <<sql/core/classic/>>
        +sparkContext: SparkContext
        +sessionState: SessionState
        +sharedState: SharedState
    }

    class ConnectClient {
        <<sql/connect/client/>>
        -channel: ManagedChannel
        -stub: SparkConnectServiceStub
    }

    SparkSession <|-- ClassicSparkSession : in-process JVM
    SparkSession <|-- ConnectClient : gRPC client

    class SparkConnectServer {
        <<sql/connect/server/>>
        +SparkConnectService
        +SparkConnectPlanner
    }

    ConnectClient ..> SparkConnectServer : gRPC/Protobuf
    SparkConnectServer ..> ClassicSparkSession : uses internally

Classic 模式sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala#L92-L99)将完整的 Spark 运行时嵌入到用户的 JVM 进程中。SparkSession 持有一个 SparkContext,以及包含解析器、分析器、优化器和规划器的 SessionState,还有全套调度基础设施。这是传统模型,灵活性最高,但也将应用程序与特定 Spark 版本紧密绑定。

Connect 模式则让你的应用以轻量级 gRPC 客户端的形式运行。DataFrame API 调用被序列化为 Protobuf 消息,发送给远程的 Spark Connect 服务端,后者将其反序列化,走完完整的 Catalyst 流水线,再将结果以流式方式返回。客户端既不会实例化 SparkContext,也不会加载 Spark 的 JAR 包。

这一设计的价值显而易见:Connect 模式带来了版本独立性(升级服务端无需重新编译客户端)、语言独立性(Python、Go、Rust 等客户端无需依赖 JVM),以及资源隔离(客户端进程不占用 Spark 的内存)。

Spark Connect 协议:gRPC 上的 Protobuf Plan

通信协议通过 sql/connect/common/src/main/protobuf/ 目录下的 Protobuf 文件定义:

sql/connect/common/src/main/protobuf/spark/connect/base.proto#L35-L58

message Plan {
  oneof op_type {
    Relation root = 1;
    Command command = 2;
    CompressedOperation compressed_operation = 3;
  }
}
graph TB
    subgraph Client["Client (any language)"]
        DF["df.filter('age > 25').select('name')"]
        PROTO["Protobuf Plan message"]
        DF --> PROTO
    end

    subgraph Wire["gRPC Channel"]
        REQ["ExecutePlanRequest"]
        RESP["ExecutePlanResponse\n(Arrow batches)"]
    end

    subgraph Server["Spark Connect Server"]
        SERVICE["SparkConnectService"]
        PLANNER["SparkConnectPlanner"]
        CATALYST["Catalyst Pipeline"]
        EXEC["SparkPlan.execute()"]
    end

    PROTO --> REQ
    REQ --> SERVICE
    SERVICE --> PLANNER
    PLANNER --> CATALYST
    CATALYST --> EXEC
    EXEC --> RESP
    RESP --> Client

Plan 消息有两种主要变体:

  • Relation:表示查询操作(SELECT、JOIN、FILTER 等),定义在 relations.proto 中,每种 relation 类型对应一个 DataFrame 操作。
  • Command:表示 DDL/DML 操作(CREATE TABLE、INSERT、SET CONFIGURATION 等),定义在 commands.proto 中。

第三种变体 CompressedOperation 支持对大型 plan 进行 Zstandard 压缩——这是一个务实的优化,因为复杂的 DataFrame pipeline 序列化后可能产生出乎意料的大 Protobuf 消息。

服务端在 base.proto 中暴露了若干 RPC:ExecutePlan 用于执行查询,AnalyzePlan 用于 plan 分析(explain、schema 检查),Config 用于配置管理,AddArtifacts 用于上传 JAR 包和文件。

SparkConnectPlanner:从 Proto 到 Catalyst 的翻译层

这一翻译层由超过 4000 行的 SparkConnectPlanner 类承担:

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala#L18-L80

Planner 将每种 Protobuf Relation 类型转换为对应的 Catalyst LogicalPlan 节点:proto.Filter 变成 Catalyst 的 Filterproto.Join 变成 Catalyst 的 Join,依此类推。表达式的转换遵循同样的模式——Protobuf 表达式树被递归地转换为 Catalyst Expression 树。

一旦 Protobuf plan 被翻译成 Catalyst LogicalPlan,就会进入我们在第 3 篇中探讨过的同一流水线:分析 → 优化 → 物理规划 → 执行。从本质上看,Spark Connect 服务端就是一个加了 gRPC 前端的 Classic SparkSession。

提示: 调试 Spark Connect 问题时,请注意翻译层可能引入微妙的差异。可以使用带 EXPLAIN 选项的 AnalyzePlan,查看服务端根据客户端操作实际生成的 Catalyst plan。

Connect 服务端:Session 管理与服务实现

gRPC 服务由 SparkConnectService 实现:

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala#L59-L80

class SparkConnectService(debug: Boolean) extends AsyncService with BindableService with Logging {
  override def executePlan(
      request: proto.ExecutePlanRequest,
      responseObserver: StreamObserver[proto.ExecutePlanResponse]): Unit = {
    try {
      new SparkConnectExecutePlanHandler(responseObserver).handle(request)
    } catch {
      ErrorUtils.handleError("execute", observer = responseObserver, ...)
    }
  }
}
graph TB
    subgraph Server["Spark Connect Server Process"]
        GRPC["Netty gRPC Server"]
        SVC["SparkConnectService"]
        SESSIONS["SessionManager"]
        S1["Session 1<br/>(User A)"]
        S2["Session 2<br/>(User B)"]
        S3["Session 3<br/>(User A, session 2)"]
    end

    GRPC --> SVC
    SVC --> SESSIONS
    SESSIONS --> S1
    SESSIONS --> S2
    SESSIONS --> S3

    S1 --> SC["Shared SparkContext"]
    S2 --> SC
    S3 --> SC

服务端通过 SparkConnectSessionManager 支持多租户 session 管理。每个客户端连接通过 user_idsession_id 标识自身。Session 管理器维护一个 SparkSession 实例池(每个实例拥有独立的 SessionState,用于隔离配置和临时视图),所有 session 共享同一个 SparkContext 和集群连接。

服务端基于 Netty 的 gRPC 实现构建,支持配置最大入站消息大小、认证令牌和连接绑定地址。它既可以作为独立进程运行,也可以嵌入到现有的 Spark 应用中。

SparkSessionExtensions:注入自定义规则

对于需要定制 Catalyst 流水线但又不想 fork Spark 的用户,SparkSessionExtensions 提供了注入点:

sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala#L34-L72

扩展点覆盖流水线的每个阶段:

flowchart LR
    PARSE["Parser<br/>injectParser"] --> ANALYZE["Analyzer<br/>injectResolutionRule"]
    ANALYZE --> CHECK["Check Analysis<br/>injectCheckRule"]
    CHECK --> OPT["Optimizer<br/>injectOptimizerRule"]
    OPT --> PLAN["Planner<br/>injectPlannerStrategy"]
    PLAN --> PREP["Columnar<br/>injectColumnar"]
扩展点 方法 适用场景
Parser injectParser 自定义 SQL 语法、DSL 到 plan 的转换
Analyzer Rules injectResolutionRule 自定义名称解析、虚拟表
Check Rules injectCheckRule 自定义校验(如策略合规检查)
Optimizer Rules injectOptimizerRule 自定义改写(如索引感知优化)
Planner Strategies injectPlannerStrategy 自定义物理算子
Columnar Rules injectColumnar GPU/向量化执行适配器
AQE Rules injectRuntimeOptimizerRule 运行时重优化定制

扩展既可以通过编程方式注册,也可以通过 spark.sql.extensions 配置项指定:

SparkSession.builder()
  .config("spark.sql.extensions", "com.example.MyExtensions")
  .getOrCreate()

SparkSessionExtensionsProvider trait(通过 ServiceLoader 加载)支持从 classpath 上的 JAR 包自动发现扩展,实现真正的即插即用。

可插拔子系统:集群管理器、Shuffle 与插件

可扩展性远不止于 SQL 流水线。Spark 核心引擎在每一层都提供了可插拔的子系统:

graph TB
    subgraph "Extension Points"
        ECM["ExternalClusterManager<br/>(YARN, K8s, Standalone)"]
        SM["ShuffleManager<br/>(Sort, Custom)"]
        PLUGIN["PluginContainer<br/>(Driver/Executor Plugins)"]
        DSV2["DataSource V2<br/>(Custom Connectors)"]
    end

    subgraph "Core Engine"
        SC[SparkContext]
        TS[TaskScheduler]
        SE[SparkEnv]
        CAT[Catalog]
    end

    ECM --> TS
    SM --> SE
    PLUGIN --> SC
    DSV2 --> CAT

ExternalClusterManagercore/src/main/scala/org/apache/spark/scheduler/ExternalClusterManager.scala#L25-L62)是集群管理器的 SPI 接口。正如第 2 篇所介绍的,它通过 ServiceLoader 发现,并为指定的 master URL 提供 TaskSchedulerSchedulerBackend。YARN 和 Kubernetes 支持正是以这种方式实现的——各自作为独立模块,注册自己的 ExternalClusterManager 实现。

ShuffleManagercore/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala#L38)是可插拔的 shuffle 接口。默认的 SortShuffleManager 已达到生产级质量,但 Databricks、LinkedIn 等云厂商针对各自基础设施(远程 shuffle 服务、存算分离架构)构建了专属的 ShuffleManager 实现。

插件系统PluginContainer(在第 2 篇介绍 SparkContext 初始化时提及)支持 driver 插件和 executor 插件。这些插件可以拦截指标、修改配置、注册自定义 RPC 端点或添加监控钩子,非常适合可观测性和资源管理方面的集成需求。

Data Source API V2:使用最广泛的扩展点。它允许你为任意数据源——数据库、消息队列、文件格式、REST API——编写自定义连接器,并支持谓词下推、列裁剪和并行读取。V2 数据源可以参与 Catalyst 优化,接收下推的过滤条件和投影信息。

提示: 选择扩展点时,关键是思考你需要介入执行的哪个阶段。如果需要改变查询的解释方式,注入 parser 或 analyzer 规则;如果需要针对特定存储系统优化,使用 Data Source V2 的下推能力;如果需要修改执行环境,使用插件系统。

全局回顾

在这五篇文章中,我们完整地走过了 Apache Spark 的每一个层次:

  1. 架构与模块地图 — monorepo 中约 40 个模块的三层组织结构
  2. 启动流程 — 从 spark-submit 到 SparkContext 精心编排的初始化序列
  3. Catalyst 流水线 — SQL 文本经由 TreeNode 变换,最终生成优化后的逻辑计划
  4. 执行引擎 — RDD、stage、task、shuffle 与自适应重优化
  5. Connect 与可扩展性 — gRPC 客户端-服务端分离与插件架构

贯穿始终的核心洞察是:Spark 建立在一系列清晰的抽象之上——用于表示 plan 的 TreeNode、用于分布式数据的 RDD、用于变换的 Rule、用于数据交换的 ShuffleManager——每一层都定义了明确的扩展点。正是这种分层、可插拔的架构,使 Spark 从一个科研项目演进为现代数据栈的核心基础设施,并被数以千计的组织根据自身的独特基础设施和业务负载加以定制。

无论你是通过审视 Catalyst 的优化过程来排查慢查询,还是贡献一条新的优化器规则,抑或是用新语言构建 Spark Connect 客户端,或者实现一个自定义 shuffle 服务——你现在已经拥有了在 Spark 代码库中自信导航的全局视图。从入口点出发,沿着执行路径追踪,让抽象层次指引你前行。