エンジンルームの深層:ドレインループ、Queue Fusion、ロックフリー並行処理
前提知識
- ›第2回:subscribe-chain-and-operator-anatomy
- ›Java の並行処理プリミティブ(AtomicInteger、CAS 操作、volatile セマンティクス、happens-before 保証)
- ›キューデータ構造の基礎知識
エンジンルームの深層:ドレインループ、Queue Fusion、ロックフリー並行処理
RxJava のオペレーターは、複数のスレッドからシグナルを受け取ることが珍しくありません。onNext を呼び出すプロデューサースレッド、request を呼び出すコンシューマースレッド、さらにキャンセルを投げる第三のスレッドが同時に動くこともあります。従来の synchronized ブロックでも対処できますが、数十段にわたるオペレーターチェーンでそのコストが積み重なれば、パフォーマンスへの影響は無視できません。RxJava がこの問題を解決しているのが、ロックなしでアクセスをシリアライズする アトミック WIP ドレインループ と、中間キューを丸ごと排除する オペレーター融合(operator fusion) という2つのアイデアです。この2つがあってこそ、RxJava は1秒あたり数億件というスループットを実現できています。
アトミック WIP ドレインループパターン
RxJava における並行処理の核心は、work-in-progress(WIP)ドレインループです。スレッド境界をまたぐほぼすべてのオペレーターに登場します。FlowableObserveOn.java#L164-L168 のエントリーポイントは、一見するととてもシンプルです。
final void trySchedule() {
if (getAndIncrement() != 0) {
return;
}
worker.schedule(this);
}
このクラスは AtomicInteger を継承しており、カウンターが WIP インジケーターとして機能します。パターンの仕組みは次のとおりです。
- エントリー:
getAndIncrement()を呼び出します。前の値が 0 であれば、このスレッドが「権利を獲得」してドレインループに入ります。0 でなければ、別のスレッドがすでにドレイン中です。カウンターをインクリメントしてそのまま戻ります。 - ドレイン:キューからアイテムを処理しながら、完了やキャンセルを確認します。
- 終了:
addAndGet(-missed)を呼び出します。結果が 0 であれば、ドレイン中に新たな作業は届いていないため、安全に終了できます。0 でなければ、処理すべき作業が届いているのでループを続けます。
flowchart TD
ENTER[trySchedule called] --> INC{getAndIncrement != 0?}
INC -->|"Yes: other thread draining"| RETURN[Return immediately]
INC -->|"No: we won the race"| SCHEDULE[Schedule drain on worker]
SCHEDULE --> DRAIN[Process queued items]
DRAIN --> EXIT{addAndGet -missed == 0?}
EXIT -->|"Yes: no new work"| DONE[Exit loop]
EXIT -->|"No: work arrived during drain"| DRAIN
このパターンの特長は ロックフリー であることです。どのスレッドも他のスレッドの完了を待って止まることがありません。また 競合安全 でもあります。インクリメント操作が「作業あり」のシグナルと「他のスレッドに処理させない」ロックの両方を兼ねているからです。終了チェック(addAndGet(-missed))は、ドレイナーが終了しようとした瞬間にプロデューサーがカウンターをインクリメントするというギリギリの競合も確実に処理します。ドレイナーは非ゼロの結果を見てループに戻るため、作業の取りこぼしが起きません。
サーバーで何ができるかを知るには: RxJava のオペレーターコードを読むとき、
AtomicIntegerを継承しているクラスを探しましょう。それがドレインループの目印です。ドレインメソッドは通常drain()と名付けられているか、クラスがRunnableを実装している場合はrun()になっています。
FlowableObserveOn:ドレインループの教科書
FlowableObserveOn は、ドレインループの全体像を学ぶのに最適なオペレーターです。スレッドの切り替え、キュー管理、バックプレッシャー処理、fusion ネゴシエーションと、あらゆる要素が詰め込まれています。
FlowableObserveOn.java#L62-L101 の BaseObserveOnSubscriber がインフラを構築します。
abstract static class BaseObserveOnSubscriber<T>
extends BasicIntQueueSubscription<T>
implements FlowableSubscriber<T>, Runnable {
final Worker worker;
final int prefetch;
final int limit;
// ...
BaseObserveOnSubscriber(Worker worker, boolean delayError, int prefetch) {
this.worker = worker;
this.prefetch = prefetch;
this.limit = prefetch - (prefetch >> 2); // 75% refill threshold
}
run() メソッドは、状況に応じて三つのドレインモードに処理を振り分けます。
public final void run() {
if (outputFused) {
runBackfused();
} else if (sourceMode == SYNC) {
runSync();
} else {
runAsync();
}
}
本来の複雑さが現れるのは、FlowableObserveOn.java#L362-L380 の非同期ドレインです。キューからの読み取り、バックプレッシャーの遵守(requested.get())、終了イベントの確認、そして消費済みアイテム数に基づいてアップストリームへの新規リクエストを発行するリフィルパターンが、すべてここに集約されています。
sequenceDiagram
participant Upstream
participant ObserveOn as ObserveOnSubscriber
participant Queue as SpscArrayQueue
participant Worker as Scheduler.Worker
participant Downstream
Upstream->>ObserveOn: onNext(item)
ObserveOn->>Queue: offer(item)
ObserveOn->>ObserveOn: trySchedule() [WIP++]
Note over Worker: On scheduler thread
Worker->>ObserveOn: run()
ObserveOn->>Queue: poll()
Queue-->>ObserveOn: item
ObserveOn->>Downstream: onNext(item)
ObserveOn->>ObserveOn: consumed++ (replenish at limit)
ロックフリー SPSC キュー
オペレーターがバッファリングに使うキューは java.util.concurrent の実装ではありません。JCTools プロジェクトをベースにカスタム実装された SPSC(Single-Producer-Single-Consumer)キューです。
SpscArrayQueue は有界(bounded)バリアントで、AtomicReferenceArray を内部に持つリングバッファです。
public final class SpscArrayQueue<E> extends AtomicReferenceArray<E>
implements SimplePlainQueue<E> {
final int mask;
final AtomicLong producerIndex;
final AtomicLong consumerIndex;
final int lookAheadStep;
public SpscArrayQueue(int capacity) {
super(Pow2.roundToPowerOfTwo(capacity));
this.mask = length() - 1;
this.producerIndex = new AtomicLong();
this.consumerIndex = new AtomicLong();
lookAheadStep = Math.min(capacity / 4, MAX_LOOK_AHEAD_STEP);
}
設計上の重要な決定事項をまとめます。
- 2の累乗サイズ:容量を2の累乗に切り上げることで、インデックス計算をモジュロ除算から
index & maskのビット演算に置き換えられます。地味ながら効果的なマイクロ最適化です。 - プロデューサー/コンシューマーインデックスの分離:それぞれを独立した
AtomicLongオブジェクトとして確保することで、フォールスシェアリングを回避しています。 - 先読み最適化(look-ahead):プロデューサーは実際のオファースロットを確認する前に、複数先のスロットをチェックします。コンシューマーインデックスへの volatile 読み取り頻度を下げる効果があります。
容量をあらかじめ確定できない場合は、固定サイズの配列セグメントをリンクリスト状につなぐ SpscLinkedArrayQueue を使います。こちらは無制限(unbounded)に成長できます。
flatMap が複数のインナーソースをマージするようなマルチプロデューサーシナリオでは、Michael-Scott スタイルのロックフリーキューを MPSC(Multiple-Producer-Single-Consumer)向けに改良した MpscLinkedQueue が使われます。
classDiagram
class SimplePlainQueue~E~ {
<<interface>>
+offer(E): boolean
+poll(): E
+isEmpty(): boolean
+clear()
}
class SpscArrayQueue~E~ {
Bounded ring buffer
Power-of-2 size
Wait-free offer/poll
}
class SpscLinkedArrayQueue~E~ {
Unbounded linked arrays
Grows on demand
}
class MpscLinkedQueue~E~ {
Multi-producer safe
Lock-free CAS offer
}
SimplePlainQueue <|.. SpscArrayQueue
SimplePlainQueue <|.. SpscLinkedArrayQueue
SimplePlainQueue <|.. MpscLinkedQueue
QueueFuseable:オペレーター融合プロトコル
オペレーター融合(operator fusion)とは、隣接するオペレーター間の中間キューを丸ごと排除する仕組みです。通常、map はアイテムをキューに入れ、observeOn がそこから読み出します。融合が有効になると、observeOn は map のアップストリームから直接 poll() を呼び出し、その中でマッピング関数をインラインで実行するようになります。
このプロトコルは QueueFuseable で定義されています。
public interface QueueFuseable<T> extends SimpleQueue<T> {
int NONE = 0; // No fusion
int SYNC = 1; // Synchronous: poll() is blocking and complete
int ASYNC = 2; // Asynchronous: poll() may return null, use onNext as signal
int ANY = SYNC | ASYNC;
int BOUNDARY = 4; // Don't fuse across thread boundaries
int requestFusion(int mode);
}
融合のネゴシエーションは onSubscribe() の中で行われます。ダウンストリームのオペレーターがアップストリームの Subscription を受け取った時点で、それが QueueSubscription も実装しているかどうかを確認します。実装していれば requestFusion(mode) を呼び出します。
sequenceDiagram
participant Upstream as map (upstream)
participant Downstream as observeOn (downstream)
Upstream->>Downstream: onSubscribe(mapSubscriber)
Downstream->>Downstream: Is upstream QueueSubscription?
Downstream->>Upstream: requestFusion(ANY | BOUNDARY)
alt SYNC fusion granted
Upstream-->>Downstream: SYNC
Note over Downstream: Will call poll() directly
Note over Downstream: null from poll() = complete
else ASYNC fusion granted
Upstream-->>Downstream: ASYNC
Note over Downstream: onNext(null) = "poll me now"
Note over Downstream: poll() in drain loop
else No fusion
Upstream-->>Downstream: NONE
Note over Downstream: Normal onNext path
end
BOUNDARY フラグの役割は特に重要です。observeOn が融合をリクエストする際に ANY | BOUNDARY を渡すのは、「スレッド境界をまたぐ」ことをアップストリームに伝えるためです。これにより、融合によって計算が意図せず別スレッドへ移動してしまうのを防ぎます。たとえば map().observeOn() が完全に融合してしまうと、map の処理が本来の購読スレッドではなく observeOn のスケジューラースレッドで実行されることになり、ユーザーの期待する動作が壊れる可能性があります。
FlowableMap では、FlowableMap.java#L79-L82 の poll() で融合がサポートされています。
public U poll() throws Throwable {
T t = qs.poll();
return t != null ? Objects.requireNonNull(mapper.apply(t), "...") : null;
}
融合が有効な状態で map の subscriber に poll() を呼ぶと、アップストリームのキューから poll() が呼ばれ、マッピング関数が適用されて結果が返ります。中間キューも onNext 呼び出しも request() の簿記処理も一切不要です。融合時の map のコストは、関数呼び出し一回分とほぼ変わりません。
ConditionalSubscriber と ScalarSupplier による最適化
大規模なオペレーターチェーン全体に効いてくる、二つのマイクロ最適化を紹介します。
ConditionalSubscriber は、filter 系オペレーター特有の問題を解決します。filter().map() のようなチェーンでは、フィルタープレディケートが false を返すたびに、次のアイテムを要求するために upstream.request(1) を呼び出さなければなりません。これでは request と onNext の往復が際限なく発生します。
ConditionalSubscriber.tryOnNext(T t) はこれを解消します。アイテムが消費されたかどうかを boolean で返すシグネチャです。FlowableFilter.java#L59-L78 を見てみましょう。
public boolean tryOnNext(T t) {
if (done) { return false; }
boolean b;
try {
b = filter.test(t);
} catch (Throwable e) {
fail(e);
return true;
}
if (b) {
downstream.onNext(t);
}
return b;
}
ConditionalSubscriber 対応のアップストリームが tryOnNext でアイテムを届けたとき、false が返れば「次をくれ」という意味になり、request() 呼び出しは不要です。フィルタリングで弾かれたアイテムに対して、リクエストカウンターへの CAS 操作がアイテム単位で発生するコストを丸ごと排除できます。
ScalarSupplier は、Flowable.just(1) のように同期的にちょうど一つの値を返すソースを識別するマーカーインターフェースです。flatMap がインナーソースとして ScalarSupplier を受け取ると、FlowableFlatMap.java#L47-L52 でサブスクライブの仕組み全体をスキップできます。subscriber の生成も onSubscribe/onNext/onComplete プロトコルもインナーサブスクリプションの追跡も不要です。値を取り出してそのまま emit するだけです。
flowchart TD
FM[flatMap receives inner source] --> CHECK{ScalarSupplier?}
CHECK -->|Yes| FAST[Get value directly, emit]
CHECK -->|No| FULL[Full subscribe protocol]
FULL --> INNER[Create InnerSubscriber]
INNER --> TRACK[Add to active subscribers]
TRACK --> SUB[Subscribe to inner source]
SUB --> DRAIN[Drain loop merges items]
これらの最適化は個別に見れば小さな改善です。しかし Flowable.range(1, 1_000_000).flatMap(...) のような長いパイプラインでは、1秒あたり数百万回ものアトミック操作を削減する効果があります。
次回予告
ドレインループパターンと融合プロトコルは土台となる仕組みですが、実際に動かすにはスレッドが必要です。第4回では RxJava のスケジューラーシステムを掘り下げます。Schedulers.computation() がラウンドロビン方式で固定スレッドプールを管理する仕組み、IO スケジューラーが負荷に応じて伸縮する仕組み、新しい仮想スレッドスケジューラーとの統合を解説します。さらに subscribeOn と observeOn がこれらのスケジューラーをどのように異なる方法で活用しているかも詳しく見ていきます。