Read OSS

Node.js の I/O:ストリーム、ハンドル、イベントループ

上級

前提知識

  • 記事 1:architecture-overview
  • 記事 3:cpp-object-model-and-bindings(BaseObject/Wrap 階層)
  • libuv のイベントループモデルの理解(ハンドルとリクエスト、uv_run のフェーズ)
  • ユーザー視点での Node.js ストリーム API の基本的な知識

Node.js の I/O:ストリーム、ハンドル、イベントループ

Node.js の存在意義は、ノンブロッキング I/O にあります。ネットワーク接続、ファイル操作、タイマー、子プロセスなど、あらゆる処理は最終的に同じ仕組みを通ります。C++ 側の libuv ハンドルとリクエスト、JavaScript 側のストリームとイベントエミッター、そしてそれらをつなぐ記事 3 で解説した Wrap 階層です。本記事では、TCP 接続のライフサイクルから巧妙なタイマーシステム、process.nextTick() を支えるマイクロタスクキューまで、これらのピースが実際にどう組み合わさるかを見ていきましょう。

libuv との統合:ハンドルとリクエスト

記事 3 で確認したように、libuv には Node.js がラップする 2 つの基本的な抽象化があります。

ハンドルuv_handle_t)は長期間存続するオブジェクトで、ライフタイム中に複数のイベントを生成できます。TCP サーバー、TCP 接続、タイマー、ファイルシステムウォッチャー、シグナルハンドラーはすべてハンドルです。参照されている間はイベントループを生かし続けます。

リクエストuv_req_t)は一度きりの操作です。ファイルの読み取り、DNS ルックアップ、接続試行など、それぞれがリクエストを生成してディスパッチし、完了時に 1 回だけコールバックを受け取ります。

graph TD
    subgraph "Handles — Long-lived"
        TCP["uv_tcp_t<br/>TCP socket"]
        TIMER["uv_timer_t<br/>Timer"]
        PIPE["uv_pipe_t<br/>Unix pipe / Windows named pipe"]
        FSE["uv_fs_event_t<br/>File system watcher"]
        SIGNAL["uv_signal_t<br/>Signal handler"]
        UDP["uv_udp_t<br/>UDP socket"]
    end
    
    subgraph "Requests — One-shot"
        FSREQ["uv_fs_t<br/>File system operation"]
        CONN["uv_connect_t<br/>Connection attempt"]
        WRITE["uv_write_t<br/>Stream write"]
        DNS["uv_getaddrinfo_t<br/>DNS lookup"]
        WORK["uv_work_t<br/>Thread pool work"]
    end
    
    subgraph "Event Loop"
        LOOP["uv_run()<br/>Process events"]
    end
    
    TCP --> LOOP
    TIMER --> LOOP
    FSREQ --> LOOP
    CONN --> LOOP

SpinEventLoopInternal() 内のイベントループは uv_run(UV_RUN_DEFAULT) を呼び出し、保留中のすべての I/O イベントを処理します。UV_RUN_DEFAULT モードは、処理すべきイベントが来るか、アクティブなハンドル/リクエストがなくなるまでブロックします。uv_run() のイテレーション間では、platform->DrainTasks(isolate) が最適化されたコードのコンパイルやガベージコレクションのファイナライズといった V8 のバックグラウンドタスクを処理します。

実践における Wrap 階層:TCP 接続のライフサイクル

実際の TCP 接続を追いながら、記事 3 で解説した C++ の Wrap 階層がどう動くかを確認してみましょう。net.createServer() を呼び出してクライアントが接続してきたとき、内部では次のような流れが起きています。

sequenceDiagram
    participant NET as lib/net.js
    participant TW as TCPWrap (C++)
    participant CW as ConnectionWrap
    participant LSW as LibuvStreamWrap
    participant HW as HandleWrap
    participant UV as libuv

    Note over NET: server.listen(port)
    NET->>TW: new TCP(TCPConstants.SERVER)
    TW->>HW: HandleWrap(env, object, &handle_)
    HW->>UV: uv_tcp_init(loop, &handle_)
    NET->>TW: bind(address, port)
    NET->>TW: listen(backlog)
    TW->>UV: uv_listen(&handle_, backlog, OnConnection)
    
    Note over UV: Client connects
    UV->>CW: OnConnection(handle, status)
    CW->>TW: TCPWrap::Instantiate(env, parent, SOCKET)
    CW->>UV: uv_accept(server_handle, &client_handle)
    CW->>NET: MakeCallback(onconnection, client_wrap)
    
    Note over NET: Data flows
    NET->>LSW: ReadStart()
    LSW->>UV: uv_read_start(handle, OnAlloc, OnRead)
    UV->>LSW: OnRead(handle, nread, buf)
    LSW->>NET: MakeCallback(onread, buffer)

TCPWrapConnectionWrap<TCPWrap, uv_tcp_t> を継承し、さらに LibuvStreamWrapHandleWrapAsyncWrapBaseObject と続く継承チェーンを持ちます。各層はそれぞれの機能を担っています。

  • BaseObject:C++ オブジェクトと JavaScript のソケットオブジェクトを結びつける
  • AsyncWrapasync_hooks トラッキング用の async_id を提供する
  • HandleWrap:libuv ハンドルのライフサイクル(ref/unref/close)を管理する
  • LibuvStreamWrapReadStart()/ReadStop() と書き込み操作を実装する
  • ConnectionWrapOnConnection()AfterConnect() コールバックを処理する
  • TCPWrapbind()listen()connect() など TCP 固有のメソッドを持つ

StreamBase も重要な存在です。LibuvStreamWrap が実装する抽象インターフェースで、JavaScript から呼び出せる統一されたストリーム API を提供します。libuv ストリームと TLS ストリームの両方が StreamBase を実装しているため、tls.TLSSocketnet.Socket を透過的に置き換えられるのはこのためです。

JavaScript ストリームのアーキテクチャ

JavaScript 側では、Node.js のストリームは EventEmitter を基盤としたステートマシンとして実装されています。Readable、Writable、Duplex、Transform の 4 種類のストリームは lib/internal/streams/ に収められています。

stateDiagram-v2
    [*] --> Flowing: pipe() or resume()
    [*] --> Paused: Initial state
    Paused --> Flowing: resume() / pipe() / 'data' listener
    Flowing --> Paused: pause()
    Flowing --> Ended: push(null)
    Paused --> Ended: push(null) + drain
    Ended --> [*]
    
    state Flowing {
        [*] --> Reading
        Reading --> Buffering: _read() returns data
        Buffering --> Reading: Data consumed below hwm
        Buffering --> Backpressure: Buffer > highWaterMark
        Backpressure --> Reading: Data consumed
    }

Readable ストリームには 2 つのモードがあります。flowing モード(データが自動的にコンシューマーへ送り出される)と paused モード(read() で明示的に取り出す必要がある)です。バッファリングの制御には highWaterMark が使われ、内部バッファがこのしきい値を超えると push()false を返してバックプレッシャーを通知します。

Writable ストリームはこれと対をなすステートマシンを持ちます。重要なのは write() メソッドで、内部バッファが満杯になると false を返します。この場合、呼び出し元は 'drain' イベントを待ってから再度書き込みを行う必要があります。

lib/internal/streams/pipeline.js にある pipeline() ユーティリティは、ストリームをチェーンするときの複雑なエラー伝播とクリーンアップを自動的に処理します。.pipe() よりも pipeline() を使うのが推奨されているのはこのためです。

ヒント: プロダクションコードでは .pipe() ではなく pipeline() を使いましょう。pipeline() はチェーン全体にわたってエラー処理とクリーンアップを適切に行います。一方 .pipe() はエラー発生時にクリーンアップしないことで知られており、リソースリークの原因になります。

タイマーシステム:連結リストとひとつの libuv タイマー

lib/internal/timers.js のタイマー実装には、コードベース随一と言えるほど丁寧な ASCII アートコメントが添えられています。設計そのものも非常に巧みです。setTimeout() の呼び出しごとに libuv タイマーを生成するのではなく(数千のタイマーが動いている場合、これは非常にコストが高くなります)、Node.js は同じ待機時間のタイマーをグループにまとめて管理します。

graph TD
    subgraph "Timer Architecture"
        MAP["PriorityQueue + Object Map<br/>Keys: durations in ms"]
        MAP --> L40["TimersList {duration: 40ms}"]
        MAP --> L320["TimersList {duration: 320ms}"]
        MAP --> L1000["TimersList {duration: 1000ms}"]
        
        L40 --> T1["Timer A<br/>_onTimeout: cb1"]
        T1 --> T2["Timer B<br/>_onTimeout: cb2"]
        T2 --> T3["Timer C<br/>_onTimeout: cb3"]
        
        L1000 --> T4["Timer D<br/>_onTimeout: cb4"]
        T4 --> T5["Timer E<br/>_onTimeout: cb5"]
    end
    
    UV_TIMER["Single libuv timer<br/>Set to earliest expiry"] --> MAP

待機時間のバケットごとに双方向連結リスト(TimersList)が作られています。タイマーの追加は O(1) で、該当する待機時間のリストに末尾追記するだけです。削除も O(1) で、双方向連結リストからリンクを外すだけです。同じ待機時間のタイマーは挿入順(時系列順)に並んでいるため、タイマーが発火したとき確認が必要なのはリストの先頭だけです。

どの待機時間バケットが次に期限切れになるかは PriorityQueue(二分ヒープ)で追跡されます。単一の libuv uv_timer_t が最も近い期限にセットされ、発火すると Node.js はすべてのバケットにわたって期限切れのタイマーを処理し、その後 libuv タイマーを次の期限にリセットします。

この設計により、Node.js は数十万もの有効なタイマーを効率的に管理できます。これは、すべての接続がアイドルタイムアウトを持つ HTTP サーバーでは非常によくあるシナリオです。

マイクロタスク、nextTick、setImmediate

process.nextTick()、V8 マイクロタスク(Promise)、setImmediate() の関係は、Node.js についてもっともよく質問される事柄のひとつです。これらはイベントループの異なるタイミングで実行されます。

flowchart TD
    UV["uv_run() iteration"] --> TIMERS["1. Timers phase<br/>setTimeout / setInterval"]
    TIMERS --> NT1["⚡ nextTick queue + microtasks"]
    NT1 --> PENDING["2. Pending I/O callbacks"]
    PENDING --> NT2["⚡ nextTick queue + microtasks"]
    NT2 --> POLL["3. Poll phase<br/>I/O events"]
    POLL --> NT3["⚡ nextTick queue + microtasks"]
    NT3 --> CHECK["4. Check phase<br/>setImmediate callbacks"]
    CHECK --> NT4["⚡ nextTick queue + microtasks"]
    NT4 --> CLOSE["5. Close callbacks"]
    CLOSE --> NT5["⚡ nextTick queue + microtasks"]
    NT5 --> UV

process.nextTick()FixedQueueTickInfo 共有状態(env.hAliasedFloat64Array で、境界をまたがずに C++ と JavaScript の両側からアクセスできます)を使います。kHasTickScheduled フラグは、nextTick キューを処理する必要があることを C++ レイヤーに伝えます。

重要なのは、nextTick とマイクロタスクはイテレーション単位ではなく、イベントループの各フェーズの間に実行されるという点です。つまり process.nextTick() のコールバックはあらゆる I/O より先に実行されます。不用意に使うと I/O を枯渇させる可能性がある、鋭いツールと言えます。

setImmediate() は「poll」フェーズの後にある「check」フェーズで実行されます。つまり setImmediate() のコールバックは I/O イベントが処理されてから実行されるため、I/O を枯渇させずに処理を遅延させたいときに適した選択肢です。

async_hooks と AsyncWrap によるトラッキング

Node.js のすべての非同期操作は(記事 3 で解説した)AsyncWrap を通り、これが async_hooks API を実現します。トラッキングは 4 つのライフサイクルイベントを通じて機能します。

sequenceDiagram
    participant AH as async_hooks
    participant AW as AsyncWrap (C++)
    participant UV as libuv
    
    Note over AW: Creating a new TCP connection
    AW->>AH: init(asyncId, type, triggerAsyncId, resource)
    Note over AH: Track: "TCPWrap #7 triggered by #3"
    
    Note over UV: Connection callback fires
    AW->>AH: before(asyncId)
    Note over AH: Set execution context to #7
    AW->>AW: MakeCallback(onconnection)
    AW->>AH: after(asyncId)
    Note over AH: Restore previous context
    
    Note over AW: Socket closed
    AW->>AH: destroy(asyncId)
    Note over AH: Cleanup tracking for #7

プロバイダータイプは src/async_wrap.h でマクロを使って定義されており、TCPWRAPFSREQCALLBACKGETADDRINFOREQWRAPHTTP2SESSION など、あらゆる非同期リソース型が列挙されています。各型には一意の enum 値が割り振られ、async_hooks のコンシューマーはこれを使ってイベントをフィルタリングできます。

executionAsyncId()triggerAsyncId() は非同期コンテキストのチェーンを公開し、AsyncLocalStorage のようなツールがリクエストスコープのデータを明示的な引数渡しなしに非同期境界を越えて伝播できるようにします。これも同じ AsyncWrap 基盤の上に構築されています。AsyncLocalStorageasync_id をキーとしてデータを保存し、init フックの triggerAsyncId チェーンを通じてデータを伝播します。

ヒント: async_hooks には測定可能なオーバーヘッドがあります。プロダクション環境では、生の async_hooks ではなく、一般的なユースケースに最適化された AsyncLocalStorage を優先して使いましょう。診断用のフックが必要な場合は、pub/sub スタイルの計装においてオーバーヘッドが少ない diagnostics_channel API も検討してみてください。

次のステップ

ここまでで、libuv のイベントから C++ の Wrap を経由して JavaScript のストリームへ、戻ってくるまでの完全な I/O パスを網羅できました。このシリーズの最終記事では、Node.js の横断的な関心事を解説します。パーミッションモデル、エラーシステム、Web Platform API の統合、V8 スナップショット、シングル実行ファイル(SEA)、組み込みテストランナーが対象です。