ワークフローエンジンの内部構造:グラフ実行、Node Factory、そしてLayerシステム
前提知識
- ›第1・2回:アーキテクチャ概要とリクエストフロー
- ›有向非巡回グラフ(DAG)の基本的な理解
- ›PythonにおけるイベントドリブンなGeneratorパターンへの慣れ
ワークフローエンジンの内部構造:グラフ実行、Node Factory、そしてLayerシステム
Difyのビジュアルワークフロービルダーでは、LLM呼び出し・ナレッジ検索・コード実行・HTTPリクエスト・条件分岐といった多様なノードを、有向非巡回グラフ(DAG)として組み合わせてLLMアプリケーションを構築できます。このビジュアルなDAGを実際の計算として動かすのが GraphEngine です。GraphEngineは汎用のグラフ実行ランタイムであり、Difyはその上にアプリケーション固有の関心事をラップしています。本記事では、このエンジンがどのように組み立てられているか、ノードの登録と生成の仕組み、そしてクォータ管理やオブザーバビリティといった横断的な関心事にAOP風のミドルウェアを提供するLayerシステムの詳細を探っていきます。
WorkflowEntry:GraphEngineの構築と実行
WorkflowEntry クラスは、DifyのアプリケーションレイヤーとGraphEngineライブラリ graphon をつなぐ橋渡し役です。コンストラクタでLayerを含む完全な GraphEngine を組み立て、run() メソッドが GraphEngineEvent を順次yieldします。
flowchart TD
WE[WorkflowEntry.__init__] --> DepthCheck[Check call depth limit]
DepthCheck --> CmdChannel[Create CommandChannel]
CmdChannel --> ChildBuilder[Create _WorkflowChildEngineBuilder]
ChildBuilder --> GE[GraphEngine<br/>graph + runtime_state + config]
GE --> DebugLayer{DEBUG mode?}
DebugLayer -->|Yes| AddDebug[Add DebugLoggingLayer]
DebugLayer -->|No| LimitsLayer
AddDebug --> LimitsLayer[Add ExecutionLimitsLayer<br/>max_steps + max_time]
LimitsLayer --> QuotaLayer[Add LLMQuotaLayer]
QuotaLayer --> OtelCheck{OTEL enabled?}
OtelCheck -->|Yes| AddOtel[Add ObservabilityLayer]
OtelCheck -->|No| Ready[Engine ready]
AddOtel --> Ready
workflow_entry.py#L111-L191 のコンストラクタは、大きく4つのことを行います。
- 呼び出し深度の制限 — ワークフローがサブワークフローを呼び出す際の無限再帰を防ぐ(
WORKFLOW_CALL_MAX_DEPTH) - 実行コンテキストのキャプチャ — バックグラウンド実行で利用するために、現在のFlask/スレッディングのコンテキストをスナップショットとして保存する
- GraphEngineの設定 — ワーカープールのパラメータ(
min_workers、max_workers、scale_up_threshold)を指定する - Layerの積み重ね — 横断的な関心事を担うLayerを順番に追加する
run() メソッド自体はシンプルで、graph_engine.run() に処理を委譲してイベントをyieldします。ユーザーキャンセルを示す GenerateTaskStoppedError を捕捉し、未知の例外は GraphRunFailedEvent にラップします。
graphon(汎用グラフ実行プリミティブ)とDify固有コードをあえて分離しているのは意図的な設計です。graphon ライブラリはグラフ・ノード・変数プール・実行について知っていますが、LLMクォータ・オブザーバビリティ・永続化については関知しません。それらの関心事はDifyのLayer実装に集約されています。
Layerパターン:グラフ実行のためのAOP風ミドルウェア
GraphEngineLayer は抽象クラスであり、ノード実行のライフサイクルにフックを提供します。各Layerはノード実装を変更することなく、グラフレベル・ノードレベルのイベントを監視して処理を加えることができます。まさにアスペクト指向プログラミング(AOP)の考え方です。
classDiagram
class GraphEngineLayer {
<<abstract>>
+on_graph_start()
+on_graph_end(error)
+on_event(event)
+on_node_run_start(node)
+on_node_run_end(node, error, result_event)
}
class LLMQuotaLayer {
-_abort_sent: bool
+on_node_run_start() check quota
+on_node_run_end() deduct quota
}
class ExecutionLimitsLayer {
+max_steps: int
+max_time: float
}
class ObservabilityLayer {
+on_node_run_start() create span
+on_node_run_end() close span
}
class DebugLoggingLayer {
+level: str
+include_inputs: bool
}
class PauseStatePersistenceLayer {
+on_event() persist pause state
}
GraphEngineLayer <|-- LLMQuotaLayer
GraphEngineLayer <|-- ExecutionLimitsLayer
GraphEngineLayer <|-- ObservabilityLayer
GraphEngineLayer <|-- DebugLoggingLayer
GraphEngineLayer <|-- PauseStatePersistenceLayer
仕組みを理解するうえで最もわかりやすい例が LLMQuotaLayer です。このLayerは2つのライフサイクルイベントにフックします。
on_node_run_start()— LLM・ParameterExtractor・QuestionClassifier ノードの実行前に、コストのかかるモデル呼び出しが始まる前にテナントのクォータが十分かチェックするon_node_run_end()— ノードの実行が成功した後、実際のトークン使用量をテナントのクォータから差し引く
クォータが超過した場合、このLayerは単に例外を投げるわけではありません。エンジンのコマンドチャネルを通じて AbortCommand を送信し、グラフの停止イベントをセットします。これにより、実行中のノードが処理を完了できるグレースフルシャットダウンが実現されます。
def _send_abort_command(self, *, reason: str) -> None:
if not self.command_channel or self._abort_sent:
return
self.command_channel.send_command(
AbortCommand(command_type=CommandType.ABORT, reason=reason)
)
self._abort_sent = True
ヒント:
LLMQuotaLayerの_extract_model_instance()メソッドには興味深いエッジケースへの対応があります。モデルインスタンスがDifyPreparedLLMでラップされている場合、_model_instance属性の有無を確認してアンラップします。これは Node Factory で使われているDecoratorパターンの副産物です。
ノードの登録:自動インポートと自己登録
Difyは自己登録パターンを採用しています。ノードクラスはインポートされるだけで自動的に登録されます。node_factory.py モジュールがこの仕組みを取りまとめています。
flowchart TD
Boot[register_nodes called] --> ImportGraphon[pkgutil.walk_packages<br/>graphon.nodes.*]
Boot --> ImportDify[pkgutil.walk_packages<br/>core.workflow.nodes.*]
ImportGraphon --> NodeClass[Node subclass imported]
ImportDify --> NodeClass
NodeClass --> SelfRegister["Node.__init_subclass__()<br/>registers in class-level registry"]
SelfRegister --> Registry["Node.get_node_type_classes_mapping()<br/>{NodeType → {version → class}}"]
register_nodes() 関数(@lru_cache(maxsize=1) でキャッシュ済み)は2つのパッケージツリーをインポートします。
@lru_cache(maxsize=1)
def register_nodes() -> None:
_import_node_package("graphon.nodes")
_import_node_package("core.workflow.nodes")
_import_node_package() ヘルパーは pkgutil.walk_packages を使ってパッケージ内のすべてのモジュールを再帰的にインポートします。各 Node サブクラスはクラスレベルで自身のタイプとバージョンを宣言しており、基底クラス Node が __init_subclass__() を使ってそれらをマッピングに収集します。
resolve_workflow_node_class() 関数はフォールバック戦略付きのバージョン解決を担います。まず node_version 文字列に完全一致するクラスを探し、見つからなければ latest バージョンにフォールバックします。これにより複数バージョンのノード進化がサポートされ、古いワークフローは元のノード実装を使い続けることができます。
node_factory.py#L145-L191 にある _LazyNodeTypeClassesMapping ラッパーは、レジストリに対してミュータブルなdictライクなインターフェースを提供し、レジストリのバージョンが変わるとキャッシュを無効化します。これにより、テストやプラグインがコアレジストリに影響を与えずにノード実装をオーバーライドできます。
DifyNodeFactory:依存性注入コンテナ
DifyNodeFactory は、Difyのアプリケーションレイヤーと汎用グラフエンジンが出会う場所です。その create_node() メソッドは手動の依存性注入コンテナとして機能し、ノードタイプごとに異なるkwargsを組み立てます。
node_factory.py#L240-L303 のコンストラクタは、必要な依存関係をすべて事前に構築します。
| 依存関係 | 利用ノード | 用途 |
|---|---|---|
_code_executor |
Code ノード | サンドボックス化されたコード実行 |
_http_request_http_client |
HTTP・LLM・Doc extractor | SSRFプロテクト済みHTTPクライアント |
_llm_credentials_provider |
LLM・Question Classifier・Parameter Extractor | モデルクレデンシャルの解決 |
_tool_runtime |
Tool ノード | テナントコンテキストを持つツール実行 |
_human_input_runtime |
Human Input ノード | フォーム管理と状態管理 |
_agent_strategy_resolver |
Agent ノード | プラグインベースのエージェント戦略 |
_file_reference_factory |
複数ノード | ファイル参照の作成 |
create_node() メソッドは NodeType からファクトリーlambdaへのディスパッチテーブルを使います。
node_init_kwargs_factories: Mapping[NodeType, Callable[[], dict[str, object]]] = {
BuiltinNodeTypes.CODE: lambda: {
"code_executor": self._code_executor,
"code_limits": self._code_limits,
},
BuiltinNodeTypes.HTTP_REQUEST: lambda: {
"http_request_config": self._http_request_config,
"http_client": self._http_request_http_client,
...
},
BuiltinNodeTypes.LLM: lambda: self._build_llm_compatible_node_init_kwargs(...),
...
}
この設計のおかげで、ノード自体はフレームワーク非依存のまま保たれます。LLMノードはDifyのクレデンシャルシステムやSSRFプロキシについて何も知る必要がなく、その配線はすべてFactoryが担います。
ヒント: 新しいノードタイプを追加するには、(1)
core/workflow/nodes/またはgraphon/nodes/以下にノードクラスを作成し、(2)Dify固有の依存関係が必要ならDifyNodeFactory.create_node()にファクトリーlambdaを追加し、(3)あとは自動インポートが登録を引き受けます。
Pause・Resume、そしち子グラフの実行
Difyは human-in-the-loop ワークフローをpause/resumeメカニズムで実現しています。ワークフローがHuman Inputノードに到達すると、グラフ実行全体がいったん停止して状態をシリアライズし、ユーザーがフォームに回答するまで待機します。
sequenceDiagram
participant Runner
participant GraphEngine
participant HumanInputNode
participant PauseLayer
participant DB
participant User
participant ResumeRunner
Runner->>GraphEngine: run()
GraphEngine->>HumanInputNode: execute()
HumanInputNode-->>GraphEngine: GraphRunPausedEvent
GraphEngine-->>Runner: yield PausedEvent
Runner->>PauseLayer: on_event(PausedEvent)
PauseLayer->>PauseLayer: Serialize GraphRuntimeState
PauseLayer->>DB: Store WorkflowResumptionContext
User->>ResumeRunner: Submit form data
ResumeRunner->>DB: Load WorkflowResumptionContext
ResumeRunner->>ResumeRunner: Deserialize state
ResumeRunner->>GraphEngine: resume from paused state
PauseStatePersistenceLayer は GraphRunPausedEvent を捕捉し、WorkflowResumptionContext を永続化します。これはシリアライズされたグラフランタイム状態とgenerate entityを含むPydanticモデルです。
WorkflowResumptionContext は WorkflowAppGenerateEntity と AdvancedChatAppGenerateEntity の両方を扱うために、discriminated unionを使用しています。
class WorkflowResumptionContext(BaseModel):
version: Literal["1"] = "1"
generate_entity: _GenerateEntityUnion
serialized_graph_runtime_state: str
子グラフ実行(サブワークフロー)については、_WorkflowChildEngineBuilder が新しい GraphEngine を作成します。このとき、ランタイム状態は新規に作られますが、親の実行コンテキストは共有されます。子エンジンには「子向けに安全な」Layer(LLMQuotaLayer など)のみが付与され、オブザーバビリティや永続化のLayerは追加されません。それらの関心事は親から引き継がれます。
システム変数とVariablePool
VariablePool は、ワークフロー実行中のすべてのノードが使う共有データバスです。実行開始前にシステム変数が注入され、各ノードはこのプールから入力を読み取り、出力を書き込みます。
system_variables.py モジュールはシステム変数のキーを定義しています。
| 変数キー | 型 | 供給元 |
|---|---|---|
query |
string | ユーザーの入力テキスト |
files |
list[File] | アップロードされたファイル |
conversation_id |
string | アクティブな会話 |
user_id |
string | 実行ユーザー |
dialogue_count |
int | 会話内のメッセージ数 |
app_id |
string | アプリケーション識別子 |
workflow_id |
string | ワークフロー識別子 |
workflow_run_id |
string | 現在の実行ラン |
timestamp |
int | 実行開始時刻 |
変数にはセレクタでアクセスします。セレクタは (node_id, variable_name) のタプルです。システム変数には特別な sys プレフィックスのノードIDが使われ、環境変数には env、実行をまたいで保持される会話変数には conversation プレフィックスが使われます。
VariablePool はスコープ付きアクセスをサポートしており、各ノードはグラフ内で自身の上流にある依存ノードの変数しか読み取れません。これはグラフエンジンの実行順序によって強制され、データ競合を防ぎ、決定論的な実行を保証します。
次回予告
ワークフローエンジンの内部構造を一通り押さえたところで、次はDifyの中でも特に複雑なサブシステム、RAGパイプラインへと進みましょう。第4回では、ドキュメントがアップロードから抽出・分割・埋め込み・ベクターストレージへと流れる道筋を追い、マルチデータセット検索がLLM powered ルーティングを使って最も関連性の高いナレッジを見つける仕組みを解説します。