Read OSS

ワークフローエンジンの内部構造:グラフ実行、Node Factory、そしてLayerシステム

上級

前提知識

  • 第1・2回:アーキテクチャ概要とリクエストフロー
  • 有向非巡回グラフ(DAG)の基本的な理解
  • PythonにおけるイベントドリブンなGeneratorパターンへの慣れ

ワークフローエンジンの内部構造:グラフ実行、Node Factory、そしてLayerシステム

Difyのビジュアルワークフロービルダーでは、LLM呼び出し・ナレッジ検索・コード実行・HTTPリクエスト・条件分岐といった多様なノードを、有向非巡回グラフ(DAG)として組み合わせてLLMアプリケーションを構築できます。このビジュアルなDAGを実際の計算として動かすのが GraphEngine です。GraphEngineは汎用のグラフ実行ランタイムであり、Difyはその上にアプリケーション固有の関心事をラップしています。本記事では、このエンジンがどのように組み立てられているか、ノードの登録と生成の仕組み、そしてクォータ管理やオブザーバビリティといった横断的な関心事にAOP風のミドルウェアを提供するLayerシステムの詳細を探っていきます。

WorkflowEntry:GraphEngineの構築と実行

WorkflowEntry クラスは、DifyのアプリケーションレイヤーとGraphEngineライブラリ graphon をつなぐ橋渡し役です。コンストラクタでLayerを含む完全な GraphEngine を組み立て、run() メソッドが GraphEngineEvent を順次yieldします。

flowchart TD
    WE[WorkflowEntry.__init__] --> DepthCheck[Check call depth limit]
    DepthCheck --> CmdChannel[Create CommandChannel]
    CmdChannel --> ChildBuilder[Create _WorkflowChildEngineBuilder]
    ChildBuilder --> GE[GraphEngine<br/>graph + runtime_state + config]
    GE --> DebugLayer{DEBUG mode?}
    DebugLayer -->|Yes| AddDebug[Add DebugLoggingLayer]
    DebugLayer -->|No| LimitsLayer
    AddDebug --> LimitsLayer[Add ExecutionLimitsLayer<br/>max_steps + max_time]
    LimitsLayer --> QuotaLayer[Add LLMQuotaLayer]
    QuotaLayer --> OtelCheck{OTEL enabled?}
    OtelCheck -->|Yes| AddOtel[Add ObservabilityLayer]
    OtelCheck -->|No| Ready[Engine ready]
    AddOtel --> Ready

workflow_entry.py#L111-L191 のコンストラクタは、大きく4つのことを行います。

  1. 呼び出し深度の制限 — ワークフローがサブワークフローを呼び出す際の無限再帰を防ぐ(WORKFLOW_CALL_MAX_DEPTH
  2. 実行コンテキストのキャプチャ — バックグラウンド実行で利用するために、現在のFlask/スレッディングのコンテキストをスナップショットとして保存する
  3. GraphEngineの設定 — ワーカープールのパラメータ(min_workersmax_workersscale_up_threshold)を指定する
  4. Layerの積み重ね — 横断的な関心事を担うLayerを順番に追加する

run() メソッド自体はシンプルで、graph_engine.run() に処理を委譲してイベントをyieldします。ユーザーキャンセルを示す GenerateTaskStoppedError を捕捉し、未知の例外は GraphRunFailedEvent にラップします。

graphon(汎用グラフ実行プリミティブ)とDify固有コードをあえて分離しているのは意図的な設計です。graphon ライブラリはグラフ・ノード・変数プール・実行について知っていますが、LLMクォータ・オブザーバビリティ・永続化については関知しません。それらの関心事はDifyのLayer実装に集約されています。

Layerパターン:グラフ実行のためのAOP風ミドルウェア

GraphEngineLayer は抽象クラスであり、ノード実行のライフサイクルにフックを提供します。各Layerはノード実装を変更することなく、グラフレベル・ノードレベルのイベントを監視して処理を加えることができます。まさにアスペクト指向プログラミング(AOP)の考え方です。

classDiagram
    class GraphEngineLayer {
        <<abstract>>
        +on_graph_start()
        +on_graph_end(error)
        +on_event(event)
        +on_node_run_start(node)
        +on_node_run_end(node, error, result_event)
    }

    class LLMQuotaLayer {
        -_abort_sent: bool
        +on_node_run_start() check quota
        +on_node_run_end() deduct quota
    }

    class ExecutionLimitsLayer {
        +max_steps: int
        +max_time: float
    }

    class ObservabilityLayer {
        +on_node_run_start() create span
        +on_node_run_end() close span
    }

    class DebugLoggingLayer {
        +level: str
        +include_inputs: bool
    }

    class PauseStatePersistenceLayer {
        +on_event() persist pause state
    }

    GraphEngineLayer <|-- LLMQuotaLayer
    GraphEngineLayer <|-- ExecutionLimitsLayer
    GraphEngineLayer <|-- ObservabilityLayer
    GraphEngineLayer <|-- DebugLoggingLayer
    GraphEngineLayer <|-- PauseStatePersistenceLayer

仕組みを理解するうえで最もわかりやすい例が LLMQuotaLayer です。このLayerは2つのライフサイクルイベントにフックします。

  • on_node_run_start() — LLM・ParameterExtractor・QuestionClassifier ノードの実行前に、コストのかかるモデル呼び出しが始まるにテナントのクォータが十分かチェックする
  • on_node_run_end() — ノードの実行が成功した後、実際のトークン使用量をテナントのクォータから差し引く

クォータが超過した場合、このLayerは単に例外を投げるわけではありません。エンジンのコマンドチャネルを通じて AbortCommand を送信し、グラフの停止イベントをセットします。これにより、実行中のノードが処理を完了できるグレースフルシャットダウンが実現されます。

def _send_abort_command(self, *, reason: str) -> None:
    if not self.command_channel or self._abort_sent:
        return
    self.command_channel.send_command(
        AbortCommand(command_type=CommandType.ABORT, reason=reason)
    )
    self._abort_sent = True

ヒント: LLMQuotaLayer_extract_model_instance() メソッドには興味深いエッジケースへの対応があります。モデルインスタンスが DifyPreparedLLM でラップされている場合、_model_instance 属性の有無を確認してアンラップします。これは Node Factory で使われているDecoratorパターンの副産物です。

ノードの登録:自動インポートと自己登録

Difyは自己登録パターンを採用しています。ノードクラスはインポートされるだけで自動的に登録されます。node_factory.py モジュールがこの仕組みを取りまとめています。

flowchart TD
    Boot[register_nodes called] --> ImportGraphon[pkgutil.walk_packages<br/>graphon.nodes.*]
    Boot --> ImportDify[pkgutil.walk_packages<br/>core.workflow.nodes.*]
    ImportGraphon --> NodeClass[Node subclass imported]
    ImportDify --> NodeClass
    NodeClass --> SelfRegister["Node.__init_subclass__()<br/>registers in class-level registry"]
    SelfRegister --> Registry["Node.get_node_type_classes_mapping()<br/>{NodeType → {version → class}}"]

register_nodes() 関数(@lru_cache(maxsize=1) でキャッシュ済み)は2つのパッケージツリーをインポートします。

@lru_cache(maxsize=1)
def register_nodes() -> None:
    _import_node_package("graphon.nodes")
    _import_node_package("core.workflow.nodes")

_import_node_package() ヘルパーは pkgutil.walk_packages を使ってパッケージ内のすべてのモジュールを再帰的にインポートします。各 Node サブクラスはクラスレベルで自身のタイプとバージョンを宣言しており、基底クラス Node__init_subclass__() を使ってそれらをマッピングに収集します。

resolve_workflow_node_class() 関数はフォールバック戦略付きのバージョン解決を担います。まず node_version 文字列に完全一致するクラスを探し、見つからなければ latest バージョンにフォールバックします。これにより複数バージョンのノード進化がサポートされ、古いワークフローは元のノード実装を使い続けることができます。

node_factory.py#L145-L191 にある _LazyNodeTypeClassesMapping ラッパーは、レジストリに対してミュータブルなdictライクなインターフェースを提供し、レジストリのバージョンが変わるとキャッシュを無効化します。これにより、テストやプラグインがコアレジストリに影響を与えずにノード実装をオーバーライドできます。

DifyNodeFactory:依存性注入コンテナ

DifyNodeFactory は、Difyのアプリケーションレイヤーと汎用グラフエンジンが出会う場所です。その create_node() メソッドは手動の依存性注入コンテナとして機能し、ノードタイプごとに異なるkwargsを組み立てます。

node_factory.py#L240-L303 のコンストラクタは、必要な依存関係をすべて事前に構築します。

依存関係 利用ノード 用途
_code_executor Code ノード サンドボックス化されたコード実行
_http_request_http_client HTTP・LLM・Doc extractor SSRFプロテクト済みHTTPクライアント
_llm_credentials_provider LLM・Question Classifier・Parameter Extractor モデルクレデンシャルの解決
_tool_runtime Tool ノード テナントコンテキストを持つツール実行
_human_input_runtime Human Input ノード フォーム管理と状態管理
_agent_strategy_resolver Agent ノード プラグインベースのエージェント戦略
_file_reference_factory 複数ノード ファイル参照の作成

create_node() メソッドは NodeType からファクトリーlambdaへのディスパッチテーブルを使います。

node_init_kwargs_factories: Mapping[NodeType, Callable[[], dict[str, object]]] = {
    BuiltinNodeTypes.CODE: lambda: {
        "code_executor": self._code_executor,
        "code_limits": self._code_limits,
    },
    BuiltinNodeTypes.HTTP_REQUEST: lambda: {
        "http_request_config": self._http_request_config,
        "http_client": self._http_request_http_client,
        ...
    },
    BuiltinNodeTypes.LLM: lambda: self._build_llm_compatible_node_init_kwargs(...),
    ...
}

この設計のおかげで、ノード自体はフレームワーク非依存のまま保たれます。LLMノードはDifyのクレデンシャルシステムやSSRFプロキシについて何も知る必要がなく、その配線はすべてFactoryが担います。

ヒント: 新しいノードタイプを追加するには、(1)core/workflow/nodes/ または graphon/nodes/ 以下にノードクラスを作成し、(2)Dify固有の依存関係が必要なら DifyNodeFactory.create_node() にファクトリーlambdaを追加し、(3)あとは自動インポートが登録を引き受けます。

Pause・Resume、そしち子グラフの実行

Difyは human-in-the-loop ワークフローをpause/resumeメカニズムで実現しています。ワークフローがHuman Inputノードに到達すると、グラフ実行全体がいったん停止して状態をシリアライズし、ユーザーがフォームに回答するまで待機します。

sequenceDiagram
    participant Runner
    participant GraphEngine
    participant HumanInputNode
    participant PauseLayer
    participant DB
    participant User
    participant ResumeRunner

    Runner->>GraphEngine: run()
    GraphEngine->>HumanInputNode: execute()
    HumanInputNode-->>GraphEngine: GraphRunPausedEvent
    GraphEngine-->>Runner: yield PausedEvent
    Runner->>PauseLayer: on_event(PausedEvent)
    PauseLayer->>PauseLayer: Serialize GraphRuntimeState
    PauseLayer->>DB: Store WorkflowResumptionContext

    User->>ResumeRunner: Submit form data
    ResumeRunner->>DB: Load WorkflowResumptionContext
    ResumeRunner->>ResumeRunner: Deserialize state
    ResumeRunner->>GraphEngine: resume from paused state

PauseStatePersistenceLayerGraphRunPausedEvent を捕捉し、WorkflowResumptionContext を永続化します。これはシリアライズされたグラフランタイム状態とgenerate entityを含むPydanticモデルです。

WorkflowResumptionContextWorkflowAppGenerateEntityAdvancedChatAppGenerateEntity の両方を扱うために、discriminated unionを使用しています。

class WorkflowResumptionContext(BaseModel):
    version: Literal["1"] = "1"
    generate_entity: _GenerateEntityUnion
    serialized_graph_runtime_state: str

子グラフ実行(サブワークフロー)については、_WorkflowChildEngineBuilder が新しい GraphEngine を作成します。このとき、ランタイム状態は新規に作られますが、親の実行コンテキストは共有されます。子エンジンには「子向けに安全な」Layer(LLMQuotaLayer など)のみが付与され、オブザーバビリティや永続化のLayerは追加されません。それらの関心事は親から引き継がれます。

システム変数とVariablePool

VariablePool は、ワークフロー実行中のすべてのノードが使う共有データバスです。実行開始前にシステム変数が注入され、各ノードはこのプールから入力を読み取り、出力を書き込みます。

system_variables.py モジュールはシステム変数のキーを定義しています。

変数キー 供給元
query string ユーザーの入力テキスト
files list[File] アップロードされたファイル
conversation_id string アクティブな会話
user_id string 実行ユーザー
dialogue_count int 会話内のメッセージ数
app_id string アプリケーション識別子
workflow_id string ワークフロー識別子
workflow_run_id string 現在の実行ラン
timestamp int 実行開始時刻

変数にはセレクタでアクセスします。セレクタは (node_id, variable_name) のタプルです。システム変数には特別な sys プレフィックスのノードIDが使われ、環境変数には env、実行をまたいで保持される会話変数には conversation プレフィックスが使われます。

VariablePool はスコープ付きアクセスをサポートしており、各ノードはグラフ内で自身の上流にある依存ノードの変数しか読み取れません。これはグラフエンジンの実行順序によって強制され、データ競合を防ぎ、決定論的な実行を保証します。

次回予告

ワークフローエンジンの内部構造を一通り押さえたところで、次はDifyの中でも特に複雑なサブシステム、RAGパイプラインへと進みましょう。第4回では、ドキュメントがアップロードから抽出・分割・埋め込み・ベクターストレージへと流れる道筋を追い、マルチデータセット検索がLLM powered ルーティングを使って最も関連性の高いナレッジを見つける仕組みを解説します。