Read OSS

Catalyst クエリパイプライン:SQL テキストから最適化プランへ

上級

前提知識

  • 第 1・2 回(アーキテクチャとブートシーケンス)を読んでいること
  • 木構造データと再帰的変換の基礎知識
  • Scala のパターンマッチングと partial function への理解
  • SQL クエリ処理の基本概念(パース、論理/物理プラン)の把握

Catalyst クエリパイプライン:SQL テキストから最適化プランへ

Catalyst は Spark SQL の核心をなすコンポーネントです。その名は、反応を加速しながら自身は消費されない化学的プロセス(触媒)に由来します。Catalyst オプティマイザは、生の SQL や DataFrame 操作を効率的な分散実行プランへと変換します。素朴なクエリを桁違いに高速なものへと書き換えられるのは、Catalyst があるからこそです。

Catalyst の根幹を支えるのは、不変ツリー(TreeNode)とルールベースの変換(RuleExecutor)という 2 つの基礎的な抽象化です。パース、解析、最適化、物理プランニングといったすべての処理は、これらのプリミティブの上に成り立っています。Catalyst を理解するということは、この 2 つの仕組みを理解することです。

TreeNode 抽象化

Spark のすべてのプラン(論理・物理を問わず)は、ノードのツリーとして表現されます。抽象クラス TreeNode は、このツリーを不変的に操作するための仕組みを提供します。

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala#L70-L75

abstract class TreeNode[BaseType <: TreeNode[BaseType]]
  extends Product
  with TreePatternBits
  with WithOrigin {
  self: BaseType =>
classDiagram
    class TreeNode~BaseType~ {
        <<abstract>>
        +children: Seq[BaseType]
        +transformDown(rule): BaseType
        +transformUp(rule): BaseType
        +transformDownWithPruning(cond, rule): BaseType
        +withNewChildren(newChildren): BaseType
        +fastEquals(other): Boolean
        +find(f): Option
        +foreach(f): Unit
        +map(f): Seq
    }

    class LogicalPlan {
        +output: Seq[Attribute]
        +resolved: Boolean
    }

    class SparkPlan {
        +execute(): RDD[InternalRow]
        +doExecute(): RDD[InternalRow]
    }

    class Expression {
        +dataType: DataType
        +eval(input): Any
    }

    TreeNode <|-- LogicalPlan
    TreeNode <|-- SparkPlan
    TreeNode <|-- Expression

TreeNode の中心的な API は transform 系のメソッドです。これらは PartialFunction を各ノードに適用し、マッチしたノードを置き換えた新しいツリーを返します。いわゆるコピーオンライトのセマンティクスです。

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala#L470-L488

transformDown はルールを前順(親から子の順)で適用し、transformUp は後順(子から親の順)で適用します。この違いは重要です。変換済みの子ノードを参照する必要があるルールは、ボトムアップ方式が適しています。

TreePatternBits mixin はパフォーマンスのための最適化です。各ツリーノードはサブツリー内に存在する TreePattern 列挙値をビットセットとして保持しています。ルールは transformDownWithPruning を通じて対象パターンを宣言し、該当するパターンを含まないサブツリー全体をスキップできます。50 以上のオプティマイザルールがそれぞれツリー全体を走査する状況では、この枝刈りがパフォーマンスに直結します。

ヒント: カスタムのオプティマイザルールを実装する際は、素の transformDown ではなく、適切な TreePattern を指定した transformDownWithPruning を必ず使いましょう。関係のないサブツリーをスキップするだけで、複雑なクエリプランでは最適化時間を 10〜50 倍短縮できます。

RuleExecutor:バッチと不動点反復

個々のルールは有用ですが、それらを束ねる仕組みが必要です。RuleExecutor フレームワークは、ルールを名前付きの Batch にまとめ、それぞれに実行戦略を割り当てます。

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala#L125-L165

abstract class RuleExecutor[TreeType <: TreeNode[_]] extends Logging {
  case object Once extends Strategy { val maxIterations = 1 }
  case class FixedPoint(override val maxIterations: Int, ...) extends Strategy
  
  protected[catalyst] case class Batch(name: String, strategy: Strategy, rules: Rule[TreeType]*)
  protected def batches: Seq[Batch]
}
flowchart TD
    START[Input Plan] --> B1

    subgraph B1["Batch: Resolution (FixedPoint 100)"]
        R1[Rule 1] --> R2[Rule 2] --> R3[Rule N]
        R3 -->|"Plan changed?"| CHECK1{Converged?}
        CHECK1 -->|No| R1
        CHECK1 -->|Yes| B1OUT[Batch Output]
    end

    B1OUT --> B2

    subgraph B2["Batch: Optimization (Once)"]
        R4[Rule A] --> R5[Rule B]
    end

    B2 --> VALIDATE[Validate Plan Changes]
    VALIDATE --> END[Output Plan]

戦略は 2 種類あります。

  • Once:バッチを 1 回だけ実行します。本質的に冪等なルールや、繰り返すべきでないルール(サブクエリの最適化など)に使います。
  • FixedPoint(n):プランが変化しなくなる(収束する)か、n 回の反復に達するまでバッチを繰り返します。あるルールが別のルールの適用を可能にするケースがある、解析や最適化のフェーズで使います。

Once バッチの実行後、エグゼキュータはオプションで冪等性チェックを行います。バッチを再適用して結果が変わらないことを確認するもので、ルールが真に冪等でないバグを早期に検出できます。

パース:SQL テキストから未解決プランへ

パイプラインの最初のフェーズは、SQL テキストを未解決の論理プランに変換することです。Spark は ANTLR4 を使ってグラマー定義からパーサーを生成し、AstBuilder がパースツリーをたどって Catalyst ノードを構築します。

AstBuilder は Spark がサポートするあらゆる SQL 構文を処理しなければならないため、コードベースの中でも最大級のファイルの一つ(7,000 行超)です。各ビジットメソッドは未解決のプランノードを生成します。テーブル名には UnresolvedRelation、カラム参照には UnresolvedAttribute、関数呼び出しには UnresolvedFunction を使います。

flowchart LR
    SQL["SELECT t.name<br/>FROM users t<br/>WHERE t.age > 25"] --> ANTLR[ANTLR4 Parser]
    ANTLR --> PT[Parse Tree]
    PT --> AST[AstBuilder Visit]
    AST --> LP[Unresolved LogicalPlan]

    LP --> PROJ["Project\n[UnresolvedAttribute('t.name')]"]
    PROJ --> FILT["Filter\n[UnresolvedAttribute('t.age') > 25]"]
    FILT --> REL["UnresolvedRelation\n['users'] AS t"]

重要なのは、パースの結果は「未解決」な参照を含んだプランだという点です。テーブルやカラムとの対応づけはまだ行われていません。それを担うのが次のフェーズです。

解析:名前、型、関数の解決

Analyzer は RuleExecutor[LogicalPlan] を継承しており、50 以上のルールがバッチに整理されています。

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L291-L295

class Analyzer(
    override val catalogManager: CatalogManager,
    private[sql] val sharedRelationCache: RelationCache = RelationCache.empty)
  extends RuleExecutor[LogicalPlan]
  with CheckAnalysis with AliasHelper with SQLConfHelper with ColumnResolutionHelper

ある参照の解決が別の参照の解決を可能にすることがあるため、解析は FixedPoint 戦略で実行されます。プラン内のすべてのノードが resolved = true を返すまで反復が続きます。典型的な解決ステップは次のとおりです。

  1. テーブル解決UnresolvedRelation("users") → カタログでテーブルを検索 → LogicalRelation(parquetFiles, schema)
  2. 属性解決UnresolvedAttribute("t.age") → 解決済みリレーションのスキーマと照合 → AttributeReference("age", IntegerType)
  3. 型強制IntegerType > StringType("25") → 暗黙のキャストを挿入 → IntegerType > Cast(StringType("25"), IntegerType)
  4. 関数解決UnresolvedFunction("count") → FunctionRegistry で検索 → AggregateExpression(Count(...))

また Analyzer は、より新しい HybridAnalyzer システムへの委譲もサポートしています。これはパフォーマンス向上のためのシングルパス解決を行い、必要な場合は従来の反復的アプローチにフォールバックします。

最適化:クエリ改善のための 50 以上のルール

プランが完全に解決されると、次はオプティマイザがパフォーマンス向上のために書き換えを行います。OptimizerRuleExecutor[LogicalPlan] を継承し、50 以上のルールを含む defaultBatches を定義しています。

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L100-L150

ルールは論理的にグループ化されています。

カテゴリ ルール例 概要
オペレータのプッシュダウン PushDownPredicatesColumnPruning フィルタをデータソースに近づけ、不要なカラムを除去する
オペレータの結合 CollapseProjectCombineUnionsCollapseRepartition 隣接するオペレータをまとめる
定数畳み込み ConstantFoldingConstantPropagationNullPropagation 定数式を事前に計算する
ブール式の簡略化 BooleanSimplificationSimplifyConditionals ブール論理を単純化する
結合の最適化 ReorderJoinEliminateOuterJoin 結合の順序変更と単純化を行う
サブクエリ RewriteCorrelatedScalarSubquery サブクエリを効率的な形式に書き換える
flowchart TD
    INPUT["Analyzed Plan"] --> B1["Batch: Operator Push Down"]
    B1 --> B2["Batch: Constant Folding"]
    B2 --> B3["Batch: Join Reorder"]
    B3 --> B4["Batch: Subquery Optimization"]
    B4 --> B5["Batch: Column Pruning"]
    B5 --> CHECK{Converged?}
    CHECK -->|No, iterate| B1
    CHECK -->|Yes| OUTPUT["Optimized Plan"]

オプティマイザはすべてのプラン変更を検証します。各ルールの適用後に、プランが解決済みのままであること、式の ID が一意であることを確認します。相互に影響し合う多数のルールが存在する中で、オプティマイザのバグを早期に発見するうえで非常に重要な仕組みです。

QueryExecution:パイプラインのオーケストレーター

QueryExecution は、すべてのフェーズを連結するクラスです。Spark の拡張された遅延評価の仕組みである LazyTry を活用し、各フェーズをオンデマンドで計算するパイプラインを形成します。

sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala#L65-L72

sequenceDiagram
    participant QE as QueryExecution
    participant Parser
    participant Analyzer
    participant Optimizer
    participant Planner

    QE->>QE: logical (input plan)
    QE->>Analyzer: lazyAnalyzed
    Analyzer-->>QE: analyzed
    QE->>QE: commandExecuted (eager command handling)
    QE->>QE: withCachedData (cache lookup)
    QE->>Optimizer: lazyOptimizedPlan
    Optimizer-->>QE: optimizedPlan
    QE->>Planner: lazySparkPlan
    Planner-->>QE: sparkPlan
    QE->>QE: lazyExecutedPlan (prep rules + AQE)
    QE-->>QE: executedPlan

QueryExecution が公開するパイプラインのステージは以下のとおりです。

  1. logical → 入力となる LogicalPlan(パースまたは DataFrame API から生成)
  2. analyzed → Analyzer によって解決済み(155 行目
  3. optimizedPlan → Optimizer によって変換済み(259 行目
  4. sparkPlanSparkPlanner によって物理プランに変換済み(274 行目
  5. executedPlan → 準備ルールの適用と AQE ラッピングが完了した最終プラン

各ステージは遅延評価です。analyzed はアクセスされるまで計算されません。そのため df.explain() を呼び出してもクエリは実行されません。LazyTry ラッパーは例外をキャプチャし、後から完全なコンテキストとともに表示できるようにします。

ヒント: df.queryExecution.analyzed.treeString(あるいは .optimizedPlan.sparkPlan)を使うと、中間プランを確認できます。Spark がなぜそのような実行プランを生成するのかを理解するための、最も強力なデバッグ手段です。

物理プランニング:LogicalPlan から SparkPlan へ

最後の変換フェーズでは、論理プランを物理プランへと変換します。物理プランは、RDD[InternalRow] を生成する方法を知っている SparkPlan ノードのツリーです。

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala#L60-L65

abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializable {
  // ...
  final def execute(): RDD[InternalRow] = executeQuery { executeRDD.get }
  protected def doExecute(): RDD[InternalRow]
}

プランナーは SparkStrategies を使います。これは Strategy 実装の集合で、論理オペレータをパターンマッチして対応する物理オペレータを生成します。たとえば、論理的な FilterFilterExec になり、論理的な Join は、結合サイズの推定値や設定に応じて SortMergeJoinExecBroadcastHashJoinExecShuffledHashJoinExec のいずれかになります。

物理プランの execute() メソッドは、SQL の世界とコアエンジンをつなぐ架け橋です。呼び出されると doExecute() が実行され、RDD[InternalRow] が返されます。そこからはコアのスケジューリングスタックが処理を引き継ぎます。

次回予告

ここまで、SQL テキストから Catalyst の解析・最適化フェーズを経て物理プランに至るまでの流れを追ってきました。では、そのプランが実際に実行されるとき何が起きるのでしょうか?次回は SparkPlan.execute() が RDD を生成する仕組みや、DAGScheduler がステージに分割する方法を解説します。さらに、シャッフルの仕組みと Adaptive Query Execution による実行時の再最適化も掘り下げます。