Read OSS

Spark Connect and the Extensibility Architecture

Advanced

Prerequisites

  • Articles 1-4 (full understanding of Spark architecture and execution)
  • Basic familiarity with gRPC and Protocol Buffers
  • Understanding of API abstraction and client-server patterns

Spark Connect and the Extensibility Architecture

Throughout this series, we've explored Spark's internals from monorepo structure through query optimization to distributed execution. But there's a recurring theme we've only touched on: Spark is designed to be extended at every layer. Whether you're a cloud vendor running a managed Spark service, a data platform team injecting custom optimizer rules, or a language community building a Go or Rust client — Spark has extension points for you.

This final article covers two related topics: the Spark Connect architecture that decouples clients from the server via gRPC, and the broader extensibility patterns that make Spark one of the most customizable data engines in existence.

The Classic vs Connect Architecture Split

As we introduced in Part 1, Spark 4.x defines SparkSession as an abstract class in sql/api with two implementations:

sql/api/src/main/scala/org/apache/spark/sql/SparkSession.scala#L63

classDiagram
    class SparkSession {
        <<abstract, sql/api>>
        +sql(query): DataFrame
        +read: DataFrameReader
        +createDataFrame(): DataFrame
        +version: String
    }

    class ClassicSparkSession {
        <<sql/core/classic/>>
        +sparkContext: SparkContext
        +sessionState: SessionState
        +sharedState: SharedState
    }

    class ConnectClient {
        <<sql/connect/client/>>
        -channel: ManagedChannel
        -stub: SparkConnectServiceStub
    }

    SparkSession <|-- ClassicSparkSession : in-process JVM
    SparkSession <|-- ConnectClient : gRPC client

    class SparkConnectServer {
        <<sql/connect/server/>>
        +SparkConnectService
        +SparkConnectPlanner
    }

    ConnectClient ..> SparkConnectServer : gRPC/Protobuf
    SparkConnectServer ..> ClassicSparkSession : uses internally

Classic mode (sql/core/src/main/scala/org/apache/spark/sql/classic/SparkSession.scala#L92-L99) embeds the entire Spark runtime in the user's JVM process. The SparkSession holds a SparkContext, a SessionState with the parser/analyzer/optimizer/planner, and all the scheduling infrastructure. This is the traditional model and gives maximum flexibility — but couples your application tightly to a specific Spark version.

Connect mode runs your application as a thin gRPC client. The DataFrame API calls are serialized into Protobuf messages and sent to a remote Spark Connect server, which deserializes them, runs the full Catalyst pipeline, and streams results back. The client never instantiates a SparkContext or loads Spark's JARs.

The design rationale is powerful: with Connect, you get version independence (upgrade the server without recompiling clients), language independence (Python, Go, Rust clients that don't need the JVM), and resource isolation (the client process doesn't consume Spark's memory).

Spark Connect Protocol: Protobuf Plans over gRPC

The wire protocol is defined in Protobuf files under sql/connect/common/src/main/protobuf/:

sql/connect/common/src/main/protobuf/spark/connect/base.proto#L35-L58

message Plan {
  oneof op_type {
    Relation root = 1;
    Command command = 2;
    CompressedOperation compressed_operation = 3;
  }
}
graph TB
    subgraph Client["Client (any language)"]
        DF["df.filter('age > 25').select('name')"]
        PROTO["Protobuf Plan message"]
        DF --> PROTO
    end

    subgraph Wire["gRPC Channel"]
        REQ["ExecutePlanRequest"]
        RESP["ExecutePlanResponse\n(Arrow batches)"]
    end

    subgraph Server["Spark Connect Server"]
        SERVICE["SparkConnectService"]
        PLANNER["SparkConnectPlanner"]
        CATALYST["Catalyst Pipeline"]
        EXEC["SparkPlan.execute()"]
    end

    PROTO --> REQ
    REQ --> SERVICE
    SERVICE --> PLANNER
    PLANNER --> CATALYST
    CATALYST --> EXEC
    EXEC --> RESP
    RESP --> Client

The Plan message has two main variants:

  • Relation: Represents a query (SELECT, JOIN, FILTER, etc.). Defined in relations.proto, each relation type corresponds to a DataFrame operation.
  • Command: Represents DDL/DML operations (CREATE TABLE, INSERT, SET CONFIGURATION, etc.). Defined in commands.proto.

A third variant, CompressedOperation, supports Zstandard compression for large plans — a pragmatic optimization since complex DataFrame pipelines can produce surprisingly large Protobuf messages.

The server exposes several RPCs defined in base.proto: ExecutePlan for running queries, AnalyzePlan for plan analysis (explain, schema inspection), Config for configuration management, and AddArtifacts for uploading JARs and files.

SparkConnectPlanner: Proto to Catalyst Translation

The translation layer is the 4000+ line SparkConnectPlanner class:

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala#L18-L80

The planner converts every Protobuf Relation type into the corresponding Catalyst LogicalPlan node. A proto.Filter becomes a Catalyst Filter, a proto.Join becomes a Catalyst Join, and so on. Expressions follow the same pattern — protobuf expression trees are recursively converted to Catalyst Expression trees.

Once the Protobuf plan is translated to a Catalyst LogicalPlan, it enters the same pipeline we explored in Part 3: analysis → optimization → physical planning → execution. The Spark Connect server is essentially a Classic SparkSession with a gRPC frontend.

Tip: When debugging Spark Connect issues, remember that the translation layer can introduce subtle differences. Use AnalyzePlan with the EXPLAIN option to see the actual Catalyst plan the server produces from your client's operations.

Connect Server: Session Management and Service

The gRPC service is implemented by SparkConnectService:

sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala#L59-L80

class SparkConnectService(debug: Boolean) extends AsyncService with BindableService with Logging {
  override def executePlan(
      request: proto.ExecutePlanRequest,
      responseObserver: StreamObserver[proto.ExecutePlanResponse]): Unit = {
    try {
      new SparkConnectExecutePlanHandler(responseObserver).handle(request)
    } catch {
      ErrorUtils.handleError("execute", observer = responseObserver, ...)
    }
  }
}
graph TB
    subgraph Server["Spark Connect Server Process"]
        GRPC["Netty gRPC Server"]
        SVC["SparkConnectService"]
        SESSIONS["SessionManager"]
        S1["Session 1<br/>(User A)"]
        S2["Session 2<br/>(User B)"]
        S3["Session 3<br/>(User A, session 2)"]
    end

    GRPC --> SVC
    SVC --> SESSIONS
    SESSIONS --> S1
    SESSIONS --> S2
    SESSIONS --> S3

    S1 --> SC["Shared SparkContext"]
    S2 --> SC
    S3 --> SC

The server supports multi-tenant session management through SparkConnectSessionManager. Each client connection identifies itself with a user_id and session_id. The session manager maintains a pool of SparkSession instances (each with its own SessionState for isolated configuration and temporary views), all sharing a single SparkContext and cluster connection.

The server is built on Netty's gRPC implementation, with configurable max inbound message size, authentication tokens, and connection binding address. It can run as a standalone process or embedded within an existing Spark application.

SparkSessionExtensions: Injecting Custom Rules

For users who need to customize the Catalyst pipeline without forking Spark, SparkSessionExtensions provides injection points:

sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala#L34-L72

The extension points cover every phase of the pipeline:

flowchart LR
    PARSE["Parser<br/>injectParser"] --> ANALYZE["Analyzer<br/>injectResolutionRule"]
    ANALYZE --> CHECK["Check Analysis<br/>injectCheckRule"]
    CHECK --> OPT["Optimizer<br/>injectOptimizerRule"]
    OPT --> PLAN["Planner<br/>injectPlannerStrategy"]
    PLAN --> PREP["Columnar<br/>injectColumnar"]
Extension Point Method Use Case
Parser injectParser Custom SQL syntax, DSL-to-plan translation
Analyzer Rules injectResolutionRule Custom name resolution, virtual tables
Check Rules injectCheckRule Custom validation (e.g., policy enforcement)
Optimizer Rules injectOptimizerRule Custom rewrites (e.g., index-aware optimization)
Planner Strategies injectPlannerStrategy Custom physical operators
Columnar Rules injectColumnar GPU/vectorized execution adapters
AQE Rules injectRuntimeOptimizerRule Runtime re-optimization customization

Extensions are registered either programmatically or via the spark.sql.extensions configuration property:

SparkSession.builder()
  .config("spark.sql.extensions", "com.example.MyExtensions")
  .getOrCreate()

The SparkSessionExtensionsProvider trait (loaded via ServiceLoader) allows extensions to be auto-discovered from JARs on the classpath, making them completely plug-and-play.

Pluggable Subsystems: Cluster Managers, Shuffle, Plugins

Extensibility goes far beyond the SQL pipeline. Spark's core engine has pluggable subsystems at every layer:

graph TB
    subgraph "Extension Points"
        ECM["ExternalClusterManager<br/>(YARN, K8s, Standalone)"]
        SM["ShuffleManager<br/>(Sort, Custom)"]
        PLUGIN["PluginContainer<br/>(Driver/Executor Plugins)"]
        DSV2["DataSource V2<br/>(Custom Connectors)"]
    end

    subgraph "Core Engine"
        SC[SparkContext]
        TS[TaskScheduler]
        SE[SparkEnv]
        CAT[Catalog]
    end

    ECM --> TS
    SM --> SE
    PLUGIN --> SC
    DSV2 --> CAT

ExternalClusterManager (core/src/main/scala/org/apache/spark/scheduler/ExternalClusterManager.scala#L25-L62) is the SPI for cluster managers. As we saw in Part 2, it's discovered via ServiceLoader and provides a TaskScheduler and SchedulerBackend for the given master URL. This is how YARN and Kubernetes support is implemented — as separate modules that register their ExternalClusterManager implementation.

ShuffleManager (core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala#L38) is the pluggable shuffle interface. The default SortShuffleManager is production-quality, but cloud vendors like Databricks, LinkedIn, and others have built custom shuffle managers optimized for their infrastructure (remote shuffle services, disaggregated storage).

Plugin System: The PluginContainer (referenced during SparkContext initialization in Part 2) supports driver plugins and executor plugins. These can intercept metrics, modify configurations, register custom RPC endpoints, or add monitoring hooks — useful for observability and resource management integrations.

Data Source API V2: The most widely used extension point. It lets you write custom connectors for any data source — databases, message queues, file formats, REST APIs — with support for predicate pushdown, column pruning, and parallel reads. V2 sources participate in Catalyst optimization, receiving pushed-down filters and projections.

Tip: When deciding which extension point to use, think about which phase of execution you need to intercept. If you need to change how queries are interpreted, inject parser or analyzer rules. If you need to optimize for a specific storage system, use Data Source V2's pushdown capabilities. If you need to modify the execution environment, use the Plugin system.

Putting It All Together

Over these five articles, we've traced the complete path through Apache Spark:

  1. Architecture and Module Map — the monorepo's ~40 modules organized in three layers
  2. Boot Sequence — from spark-submit through SparkContext's carefully ordered initialization
  3. Catalyst Pipeline — SQL text through TreeNode transforms to optimized logical plans
  4. Execution Engine — RDDs, stages, tasks, shuffles, and adaptive re-optimization
  5. Connect and Extensibility — the gRPC client-server split and the plug-in architecture

The unifying insight is that Spark is built as a series of clean abstractions — TreeNode for plans, RDD for distributed data, Rule for transformations, ShuffleManager for data exchange — each with well-defined extension points. This layered, pluggable architecture is what has allowed Spark to evolve from a research project into the backbone of the modern data stack, adapted by thousands of organizations to their unique infrastructure and workloads.

Whether you're debugging a slow query by inspecting Catalyst's optimization passes, contributing a new optimizer rule, building a Spark Connect client in a new language, or implementing a custom shuffle service, you now have the map to navigate Spark's codebase with confidence. Start from the entry points, trace the execution path, and let the abstractions guide you.