The Catalyst Query Pipeline: From SQL Text to Optimized Plan
Prerequisites
- ›Articles 1-2 (Architecture and Boot Sequence)
- ›Understanding of tree data structures and recursive transformations
- ›Familiarity with Scala pattern matching and partial functions
- ›Basic SQL query processing concepts (parsing, logical/physical plans)
The Catalyst Query Pipeline: From SQL Text to Optimized Plan
Catalyst is Spark SQL's crown jewel. Named after the chemical process that accelerates reactions without being consumed, the Catalyst optimizer transforms raw SQL or DataFrame operations into efficient distributed execution plans. It's the reason Spark SQL can take a naive query and rewrite it into something that runs orders of magnitude faster.
At its core, Catalyst is built on two foundational abstractions: immutable trees (TreeNode) and rule-based transformations (RuleExecutor). Everything else — parsing, analysis, optimization, physical planning — is composed from these primitives. Understanding Catalyst means understanding these building blocks.
The TreeNode Abstraction
Every plan in Spark — logical or physical — is a tree of nodes. The TreeNode abstract class provides the machinery for manipulating these trees immutably:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala#L70-L75
abstract class TreeNode[BaseType <: TreeNode[BaseType]]
extends Product
with TreePatternBits
with WithOrigin {
self: BaseType =>
classDiagram
class TreeNode~BaseType~ {
<<abstract>>
+children: Seq[BaseType]
+transformDown(rule): BaseType
+transformUp(rule): BaseType
+transformDownWithPruning(cond, rule): BaseType
+withNewChildren(newChildren): BaseType
+fastEquals(other): Boolean
+find(f): Option
+foreach(f): Unit
+map(f): Seq
}
class LogicalPlan {
+output: Seq[Attribute]
+resolved: Boolean
}
class SparkPlan {
+execute(): RDD[InternalRow]
+doExecute(): RDD[InternalRow]
}
class Expression {
+dataType: DataType
+eval(input): Any
}
TreeNode <|-- LogicalPlan
TreeNode <|-- SparkPlan
TreeNode <|-- Expression
TreeNode's key methods are the transform family. They apply a PartialFunction to each node and return a new tree with matching nodes replaced — copy-on-write semantics:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala#L470-L488
transformDown applies the rule pre-order (parent before children), while transformUp applies it post-order. The choice matters: bottom-up transforms are useful when a rule needs to see already-transformed children.
The TreePatternBits mixin is a performance optimization. Each tree node maintains a bitset indicating which TreePattern enum values exist in its subtree. Rules declare which patterns they care about via transformDownWithPruning, and entire subtrees can be skipped if they don't contain the relevant pattern. With 50+ optimizer rules each traversing the entire tree, this pruning is critical for performance.
Tip: When adding a custom optimizer rule, always use
transformDownWithPruningwith the appropriateTreePatterninstead of baretransformDown. Skipping irrelevant subtrees can reduce optimization time by 10-50x on complex query plans.
RuleExecutor: Batches and Fixed-Point Iteration
Individual rules are useful, but they need orchestration. The RuleExecutor framework organizes rules into named Batches, each with an execution strategy:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala#L125-L165
abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
case object Once extends Strategy { val maxIterations = 1 }
case class FixedPoint(override val maxIterations: Int, ...) extends Strategy
protected[catalyst] case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)
protected def batches: Seq[Batch]
}
flowchart TD
START[Input Plan] --> B1
subgraph B1["Batch: Resolution (FixedPoint 100)"]
R1[Rule 1] --> R2[Rule 2] --> R3[Rule N]
R3 -->|"Plan changed?"| CHECK1{Converged?}
CHECK1 -->|No| R1
CHECK1 -->|Yes| B1OUT[Batch Output]
end
B1OUT --> B2
subgraph B2["Batch: Optimization (Once)"]
R4[Rule A] --> R5[Rule B]
end
B2 --> VALIDATE[Validate Plan Changes]
VALIDATE --> END[Output Plan]
Two strategies are available:
Once: The batch runs exactly once. Used for rules that are naturally idempotent or that shouldn't repeat (like subquery optimization).FixedPoint(n): The batch repeats until the plan stops changing (convergence) orniterations are reached. Used for resolution and optimization where rules can enable each other.
After each Once batch, the executor optionally runs an idempotence check — applying the batch again and verifying the result is unchanged. This catches bugs where a rule isn't truly idempotent.
Parsing: SQL Text to Unresolved Plan
The first stage of the pipeline converts SQL text into an unresolved logical plan. Spark uses ANTLR4 to generate a parser from a grammar definition, then the AstBuilder visits the parse tree to construct Catalyst nodes.
The AstBuilder is one of the largest files in the codebase — over 7,000 lines — because it must handle every SQL construct Spark supports. Each visit method creates unresolved plan nodes: UnresolvedRelation for table names, UnresolvedAttribute for column references, UnresolvedFunction for function calls.
flowchart LR
SQL["SELECT t.name<br/>FROM users t<br/>WHERE t.age > 25"] --> ANTLR[ANTLR4 Parser]
ANTLR --> PT[Parse Tree]
PT --> AST[AstBuilder Visit]
AST --> LP[Unresolved LogicalPlan]
LP --> PROJ["Project\n[UnresolvedAttribute('t.name')]"]
PROJ --> FILT["Filter\n[UnresolvedAttribute('t.age') > 25]"]
FILT --> REL["UnresolvedRelation\n['users'] AS t"]
The key insight is that parsing produces a plan full of "unresolved" references — names that haven't been matched to actual tables or columns yet. That's the job of the next phase.
Analysis: Resolving Names, Types, and Functions
The Analyzer is a RuleExecutor[LogicalPlan] with approximately 50+ rules organized in batches:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L291-L295
class Analyzer(
override val catalogManager: CatalogManager,
private[sql] val sharedRelationCache: RelationCache = RelationCache.empty)
extends RuleExecutor[LogicalPlan]
with CheckAnalysis with AliasHelper with SQLConfHelper with ColumnResolutionHelper
Analysis runs with FixedPoint strategy because resolving one reference can enable resolving another. The process iterates until every node in the plan reports resolved = true. Typical resolution steps include:
- Table resolution:
UnresolvedRelation("users")→ look up the table in the catalog →LogicalRelation(parquetFiles, schema) - Attribute resolution:
UnresolvedAttribute("t.age")→ match against the schema of the resolved relation →AttributeReference("age", IntegerType) - Type coercion:
IntegerType > StringType("25")→ insert implicit cast →IntegerType > Cast(StringType("25"), IntegerType) - Function resolution:
UnresolvedFunction("count")→ look up in FunctionRegistry →AggregateExpression(Count(...))
The Analyzer also delegates to a newer HybridAnalyzer system that can handle single-pass resolution for better performance, falling back to the legacy iterative approach when needed.
Optimization: 50+ Rules for Query Improvement
Once the plan is fully resolved, the Optimizer rewrites it for better performance. The Optimizer extends RuleExecutor[LogicalPlan] and defines defaultBatches containing over 50 rules:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L100-L150
The rules are grouped logically:
| Category | Example Rules | What They Do |
|---|---|---|
| Operator Push Down | PushDownPredicates, ColumnPruning |
Move filters closer to data sources, drop unused columns |
| Operator Combine | CollapseProject, CombineUnions, CollapseRepartition |
Merge adjacent operators |
| Constant Folding | ConstantFolding, ConstantPropagation, NullPropagation |
Pre-compute constant expressions |
| Boolean Simplification | BooleanSimplification, SimplifyConditionals |
Simplify boolean logic |
| Join Optimization | ReorderJoin, EliminateOuterJoin |
Reorder and simplify joins |
| Subquery | RewriteCorrelatedScalarSubquery |
Rewrite subqueries for efficiency |
flowchart TD
INPUT["Analyzed Plan"] --> B1["Batch: Operator Push Down"]
B1 --> B2["Batch: Constant Folding"]
B2 --> B3["Batch: Join Reorder"]
B3 --> B4["Batch: Subquery Optimization"]
B4 --> B5["Batch: Column Pruning"]
B5 --> CHECK{Converged?}
CHECK -->|No, iterate| B1
CHECK -->|Yes| OUTPUT["Optimized Plan"]
The optimizer validates every plan change — after each rule, it checks that the plan remains resolved and that expression IDs remain unique. This catches optimizer bugs early, which is invaluable given the number of interacting rules.
QueryExecution: The Pipeline Orchestrator
QueryExecution is the class that chains all phases together. It uses LazyTry (Spark's enhanced lazy evaluation) to form a pipeline where each phase is computed on demand:
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala#L65-L72
sequenceDiagram
participant QE as QueryExecution
participant Parser
participant Analyzer
participant Optimizer
participant Planner
QE->>QE: logical (input plan)
QE->>Analyzer: lazyAnalyzed
Analyzer-->>QE: analyzed
QE->>QE: commandExecuted (eager command handling)
QE->>QE: withCachedData (cache lookup)
QE->>Optimizer: lazyOptimizedPlan
Optimizer-->>QE: optimizedPlan
QE->>Planner: lazySparkPlan
Planner-->>QE: sparkPlan
QE->>QE: lazyExecutedPlan (prep rules + AQE)
QE-->>QE: executedPlan
The pipeline stages exposed by QueryExecution are:
logical→ the input LogicalPlan (from parsing or DataFrame API)analyzed→ resolved by the Analyzer (line 155)optimizedPlan→ transformed by the Optimizer (line 259)sparkPlan→ converted to physical plan bySparkPlanner(line 274)executedPlan→ final plan with preparation rules applied and AQE wrapping
Each stage is lazy — analyzed isn't computed until someone accesses it. This means you can call df.explain() without executing the query. The LazyTry wrapper captures exceptions, allowing them to be surfaced later with full context.
Tip: Use
df.queryExecution.analyzed.treeString(or.optimizedPlan,.sparkPlan) to inspect intermediate plans. This is the most powerful debugging tool for understanding why Spark generates a particular execution plan.
Physical Planning: LogicalPlan to SparkPlan
The final transformation converts a logical plan into a physical plan — a tree of SparkPlan nodes that know how to produce RDD[InternalRow]:
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L60-L65
abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializable {
// ...
final def execute(): RDD[InternalRow] = executeQuery { executeRDD.get }
protected def doExecute(): RDD[InternalRow]
}
The planner uses SparkStrategies — a collection of Strategy implementations that pattern-match on logical operators and produce corresponding physical operators. For example, a logical Filter becomes a FilterExec, a logical Join might become SortMergeJoinExec, BroadcastHashJoinExec, or ShuffledHashJoinExec depending on join size estimates and configuration.
The physical plan's execute() method is the bridge between the SQL world and the core engine. When called, it invokes doExecute() which returns an RDD[InternalRow] — and from that point, the core scheduling stack takes over.
What's Next
We've followed a query from SQL text through Catalyst's analysis and optimization phases to a physical plan. But what happens when that plan actually executes? In the next article, we'll trace how SparkPlan.execute() produces RDDs, how the DAGScheduler breaks those RDDs into stages, how the shuffle system exchanges data between stages, and how Adaptive Query Execution re-optimizes at runtime.