サブスクライブの連鎖:RxJava のすべてのオペレーターが実際にどう動くか
前提知識
- ›第1回:アーキテクチャとコードベースのナビゲーション
- ›Reactive Streams 仕様の基礎(Publisher/Subscriber/Subscription の契約)
- ›デコレーターデザインパターン
サブスクライブの連鎖:RxJava のすべてのオペレーターが実際にどう動くか
これまで RxJava で組んできたパイプラインは、すべて同じライフサイクルをたどっています。まずオペレーターのチェーンを組み立て、次に subscribe を呼び出してデータの流れを起動する、という流れです。一見シンプルな subscribe() の呼び出しですが、その裏ではプラグインフックを経由し、subscriber を安全なレイヤーで包み込み、オペレーターチェーンを逆向きに遡るという処理が行われています。このメカニズムを理解することが、RxJava のあらゆる仕組みを理解するための鍵となります。
subscribe() のエントリーポイント
Flowable に対して subscribe() を呼び出すと、Flowable.java#L16009-L16031 のメソッドが次の3ステップを実行します。
public final void subscribe(@NonNull FlowableSubscriber<? super T> subscriber) {
Objects.requireNonNull(subscriber, "subscriber is null");
try {
Subscriber<? super T> flowableSubscriber = RxJavaPlugins.onSubscribe(this, subscriber);
Objects.requireNonNull(flowableSubscriber, "The RxJavaPlugins.onSubscribe hook returned a null...");
subscribeActual(flowableSubscriber);
} catch (NullPointerException e) {
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
// ...
}
}
- null チェック — subscriber が null でないことを確認する
- subscribe フックの実行 —
RxJavaPlugins.onSubscribe()が subscriber を差し替えたり、ラップしたりできる subscribeActual()への委譲 — すべてのオペレーターが実装する abstract メソッドを呼び出す
これは Template Method パターンの典型例です。subscribe() メソッドは final であるため、オペレーターはこれをオーバーライドできません。代わりに subscribeActual() を実装することで、プラグインフックが必ず実行されることが保証されます。
sequenceDiagram
participant User
participant Flowable
participant RxJavaPlugins
participant Operator as subscribeActual()
User->>Flowable: subscribe(subscriber)
Flowable->>Flowable: Objects.requireNonNull(subscriber)
Flowable->>RxJavaPlugins: onSubscribe(this, subscriber)
RxJavaPlugins-->>Flowable: possibly wrapped subscriber
Flowable->>Operator: subscribeActual(wrappedSubscriber)
エラーハンドリングには微妙な事情があります。subscribeActual が例外をスローした場合、RxJava は onError を呼べません(subscriber がまだ onSubscribe を受け取っていない可能性があるためです)。かといって onSubscribe も呼べません(すでに呼ばれている可能性があるためです)。そこで、エラーは未配信エラーとして RxJavaPlugins.onError() に転送され、その後 NullPointerException でラップしてスローされます。これは Reactive Streams 仕様を満たすための実用的な回避策です。
AbstractFlowableWithUpstream:オペレーターの基底クラス
source.map(fn).filter(pred).observeOn(scheduler) のようにオペレーターをチェーンすると、デコレーターオブジェクトの連結リストが構築されます。各オペレータークラスは Flowable を継承し、上流ソースへの参照を保持します。
このパターンの基底クラスが AbstractFlowableWithUpstream です。
abstract class AbstractFlowableWithUpstream<T, R> extends Flowable<R>
implements HasUpstreamPublisher<T> {
protected final Flowable<T> source;
AbstractFlowableWithUpstream(Flowable<T> source) {
this.source = Objects.requireNonNull(source, "source is null");
}
}
フィールドとコンストラクターだけというシンプルな構成ですが、デコレーターチェーンの骨格を確立しています。
classDiagram
class Flowable~T~ {
<<abstract>>
+subscribe(Subscriber)
#subscribeActual(Subscriber)*
}
class AbstractFlowableWithUpstream~T_R~ {
<<abstract>>
#source: Flowable~T~
}
class FlowableMap~T_U~ {
-mapper: Function
#subscribeActual(Subscriber)
}
class FlowableFilter~T~ {
-predicate: Predicate
#subscribeActual(Subscriber)
}
class FlowableObserveOn~T~ {
-scheduler: Scheduler
#subscribeActual(Subscriber)
}
Flowable <|-- AbstractFlowableWithUpstream
AbstractFlowableWithUpstream <|-- FlowableMap
AbstractFlowableWithUpstream <|-- FlowableFilter
AbstractFlowableWithUpstream <|-- FlowableObserveOn
source.map(fn).filter(pred) と書いたとき、組み立て時に生成されるのは FlowableFilter(source=FlowableMap(source=originalSource)) という構造です。subscribe すると、呼び出しは逆向きに伝播します。FlowableFilter.subscribeActual が FlowableMap をサブスクライブし、FlowableMap が originalSource をサブスクライブするという順序になります。
詳解:FlowableMap — オペレーターの典型例
FlowableMap.java#L26-L83 の FlowableMap は、最もシンプルな完全実装のオペレーターでありながら、すべてのパターンを示しています。
public final class FlowableMap<T, U> extends AbstractFlowableWithUpstream<T, U> {
final Function<? super T, ? extends U> mapper;
@Override
protected void subscribeActual(Subscriber<? super U> s) {
if (s instanceof ConditionalSubscriber) {
source.subscribe(new MapConditionalSubscriber<>((ConditionalSubscriber<? super U>)s, mapper));
} else {
source.subscribe(new MapSubscriber<>(s, mapper));
}
}
subscribeActual は、下流の subscriber を MapSubscriber でラップし、それを上流ソースにサブスクライブします。これがデコレーターパターンの実装そのものです。MapSubscriber は onNext シグナルを横取りし、関数を適用した上で結果を転送します。
MapSubscriber.onNext メソッドを見ると、その全体像がよくわかります。
public void onNext(T t) {
if (done) { return; }
if (sourceMode != NONE) {
downstream.onNext(null); // fusion mode: signal "poll me"
return;
}
U v;
try {
v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
ここには3つの重要なパターンが見て取れます。
- done ガード — まず
doneフラグを確認し、遅延シグナルを無音で捨てる - fusion ショートサーキット — fusion モードで動作している場合(
sourceMode != NONE)、マップ済みの値ではなく null センチネルを転送し、下流が直接キューをpoll()できるようにする fail()を使ったトライキャッチ — ユーザーコード(mapper.apply(t))は常にラップされる。fail()ヘルパーは上流をキャンセルし、下流にonErrorを通知する
fusion のサポートは poll() でも実装されています。
public U poll() throws Throwable {
T t = qs.poll();
return t != null ? Objects.requireNonNull(mapper.apply(t), "...") : null;
}
fusion モードでは、map 関数は onNext() ではなく poll() 内で適用されます。値は上流のキューからマッピング関数を通り、中間キューへの蓄積なしに直接流れていきます。このメカニズムは第3回で詳しく掘り下げます。
sequenceDiagram
participant Source
participant MapSubscriber
participant Downstream
Source->>MapSubscriber: onSubscribe(subscription)
MapSubscriber->>Downstream: onSubscribe(mapSubscriber)
Note over Downstream: request(n)
Downstream->>MapSubscriber: request(n)
MapSubscriber->>Source: request(n)
Source->>MapSubscriber: onNext(t)
MapSubscriber->>MapSubscriber: v = mapper.apply(t)
MapSubscriber->>Downstream: onNext(v)
Source->>MapSubscriber: onComplete()
MapSubscriber->>Downstream: onComplete()
ポイント: RxJava のすべてのオペレーターはこの構造に従っています。
AbstractFlowableWithUpstreamを継承し、subscribeActualで下流をラップし、変換を加えながらシグナルを転送する、という流れです。FlowableMapを理解すれば、500 を超える他のオペレーターファイルを読み解くための共通テンプレートが手に入ります。
Flowable と Observable:2つのサブスクリプションプロトコル
Flowable と Observable は、根本的に異なるサブスクリプションの仕組みを採用しています。この設計上の違いは、すべてのオペレーター実装に影響します。
Flowable は Subscription を使った Reactive Streams プロトコルに従います。
Subscriber.onSubscribe(Subscription s)— サブスクリプションを届けるSubscription.request(long n)— 下流が N 件のアイテムをリクエストする(バックプレッシャー)Subscription.cancel()— 下流がキャンセルする
Observable は Disposable を使います。
Observer.onSubscribe(Disposable d)— Disposable を届けるDisposable.dispose()— 下流がキャンセルするrequest()なし — バックプレッシャーなし
この結果、両方のタイプに存在するオペレーターはそれぞれ2つの実装を持つことになります。FlowableMap と ObservableMap、FlowableFilter と ObservableFilter、という具合です。Flowable 版は request(n) の転送を処理しなければなりませんが、Observable 版にその必要はありません。
classDiagram
class Subscription {
<<interface>>
+request(long n)
+cancel()
}
class Disposable {
<<interface>>
+dispose()
+isDisposed(): boolean
+close()
}
class FlowableMap {
Uses Subscription
Handles request(n)
}
class ObservableMap {
Uses Disposable
No backpressure
}
Subscription <.. FlowableMap : uses
Disposable <.. ObservableMap : uses
RxJava 4.x では Disposable が AutoCloseable を継承しており、try-with-resources でサブスクリプションのライフサイクルを管理できるようになりました。Disposable.java#L28-L46 による、地味ながら実用的な使い勝手の向上です。
オペレーターにおけるエラーハンドリング
RxJava オペレーターのエラーハンドリングは厳格なプロトコルに従っており、先ほどの FlowableMap の例でも確認できます。
まず、致命的なエラーはただちに再スローされます。Exceptions.java#L66-L73 の Exceptions.throwIfFatal() は VirtualMachineError と LinkageError を検査します。
public static void throwIfFatal(@NonNull Throwable t) {
if (t instanceof VirtualMachineError) {
throw (VirtualMachineError) t;
} else if (t instanceof LinkageError) {
throw (LinkageError) t;
}
}
それ以外の例外はキャッチ可能として扱われ、リアクティブなエラーチャンネルを通じて転送されます。BasicFuseableSubscriber の fail() ヘルパーメソッドが標準的な処理シーケンスを担い、上流のキャンセル、done フラグの設定、downstream.onError(ex) の呼び出しを順に行います。
エラーの届け先がない場合 — たとえば onComplete の後に onError が呼ばれた場合や、キャンセル中にエラーが発生した場合など — RxJava はそのエラーを RxJavaPlugins.onError() に転送します。エラーハンドラーが設定されていない場合は、stderr に出力した上でカレントスレッドでスローされます。この「未配信エラー」の仕組みが、例外がサイレントに握り潰されることを防ぐセーフティネットとなっています。
flowchart TD
EX[Exception in operator] --> TIF{throwIfFatal?}
TIF -->|VirtualMachineError/LinkageError| RETHROW[Rethrow immediately]
TIF -->|Other| DELIVER{Can deliver onError?}
DELIVER -->|Yes| CANCEL[Cancel upstream] --> ONERROR[downstream.onError]
DELIVER -->|No: already terminal| PLUGIN[RxJavaPlugins.onError]
PLUGIN --> HANDLER{Handler set?}
HANDLER -->|Yes| CUSTOM[Custom handler]
HANDLER -->|No| STDERR[Print + throw on thread]
Assembly フックと Subscribe フック
RxJavaPlugins はすべてのリアクティブタイプに対して2つのインターセプトポイントを提供しています。assembly フックとsubscribe フックです。
assembly フックは、サブスクリプション時ではなく、オペレーターが生成されるチェーン構築時に発火します。RxJavaPlugins.java#L72-L73 の onFlowableAssembly フックは、構築中の Flowable をインターセプトして差し替えることができます。
static volatile Function<? super Flowable, ? extends Flowable> onFlowableAssembly;
subscribe フックは subscribe() が呼ばれたとき、subscribeActual() の実行前に発火します。RxJavaPlugins.java#L102-L104 の onFlowableSubscribe フックは Flowable と Subscriber の両方を受け取り、後者を差し替えることができます。
static volatile BiFunction<? super Flowable, ? super Subscriber, ? extends Subscriber> onFlowableSubscribe;
これらのフックはデバッグツールの基盤となっています。assembly フックを使ってオペレーター生成時にスタックトレースをキャプチャすれば、「このオペレーターはどこで組み立てられたのか?」というデバッグの定番の悩みを解決できます。また、subscribe フックですべての subscriber をインストルメンテーションでラップすれば、シグナルをログに記録することも可能です。
sequenceDiagram
participant Dev as Developer
participant Assembly as Assembly Hook
participant Chain as Operator Chain
participant Subscribe as Subscribe Hook
participant Actual as subscribeActual
Dev->>Chain: source.map(fn).filter(pred)
Chain->>Assembly: onFlowableAssembly(FlowableMap)
Assembly-->>Chain: possibly wrapped Flowable
Chain->>Assembly: onFlowableAssembly(FlowableFilter)
Dev->>Chain: .subscribe(subscriber)
Chain->>Subscribe: onFlowableSubscribe(flowable, subscriber)
Subscribe-->>Chain: possibly wrapped subscriber
Chain->>Actual: subscribeActual(wrappedSubscriber)
どちらのフックもすべてのリアクティブタイプに対応しており、Observable、Single、Maybe、Completable、ParallelFlowable にも並行してフックが用意されています。
次回予告
オペレーターの組み立て方と、サブスクリプションがチェーンを伝播する仕組みを確認しました。ただし、最もパフォーマンスに直結する部分、つまりオペレーターが並行シグナルを安全に扱う方法と、fusion が中間キューをどう排除するかについては、まだ触れていません。第3回では内部の核心部分に踏み込み、atomic なドレインループパターン、ロックフリーな SPSC キュー、そして RxJava を JVM 最速クラスのリアクティブライブラリたらしめる QueueFuseable プロトコルを解説します。