Read OSS

从 HTTP 请求到 LLM 响应:应用执行管道全解析

高级

前置知识

  • 第 1 篇:架构概览
  • 熟悉 server-sent events(SSE)
  • 了解 Python 多线程与异步模式

从 HTTP 请求到 LLM 响应:应用执行管道全解析

无论是聊天机器人回复用户消息、调用 completion API,还是触发 workflow 执行,每一次与 Dify 应用的交互都遵循同一套核心执行管道。理解这套管道,是读懂 Dify 如何将一个 HTTP 请求转化为流式 LLM 响应的关键所在。本文将从 controller 到 SSE 流,完整追踪请求的生命周期,揭示贯穿整个代码库的四阶段架构模式。

Controller 层与请求验证

请求通过第 1 篇介绍的某个 blueprint 组进入系统。我们以 Service API 路径为例,从 controllers/service_api/app/completion.py 开始追踪:

sequenceDiagram
    participant Client
    participant Controller
    participant Pydantic
    participant AppGenerateService

    Client->>Controller: POST /v1/chat-messages
    Controller->>Pydantic: ChatRequestPayload.model_validate()
    Pydantic-->>Controller: validated payload
    Controller->>AppGenerateService: generate(app_model, user, args, invoke_from)
    AppGenerateService-->>Controller: Generator[SSE events]
    Controller-->>Client: streaming response

Controller 使用 ChatRequestPayload 等 Pydantic 模型对请求进行验证,包括类型检查、conversation ID 规范化以及 UUID 格式校验。@validate_app_token 装饰器负责处理认证逻辑,将 API key 解析为对应的 app model 和终端用户。

有一个细节值得关注:InvokeFrom 枚举,定义于 app_invoke_entities.py,用于标识请求的来源:

来源 行为
SERVICE_API 外部开发者 API 使用已发布的 workflow
WEB_APP 终端用户 Web 界面 使用已发布的 workflow
DEBUGGER 控制台调试面板 使用草稿 workflow
EXPLORE 控制台探索/预览 使用已发布的 workflow
TRIGGER Webhook 触发器 使用已发布的 workflow
PUBLISHED_PIPELINE 已发布的 pipeline 调用 使用已发布的 workflow
VALIDATION 校验上下文 使用已发布的 workflow

这个枚举会贯穿整个执行管道,影响 workflow 版本的获取方式、用户身份的解析逻辑,以及适用的限流规则。

AppGenerateService:模式分发中枢

所有请求最终都会汇聚到 AppGenerateService.generate()。这是整个系统的核心分发点,根据应用的模式将请求路由到对应的 generator。在分发之前,它会统一处理两个横切关注点:计费配额检查和限流

flowchart TD
    Generate[AppGenerateService.generate] --> Quota{Billing enabled?}
    Quota -->|Yes| CheckQuota[QuotaType.WORKFLOW.consume]
    Quota -->|No| RateLimit
    CheckQuota --> RateLimit[RateLimit.enter]
    RateLimit --> ModeCheck{app_model.mode?}
    ModeCheck -->|COMPLETION| CompGen[CompletionAppGenerator]
    ModeCheck -->|CHAT| ChatGen[ChatAppGenerator]
    ModeCheck -->|AGENT_CHAT| AgentGen[AgentChatAppGenerator]
    ModeCheck -->|ADVANCED_CHAT| AdvGen[AdvancedChatAppGenerator]
    ModeCheck -->|WORKFLOW| WfGen[WorkflowAppGenerator]

限流器基于 Redis 在应用级别工作,追踪每个应用的并发活跃请求数。_get_max_active_requests() 方法返回应用级限制与全局配置限制中的较小值,设为零表示不限制。

提示: 排查请求被拒绝的问题时,建议按以下顺序检查三个环节:计费配额(QuotaType.WORKFLOW.consume)、应用级限流(RateLimit.enter),以及管道更深层的单模型限流。

五种应用模式及其类族

Dify 支持五种不同的应用模式,每种模式都有专属的 Generator 类来负责搭建执行上下文:

classDiagram
    class MessageBasedAppGenerator {
        +convert_to_event_stream()
        +retrieve_events()
    }

    class CompletionAppGenerator {
        +generate()
    }
    class ChatAppGenerator {
        +generate()
    }
    class AgentChatAppGenerator {
        +generate()
    }
    class AdvancedChatAppGenerator {
        +generate()
        +retrieve_events()
        +single_iteration_generate()
    }
    class WorkflowAppGenerator {
        +generate()
    }

    MessageBasedAppGenerator <|-- CompletionAppGenerator
    MessageBasedAppGenerator <|-- ChatAppGenerator
    MessageBasedAppGenerator <|-- AgentChatAppGenerator
    MessageBasedAppGenerator <|-- AdvancedChatAppGenerator
    MessageBasedAppGenerator <|-- WorkflowAppGenerator

这五种模式分为两种执行策略:

基于 EasyUI 的模式(Completion、Chat、AgentChat)采用提示词模板 + 单次 LLM 调用的模式。它们创建一个带有 ModelConfigWithCredentialsEntityEasyUIBasedAppGenerateEntity,并在后台线程中同步运行。

基于 Workflow 的模式(AdvancedChat、Workflow)通过图引擎执行一个由节点组成的 DAG。这两种模式在架构上有一个重要区分:流式模式下,执行任务会被派发给 Celery task,并通过 Redis pub/sub 将结果流式传回;阻塞模式下则同步运行。

这一区分在 generate 方法中清晰可见——对于 ADVANCED_CHAT 流式模式,app_generate_service.py#L150-L178 处的代码会将 AppExecutionParams 序列化,并通过 workflow_based_app_execution_task.delay() 进行派发。

Generator → Runner → QueueManager → TaskPipeline

Dify 的核心执行模式遵循一套四阶段管道,各阶段职责清晰、相互分离:

sequenceDiagram
    participant Client
    participant Generator
    participant Runner/CeleryTask
    participant QueueManager
    participant TaskPipeline

    Client->>Generator: generate()
    Generator->>Generator: Build entity, validate config
    Generator->>QueueManager: Create queue
    Generator->>Runner/CeleryTask: Start execution (thread or Celery)

    loop Execution
        Runner/CeleryTask->>QueueManager: publish(QueueEvent)
        QueueManager->>QueueManager: Put in queue
    end

    Generator->>TaskPipeline: Create pipeline(queue_manager)

    loop Streaming
        TaskPipeline->>QueueManager: listen()
        QueueManager-->>TaskPipeline: QueueEvent
        TaskPipeline->>TaskPipeline: Transform to SSE
        TaskPipeline-->>Client: SSE event
    end

第一阶段:Generator — 负责搭建执行上下文。AdvancedChatAppGenerator 会校验应用配置、解析文件上传设置、创建会话记录,并构建生成实体,最终产出一个完整填充的上下文对象。

第二阶段:Runner — 执行实际的计算逻辑。对于基于 workflow 的应用,WorkflowBasedAppRunner 将 workflow 引擎产生的 GraphEngineEvent 映射为 QueueEvent。Runner 运行在后台线程或 Celery task 中,随着计算的推进持续产生事件。

第三阶段:QueueManagerAppQueueManager 是连接生产者(Runner)和消费者(TaskPipeline)的桥梁。它使用 Python 的 queue.Queue 实现进程内通信,并借助 Redis 缓存实现分布式停止信号的传递。listen() 方法在可配置的超时时间内持续产出消息,当超过 APP_MAX_EXECUTION_TIME 时自动发送停止信号。

第四阶段:TaskPipelineBasedGenerateTaskPipeline 消费队列事件并将其转化为 SSE 响应对象。它负责错误映射(例如将 InvokeAuthorizationError 转换为用户友好的提示)、输出内容审核,以及消息状态追踪。

订阅门控流式传输与 Redis Pub/Sub vs. Streams

流式架构中存在一个微妙但重要的协调机制。对于通过 Celery 运行的基于 workflow 的应用,执行任务在远程 worker 上启动。但 SSE 响应流需要在任务开始产生事件之前就做好接收准备。

_build_streaming_task_on_subscribe() 方法根据 Redis channel 类型采用不同的协调策略:

flowchart TD
    Start[Build on_subscribe callback] --> ChannelType{PUBSUB_REDIS_CHANNEL_TYPE?}
    ChannelType -->|streams| StartImmediate[Start task immediately<br/>consumers can replay past events]
    ChannelType -->|pubsub/sharded| StartGated[Start on first SSE subscriber<br/>with 200ms fallback timer]
    StartGated --> Timer[threading.Timer 200ms]
    StartGated --> Subscribe[Client connects to SSE]
    Subscribe --> Cancel[Cancel timer, start task]
    Timer --> FallbackStart[Start task if no subscriber]

使用 Redis Streams 时,事件具有持久性——消费者可以读取到它订阅之前发布的事件。因此 Celery task 可以立即启动。

使用 Redis Pub/Sub(默认模式)时,事件是即发即弃的。如果任务在 SSE 端点订阅之前就开始发布,那些事件将会丢失。解决方案是订阅门控启动:Celery task 仅在第一个 SSE 客户端连接时才被派发。200ms 的兜底定时器确保即使客户端始终未连接,任务也能正常启动,从而防止资源泄漏。

这一机制通过 threading.Lockstarted 标志实现,简洁且线程安全:

channel_type = dify_config.PUBSUB_REDIS_CHANNEL_TYPE
if channel_type == "streams":
    _try_start()
    return _on_subscribe_streams

timer = threading.Timer(SSE_TASK_START_FALLBACK_MS / 1000.0, _try_start)
timer.daemon = True
timer.start()

提示: 如果在流式模式下遇到事件丢失的问题,请检查 PUBSUB_REDIS_CHANNEL_TYPE 的配置。切换为 streams 可以彻底消除竞态条件,代价是 Redis 内存占用略有增加。

事件类型全览

队列系统使用丰富的事件类型层次结构来传递执行状态。每个事件都携带足够的信息,供 TaskPipeline 构建相应的 SSE payload:

Queue Event 触发时机 SSE 输出
QueueWorkflowStartedEvent 图引擎开始执行 workflow_started
QueueNodeStartedEvent 节点开始执行 node_started
QueueTextChunkEvent LLM 流式输出 token text_chunk
QueueNodeSucceededEvent 节点执行完成 node_finished
QueueRetrieverResourcesEvent RAG 检索结果返回 retriever_resources
QueueWorkflowSucceededEvent 图执行完成 workflow_finished
QueueErrorEvent 任意错误发生 error
QueueStopEvent 用户取消 stop
QueueWorkflowPausedEvent 需要人工输入 workflow_paused

WorkflowBasedAppRunner 负责将 workflow 引擎产生的 GraphEngineEvent 映射为 QueueEvent。这一映射层正是 Dify 特有的关注点被注入事件流的地方,例如引用元数据、检索资源以及 agent 日志。

下一步

我们已经完整追踪了从 HTTP 到 SSE 流的请求生命周期,揭示了四阶段管道模式。在第 3 篇中,我们将深入 workflow 引擎本身——包括 GraphEngine 的内部机制、用于处理横切关注点的层级系统、节点工厂的依赖注入,以及支持人在回路(human-in-the-loop)工作流的暂停/恢复机制。