Read OSS

バックプレッシャー:Flowable が高速なプロデューサーから低速なコンシューマーを守る仕組み

上級

前提知識

  • 第 2 回:subscribe-chain-and-operator-anatomy
  • 第 3 回:drain-loops-and-operator-fusion
  • Reactive Streams 仕様(Publisher/Subscriber/Subscription コントラクト)

バックプレッシャー:Flowable が高速なプロデューサーから低速なコンシューマーを守る仕組み

非同期システムは、いずれ必ずプロデューサーとコンシューマーの速度差という問題に直面します。データベースカーソルは、ネットワークソケットが送信できる速度をはるかに超えてレコードを生成できます。フロー制御がなければ、中間バッファは際限なく膨れ上がり、JVM がメモリ不足に陥ります。バックプレッシャーは、Reactive Streams によるこの問題への解決策です。コンシューマーがアイテムを 要求 し、プロデューサーはその上限を守ります。RxJava の Flowable はこのプロトコルを実装していますが、実装の詳細は概念が示唆する以上に微妙です。

request(n) プロトコルとバックプレッシャーが必要な理由

Reactive Streams のコントラクトは、プルとプッシュを組み合わせたハイブリッドプロトコルを定義しています。

  1. Publisher.subscribe(Subscriber) — 接続を確立する
  2. Subscriber.onSubscribe(Subscription) — 制御ハンドルを届ける
  3. Subscription.request(n) — サブスクライバーが最大 n 件のアイテムを要求する
  4. Publisher は最大 n 件の onNext シグナルを送出する
  5. 準備ができたら、サブスクライバーがさらに要求する
sequenceDiagram
    participant Producer as Flowable (Publisher)
    participant Sub as Subscriber
    
    Producer->>Sub: onSubscribe(subscription)
    Sub->>Producer: request(128)
    
    loop Up to 128 items
        Producer->>Sub: onNext(item)
    end
    
    Sub->>Producer: request(96)
    Note over Sub: Requests more after consuming ~75%
    
    loop Up to 96 items
        Producer->>Sub: onNext(item)
    end
    
    Producer->>Sub: onComplete()

Observable は意図的にこのプロトコルを実装していません。第 2 回で説明したとおり、Subscription の代わりに Disposable を使用するため、request() もフロー制御も存在しません。この型の分離が設けられている理由はここにあります。Observable は本質的に有界であるか、UI 駆動のソースを対象としており、バックプレッシャーのオーバーヘッドは不要です。

ヒント: FlowableObservable のどちらを使うべきか迷ったときは、「このソースはコンシューマーが処理できる速度より速くデータを生成できるか?」と自問してみましょう。答えが Yes なら Flowable、クリックや一定間隔のセンサー読み取りといったイベント駆動ソースなら Observable を選びましょう。

BackpressureHelper:アトミックなリクエスト管理

ダウンストリームからの request(n) 呼び出しは、どのスレッドからでも届く可能性があるため、アトミックに蓄積する必要があります。BackpressureHelper は、そのための CAS ループユーティリティを提供しています。

public static long add(AtomicLong requested, long n) {
    for (;;) {
        long r = requested.get();
        if (r == Long.MAX_VALUE) {
            return Long.MAX_VALUE;
        }
        long u = addCap(r, n);
        if (requested.compareAndSet(r, u)) {
            return r;
        }
    }
}

なぜ単純に AtomicLong.addAndGet(n) を使わないのでしょうか。理由は 2 つあります。

  1. Long.MAX_VALUE の上限処理:Reactive Streams 仕様では、request(Long.MAX_VALUE) は「無制限」を意味します。カウンターが Long.MAX_VALUE に達したら、そこに留まり続けなければなりません。追加のリクエストによって負の値にオーバーフローしてはいけないのです。addCap メソッドがこれを処理します。

  2. 以前の値の返却:オペレーターは、ドレインを開始するためにこれが最初のリクエストかどうか(以前の値が 0 だったか)を知る必要があります。addAndGet は新しい値を返すため、このパターンではあまり役に立ちません。

addCap メソッドそのものもエレガントな実装です。

public static long addCap(long a, long b) {
    long u = a + b;
    if (u < 0L) {
        return Long.MAX_VALUE;
    }
    return u;
}

符号チェックによるオーバーフロー検出です。2 つの正の long を加算して負の値が得られたなら、オーバーフローが発生しています。

produced() メソッドは、消費されたアイテム数をリクエスト済みカウントから減算します。こちらも Long.MAX_VALUE の保持を考慮しています。

public static long produced(AtomicLong requested, long n) {
    for (;;) {
        long current = requested.get();
        if (current == Long.MAX_VALUE) {
            return Long.MAX_VALUE;
        }
        long update = current - n;
        if (update < 0L) {
            RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update));
            update = 0L;
        }
        if (requested.compareAndSet(current, update)) {
            return update;
        }
    }
}
flowchart TD
    REQ[request n arrives] --> GET[r = requested.get]
    GET --> MAX{r == MAX_VALUE?}
    MAX -->|Yes| SKIP[Return MAX_VALUE - unbounded mode]
    MAX -->|No| ADD[u = addCap r n]
    ADD --> CAS{compareAndSet r u}
    CAS -->|Success| DONE[Return r - previous value]
    CAS -->|Fail: concurrent update| GET

SubscriptionHelper:バリデーションと CANCELLED センチネル

SubscriptionHelper は単一の値 CANCELLED を持つ enum で、アトミックセンチネルとして機能します。

public enum SubscriptionHelper implements Subscription {
    CANCELLED;
    
    @Override public void request(long n) { /* deliberately ignored */ }
    @Override public void cancel() { /* deliberately ignored */ }
}

このセンチネルにより、安全なアトミックキャンセル処理が可能になります。オペレーターはアップストリームの SubscriptionAtomicReference<Subscription> に保持します。キャンセルする際は次のようになります。

public static boolean cancel(AtomicReference<Subscription> field) {
    Subscription current = field.get();
    if (current != CANCELLED) {
        current = field.getAndSet(CANCELLED);
        if (current != CANCELLED) {
            if (current != null) { current.cancel(); }
            return true;
        }
    }
    return false;
}

getAndSet(CANCELLED) はアトミックにセンチネルへスワップし、以前の値を返します。すでに CANCELLED であれば、別のスレッドが先にキャンセルを完了しています。本物の subscription であれば、それをキャンセルします。このパターンはロックなしでスレッドセーフです。

setOnce メソッドは、subscription が正確に 1 回だけ設定されるべき一般的なケースを処理します。

public static boolean setOnce(AtomicReference<Subscription> field, Subscription s) {
    if (!field.compareAndSet(null, s)) {
        s.cancel();
        if (field.get() != CANCELLED) {
            reportSubscriptionSet();  // Protocol violation
        }
        return false;
    }
    return true;
}

BackpressureStrategy と FlowableCreate のエミッター

Flowable.create() を使って命令型コードから Flowable を生成する際は、BackpressureStrategy を選択する必要があります。BackpressureStrategy.java の enum には 5 つのオプションが定義されています。

Strategy 挙動 リスク
MISSING 戦略なし — ダウンストリームに委譲 未処理の場合 MissingBackpressureException
ERROR 需要を超えた場合にエラーを通知 安全だが処理が中断する
BUFFER 無制限バッファ プロデューサーが大幅に速い場合 OOM
DROP 需要がない場合はアイテムを破棄 データロス
LATEST 最新のアイテムのみ保持 データロスが発生するが常に「最新」を保持

FlowableCreate.java#L40-L74FlowableCreate は、ストラテジーごとに異なる Emitter サブクラスを生成します。

public void subscribeActual(Subscriber<? super T> t) {
    BaseEmitter<T> emitter;
    switch (backpressure) {
        case MISSING:  emitter = new MissingEmitter<>(t); break;
        case ERROR:    emitter = new ErrorAsyncEmitter<>(t); break;
        case DROP:     emitter = new DropAsyncEmitter<>(t); break;
        case LATEST:   emitter = new LatestAsyncEmitter<>(t); break;
        default:       emitter = new BufferAsyncEmitter<>(t, bufferSize()); break;
    }
    t.onSubscribe(emitter);
    source.subscribe(emitter);
}
flowchart TD
    FC[FlowableCreate] --> SW{BackpressureStrategy?}
    SW -->|MISSING| ME[MissingEmitter<br>No backpressure handling]
    SW -->|ERROR| EE[ErrorAsyncEmitter<br>Throws MissingBackpressureException]
    SW -->|DROP| DE[DropAsyncEmitter<br>Discards when no demand]
    SW -->|LATEST| LE[LatestAsyncEmitter<br>Keeps newest in AtomicReference]
    SW -->|BUFFER| BE[BufferAsyncEmitter<br>Unbounded SpscLinkedArrayQueue]
    
    ME --> DS[downstream]
    EE --> DS
    DE --> DS
    LE --> DS
    BE --> DS

LatestAsyncEmitter は特に巧みな設計です。AtomicReference を使って最新の値を保持し、ダウンストリームがまだ要求していない状態で新しい onNext が届くと、格納された値をアトミックに置き換えます。需要が発生したとき、ドレインループはその時点で格納されている値を取り出します。

FlowableObserveOn における Prefetch と Limit

prefetch/limit パターンは、observeOn のようにスレッド境界をまたぐオペレーターが、アイテムごとのリクエスト/レスポンスのオーバーヘッドなしにバックプレッシャーを管理する手法です。

FlowableObserveOn.java#L96-L101 を見てみましょう。

this.prefetch = prefetch;
this.limit = prefetch - (prefetch >> 2);  // 75% refill threshold

デフォルトの prefetch が 128 の場合、limit128 - 32 = 96 になります。処理の流れは次のとおりです。

  1. 最初にアップストリームから prefetch(128)件のアイテムを要求する
  2. キューからアイテムを処理しながら、消費件数をカウントする
  3. 消費件数が limit(96)に達したら、アップストリームにさらに limit 件を要求する
  4. 消費件数カウンターをリセットする

これにより 75% 補充しきい値 が生まれます。バッファの 75% を消費した時点で、消費した量と同じだけ新しいバッチを要求します。これは次の要素のバランスを取るための設計です。

  • スループット:大きなバッチサイズにより request() 呼び出しのオーバーヘッドを削減する
  • メモリ:バッファが prefetch 容量を超えることがない
  • レイテンシ:75% 時点で補充を開始することで、新しいアイテムが届くまでの間、残り 25% のバッファを処理し続けられる
sequenceDiagram
    participant Upstream
    participant ObserveOn
    participant Downstream
    
    ObserveOn->>Upstream: request(128)
    Upstream->>ObserveOn: 128 items
    
    loop Drain items 1-96
        ObserveOn->>Downstream: onNext(item)
    end
    
    Note over ObserveOn: consumed=96 ≥ limit=96
    ObserveOn->>Upstream: request(96)
    
    loop Drain items 97-128 + new items
        ObserveOn->>Downstream: onNext(item)
    end

FlowableFlatMap:N 個の内部ソースにまたがるリクエスト管理

FlowableFlatMap は RxJava の中でも最も複雑なオペレーターと言っても過言ではなく、その理由の大部分はバックプレッシャーの処理にあります。MergeSubscriber は、動的な内部ソースの集合にまたがるリクエスト管理を調整する必要があります。

static final class MergeSubscriber<T, U> extends AtomicInteger 
    implements FlowableSubscriber<T>, Subscription {
    
    final int maxConcurrency;
    final int bufferSize;
    volatile SimplePlainQueue<U> queue;
    final AtomicThrowable errors = new AtomicThrowable();
    final AtomicReference<InnerSubscriber<?, ?>[]> subscribers;
    final AtomicLong requested = new AtomicLong();
    long uniqueId;
    int scalarEmitted;
    final int scalarLimit;

複雑さは複数の並行フローに起因しています。

  1. 外部ソースがアイテムを送出して内部サブスクリプションをトリガーする
  2. maxConcurrency が同時にアクティブにできる内部ソースの数を制限する
  3. 各内部ソースのアイテムが単一の出力ストリームにマージされる
  4. ダウンストリームの request(n) をすべてのアクティブな内部ソースに分配する必要がある
  5. 内部ソースが完了したら、バッファリングされた外部アイテムから新しいソースをサブスクライブできる

FlowableFlatMap.java#L107-L119onSubscribe メソッドから、初期リクエストの戦略が読み取れます。

public void onSubscribe(Subscription s) {
    if (SubscriptionHelper.validate(this.upstream, s)) {
        this.upstream = s;
        downstream.onSubscribe(this);
        if (!cancelled) {
            if (maxConcurrency == Integer.MAX_VALUE) {
                s.request(Long.MAX_VALUE);  // Unbounded: get everything
            } else {
                s.request(maxConcurrency);  // Bounded: get up to maxConcurrency items
            }
        }
    }
}

第 3 回で取り上げたスカラー最適化がここで重要な役割を担います。Flowable.just() の内部ソースに対して、flatMap はサブスクリプション機構全体をスキップします。スカラー送出を蓄積し、補充リクエストをバッチ処理します。

int scalarEmitted;
final int scalarLimit;
// scalarLimit = Math.max(1, maxConcurrency >> 1)

scalarEmittedscalarLimit に達すると、外部アイテムを scalarLimit 件追加で要求するバッチリクエストが発行されます。これにより、複数のスカラー送出にわたって request() の CAS コストを分散できます。

flowchart TD
    OUTER[Outer Flowable] -->|"request(maxConcurrency)"| MS[MergeSubscriber]
    MS -->|"onNext(T)"| MAP[mapper.apply T]
    MAP -->|"Scalar?"| SC{ScalarSupplier?}
    SC -->|Yes| EMIT[Emit directly + scalarEmitted++]
    SC -->|No| INNER[Subscribe InnerSubscriber]
    INNER --> IS1[InnerSubscriber 1]
    INNER --> IS2[InnerSubscriber 2]
    INNER --> ISN[InnerSubscriber N]
    IS1 -->|"items"| DRAIN[Drain loop merges all]
    IS2 -->|"items"| DRAIN
    ISN -->|"items"| DRAIN
    DRAIN -->|"onNext"| DOWN[Downstream]
    DOWN -->|"request n"| DRAIN
    EMIT --> DOWN

MergeSubscriber のドレインループはコードベース全体で最も複雑な部分です。アクティブな内部サブスクライバー(アトミック配列で管理)をラウンドロビンし、各キューから読み取り、グローバルなエラーアキュムレーター(AtomicThrowable)を確認します。さらにキャンセル処理と requested カウンターの管理も、すべてロックフリーのドレインループで実現しています。このクラスを深く理解することは、RxJava の内部実装を学ぶ上での通過儀礼と言えるでしょう。

次回の予告

ここまでで RxJava の内部実装を一通りカバーしました。オペレーターパターン、ドレインループ、フュージョン、スケジューラー、バックプレッシャーです。第 6 回では、4.x の新機能に焦点を当てます。Java にプルベースの非同期列挙をもたらす Streamable 型、バーチャルスレッド上の命令型コード向けの VirtualGeneratorVirtualEmitter を解説します。さらに sealed types やレコードといったモダンな Java 機能が API をどのように変革しているかも探ります。