Read OSS

深入工作流引擎:图执行、节点工厂与层系统

高级

前置知识

  • 第 1-2 篇:架构概览与请求流程
  • 理解有向无环图(DAG)
  • 熟悉 Python 中的事件驱动 generator 模式

深入工作流引擎:图执行、节点工厂与层系统

Dify 的可视化工作流构建器允许用户将 LLM 调用、知识检索、代码执行、HTTP 请求、条件分支等功能节点连接成一张有向无环图,从而组合出完整的 LLM 应用。在这张可视化 DAG 的背后,GraphEngine 负责将其转化为真正运行的计算过程——这是一个通用图执行运行时,Dify 在此基础上叠加了应用层的业务逻辑。本文将深入探讨该引擎的组装方式、节点的注册与实例化机制,以及层系统如何以 AOP 风格的 middleware 处理配额管理和可观测性等横切关注点。

WorkflowEntry:构建并运行图引擎

WorkflowEntry 类是 Dify 应用层与通用 graphon 图引擎库之间的桥梁。它的构造函数负责组装一个完整配置的 GraphEngine(包括各个层),run() 方法则以 generator 的形式依次产出 GraphEngineEvent

flowchart TD
    WE[WorkflowEntry.__init__] --> DepthCheck[Check call depth limit]
    DepthCheck --> CmdChannel[Create CommandChannel]
    CmdChannel --> ChildBuilder[Create _WorkflowChildEngineBuilder]
    ChildBuilder --> GE[GraphEngine<br/>graph + runtime_state + config]
    GE --> DebugLayer{DEBUG mode?}
    DebugLayer -->|Yes| AddDebug[Add DebugLoggingLayer]
    DebugLayer -->|No| LimitsLayer
    AddDebug --> LimitsLayer[Add ExecutionLimitsLayer<br/>max_steps + max_time]
    LimitsLayer --> QuotaLayer[Add LLMQuotaLayer]
    QuotaLayer --> OtelCheck{OTEL enabled?}
    OtelCheck -->|Yes| AddOtel[Add ObservabilityLayer]
    OtelCheck -->|No| Ready[Engine ready]
    AddOtel --> Ready

workflow_entry.py#L111-L191 中的构造函数主要完成以下几件事:

  1. 调用深度限制 — 防止工作流调用子工作流时产生无限递归(WORKFLOW_CALL_MAX_DEPTH
  2. 执行上下文捕获 — 对当前 Flask/threading 上下文打快照,供后台执行使用
  3. GraphEngine 配置 — 设置 worker 池参数(min_workersmax_workersscale_up_threshold
  4. 层的堆叠 — 按顺序添加各个横切关注点层

run() 方法的实现出人意料地简洁——它将执行委托给 graph_engine.run() 并转发产出的事件,同时捕获 GenerateTaskStoppedError(用户取消),并将未知异常包装为 GraphRunFailedEvent

graphon(可复用的图执行原语)与 Dify 特有代码的分离是有意为之的。graphon 库只负责图、节点、变量池和执行逻辑,对 LLM 配额、可观测性或持久化一无所知。这些关注点全部由 Dify 的层实现来承载。

Layer 模式:图执行的 AOP 风格 Middleware

GraphEngineLayer 是一个抽象类,提供了节点执行生命周期的各个钩子。每个层都可以在不修改节点实现的前提下,监听并响应图级和节点级事件。这是经典的面向切面编程(AOP)思想。

classDiagram
    class GraphEngineLayer {
        <<abstract>>
        +on_graph_start()
        +on_graph_end(error)
        +on_event(event)
        +on_node_run_start(node)
        +on_node_run_end(node, error, result_event)
    }

    class LLMQuotaLayer {
        -_abort_sent: bool
        +on_node_run_start() check quota
        +on_node_run_end() deduct quota
    }

    class ExecutionLimitsLayer {
        +max_steps: int
        +max_time: float
    }

    class ObservabilityLayer {
        +on_node_run_start() create span
        +on_node_run_end() close span
    }

    class DebugLoggingLayer {
        +level: str
        +include_inputs: bool
    }

    class PauseStatePersistenceLayer {
        +on_event() persist pause state
    }

    GraphEngineLayer <|-- LLMQuotaLayer
    GraphEngineLayer <|-- ExecutionLimitsLayer
    GraphEngineLayer <|-- ObservabilityLayer
    GraphEngineLayer <|-- DebugLoggingLayer
    GraphEngineLayer <|-- PauseStatePersistenceLayer

LLMQuotaLayer 是最具参考价值的示例,它挂载了两个生命周期钩子:

  • on_node_run_start() — 针对 LLM、ParameterExtractor 和 QuestionClassifier 节点,在昂贵的模型调用发生之前检查租户是否有足够的配额
  • on_node_run_end() — 节点成功执行后,从租户配额中扣除实际消耗的 token 用量

当配额耗尽时,该层并不会直接抛出异常,而是通过引擎的命令通道发送 AbortCommand 并触发图的停止事件——这是一种优雅停机方式,允许正在执行中的节点完成后再退出:

def _send_abort_command(self, *, reason: str) -> None:
    if not self.command_channel or self._abort_sent:
        return
    self.command_channel.send_command(
        AbortCommand(command_type=CommandType.ABORT, reason=reason)
    )
    self._abort_sent = True

提示: LLMQuotaLayer 中的 _extract_model_instance() 方法处理了一个有趣的边界情况——当 model instance 被 DifyPreparedLLM 包装时,它会通过检查 _model_instance 属性来解包。这是节点工厂中装饰器模式所带来的副作用。

节点注册:自动导入与自注册

Dify 采用自注册模式——节点类只需被导入,就会自动完成注册。node_factory.py 模块负责统筹这一过程:

flowchart TD
    Boot[register_nodes called] --> ImportGraphon[pkgutil.walk_packages<br/>graphon.nodes.*]
    Boot --> ImportDify[pkgutil.walk_packages<br/>core.workflow.nodes.*]
    ImportGraphon --> NodeClass[Node subclass imported]
    ImportDify --> NodeClass
    NodeClass --> SelfRegister["Node.__init_subclass__()<br/>registers in class-level registry"]
    SelfRegister --> Registry["Node.get_node_type_classes_mapping()<br/>{NodeType → {version → class}}"]

register_nodes() 函数(通过 @lru_cache(maxsize=1) 缓存)会导入两个包树:

@lru_cache(maxsize=1)
def register_nodes() -> None:
    _import_node_package("graphon.nodes")
    _import_node_package("core.workflow.nodes")

_import_node_package() 辅助函数使用 pkgutil.walk_packages 递归导入包内的每个模块。每个 Node 子类在类级别声明自己的类型和版本,基类 Node 则通过 __init_subclass__() 将它们汇总到一张映射表中。

resolve_workflow_node_class() 函数采用带回退策略的版本解析:先尝试精确匹配 node_version 字符串,若找不到则回退到 latest 版本。这一机制支持节点的多版本演进——历史工作流可以继续使用原有的节点实现。

node_factory.py#L145-L191 中的 _LazyNodeTypeClassesMapping 包装器在注册表之上提供了类似可变字典的接口,并在注册表版本变更时自动失效缓存。这使得测试和插件可以覆盖节点实现,而不影响核心注册表。

DifyNodeFactory:依赖注入容器

DifyNodeFactory 是 Dify 应用层与通用图引擎的对接点。它的 create_node() 方法充当手动依赖注入容器,为每种节点类型组装不同的 kwargs。

工厂构造函数(node_factory.py#L240-L303)会预先构建所有依赖项:

依赖项 使用方 用途
_code_executor Code 节点 沙箱化代码执行
_http_request_http_client HTTP、LLM、Doc extractor 防 SSRF 的 HTTP 客户端
_llm_credentials_provider LLM、Question Classifier、Parameter Extractor 模型凭证解析
_tool_runtime Tool 节点 携带租户上下文的工具执行
_human_input_runtime Human Input 节点 表单管理与状态
_agent_strategy_resolver Agent 节点 基于插件的 agent 策略
_file_reference_factory 多个节点 文件引用创建

create_node() 方法使用一张将 NodeType 映射到工厂 lambda 的调度表:

node_init_kwargs_factories: Mapping[NodeType, Callable[[], dict[str, object]]] = {
    BuiltinNodeTypes.CODE: lambda: {
        "code_executor": self._code_executor,
        "code_limits": self._code_limits,
    },
    BuiltinNodeTypes.HTTP_REQUEST: lambda: {
        "http_request_config": self._http_request_config,
        "http_client": self._http_request_http_client,
        ...
    },
    BuiltinNodeTypes.LLM: lambda: self._build_llm_compatible_node_init_kwargs(...),
    ...
}

这种设计使节点本身保持框架无关性——LLM 节点无需了解 Dify 的凭证系统或 SSRF 代理,这些接线工作全部由工厂负责。

提示: 添加新节点类型时,需要:(1) 在 core/workflow/nodes/graphon/nodes/ 下创建节点类;(2) 如果节点需要 Dify 特有的依赖,在 DifyNodeFactory.create_node() 中添加对应的工厂 lambda;(3) 自动导入机制会处理节点注册,无需额外操作。

暂停、恢复与子图执行

Dify 通过暂停/恢复机制支持**人在回路(human-in-the-loop)**工作流。当工作流执行到 Human Input 节点时,整个图执行会暂停、序列化状态,并等待用户提交表单响应。

sequenceDiagram
    participant Runner
    participant GraphEngine
    participant HumanInputNode
    participant PauseLayer
    participant DB
    participant User
    participant ResumeRunner

    Runner->>GraphEngine: run()
    GraphEngine->>HumanInputNode: execute()
    HumanInputNode-->>GraphEngine: GraphRunPausedEvent
    GraphEngine-->>Runner: yield PausedEvent
    Runner->>PauseLayer: on_event(PausedEvent)
    PauseLayer->>PauseLayer: Serialize GraphRuntimeState
    PauseLayer->>DB: Store WorkflowResumptionContext

    User->>ResumeRunner: Submit form data
    ResumeRunner->>DB: Load WorkflowResumptionContext
    ResumeRunner->>ResumeRunner: Deserialize state
    ResumeRunner->>GraphEngine: resume from paused state

PauseStatePersistenceLayer 拦截 GraphRunPausedEvent 事件,并将 WorkflowResumptionContext 持久化到数据库——这是一个 Pydantic model,包含序列化后的图运行时状态和 generate entity。

WorkflowResumptionContext 使用 discriminated union 同时处理 WorkflowAppGenerateEntityAdvancedChatAppGenerateEntity 两种情况:

class WorkflowResumptionContext(BaseModel):
    version: Literal["1"] = "1"
    generate_entity: _GenerateEntityUnion
    serialized_graph_runtime_state: str

对于子图执行(子工作流),_WorkflowChildEngineBuilder 会创建一个新的 GraphEngine,使用全新的运行时状态,但共享父引擎的执行上下文。子引擎只会加载"子引擎安全"的层(如 LLMQuotaLayer),不会自行添加可观测性或持久化层——这些关注点由父引擎统一承担。

系统变量与 VariablePool

VariablePool 是工作流执行过程中所有节点共享的数据总线。系统变量在执行开始前注入,每个节点从中读取输入并将输出写回其中。

system_variables.py 模块定义了系统变量的键名:

变量键 类型 来源
query string 用户输入文本
files list[File] 上传的文件
conversation_id string 当前会话
user_id string 执行用户
dialogue_count int 会话中的消息数量
app_id string 应用标识符
workflow_id string 工作流标识符
workflow_run_id string 当前执行运行标识
timestamp int 执行开始时间

变量通过选择器(selector)寻址——即 (node_id, variable_name) 形式的元组。系统变量使用特殊的 sys 前缀作为 node ID,环境变量使用 env 前缀,跨执行持久化的会话变量则使用 conversation 前缀。

VariablePool 支持作用域访问——图引擎通过执行顺序约束,确保每个节点只能读取其上游依赖节点的变量。这从根本上避免了数据竞争,保证了执行结果的确定性。

下一步

掌握了工作流引擎的内部机制之后,我们即将深入 Dify 最复杂的子系统之一:RAG pipeline。在第 4 篇中,我们将追踪文档从上传到抽取、切分、embedding 和向量存储的完整流程,并了解多数据集检索如何借助 LLM 路由找到最相关的知识。