Read OSS

HTTPリクエストからLLMレスポンスまで:アプリ実行パイプラインの全貌

上級

前提知識

  • 第1回:アーキテクチャ概要
  • Server-Sent Events(SSE)の基礎知識
  • PythonのスレッドおよびAsync処理パターンへの理解

HTTPリクエストからLLMレスポンスまで:アプリ実行パイプラインの全貌

チャットボットへのメッセージ送信、Completion APIの呼び出し、ワークフローの実行。Difyアプリへのあらゆるリクエストは、同じ実行パイプラインをたどります。このパイプラインを理解することが、DifyがどのようにHTTPリクエストをストリーミングのLLMレスポンスへと変換するかを理解する鍵となります。本記事では、コントローラーからSSEストリームまでのライフサイクル全体を追いながら、コードベース全体に貫かれた4段階のアーキテクチャパターンを明らかにします。

コントローラー層とリクエストバリデーション

リクエストは、第1回で取り上げたblueprintグループのいずれかを経由して入ってきます。ここではService APIのパスをたどり、controllers/service_api/app/completion.pyから始めましょう。

sequenceDiagram
    participant Client
    participant Controller
    participant Pydantic
    participant AppGenerateService

    Client->>Controller: POST /v1/chat-messages
    Controller->>Pydantic: ChatRequestPayload.model_validate()
    Pydantic-->>Controller: validated payload
    Controller->>AppGenerateService: generate(app_model, user, args, invoke_from)
    AppGenerateService-->>Controller: Generator[SSE events]
    Controller-->>Client: streaming response

コントローラーはChatRequestPayloadなどのPydanticモデルを使ってリクエストを検証し、型の強制適用、会話IDの正規化、UUIDフォーマットの検証を行います。認証は@validate_app_tokenデコレーターが担い、APIキーをappモデルおよびエンドユーザーに解決します。

重要な要素として、app_invoke_entities.pyで定義されているInvokeFromenumがあります。これはリクエストの発生元を識別するものです。

発生元 挙動
SERVICE_API 外部開発者API 公開済みワークフローを使用
WEB_APP エンドユーザー向けWebインターフェース 公開済みワークフローを使用
DEBUGGER コンソールのデバッグパネル ドラフトワークフローを使用
EXPLORE コンソールのプレビュー 公開済みワークフローを使用
TRIGGER Webhookトリガー 公開済みワークフローを使用
PUBLISHED_PIPELINE 公開パイプラインの呼び出し 公開済みワークフローを使用
VALIDATION バリデーションコンテキスト 公開済みワークフローを使用

このenumは実行パイプライン全体を流れ、どのバージョンのワークフローを取得するか、ユーザーIDをどう解決するか、どのレート制限を適用するかに影響を与えます。

AppGenerateService:モードディスパッチャー

すべてのリクエストは最終的にAppGenerateService.generate()へ集約されます。ここはアプリのモードに応じて適切なGeneratorへとリクエストをルーティングする、中央のディスパッチポイントです。ディスパッチの前に、課金クォータチェックとレート制限という2つの横断的処理が適用されます。

flowchart TD
    Generate[AppGenerateService.generate] --> Quota{Billing enabled?}
    Quota -->|Yes| CheckQuota[QuotaType.WORKFLOW.consume]
    Quota -->|No| RateLimit
    CheckQuota --> RateLimit[RateLimit.enter]
    RateLimit --> ModeCheck{app_model.mode?}
    ModeCheck -->|COMPLETION| CompGen[CompletionAppGenerator]
    ModeCheck -->|CHAT| ChatGen[ChatAppGenerator]
    ModeCheck -->|AGENT_CHAT| AgentGen[AgentChatAppGenerator]
    ModeCheck -->|ADVANCED_CHAT| AdvGen[AdvancedChatAppGenerator]
    ModeCheck -->|WORKFLOW| WfGen[WorkflowAppGenerator]

レートリミッターはRedisを使ってアプリ単位で動作し、アプリケーションごとのアクティブな同時リクエスト数を追跡します。_get_max_active_requests()メソッドは、アプリ固有の上限とグローバル設定上限のうち小さい方を返し、ゼロは無制限を意味します。

ヒント: リクエストが拒否される原因を調べるときは、次の順番で確認しましょう。①課金クォータ(QuotaType.WORKFLOW.consume)、②アプリレベルのレート制限(RateLimit.enter)、③パイプラインのより深い部分で適用されるモデルごとのレート制限。

5つのアプリモードとそのクラス構成

Difyは5種類のアプリモードをサポートしており、それぞれ実行コンテキストのセットアップ方法を知っている専用のGeneratorクラスを持ちます。

classDiagram
    class MessageBasedAppGenerator {
        +convert_to_event_stream()
        +retrieve_events()
    }

    class CompletionAppGenerator {
        +generate()
    }
    class ChatAppGenerator {
        +generate()
    }
    class AgentChatAppGenerator {
        +generate()
    }
    class AdvancedChatAppGenerator {
        +generate()
        +retrieve_events()
        +single_iteration_generate()
    }
    class WorkflowAppGenerator {
        +generate()
    }

    MessageBasedAppGenerator <|-- CompletionAppGenerator
    MessageBasedAppGenerator <|-- ChatAppGenerator
    MessageBasedAppGenerator <|-- AgentChatAppGenerator
    MessageBasedAppGenerator <|-- AdvancedChatAppGenerator
    MessageBasedAppGenerator <|-- WorkflowAppGenerator

モードは2つの実行戦略に分かれます。

EasyUIベースのモード(Completion、Chat、AgentChat)は、プロンプトテンプレート+単一LLM呼び出しというパターンを使います。EasyUIBasedAppGenerateEntityと直接的なModelConfigWithCredentialsEntityを組み合わせ、バックグラウンドスレッドで同期的に実行されます。

ワークフローベースのモード(AdvancedChat、Workflow)は、グラフエンジンを通じてDAGのノードを実行します。これらのモードには重要なアーキテクチャ上の分岐があります。ストリーミングモードでは、実行をCeleryタスクにディスパッチし、Redis pub/subを経由して結果をストリームとして返します。ブロッキングモードでは同期的に実行されます。

この分岐はgenerateメソッドにはっきりと表れています。ADVANCED_CHATのストリーミングではapp_generate_service.py#L150-L178のコードがAppExecutionParamsペイロードをシリアライズし、workflow_based_app_execution_task.delay()でディスパッチします。

Generator → Runner → QueueManager → TaskPipeline

Difyのコアとなる実行パターンは、関心事を明確に分離した4段階のパイプラインです。

sequenceDiagram
    participant Client
    participant Generator
    participant Runner/CeleryTask
    participant QueueManager
    participant TaskPipeline

    Client->>Generator: generate()
    Generator->>Generator: Build entity, validate config
    Generator->>QueueManager: Create queue
    Generator->>Runner/CeleryTask: Start execution (thread or Celery)

    loop Execution
        Runner/CeleryTask->>QueueManager: publish(QueueEvent)
        QueueManager->>QueueManager: Put in queue
    end

    Generator->>TaskPipeline: Create pipeline(queue_manager)

    loop Streaming
        TaskPipeline->>QueueManager: listen()
        QueueManager-->>TaskPipeline: QueueEvent
        TaskPipeline->>TaskPipeline: Transform to SSE
        TaskPipeline-->>Client: SSE event
    end

Stage 1:Generator — 実行コンテキストのセットアップを担います。AdvancedChatAppGeneratorはアプリ設定の検証、ファイルアップロード設定の解決、会話レコードの作成、generateエンティティの構築を行い、すべての情報が揃ったコンテキストオブジェクトを生成します。

Stage 2:Runner — 実際の計算処理を実行します。ワークフローベースのアプリでは、WorkflowBasedAppRunnerがワークフローエンジンから発行されるGraphEngineEventQueueEventにマッピングします。RunnerはバックグラウンドスレッドまたはCeleryタスクとして動作し、計算が進むにつれてイベントを発行していきます。

Stage 3:QueueManagerAppQueueManagerはプロデューサー(Runner)とコンシューマー(TaskPipeline)の橋渡し役です。プロセス内通信にはPythonのqueue.Queueを使い、分散環境での停止シグナルにはRedisバックのオーナーシップキャッシュを利用します。listen()メソッドは設定可能なタイムアウト付きでメッセージを返し、APP_MAX_EXECUTION_TIMEを超えると自動的に停止シグナルを送信します。

Stage 4:TaskPipelineBasedGenerateTaskPipelineはキューイベントを消費し、SSEレスポンスオブジェクトへと変換します。エラーのマッピング(InvokeAuthorizationErrorをユーザーフレンドリーなメッセージへ変換するなど)、出力のモデレーション、メッセージステータスの追跡も担います。

サブスクライブ・ゲート型ストリーミングとRedis Pub/Sub vs. Streams

ストリーミングアーキテクチャには、微妙ながら重要な調整メカニズムが存在します。Celery経由でワークフローベースのアプリが実行される場合、実行タスクはリモートワーカー上で起動します。しかしSSEレスポンスストリームは、タスクがイベントを発行し始める前に受信できる状態でなければなりません。

_build_streaming_task_on_subscribe()メソッドは、Redisチャンネルの種類に応じてこの調整を異なる方法で処理します。

flowchart TD
    Start[Build on_subscribe callback] --> ChannelType{PUBSUB_REDIS_CHANNEL_TYPE?}
    ChannelType -->|streams| StartImmediate[Start task immediately<br/>consumers can replay past events]
    ChannelType -->|pubsub/sharded| StartGated[Start on first SSE subscriber<br/>with 200ms fallback timer]
    StartGated --> Timer[threading.Timer 200ms]
    StartGated --> Subscribe[Client connects to SSE]
    Subscribe --> Cancel[Cancel timer, start task]
    Timer --> FallbackStart[Start task if no subscriber]

Redis Streamsを使う場合、イベントは永続化されます。コンシューマーはサブスクライブより前に発行されたイベントも読み取れるため、Celeryタスクは即座に起動できます。

Redis Pub/Sub(デフォルト)を使う場合、イベントは一過性です。SSEエンドポイントがサブスクライブする前にタスクが発行を始めると、それらのイベントは失われてしまいます。この問題への解決策がサブスクライブ・ゲート型の起動です。Celeryタスクは最初のSSEクライアントが接続したときに初めてディスパッチされます。また、クライアントが一向に接続しない場合(リソースリークを防ぐため)でも、200msのフォールバックタイマーによってタスクが起動されます。

この仕組みはthreading.Lockstartedフラグを組み合わせたシンプルかつスレッドセーフな実装で実現されています。

channel_type = dify_config.PUBSUB_REDIS_CHANNEL_TYPE
if channel_type == "streams":
    _try_start()
    return _on_subscribe_streams

timer = threading.Timer(SSE_TASK_START_FALLBACK_MS / 1000.0, _try_start)
timer.daemon = True
timer.start()

ヒント: ストリーミングモードでイベントが欠落する場合は、PUBSUB_REDIS_CHANNEL_TYPEの設定を確認しましょう。streamsに切り替えることで競合状態を解消できます。Redisのメモリ使用量が若干増える点はトレードオフとして認識しておきましょう。

イベント型の全体像

キューシステムは、実行状態を伝えるための豊富なイベント型の階層を持っています。各イベントは、TaskPipelineが適切なSSEペイロードを構築するために必要な情報を含んでいます。

キューイベント トリガー SSE出力
QueueWorkflowStartedEvent グラフエンジンの開始 workflow_started
QueueNodeStartedEvent ノード実行の開始 node_started
QueueTextChunkEvent LLMによるトークンのストリーミング text_chunk
QueueNodeSucceededEvent ノードの完了 node_finished
QueueRetrieverResourcesEvent RAG検索結果 retriever_resources
QueueWorkflowSucceededEvent グラフの完了 workflow_finished
QueueErrorEvent 任意のエラー error
QueueStopEvent ユーザーによるキャンセル stop
QueueWorkflowPausedEvent 人間の入力待ち workflow_paused

WorkflowBasedAppRunnerはワークフローエンジンが発行するGraphEngineEventQueueEventにマッピングします。このマッピング層こそが、引用メタデータ、検索リソース、エージェントログといったDify固有の情報がイベントストリームに付加される場所です。

次回予告

HTTPリクエストからSSEストリームに至るまでの全リクエストライフサイクルを追い、4段階のパイプラインパターンを明らかにしてきました。第3回では、ワークフローエンジンそのものに深く潜り込みます。GraphEngineの内部構造、横断的関心事を処理するレイヤーシステム、ノードファクトリーの依存性注入、そしてヒューマン・イン・ザ・ループのワークフローを実現するポーズ/レジュームメカニズムを解説します。