Read OSS

Catalyst 查询管道:从 SQL 文本到优化执行计划

高级

前置知识

  • 第 1-2 篇文章(架构与启动流程)
  • 了解树形数据结构与递归变换
  • 熟悉 Scala 模式匹配与偏函数
  • 具备基本的 SQL 查询处理概念(解析、逻辑计划/物理计划)

Catalyst 查询管道:从 SQL 文本到优化执行计划

Catalyst 是 Spark SQL 的核心所在。它的名字取自化学中的"催化剂"——加速反应本身却不被消耗——Catalyst 优化器将原始 SQL 或 DataFrame 操作转化为高效的分布式执行计划。正因为有了它,Spark SQL 才能把一条朴素的查询重写为执行效率高出数个数量级的版本。

Catalyst 的基础由两个核心抽象构成:不可变树(TreeNode)与基于规则的变换(RuleExecutor)。解析、分析、优化、物理规划——一切都建立在这两个原语之上。理解 Catalyst,就从理解这两块基石开始。

TreeNode 抽象

Spark 中的所有执行计划——无论是逻辑计划还是物理计划——都是由节点构成的树。TreeNode 抽象类提供了以不可变方式操作这些树所需的全部机制:

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala#L70-L75

abstract class TreeNode[BaseType <: TreeNode[BaseType]]
  extends Product
  with TreePatternBits
  with WithOrigin {
  self: BaseType =>
classDiagram
    class TreeNode~BaseType~ {
        <<abstract>>
        +children: Seq[BaseType]
        +transformDown(rule): BaseType
        +transformUp(rule): BaseType
        +transformDownWithPruning(cond, rule): BaseType
        +withNewChildren(newChildren): BaseType
        +fastEquals(other): Boolean
        +find(f): Option
        +foreach(f): Unit
        +map(f): Seq
    }

    class LogicalPlan {
        +output: Seq[Attribute]
        +resolved: Boolean
    }

    class SparkPlan {
        +execute(): RDD[InternalRow]
        +doExecute(): RDD[InternalRow]
    }

    class Expression {
        +dataType: DataType
        +eval(input): Any
    }

    TreeNode <|-- LogicalPlan
    TreeNode <|-- SparkPlan
    TreeNode <|-- Expression

TreeNode 的核心是一系列 transform 方法。它们接受一个 PartialFunction,对每个节点依次应用,并返回一棵新的树,其中匹配的节点已被替换——这正是写时复制(copy-on-write)语义的体现:

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala#L470-L488

transformDown 按前序(父节点先于子节点)应用规则,transformUp 则按后序应用。两者的选择很关键:当规则需要看到已经变换过的子节点时,自底向上更为合适。

TreePatternBits mixin 是一项性能优化。每个树节点都维护一个位集,记录其子树中存在哪些 TreePattern 枚举值。规则通过 transformDownWithPruning 声明自己关心哪些模式,对于不包含相关模式的子树,可以直接跳过。面对 50 余条优化规则、每条都需要遍历整棵树的场景,这种剪枝机制对性能至关重要。

提示: 编写自定义优化规则时,始终应使用 transformDownWithPruning 并指定相应的 TreePattern,而不是直接使用 transformDown。对于复杂查询计划,跳过无关子树可以将优化耗时降低 10 到 50 倍。

RuleExecutor:批次与不动点迭代

单条规则的能力有限,关键在于如何编排它们。RuleExecutor 框架将规则组织成若干命名的 Batch,每个批次配有独立的执行策略:

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala#L125-L165

abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
  case object Once extends Strategy { val maxIterations = 1 }
  case class FixedPoint(override val maxIterations: Int, ...) extends Strategy
  
  protected[catalyst] case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)
  protected def batches: Seq[Batch]
}
flowchart TD
    START[Input Plan] --> B1

    subgraph B1["Batch: Resolution (FixedPoint 100)"]
        R1[Rule 1] --> R2[Rule 2] --> R3[Rule N]
        R3 -->|"Plan changed?"| CHECK1{Converged?}
        CHECK1 -->|No| R1
        CHECK1 -->|Yes| B1OUT[Batch Output]
    end

    B1OUT --> B2

    subgraph B2["Batch: Optimization (Once)"]
        R4[Rule A] --> R5[Rule B]
    end

    B2 --> VALIDATE[Validate Plan Changes]
    VALIDATE --> END[Output Plan]

框架提供两种执行策略:

  • Once:批次仅执行一次。适用于天然幂等、或不应重复执行的规则(如子查询优化)。
  • FixedPoint(n):批次反复执行,直到计划不再发生变化(收敛),或达到 n 次迭代上限。适用于规则之间存在相互激活关系的解析与优化阶段。

每个 Once 批次执行结束后,执行器可选地进行幂等性检查——再次应用该批次,验证结果是否保持不变。这有助于尽早发现规则未能真正满足幂等性的问题。

解析:从 SQL 文本到未解析计划

管道的第一阶段将 SQL 文本转换为未解析的逻辑计划。Spark 使用 ANTLR4 根据语法定义生成解析器,再由 AstBuilder 遍历解析树,构建 Catalyst 节点。

AstBuilder 是代码库中最大的文件之一,超过 7000 行——因为它需要处理 Spark 支持的每一种 SQL 语法结构。每个 visit 方法都会创建未解析的计划节点:UnresolvedRelation 对应表名,UnresolvedAttribute 对应列引用,UnresolvedFunction 对应函数调用。

flowchart LR
    SQL["SELECT t.name<br/>FROM users t<br/>WHERE t.age > 25"] --> ANTLR[ANTLR4 Parser]
    ANTLR --> PT[Parse Tree]
    PT --> AST[AstBuilder Visit]
    AST --> LP[Unresolved LogicalPlan]

    LP --> PROJ["Project\n[UnresolvedAttribute('t.name')]"]
    PROJ --> FILT["Filter\n[UnresolvedAttribute('t.age') > 25]"]
    FILT --> REL["UnresolvedRelation\n['users'] AS t"]

这里有一个关键点:解析阶段产生的计划充满了"未解析"引用——这些名称尚未与实际的表或列绑定。将它们逐一绑定,正是下一阶段的任务。

分析:解析名称、类型与函数

Analyzer 是一个 RuleExecutor[LogicalPlan],其中约 50 余条规则按批次组织:

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L291-L295

class Analyzer(
    override val catalogManager: CatalogManager,
    private[sql] val sharedRelationCache: RelationCache = RelationCache.empty)
  extends RuleExecutor[LogicalPlan]
  with CheckAnalysis with AliasHelper with SQLConfHelper with ColumnResolutionHelper

分析阶段采用 FixedPoint 策略,原因在于:解析一个引用往往能激活另一个引用的解析,过程需要反复迭代,直到计划中的每个节点都报告 resolved = true。典型的解析步骤包括:

  1. 表解析UnresolvedRelation("users") → 在 catalog 中查找表 → LogicalRelation(parquetFiles, schema)
  2. 属性解析UnresolvedAttribute("t.age") → 对照已解析关系的 schema 进行匹配 → AttributeReference("age", IntegerType)
  3. 类型强制转换IntegerType > StringType("25") → 插入隐式类型转换 → IntegerType > Cast(StringType("25"), IntegerType)
  4. 函数解析UnresolvedFunction("count") → 在 FunctionRegistry 中查找 → AggregateExpression(Count(...))

Analyzer 还引入了新的 HybridAnalyzer 机制,支持单趟解析以提升性能,在无法处理的情况下自动回退到原有的迭代式解析流程。

优化:50 余条规则提升查询性能

计划完全解析后,Optimizer 负责对其进行重写以提升执行效率。Optimizer 继承自 RuleExecutor[LogicalPlan],在 defaultBatches 中定义了超过 50 条规则:

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L100-L150

这些规则按功能分类如下:

类别 示例规则 作用
算子下推 PushDownPredicatesColumnPruning 将过滤条件下推至数据源,删除未使用的列
算子合并 CollapseProjectCombineUnionsCollapseRepartition 合并相邻算子
常量折叠 ConstantFoldingConstantPropagationNullPropagation 预先计算常量表达式
布尔简化 BooleanSimplificationSimplifyConditionals 化简布尔逻辑
Join 优化 ReorderJoinEliminateOuterJoin 重排并简化 join
子查询 RewriteCorrelatedScalarSubquery 将子查询改写为更高效的形式
flowchart TD
    INPUT["Analyzed Plan"] --> B1["Batch: Operator Push Down"]
    B1 --> B2["Batch: Constant Folding"]
    B2 --> B3["Batch: Join Reorder"]
    B3 --> B4["Batch: Subquery Optimization"]
    B4 --> B5["Batch: Column Pruning"]
    B5 --> CHECK{Converged?}
    CHECK -->|No, iterate| B1
    CHECK -->|Yes| OUTPUT["Optimized Plan"]

优化器会对每次计划变更进行校验——每条规则执行后,都会检查计划是否仍处于已解析状态,以及表达式 ID 是否保持唯一。考虑到规则之间复杂的交互关系,这一机制能在第一时间发现优化器中潜藏的错误。

QueryExecution:管道的调度中心

QueryExecution 负责将所有阶段串联起来。它借助 LazyTry(Spark 增强版的惰性求值机制)构建管道,每个阶段均按需计算:

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala#L65-L72

sequenceDiagram
    participant QE as QueryExecution
    participant Parser
    participant Analyzer
    participant Optimizer
    participant Planner

    QE->>QE: logical (input plan)
    QE->>Analyzer: lazyAnalyzed
    Analyzer-->>QE: analyzed
    QE->>QE: commandExecuted (eager command handling)
    QE->>QE: withCachedData (cache lookup)
    QE->>Optimizer: lazyOptimizedPlan
    Optimizer-->>QE: optimizedPlan
    QE->>Planner: lazySparkPlan
    Planner-->>QE: sparkPlan
    QE->>QE: lazyExecutedPlan (prep rules + AQE)
    QE-->>QE: executedPlan

QueryExecution 对外暴露以下管道阶段:

  1. logical → 输入的 LogicalPlan(来自解析器或 DataFrame API)
  2. analyzed → 经 Analyzer 解析后的计划(第 155 行
  3. optimizedPlan → 经 Optimizer 变换后的计划(第 259 行
  4. sparkPlan → 由 SparkPlanner 转换为物理计划(第 274 行
  5. executedPlan → 应用准备规则并包装 AQE 后的最终计划

每个阶段都是惰性的——只有在被访问时才会触发计算。因此,调用 df.explain() 不会真正执行查询。LazyTry 包装器会捕获异常,待后续访问时携带完整上下文一并抛出。

提示: 使用 df.queryExecution.analyzed.treeString(或 .optimizedPlan.sparkPlan)可以查看各中间阶段的计划。这是理解 Spark 为何生成特定执行计划时最有力的调试手段。

物理规划:从 LogicalPlan 到 SparkPlan

最后一步变换,是将逻辑计划转换为物理计划——一棵由 SparkPlan 节点构成的树,每个节点都知道如何生成 RDD[InternalRow]

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L60-L65

abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializable {
  // ...
  final def execute(): RDD[InternalRow] = executeQuery { executeRDD.get }
  protected def doExecute(): RDD[InternalRow]
}

规划器使用 SparkStrategies——一组 Strategy 实现,通过模式匹配作用于逻辑算子,并生成对应的物理算子。例如,逻辑层的 Filter 会变成 FilterExec;逻辑层的 Join 则根据 join 规模估算和配置,可能变成 SortMergeJoinExecBroadcastHashJoinExecShuffledHashJoinExec

物理计划的 execute() 方法是 SQL 世界与底层引擎之间的桥梁。调用它时,会触发 doExecute() 并返回一个 RDD[InternalRow]——从这一刻起,核心调度栈接管后续的一切。

下篇预告

我们已经跟随一条查询,从 SQL 文本经过 Catalyst 的分析与优化阶段,一路走到了物理计划。那么,当计划真正开始执行时,又会发生什么?下一篇文章,我们将追踪 SparkPlan.execute() 如何生成 RDD、DAGScheduler 如何将 RDD 拆分为 stage、shuffle 系统如何在 stage 之间交换数据,以及自适应查询执行(AQE)如何在运行时进行二次优化。