Catalyst 查询管道:从 SQL 文本到优化执行计划
前置知识
- ›第 1-2 篇文章(架构与启动流程)
- ›了解树形数据结构与递归变换
- ›熟悉 Scala 模式匹配与偏函数
- ›具备基本的 SQL 查询处理概念(解析、逻辑计划/物理计划)
Catalyst 查询管道:从 SQL 文本到优化执行计划
Catalyst 是 Spark SQL 的核心所在。它的名字取自化学中的"催化剂"——加速反应本身却不被消耗——Catalyst 优化器将原始 SQL 或 DataFrame 操作转化为高效的分布式执行计划。正因为有了它,Spark SQL 才能把一条朴素的查询重写为执行效率高出数个数量级的版本。
Catalyst 的基础由两个核心抽象构成:不可变树(TreeNode)与基于规则的变换(RuleExecutor)。解析、分析、优化、物理规划——一切都建立在这两个原语之上。理解 Catalyst,就从理解这两块基石开始。
TreeNode 抽象
Spark 中的所有执行计划——无论是逻辑计划还是物理计划——都是由节点构成的树。TreeNode 抽象类提供了以不可变方式操作这些树所需的全部机制:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala#L70-L75
abstract class TreeNode[BaseType <: TreeNode[BaseType]]
extends Product
with TreePatternBits
with WithOrigin {
self: BaseType =>
classDiagram
class TreeNode~BaseType~ {
<<abstract>>
+children: Seq[BaseType]
+transformDown(rule): BaseType
+transformUp(rule): BaseType
+transformDownWithPruning(cond, rule): BaseType
+withNewChildren(newChildren): BaseType
+fastEquals(other): Boolean
+find(f): Option
+foreach(f): Unit
+map(f): Seq
}
class LogicalPlan {
+output: Seq[Attribute]
+resolved: Boolean
}
class SparkPlan {
+execute(): RDD[InternalRow]
+doExecute(): RDD[InternalRow]
}
class Expression {
+dataType: DataType
+eval(input): Any
}
TreeNode <|-- LogicalPlan
TreeNode <|-- SparkPlan
TreeNode <|-- Expression
TreeNode 的核心是一系列 transform 方法。它们接受一个 PartialFunction,对每个节点依次应用,并返回一棵新的树,其中匹配的节点已被替换——这正是写时复制(copy-on-write)语义的体现:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala#L470-L488
transformDown 按前序(父节点先于子节点)应用规则,transformUp 则按后序应用。两者的选择很关键:当规则需要看到已经变换过的子节点时,自底向上更为合适。
TreePatternBits mixin 是一项性能优化。每个树节点都维护一个位集,记录其子树中存在哪些 TreePattern 枚举值。规则通过 transformDownWithPruning 声明自己关心哪些模式,对于不包含相关模式的子树,可以直接跳过。面对 50 余条优化规则、每条都需要遍历整棵树的场景,这种剪枝机制对性能至关重要。
提示: 编写自定义优化规则时,始终应使用
transformDownWithPruning并指定相应的TreePattern,而不是直接使用transformDown。对于复杂查询计划,跳过无关子树可以将优化耗时降低 10 到 50 倍。
RuleExecutor:批次与不动点迭代
单条规则的能力有限,关键在于如何编排它们。RuleExecutor 框架将规则组织成若干命名的 Batch,每个批次配有独立的执行策略:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala#L125-L165
abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
case object Once extends Strategy { val maxIterations = 1 }
case class FixedPoint(override val maxIterations: Int, ...) extends Strategy
protected[catalyst] case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)
protected def batches: Seq[Batch]
}
flowchart TD
START[Input Plan] --> B1
subgraph B1["Batch: Resolution (FixedPoint 100)"]
R1[Rule 1] --> R2[Rule 2] --> R3[Rule N]
R3 -->|"Plan changed?"| CHECK1{Converged?}
CHECK1 -->|No| R1
CHECK1 -->|Yes| B1OUT[Batch Output]
end
B1OUT --> B2
subgraph B2["Batch: Optimization (Once)"]
R4[Rule A] --> R5[Rule B]
end
B2 --> VALIDATE[Validate Plan Changes]
VALIDATE --> END[Output Plan]
框架提供两种执行策略:
Once:批次仅执行一次。适用于天然幂等、或不应重复执行的规则(如子查询优化)。FixedPoint(n):批次反复执行,直到计划不再发生变化(收敛),或达到n次迭代上限。适用于规则之间存在相互激活关系的解析与优化阶段。
每个 Once 批次执行结束后,执行器可选地进行幂等性检查——再次应用该批次,验证结果是否保持不变。这有助于尽早发现规则未能真正满足幂等性的问题。
解析:从 SQL 文本到未解析计划
管道的第一阶段将 SQL 文本转换为未解析的逻辑计划。Spark 使用 ANTLR4 根据语法定义生成解析器,再由 AstBuilder 遍历解析树,构建 Catalyst 节点。
AstBuilder 是代码库中最大的文件之一,超过 7000 行——因为它需要处理 Spark 支持的每一种 SQL 语法结构。每个 visit 方法都会创建未解析的计划节点:UnresolvedRelation 对应表名,UnresolvedAttribute 对应列引用,UnresolvedFunction 对应函数调用。
flowchart LR
SQL["SELECT t.name<br/>FROM users t<br/>WHERE t.age > 25"] --> ANTLR[ANTLR4 Parser]
ANTLR --> PT[Parse Tree]
PT --> AST[AstBuilder Visit]
AST --> LP[Unresolved LogicalPlan]
LP --> PROJ["Project\n[UnresolvedAttribute('t.name')]"]
PROJ --> FILT["Filter\n[UnresolvedAttribute('t.age') > 25]"]
FILT --> REL["UnresolvedRelation\n['users'] AS t"]
这里有一个关键点:解析阶段产生的计划充满了"未解析"引用——这些名称尚未与实际的表或列绑定。将它们逐一绑定,正是下一阶段的任务。
分析:解析名称、类型与函数
Analyzer 是一个 RuleExecutor[LogicalPlan],其中约 50 余条规则按批次组织:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L291-L295
class Analyzer(
override val catalogManager: CatalogManager,
private[sql] val sharedRelationCache: RelationCache = RelationCache.empty)
extends RuleExecutor[LogicalPlan]
with CheckAnalysis with AliasHelper with SQLConfHelper with ColumnResolutionHelper
分析阶段采用 FixedPoint 策略,原因在于:解析一个引用往往能激活另一个引用的解析,过程需要反复迭代,直到计划中的每个节点都报告 resolved = true。典型的解析步骤包括:
- 表解析:
UnresolvedRelation("users")→ 在 catalog 中查找表 →LogicalRelation(parquetFiles, schema) - 属性解析:
UnresolvedAttribute("t.age")→ 对照已解析关系的 schema 进行匹配 →AttributeReference("age", IntegerType) - 类型强制转换:
IntegerType > StringType("25")→ 插入隐式类型转换 →IntegerType > Cast(StringType("25"), IntegerType) - 函数解析:
UnresolvedFunction("count")→ 在 FunctionRegistry 中查找 →AggregateExpression(Count(...))
Analyzer 还引入了新的 HybridAnalyzer 机制,支持单趟解析以提升性能,在无法处理的情况下自动回退到原有的迭代式解析流程。
优化:50 余条规则提升查询性能
计划完全解析后,Optimizer 负责对其进行重写以提升执行效率。Optimizer 继承自 RuleExecutor[LogicalPlan],在 defaultBatches 中定义了超过 50 条规则:
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L100-L150
这些规则按功能分类如下:
| 类别 | 示例规则 | 作用 |
|---|---|---|
| 算子下推 | PushDownPredicates、ColumnPruning |
将过滤条件下推至数据源,删除未使用的列 |
| 算子合并 | CollapseProject、CombineUnions、CollapseRepartition |
合并相邻算子 |
| 常量折叠 | ConstantFolding、ConstantPropagation、NullPropagation |
预先计算常量表达式 |
| 布尔简化 | BooleanSimplification、SimplifyConditionals |
化简布尔逻辑 |
| Join 优化 | ReorderJoin、EliminateOuterJoin |
重排并简化 join |
| 子查询 | RewriteCorrelatedScalarSubquery |
将子查询改写为更高效的形式 |
flowchart TD
INPUT["Analyzed Plan"] --> B1["Batch: Operator Push Down"]
B1 --> B2["Batch: Constant Folding"]
B2 --> B3["Batch: Join Reorder"]
B3 --> B4["Batch: Subquery Optimization"]
B4 --> B5["Batch: Column Pruning"]
B5 --> CHECK{Converged?}
CHECK -->|No, iterate| B1
CHECK -->|Yes| OUTPUT["Optimized Plan"]
优化器会对每次计划变更进行校验——每条规则执行后,都会检查计划是否仍处于已解析状态,以及表达式 ID 是否保持唯一。考虑到规则之间复杂的交互关系,这一机制能在第一时间发现优化器中潜藏的错误。
QueryExecution:管道的调度中心
QueryExecution 负责将所有阶段串联起来。它借助 LazyTry(Spark 增强版的惰性求值机制)构建管道,每个阶段均按需计算:
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala#L65-L72
sequenceDiagram
participant QE as QueryExecution
participant Parser
participant Analyzer
participant Optimizer
participant Planner
QE->>QE: logical (input plan)
QE->>Analyzer: lazyAnalyzed
Analyzer-->>QE: analyzed
QE->>QE: commandExecuted (eager command handling)
QE->>QE: withCachedData (cache lookup)
QE->>Optimizer: lazyOptimizedPlan
Optimizer-->>QE: optimizedPlan
QE->>Planner: lazySparkPlan
Planner-->>QE: sparkPlan
QE->>QE: lazyExecutedPlan (prep rules + AQE)
QE-->>QE: executedPlan
QueryExecution 对外暴露以下管道阶段:
logical→ 输入的 LogicalPlan(来自解析器或 DataFrame API)analyzed→ 经 Analyzer 解析后的计划(第 155 行)optimizedPlan→ 经 Optimizer 变换后的计划(第 259 行)sparkPlan→ 由SparkPlanner转换为物理计划(第 274 行)executedPlan→ 应用准备规则并包装 AQE 后的最终计划
每个阶段都是惰性的——只有在被访问时才会触发计算。因此,调用 df.explain() 不会真正执行查询。LazyTry 包装器会捕获异常,待后续访问时携带完整上下文一并抛出。
提示: 使用
df.queryExecution.analyzed.treeString(或.optimizedPlan、.sparkPlan)可以查看各中间阶段的计划。这是理解 Spark 为何生成特定执行计划时最有力的调试手段。
物理规划:从 LogicalPlan 到 SparkPlan
最后一步变换,是将逻辑计划转换为物理计划——一棵由 SparkPlan 节点构成的树,每个节点都知道如何生成 RDD[InternalRow]:
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L60-L65
abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializable {
// ...
final def execute(): RDD[InternalRow] = executeQuery { executeRDD.get }
protected def doExecute(): RDD[InternalRow]
}
规划器使用 SparkStrategies——一组 Strategy 实现,通过模式匹配作用于逻辑算子,并生成对应的物理算子。例如,逻辑层的 Filter 会变成 FilterExec;逻辑层的 Join 则根据 join 规模估算和配置,可能变成 SortMergeJoinExec、BroadcastHashJoinExec 或 ShuffledHashJoinExec。
物理计划的 execute() 方法是 SQL 世界与底层引擎之间的桥梁。调用它时,会触发 doExecute() 并返回一个 RDD[InternalRow]——从这一刻起,核心调度栈接管后续的一切。
下篇预告
我们已经跟随一条查询,从 SQL 文本经过 Catalyst 的分析与优化阶段,一路走到了物理计划。那么,当计划真正开始执行时,又会发生什么?下一篇文章,我们将追踪 SparkPlan.execute() 如何生成 RDD、DAGScheduler 如何将 RDD 拆分为 stage、shuffle 系统如何在 stage 之间交换数据,以及自适应查询执行(AQE)如何在运行时进行二次优化。