Read OSS

エンジンルームの深層:ドレインループ、Queue Fusion、ロックフリー並行処理

上級

前提知識

  • 第2回:subscribe-chain-and-operator-anatomy
  • Java の並行処理プリミティブ(AtomicInteger、CAS 操作、volatile セマンティクス、happens-before 保証)
  • キューデータ構造の基礎知識

エンジンルームの深層:ドレインループ、Queue Fusion、ロックフリー並行処理

RxJava のオペレーターは、複数のスレッドからシグナルを受け取ることが珍しくありません。onNext を呼び出すプロデューサースレッド、request を呼び出すコンシューマースレッド、さらにキャンセルを投げる第三のスレッドが同時に動くこともあります。従来の synchronized ブロックでも対処できますが、数十段にわたるオペレーターチェーンでそのコストが積み重なれば、パフォーマンスへの影響は無視できません。RxJava がこの問題を解決しているのが、ロックなしでアクセスをシリアライズする アトミック WIP ドレインループ と、中間キューを丸ごと排除する オペレーター融合(operator fusion) という2つのアイデアです。この2つがあってこそ、RxJava は1秒あたり数億件というスループットを実現できています。

アトミック WIP ドレインループパターン

RxJava における並行処理の核心は、work-in-progress(WIP)ドレインループです。スレッド境界をまたぐほぼすべてのオペレーターに登場します。FlowableObserveOn.java#L164-L168 のエントリーポイントは、一見するととてもシンプルです。

final void trySchedule() {
    if (getAndIncrement() != 0) {
        return;
    }
    worker.schedule(this);
}

このクラスは AtomicInteger を継承しており、カウンターが WIP インジケーターとして機能します。パターンの仕組みは次のとおりです。

  1. エントリーgetAndIncrement() を呼び出します。前の値が 0 であれば、このスレッドが「権利を獲得」してドレインループに入ります。0 でなければ、別のスレッドがすでにドレイン中です。カウンターをインクリメントしてそのまま戻ります。
  2. ドレイン:キューからアイテムを処理しながら、完了やキャンセルを確認します。
  3. 終了addAndGet(-missed) を呼び出します。結果が 0 であれば、ドレイン中に新たな作業は届いていないため、安全に終了できます。0 でなければ、処理すべき作業が届いているのでループを続けます。
flowchart TD
    ENTER[trySchedule called] --> INC{getAndIncrement != 0?}
    INC -->|"Yes: other thread draining"| RETURN[Return immediately]
    INC -->|"No: we won the race"| SCHEDULE[Schedule drain on worker]
    SCHEDULE --> DRAIN[Process queued items]
    DRAIN --> EXIT{addAndGet -missed == 0?}
    EXIT -->|"Yes: no new work"| DONE[Exit loop]
    EXIT -->|"No: work arrived during drain"| DRAIN

このパターンの特長は ロックフリー であることです。どのスレッドも他のスレッドの完了を待って止まることがありません。また 競合安全 でもあります。インクリメント操作が「作業あり」のシグナルと「他のスレッドに処理させない」ロックの両方を兼ねているからです。終了チェック(addAndGet(-missed))は、ドレイナーが終了しようとした瞬間にプロデューサーがカウンターをインクリメントするというギリギリの競合も確実に処理します。ドレイナーは非ゼロの結果を見てループに戻るため、作業の取りこぼしが起きません。

サーバーで何ができるかを知るには: RxJava のオペレーターコードを読むとき、AtomicInteger を継承しているクラスを探しましょう。それがドレインループの目印です。ドレインメソッドは通常 drain() と名付けられているか、クラスが Runnable を実装している場合は run() になっています。

FlowableObserveOn:ドレインループの教科書

FlowableObserveOn は、ドレインループの全体像を学ぶのに最適なオペレーターです。スレッドの切り替え、キュー管理、バックプレッシャー処理、fusion ネゴシエーションと、あらゆる要素が詰め込まれています。

FlowableObserveOn.java#L62-L101BaseObserveOnSubscriber がインフラを構築します。

abstract static class BaseObserveOnSubscriber<T>
    extends BasicIntQueueSubscription<T>
    implements FlowableSubscriber<T>, Runnable {
    
    final Worker worker;
    final int prefetch;
    final int limit;
    // ...
    
    BaseObserveOnSubscriber(Worker worker, boolean delayError, int prefetch) {
        this.worker = worker;
        this.prefetch = prefetch;
        this.limit = prefetch - (prefetch >> 2);  // 75% refill threshold
    }

run() メソッドは、状況に応じて三つのドレインモードに処理を振り分けます。

public final void run() {
    if (outputFused) {
        runBackfused();
    } else if (sourceMode == SYNC) {
        runSync();
    } else {
        runAsync();
    }
}

本来の複雑さが現れるのは、FlowableObserveOn.java#L362-L380 の非同期ドレインです。キューからの読み取り、バックプレッシャーの遵守(requested.get())、終了イベントの確認、そして消費済みアイテム数に基づいてアップストリームへの新規リクエストを発行するリフィルパターンが、すべてここに集約されています。

sequenceDiagram
    participant Upstream
    participant ObserveOn as ObserveOnSubscriber
    participant Queue as SpscArrayQueue
    participant Worker as Scheduler.Worker
    participant Downstream
    
    Upstream->>ObserveOn: onNext(item)
    ObserveOn->>Queue: offer(item)
    ObserveOn->>ObserveOn: trySchedule() [WIP++]
    
    Note over Worker: On scheduler thread
    Worker->>ObserveOn: run()
    ObserveOn->>Queue: poll()
    Queue-->>ObserveOn: item
    ObserveOn->>Downstream: onNext(item)
    ObserveOn->>ObserveOn: consumed++ (replenish at limit)

ロックフリー SPSC キュー

オペレーターがバッファリングに使うキューは java.util.concurrent の実装ではありません。JCTools プロジェクトをベースにカスタム実装された SPSC(Single-Producer-Single-Consumer)キューです。

SpscArrayQueue は有界(bounded)バリアントで、AtomicReferenceArray を内部に持つリングバッファです。

public final class SpscArrayQueue<E> extends AtomicReferenceArray<E> 
    implements SimplePlainQueue<E> {
    
    final int mask;
    final AtomicLong producerIndex;
    final AtomicLong consumerIndex;
    final int lookAheadStep;

    public SpscArrayQueue(int capacity) {
        super(Pow2.roundToPowerOfTwo(capacity));
        this.mask = length() - 1;
        this.producerIndex = new AtomicLong();
        this.consumerIndex = new AtomicLong();
        lookAheadStep = Math.min(capacity / 4, MAX_LOOK_AHEAD_STEP);
    }

設計上の重要な決定事項をまとめます。

  • 2の累乗サイズ:容量を2の累乗に切り上げることで、インデックス計算をモジュロ除算から index & mask のビット演算に置き換えられます。地味ながら効果的なマイクロ最適化です。
  • プロデューサー/コンシューマーインデックスの分離:それぞれを独立した AtomicLong オブジェクトとして確保することで、フォールスシェアリングを回避しています。
  • 先読み最適化(look-ahead):プロデューサーは実際のオファースロットを確認する前に、複数先のスロットをチェックします。コンシューマーインデックスへの volatile 読み取り頻度を下げる効果があります。

容量をあらかじめ確定できない場合は、固定サイズの配列セグメントをリンクリスト状につなぐ SpscLinkedArrayQueue を使います。こちらは無制限(unbounded)に成長できます。

flatMap が複数のインナーソースをマージするようなマルチプロデューサーシナリオでは、Michael-Scott スタイルのロックフリーキューを MPSC(Multiple-Producer-Single-Consumer)向けに改良した MpscLinkedQueue が使われます。

classDiagram
    class SimplePlainQueue~E~ {
        <<interface>>
        +offer(E): boolean
        +poll(): E
        +isEmpty(): boolean
        +clear()
    }
    class SpscArrayQueue~E~ {
        Bounded ring buffer
        Power-of-2 size
        Wait-free offer/poll
    }
    class SpscLinkedArrayQueue~E~ {
        Unbounded linked arrays
        Grows on demand
    }
    class MpscLinkedQueue~E~ {
        Multi-producer safe
        Lock-free CAS offer
    }
    
    SimplePlainQueue <|.. SpscArrayQueue
    SimplePlainQueue <|.. SpscLinkedArrayQueue
    SimplePlainQueue <|.. MpscLinkedQueue

QueueFuseable:オペレーター融合プロトコル

オペレーター融合(operator fusion)とは、隣接するオペレーター間の中間キューを丸ごと排除する仕組みです。通常、map はアイテムをキューに入れ、observeOn がそこから読み出します。融合が有効になると、observeOnmap のアップストリームから直接 poll() を呼び出し、その中でマッピング関数をインラインで実行するようになります。

このプロトコルは QueueFuseable で定義されています。

public interface QueueFuseable<T> extends SimpleQueue<T> {
    int NONE = 0;      // No fusion
    int SYNC = 1;      // Synchronous: poll() is blocking and complete
    int ASYNC = 2;     // Asynchronous: poll() may return null, use onNext as signal
    int ANY = SYNC | ASYNC;
    int BOUNDARY = 4;  // Don't fuse across thread boundaries
    
    int requestFusion(int mode);
}

融合のネゴシエーションは onSubscribe() の中で行われます。ダウンストリームのオペレーターがアップストリームの Subscription を受け取った時点で、それが QueueSubscription も実装しているかどうかを確認します。実装していれば requestFusion(mode) を呼び出します。

sequenceDiagram
    participant Upstream as map (upstream)
    participant Downstream as observeOn (downstream)
    
    Upstream->>Downstream: onSubscribe(mapSubscriber)
    Downstream->>Downstream: Is upstream QueueSubscription?
    Downstream->>Upstream: requestFusion(ANY | BOUNDARY)
    
    alt SYNC fusion granted
        Upstream-->>Downstream: SYNC
        Note over Downstream: Will call poll() directly
        Note over Downstream: null from poll() = complete
    else ASYNC fusion granted
        Upstream-->>Downstream: ASYNC
        Note over Downstream: onNext(null) = "poll me now"
        Note over Downstream: poll() in drain loop
    else No fusion
        Upstream-->>Downstream: NONE
        Note over Downstream: Normal onNext path
    end

BOUNDARY フラグの役割は特に重要です。observeOn が融合をリクエストする際に ANY | BOUNDARY を渡すのは、「スレッド境界をまたぐ」ことをアップストリームに伝えるためです。これにより、融合によって計算が意図せず別スレッドへ移動してしまうのを防ぎます。たとえば map().observeOn() が完全に融合してしまうと、map の処理が本来の購読スレッドではなく observeOn のスケジューラースレッドで実行されることになり、ユーザーの期待する動作が壊れる可能性があります。

FlowableMap では、FlowableMap.java#L79-L82poll() で融合がサポートされています。

public U poll() throws Throwable {
    T t = qs.poll();
    return t != null ? Objects.requireNonNull(mapper.apply(t), "...") : null;
}

融合が有効な状態で map の subscriber に poll() を呼ぶと、アップストリームのキューから poll() が呼ばれ、マッピング関数が適用されて結果が返ります。中間キューも onNext 呼び出しも request() の簿記処理も一切不要です。融合時の map のコストは、関数呼び出し一回分とほぼ変わりません。

ConditionalSubscriber と ScalarSupplier による最適化

大規模なオペレーターチェーン全体に効いてくる、二つのマイクロ最適化を紹介します。

ConditionalSubscriber は、filter 系オペレーター特有の問題を解決します。filter().map() のようなチェーンでは、フィルタープレディケートが false を返すたびに、次のアイテムを要求するために upstream.request(1) を呼び出さなければなりません。これでは requestonNext の往復が際限なく発生します。

ConditionalSubscriber.tryOnNext(T t) はこれを解消します。アイテムが消費されたかどうかを boolean で返すシグネチャです。FlowableFilter.java#L59-L78 を見てみましょう。

public boolean tryOnNext(T t) {
    if (done) { return false; }
    boolean b;
    try {
        b = filter.test(t);
    } catch (Throwable e) {
        fail(e);
        return true;
    }
    if (b) {
        downstream.onNext(t);
    }
    return b;
}

ConditionalSubscriber 対応のアップストリームが tryOnNext でアイテムを届けたとき、false が返れば「次をくれ」という意味になり、request() 呼び出しは不要です。フィルタリングで弾かれたアイテムに対して、リクエストカウンターへの CAS 操作がアイテム単位で発生するコストを丸ごと排除できます。

ScalarSupplier は、Flowable.just(1) のように同期的にちょうど一つの値を返すソースを識別するマーカーインターフェースです。flatMap がインナーソースとして ScalarSupplier を受け取ると、FlowableFlatMap.java#L47-L52 でサブスクライブの仕組み全体をスキップできます。subscriber の生成も onSubscribe/onNext/onComplete プロトコルもインナーサブスクリプションの追跡も不要です。値を取り出してそのまま emit するだけです。

flowchart TD
    FM[flatMap receives inner source] --> CHECK{ScalarSupplier?}
    CHECK -->|Yes| FAST[Get value directly, emit]
    CHECK -->|No| FULL[Full subscribe protocol]
    FULL --> INNER[Create InnerSubscriber]
    INNER --> TRACK[Add to active subscribers]
    TRACK --> SUB[Subscribe to inner source]
    SUB --> DRAIN[Drain loop merges items]

これらの最適化は個別に見れば小さな改善です。しかし Flowable.range(1, 1_000_000).flatMap(...) のような長いパイプラインでは、1秒あたり数百万回ものアトミック操作を削減する効果があります。

次回予告

ドレインループパターンと融合プロトコルは土台となる仕組みですが、実際に動かすにはスレッドが必要です。第4回では RxJava のスケジューラーシステムを掘り下げます。Schedulers.computation() がラウンドロビン方式で固定スレッドプールを管理する仕組み、IO スケジューラーが負荷に応じて伸縮する仕組み、新しい仮想スレッドスケジューラーとの統合を解説します。さらに subscribeOnobserveOn がこれらのスケジューラーをどのように異なる方法で活用しているかも詳しく見ていきます。