SSE、JSONL、ストリーミング:FastAPI がレスポンスのライフサイクルを管理する仕組み
前提知識
- ›Python の非同期ジェネレーターと anyio タスクグループ
- ›ASGI ストリーミングレスポンス
- ›Server-Sent Events (SSE) プロトコルの基礎
- ›第3回:リクエスト処理と依存関係の解決
SSE、JSONL、ストリーミング:FastAPI がレスポンスのライフサイクルを管理する仕組み
ストリーミングレスポンスは、FastAPI のアーキテクチャが本領を発揮する場面です。通常のリクエスト/レスポンスサイクルはシンプルです。入力を解析し、関数を呼び出し、結果をシリアライズするだけです。しかしストリーミングでは、エンドポイントが HTTP 出力と並行して動作します。つまり、ジェネレーターがまだアイテムを生成し続けている最中に、レスポンスの送信が始まるのです。これは、依存関係のライフタイム、キャンセルのセマンティクス、そして Python ランタイムのジェネレーター終了処理と、見えにくい形で複雑に絡み合います。
FastAPI が扱うストリーミングパターンは3種類あります。プロデューサー/コンシューマー型のキープアライブアーキテクチャを持つ Server-Sent Events (SSE)、構造化ストリーミング向けの JSONL(改行区切り JSON)、そしてカスタムレスポンス型向けの生ストリーミングです。それぞれ異なるハンドラーロジックを持ち、ルート登録時に決定されます。
APIRoute.init() におけるストリーム判定
ルートを登録する際、APIRoute.__init__() はエンドポイントの戻り型と response_class を検査して、ストリーミングモードを決定します。
is_generator = (
self.dependant.is_async_gen_callable or self.dependant.is_gen_callable
)
self.is_sse_stream = is_generator and lenient_issubclass(
response_class, EventSourceResponse
)
self.is_json_stream = is_generator and isinstance(
response_class, DefaultPlaceholder
)
flowchart TD
A["Endpoint registered"] --> B{Is generator?}
B -->|No| C["Regular endpoint"]
B -->|Yes| D{response_class?}
D -->|EventSourceResponse| E["SSE stream"]
D -->|DefaultPlaceholder| F["JSONL stream"]
D -->|Custom StreamingResponse| G["Raw stream"]
ロジックはシンプルです。エンドポイントがジェネレーターで response_class が EventSourceResponse であれば SSE として扱います。ジェネレーターかつ DefaultPlaceholder(デフォルトのレスポンスクラス)であれば JSONL、ジェネレーターかつ明示的なカスタムレスポンスクラスであれば生ストリーミングとなります。そもそもジェネレーターでなければ通常のエンドポイントです。
EventSourceResponse クラス自体は軽量なマーカーとして機能します。
Starlette の StreamingResponse を継承し、media_type = "text/event-stream" を設定するだけです。実際のエンコーディングロジックはルーティング層に存在します。
SSE のプロデューサー/コンシューマーアーキテクチャ
SSE の実装では、anyio のメモリストリームを使ってジェネレーターの反復処理と HTTP 出力を分離し、タイムアウト時のキープアライブ ping 挿入を可能にしています。
sequenceDiagram
participant Gen as User Generator
participant Prod as _producer task
participant Send as send_stream
participant KA as _keepalive_inserter task
participant Recv as receive_stream
participant Out as HTTP Output
participant KARecv as send_keepalive
Gen->>Prod: yield item
Prod->>Prod: _serialize_sse_item(item)
Prod->>Send: send(serialized_bytes)
Send->>KA: receive() returns data
KA->>KARecv: send(data)
KARecv->>Out: yield bytes
Note over KA: If receive() times out after 15s:
KA->>KARecv: send(": ping\n\n")
KARecv->>Out: yield keepalive
このアーキテクチャは、anyio タスクグループ上で動作する3つの並行コンポーネントで構成されています。
_producer: ユーザーのジェネレーターを反復処理し、各アイテムをシリアライズしてメモリストリームに送信する_keepalive_inserter: プロデューサーのストリームから 15 秒のタイムアウト付きで読み取る。タイムアウトが発火した場合(ジェネレーターの処理が遅い場合)、データの代わりにキープアライブコメント(": ping\n\n")を送信する- 出力ストリーム:
StreamingResponseが最終的な receive ストリームから読み取る
2段階のメモリストリームパイプライン(プロデューサー → キープアライブインサーター → 出力)によって、キープアライブタイマーがユーザーのジェネレーターを直接ラップすることはありません。これは PEP 789 に関連した意図的な設計判断です。
ヒント: キープアライブの間隔は 15 秒(
_PING_INTERVAL = 15.0)に設定されており、典型的なプロキシ/ロードバランサーのタイムアウト値に合わせて調整されています。この値はインポート可能(プライベートではありますが)で、テスト時にモンキーパッチを当てることもできます。
ServerSentEvent と format_sse_event()
ServerSentEvent Pydantic モデルは、すべての標準フィールドを持つ単一の SSE イベントを表現します。
ユーザーは、プレーンなオブジェクト(自動的に data: フィールドとしてラップされる)と、イベント構造を細かく制御するための ServerSentEvent インスタンスのどちらでも yield できます。このモデルは、data(JSON シリアライズされる)と raw_data(そのまま送信される)の排他性を強制し、id フィールドに null 文字が含まれていないことを SSE 仕様に従い検証します。
format_sse_event() 関数は、仕様に準拠したワイヤーフォーマットを生成します。
def format_sse_event(...) -> bytes:
lines: list[str] = []
if comment is not None:
for line in comment.splitlines():
lines.append(f": {line}")
if event is not None:
lines.append(f"event: {event}")
if data_str is not None:
for line in data_str.splitlines():
lines.append(f"data: {line}")
...
return "\n".join(lines).encode("utf-8")
複数行のデータも正しく処理されます。各行に個別の data: プレフィックスが付与され、イベントはダブルニューライン(\n\n)で終端されます。
JSONL ストリーミング
JSONL(改行区切り JSON)ストリーミングは SSE よりもシンプルで、キープアライブもイベントフレーミングもありません。
flowchart TD
A["Generator yields item"] --> B["_serialize_data(item)"]
B --> C{stream_item_field set?}
C -->|Yes| D["Validate + serialize via ModelField"]
C -->|No| E["jsonable_encoder() + json.dumps()"]
D --> F["Append newline byte"]
E --> F
F --> G["yield to StreamingResponse"]
同期・非同期どちらのジェネレーターもサポートしています。非同期ジェネレーターの場合は、各 yield の後に anyio.sleep(0) を挿入してキャンセルチェックポイントを保証します。これにより、生産速度の速いジェネレーターがイベントループを占有し、キャンセルシグナルの到達を妨げることを防ぎます。
_serialize_data() ヘルパーは SSE と JSONL で共有されており、統一されたバリデーション/シリアライゼーションパスを提供します。stream_item_field が存在する場合(戻り型アノテーションから取得)は、各アイテムがそれに対してバリデーションされ、Pydantic の高速パスでシリアライズされます。存在しない場合は、jsonable_encoder() が任意のオブジェクトを処理します。
PEP 789 と GeneratorExit の回避
SSE ストリーミングにおけるプロデューサー/コンシューマーの分離は、コードコメントで説明されている具体的な技術的理由に基づいています。
# Use a memory stream to decouple generator iteration
# from the keepalive timer. A producer task pulls items
# from the generator independently, so
# `anyio.fail_after` never wraps the generator's
# `__anext__` directly - avoiding CancelledError that
# would finalize the generator
PEP 789 は、anyio.fail_after やタスクキャンセルによる CancelledError が非同期ジェネレーターに投入される問題を扱っています。CPython では、これによりジェネレーターの終了処理(GeneratorExit)がトリガーされますが、この処理は不可逆です。仮にキープアライブのタイムアウトをジェネレーターの __anext__ を fail_after でラップする形で実装した場合、タイムアウトが発生するとキープアライブを挿入するだけでなく、ジェネレーター全体が終了してしまいます。
メモリストリームのパターンはこの問題を回避します。fail_after がラップするのは receive_stream.receive() のみであり、ジェネレーター自体はラップされません。ジェネレーターは独立したタスク(_producer)内で動作し、タイムアウトによる影響を受けません。
ストリーミングと AsyncExitStack のライフサイクル
SSE のコンテキストマネージャーは、リクエストスコープの exit stack に登録されます。
sse_receive_stream = await async_exit_stack.enter_async_context(
_sse_producer_cm()
)
async_exit_stack.push_async_callback(sse_receive_stream.aclose)
sequenceDiagram
participant FAS as function_astack
participant IAS as inner_astack (request)
participant MAS as middleware_astack
participant Stream as StreamingResponse
Note over FAS: Function-scoped deps cleaned up
Note over IAS: SSE producer CM entered here
IAS->>Stream: response(scope, receive, send)
Note over Stream: Streaming bytes to client...
Stream-->>IAS: Response complete
Note over IAS: Exit stack unwinds
Note over IAS: SSE CM __aexit__ cancels task group
Note over IAS: Request-scoped deps cleaned up
Note over MAS: File cleanup
この順序は非常に重要です。
- 関数スコープの依存関係は、レスポンスのストリーミングが始まる前にクローズされます(
fastapi_function_astackに登録されているため) - SSE コンテキストマネージャーは
fastapi_inner_astack(リクエストスコープ)に登録されるため、タスクグループはストリーミングレスポンス全体を通じて動作し続けます - レスポンスが完了すると(クライアントの切断、ジェネレーターの枯渇、またはエラー)、リクエストの exit stack がアンワインドされ、
tg.cancel_scope.cancel()でタスクグループがキャンセルされます - リクエストスコープの依存関係(データベースセッションなど)は、ストリームが完了した後にクローズされます
生ストリーミングエンドポイント(SSE でも JSONL でもない、カスタムレスポンスクラスを持つジェネレーター)では、ジェネレーターはそのままレスポンスクラスに渡されます。
こちらも各 yield の後にキャンセルチェックポイント(anyio.sleep(0))が挿入されます。
ストリームアイテム型の抽出
SSE と JSONL の両方において、FastAPI は OpenAPI スキーマ生成とランタイムバリデーションのためにジェネレーターのアイテム型を知る必要があります。get_stream_item_type() 関数がこれを担当します。
fastapi/dependencies/utils.py#L266-L283
_STREAM_ORIGINS = {
AsyncIterable, AsyncIterator, AsyncGenerator,
Iterable, Iterator, Generator,
}
def get_stream_item_type(annotation: Any) -> Any | None:
origin = get_origin(annotation)
if origin is not None and origin in _STREAM_ORIGINS:
type_args = get_args(annotation)
if type_args:
return type_args[0]
return Any
return None
この関数は、ジェネリックなジェネレーターアノテーションから最初の型引数を抽出します。AsyncGenerator[Item, None] であれば Item を返し、型引数のない素の AsyncGenerator であれば Any を返します。抽出された型は APIRoute の stream_item_field となり、yield された各アイテムのランタイムバリデーションと、OpenAPI の itemSchema 生成の両方に使用されます。
APIRoute.__init__() には重要な除外条件があります。アイテム型が ServerSentEvent である場合、バリデーションやスキーマ生成には使用されません。ServerSentEvent はデータモデルではなくトランスポートのラッパーであり、それに対してバリデーションを行っても意味がないためです。
次回予告
最終回では、これらすべてを支える基盤となるパターンを俯瞰して解説します。Pydantic のブリッジとなる ModelField、カスケード設定パターンの DefaultPlaceholder、jsonable_encoder のレガシーパスとモダンな高速パスの比較を取り上げます。さらに拡張性のためのカスタムルートクラス、ソース位置付きのリッチなエラーフォーマット、include_router() のルートコピー設計についても掘り下げます。