HTTPリクエストからLLMレスポンスまで:アプリ実行パイプラインの全貌
前提知識
- ›第1回:アーキテクチャ概要
- ›Server-Sent Events(SSE)の基礎知識
- ›PythonのスレッドおよびAsync処理パターンへの理解
HTTPリクエストからLLMレスポンスまで:アプリ実行パイプラインの全貌
チャットボットへのメッセージ送信、Completion APIの呼び出し、ワークフローの実行。Difyアプリへのあらゆるリクエストは、同じ実行パイプラインをたどります。このパイプラインを理解することが、DifyがどのようにHTTPリクエストをストリーミングのLLMレスポンスへと変換するかを理解する鍵となります。本記事では、コントローラーからSSEストリームまでのライフサイクル全体を追いながら、コードベース全体に貫かれた4段階のアーキテクチャパターンを明らかにします。
コントローラー層とリクエストバリデーション
リクエストは、第1回で取り上げたblueprintグループのいずれかを経由して入ってきます。ここではService APIのパスをたどり、controllers/service_api/app/completion.pyから始めましょう。
sequenceDiagram
participant Client
participant Controller
participant Pydantic
participant AppGenerateService
Client->>Controller: POST /v1/chat-messages
Controller->>Pydantic: ChatRequestPayload.model_validate()
Pydantic-->>Controller: validated payload
Controller->>AppGenerateService: generate(app_model, user, args, invoke_from)
AppGenerateService-->>Controller: Generator[SSE events]
Controller-->>Client: streaming response
コントローラーはChatRequestPayloadなどのPydanticモデルを使ってリクエストを検証し、型の強制適用、会話IDの正規化、UUIDフォーマットの検証を行います。認証は@validate_app_tokenデコレーターが担い、APIキーをappモデルおよびエンドユーザーに解決します。
重要な要素として、app_invoke_entities.pyで定義されているInvokeFromenumがあります。これはリクエストの発生元を識別するものです。
| 値 | 発生元 | 挙動 |
|---|---|---|
SERVICE_API |
外部開発者API | 公開済みワークフローを使用 |
WEB_APP |
エンドユーザー向けWebインターフェース | 公開済みワークフローを使用 |
DEBUGGER |
コンソールのデバッグパネル | ドラフトワークフローを使用 |
EXPLORE |
コンソールのプレビュー | 公開済みワークフローを使用 |
TRIGGER |
Webhookトリガー | 公開済みワークフローを使用 |
PUBLISHED_PIPELINE |
公開パイプラインの呼び出し | 公開済みワークフローを使用 |
VALIDATION |
バリデーションコンテキスト | 公開済みワークフローを使用 |
このenumは実行パイプライン全体を流れ、どのバージョンのワークフローを取得するか、ユーザーIDをどう解決するか、どのレート制限を適用するかに影響を与えます。
AppGenerateService:モードディスパッチャー
すべてのリクエストは最終的にAppGenerateService.generate()へ集約されます。ここはアプリのモードに応じて適切なGeneratorへとリクエストをルーティングする、中央のディスパッチポイントです。ディスパッチの前に、課金クォータチェックとレート制限という2つの横断的処理が適用されます。
flowchart TD
Generate[AppGenerateService.generate] --> Quota{Billing enabled?}
Quota -->|Yes| CheckQuota[QuotaType.WORKFLOW.consume]
Quota -->|No| RateLimit
CheckQuota --> RateLimit[RateLimit.enter]
RateLimit --> ModeCheck{app_model.mode?}
ModeCheck -->|COMPLETION| CompGen[CompletionAppGenerator]
ModeCheck -->|CHAT| ChatGen[ChatAppGenerator]
ModeCheck -->|AGENT_CHAT| AgentGen[AgentChatAppGenerator]
ModeCheck -->|ADVANCED_CHAT| AdvGen[AdvancedChatAppGenerator]
ModeCheck -->|WORKFLOW| WfGen[WorkflowAppGenerator]
レートリミッターはRedisを使ってアプリ単位で動作し、アプリケーションごとのアクティブな同時リクエスト数を追跡します。_get_max_active_requests()メソッドは、アプリ固有の上限とグローバル設定上限のうち小さい方を返し、ゼロは無制限を意味します。
ヒント: リクエストが拒否される原因を調べるときは、次の順番で確認しましょう。①課金クォータ(
QuotaType.WORKFLOW.consume)、②アプリレベルのレート制限(RateLimit.enter)、③パイプラインのより深い部分で適用されるモデルごとのレート制限。
5つのアプリモードとそのクラス構成
Difyは5種類のアプリモードをサポートしており、それぞれ実行コンテキストのセットアップ方法を知っている専用のGeneratorクラスを持ちます。
classDiagram
class MessageBasedAppGenerator {
+convert_to_event_stream()
+retrieve_events()
}
class CompletionAppGenerator {
+generate()
}
class ChatAppGenerator {
+generate()
}
class AgentChatAppGenerator {
+generate()
}
class AdvancedChatAppGenerator {
+generate()
+retrieve_events()
+single_iteration_generate()
}
class WorkflowAppGenerator {
+generate()
}
MessageBasedAppGenerator <|-- CompletionAppGenerator
MessageBasedAppGenerator <|-- ChatAppGenerator
MessageBasedAppGenerator <|-- AgentChatAppGenerator
MessageBasedAppGenerator <|-- AdvancedChatAppGenerator
MessageBasedAppGenerator <|-- WorkflowAppGenerator
モードは2つの実行戦略に分かれます。
EasyUIベースのモード(Completion、Chat、AgentChat)は、プロンプトテンプレート+単一LLM呼び出しというパターンを使います。EasyUIBasedAppGenerateEntityと直接的なModelConfigWithCredentialsEntityを組み合わせ、バックグラウンドスレッドで同期的に実行されます。
ワークフローベースのモード(AdvancedChat、Workflow)は、グラフエンジンを通じてDAGのノードを実行します。これらのモードには重要なアーキテクチャ上の分岐があります。ストリーミングモードでは、実行をCeleryタスクにディスパッチし、Redis pub/subを経由して結果をストリームとして返します。ブロッキングモードでは同期的に実行されます。
この分岐はgenerateメソッドにはっきりと表れています。ADVANCED_CHATのストリーミングではapp_generate_service.py#L150-L178のコードがAppExecutionParamsペイロードをシリアライズし、workflow_based_app_execution_task.delay()でディスパッチします。
Generator → Runner → QueueManager → TaskPipeline
Difyのコアとなる実行パターンは、関心事を明確に分離した4段階のパイプラインです。
sequenceDiagram
participant Client
participant Generator
participant Runner/CeleryTask
participant QueueManager
participant TaskPipeline
Client->>Generator: generate()
Generator->>Generator: Build entity, validate config
Generator->>QueueManager: Create queue
Generator->>Runner/CeleryTask: Start execution (thread or Celery)
loop Execution
Runner/CeleryTask->>QueueManager: publish(QueueEvent)
QueueManager->>QueueManager: Put in queue
end
Generator->>TaskPipeline: Create pipeline(queue_manager)
loop Streaming
TaskPipeline->>QueueManager: listen()
QueueManager-->>TaskPipeline: QueueEvent
TaskPipeline->>TaskPipeline: Transform to SSE
TaskPipeline-->>Client: SSE event
end
Stage 1:Generator — 実行コンテキストのセットアップを担います。AdvancedChatAppGeneratorはアプリ設定の検証、ファイルアップロード設定の解決、会話レコードの作成、generateエンティティの構築を行い、すべての情報が揃ったコンテキストオブジェクトを生成します。
Stage 2:Runner — 実際の計算処理を実行します。ワークフローベースのアプリでは、WorkflowBasedAppRunnerがワークフローエンジンから発行されるGraphEngineEventをQueueEventにマッピングします。RunnerはバックグラウンドスレッドまたはCeleryタスクとして動作し、計算が進むにつれてイベントを発行していきます。
Stage 3:QueueManager — AppQueueManagerはプロデューサー(Runner)とコンシューマー(TaskPipeline)の橋渡し役です。プロセス内通信にはPythonのqueue.Queueを使い、分散環境での停止シグナルにはRedisバックのオーナーシップキャッシュを利用します。listen()メソッドは設定可能なタイムアウト付きでメッセージを返し、APP_MAX_EXECUTION_TIMEを超えると自動的に停止シグナルを送信します。
Stage 4:TaskPipeline — BasedGenerateTaskPipelineはキューイベントを消費し、SSEレスポンスオブジェクトへと変換します。エラーのマッピング(InvokeAuthorizationErrorをユーザーフレンドリーなメッセージへ変換するなど)、出力のモデレーション、メッセージステータスの追跡も担います。
サブスクライブ・ゲート型ストリーミングとRedis Pub/Sub vs. Streams
ストリーミングアーキテクチャには、微妙ながら重要な調整メカニズムが存在します。Celery経由でワークフローベースのアプリが実行される場合、実行タスクはリモートワーカー上で起動します。しかしSSEレスポンスストリームは、タスクがイベントを発行し始める前に受信できる状態でなければなりません。
_build_streaming_task_on_subscribe()メソッドは、Redisチャンネルの種類に応じてこの調整を異なる方法で処理します。
flowchart TD
Start[Build on_subscribe callback] --> ChannelType{PUBSUB_REDIS_CHANNEL_TYPE?}
ChannelType -->|streams| StartImmediate[Start task immediately<br/>consumers can replay past events]
ChannelType -->|pubsub/sharded| StartGated[Start on first SSE subscriber<br/>with 200ms fallback timer]
StartGated --> Timer[threading.Timer 200ms]
StartGated --> Subscribe[Client connects to SSE]
Subscribe --> Cancel[Cancel timer, start task]
Timer --> FallbackStart[Start task if no subscriber]
Redis Streamsを使う場合、イベントは永続化されます。コンシューマーはサブスクライブより前に発行されたイベントも読み取れるため、Celeryタスクは即座に起動できます。
Redis Pub/Sub(デフォルト)を使う場合、イベントは一過性です。SSEエンドポイントがサブスクライブする前にタスクが発行を始めると、それらのイベントは失われてしまいます。この問題への解決策がサブスクライブ・ゲート型の起動です。Celeryタスクは最初のSSEクライアントが接続したときに初めてディスパッチされます。また、クライアントが一向に接続しない場合(リソースリークを防ぐため)でも、200msのフォールバックタイマーによってタスクが起動されます。
この仕組みはthreading.Lockとstartedフラグを組み合わせたシンプルかつスレッドセーフな実装で実現されています。
channel_type = dify_config.PUBSUB_REDIS_CHANNEL_TYPE
if channel_type == "streams":
_try_start()
return _on_subscribe_streams
timer = threading.Timer(SSE_TASK_START_FALLBACK_MS / 1000.0, _try_start)
timer.daemon = True
timer.start()
ヒント: ストリーミングモードでイベントが欠落する場合は、
PUBSUB_REDIS_CHANNEL_TYPEの設定を確認しましょう。streamsに切り替えることで競合状態を解消できます。Redisのメモリ使用量が若干増える点はトレードオフとして認識しておきましょう。
イベント型の全体像
キューシステムは、実行状態を伝えるための豊富なイベント型の階層を持っています。各イベントは、TaskPipelineが適切なSSEペイロードを構築するために必要な情報を含んでいます。
| キューイベント | トリガー | SSE出力 |
|---|---|---|
QueueWorkflowStartedEvent |
グラフエンジンの開始 | workflow_started |
QueueNodeStartedEvent |
ノード実行の開始 | node_started |
QueueTextChunkEvent |
LLMによるトークンのストリーミング | text_chunk |
QueueNodeSucceededEvent |
ノードの完了 | node_finished |
QueueRetrieverResourcesEvent |
RAG検索結果 | retriever_resources |
QueueWorkflowSucceededEvent |
グラフの完了 | workflow_finished |
QueueErrorEvent |
任意のエラー | error |
QueueStopEvent |
ユーザーによるキャンセル | stop |
QueueWorkflowPausedEvent |
人間の入力待ち | workflow_paused |
WorkflowBasedAppRunnerはワークフローエンジンが発行するGraphEngineEventをQueueEventにマッピングします。このマッピング層こそが、引用メタデータ、検索リソース、エージェントログといったDify固有の情報がイベントストリームに付加される場所です。
次回予告
HTTPリクエストからSSEストリームに至るまでの全リクエストライフサイクルを追い、4段階のパイプラインパターンを明らかにしてきました。第3回では、ワークフローエンジンそのものに深く潜り込みます。GraphEngineの内部構造、横断的関心事を処理するレイヤーシステム、ノードファクトリーの依存性注入、そしてヒューマン・イン・ザ・ループのワークフローを実現するポーズ/レジュームメカニズムを解説します。