Read OSS

サブスクライブの連鎖:RxJava のすべてのオペレーターが実際にどう動くか

中級

前提知識

  • 第1回:アーキテクチャとコードベースのナビゲーション
  • Reactive Streams 仕様の基礎(Publisher/Subscriber/Subscription の契約)
  • デコレーターデザインパターン

サブスクライブの連鎖:RxJava のすべてのオペレーターが実際にどう動くか

これまで RxJava で組んできたパイプラインは、すべて同じライフサイクルをたどっています。まずオペレーターのチェーンを組み立て、次に subscribe を呼び出してデータの流れを起動する、という流れです。一見シンプルな subscribe() の呼び出しですが、その裏ではプラグインフックを経由し、subscriber を安全なレイヤーで包み込み、オペレーターチェーンを逆向きに遡るという処理が行われています。このメカニズムを理解することが、RxJava のあらゆる仕組みを理解するための鍵となります。

subscribe() のエントリーポイント

Flowable に対して subscribe() を呼び出すと、Flowable.java#L16009-L16031 のメソッドが次の3ステップを実行します。

public final void subscribe(@NonNull FlowableSubscriber<? super T> subscriber) {
    Objects.requireNonNull(subscriber, "subscriber is null");
    try {
        Subscriber<? super T> flowableSubscriber = RxJavaPlugins.onSubscribe(this, subscriber);
        Objects.requireNonNull(flowableSubscriber, "The RxJavaPlugins.onSubscribe hook returned a null...");
        subscribeActual(flowableSubscriber);
    } catch (NullPointerException e) {
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        RxJavaPlugins.onError(e);
        // ...
    }
}
  1. null チェック — subscriber が null でないことを確認する
  2. subscribe フックの実行RxJavaPlugins.onSubscribe() が subscriber を差し替えたり、ラップしたりできる
  3. subscribeActual() への委譲 — すべてのオペレーターが実装する abstract メソッドを呼び出す

これは Template Method パターンの典型例です。subscribe() メソッドは final であるため、オペレーターはこれをオーバーライドできません。代わりに subscribeActual() を実装することで、プラグインフックが必ず実行されることが保証されます。

sequenceDiagram
    participant User
    participant Flowable
    participant RxJavaPlugins
    participant Operator as subscribeActual()
    
    User->>Flowable: subscribe(subscriber)
    Flowable->>Flowable: Objects.requireNonNull(subscriber)
    Flowable->>RxJavaPlugins: onSubscribe(this, subscriber)
    RxJavaPlugins-->>Flowable: possibly wrapped subscriber
    Flowable->>Operator: subscribeActual(wrappedSubscriber)

エラーハンドリングには微妙な事情があります。subscribeActual が例外をスローした場合、RxJava は onError を呼べません(subscriber がまだ onSubscribe を受け取っていない可能性があるためです)。かといって onSubscribe も呼べません(すでに呼ばれている可能性があるためです)。そこで、エラーは未配信エラーとして RxJavaPlugins.onError() に転送され、その後 NullPointerException でラップしてスローされます。これは Reactive Streams 仕様を満たすための実用的な回避策です。

AbstractFlowableWithUpstream:オペレーターの基底クラス

source.map(fn).filter(pred).observeOn(scheduler) のようにオペレーターをチェーンすると、デコレーターオブジェクトの連結リストが構築されます。各オペレータークラスは Flowable を継承し、上流ソースへの参照を保持します。

このパターンの基底クラスが AbstractFlowableWithUpstream です。

abstract class AbstractFlowableWithUpstream<T, R> extends Flowable<R> 
    implements HasUpstreamPublisher<T> {
    
    protected final Flowable<T> source;

    AbstractFlowableWithUpstream(Flowable<T> source) {
        this.source = Objects.requireNonNull(source, "source is null");
    }
}

フィールドとコンストラクターだけというシンプルな構成ですが、デコレーターチェーンの骨格を確立しています。

classDiagram
    class Flowable~T~ {
        <<abstract>>
        +subscribe(Subscriber)
        #subscribeActual(Subscriber)*
    }
    class AbstractFlowableWithUpstream~T_R~ {
        <<abstract>>
        #source: Flowable~T~
    }
    class FlowableMap~T_U~ {
        -mapper: Function
        #subscribeActual(Subscriber)
    }
    class FlowableFilter~T~ {
        -predicate: Predicate
        #subscribeActual(Subscriber)
    }
    class FlowableObserveOn~T~ {
        -scheduler: Scheduler
        #subscribeActual(Subscriber)
    }
    
    Flowable <|-- AbstractFlowableWithUpstream
    AbstractFlowableWithUpstream <|-- FlowableMap
    AbstractFlowableWithUpstream <|-- FlowableFilter
    AbstractFlowableWithUpstream <|-- FlowableObserveOn

source.map(fn).filter(pred) と書いたとき、組み立て時に生成されるのは FlowableFilter(source=FlowableMap(source=originalSource)) という構造です。subscribe すると、呼び出しは逆向きに伝播します。FlowableFilter.subscribeActualFlowableMap をサブスクライブし、FlowableMaporiginalSource をサブスクライブするという順序になります。

詳解:FlowableMap — オペレーターの典型例

FlowableMap.java#L26-L83FlowableMap は、最もシンプルな完全実装のオペレーターでありながら、すべてのパターンを示しています。

public final class FlowableMap<T, U> extends AbstractFlowableWithUpstream<T, U> {
    final Function<? super T, ? extends U> mapper;
    
    @Override
    protected void subscribeActual(Subscriber<? super U> s) {
        if (s instanceof ConditionalSubscriber) {
            source.subscribe(new MapConditionalSubscriber<>((ConditionalSubscriber<? super U>)s, mapper));
        } else {
            source.subscribe(new MapSubscriber<>(s, mapper));
        }
    }

subscribeActual は、下流の subscriber を MapSubscriber でラップし、それを上流ソースにサブスクライブします。これがデコレーターパターンの実装そのものです。MapSubscriberonNext シグナルを横取りし、関数を適用した上で結果を転送します。

MapSubscriber.onNext メソッドを見ると、その全体像がよくわかります。

public void onNext(T t) {
    if (done) { return; }
    if (sourceMode != NONE) {
        downstream.onNext(null);  // fusion mode: signal "poll me"
        return;
    }
    U v;
    try {
        v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
    } catch (Throwable ex) {
        fail(ex);
        return;
    }
    downstream.onNext(v);
}

ここには3つの重要なパターンが見て取れます。

  1. done ガード — まず done フラグを確認し、遅延シグナルを無音で捨てる
  2. fusion ショートサーキット — fusion モードで動作している場合(sourceMode != NONE)、マップ済みの値ではなく null センチネルを転送し、下流が直接キューを poll() できるようにする
  3. fail() を使ったトライキャッチ — ユーザーコード(mapper.apply(t))は常にラップされる。fail() ヘルパーは上流をキャンセルし、下流に onError を通知する

fusion のサポートは poll() でも実装されています。

public U poll() throws Throwable {
    T t = qs.poll();
    return t != null ? Objects.requireNonNull(mapper.apply(t), "...") : null;
}

fusion モードでは、map 関数は onNext() ではなく poll() 内で適用されます。値は上流のキューからマッピング関数を通り、中間キューへの蓄積なしに直接流れていきます。このメカニズムは第3回で詳しく掘り下げます。

sequenceDiagram
    participant Source
    participant MapSubscriber
    participant Downstream
    
    Source->>MapSubscriber: onSubscribe(subscription)
    MapSubscriber->>Downstream: onSubscribe(mapSubscriber)
    
    Note over Downstream: request(n)
    Downstream->>MapSubscriber: request(n)
    MapSubscriber->>Source: request(n)
    
    Source->>MapSubscriber: onNext(t)
    MapSubscriber->>MapSubscriber: v = mapper.apply(t)
    MapSubscriber->>Downstream: onNext(v)
    
    Source->>MapSubscriber: onComplete()
    MapSubscriber->>Downstream: onComplete()

ポイント: RxJava のすべてのオペレーターはこの構造に従っています。AbstractFlowableWithUpstream を継承し、subscribeActual で下流をラップし、変換を加えながらシグナルを転送する、という流れです。FlowableMap を理解すれば、500 を超える他のオペレーターファイルを読み解くための共通テンプレートが手に入ります。

Flowable と Observable:2つのサブスクリプションプロトコル

Flowable と Observable は、根本的に異なるサブスクリプションの仕組みを採用しています。この設計上の違いは、すべてのオペレーター実装に影響します。

FlowableSubscription を使った Reactive Streams プロトコルに従います。

  • Subscriber.onSubscribe(Subscription s) — サブスクリプションを届ける
  • Subscription.request(long n) — 下流が N 件のアイテムをリクエストする(バックプレッシャー)
  • Subscription.cancel() — 下流がキャンセルする

ObservableDisposable を使います。

  • Observer.onSubscribe(Disposable d) — Disposable を届ける
  • Disposable.dispose() — 下流がキャンセルする
  • request() なし — バックプレッシャーなし

この結果、両方のタイプに存在するオペレーターはそれぞれ2つの実装を持つことになります。FlowableMapObservableMapFlowableFilterObservableFilter、という具合です。Flowable 版は request(n) の転送を処理しなければなりませんが、Observable 版にその必要はありません。

classDiagram
    class Subscription {
        <<interface>>
        +request(long n)
        +cancel()
    }
    class Disposable {
        <<interface>>
        +dispose()
        +isDisposed(): boolean
        +close()
    }
    
    class FlowableMap {
        Uses Subscription
        Handles request(n)
    }
    class ObservableMap {
        Uses Disposable
        No backpressure
    }
    
    Subscription <.. FlowableMap : uses
    Disposable <.. ObservableMap : uses

RxJava 4.x では DisposableAutoCloseable を継承しており、try-with-resources でサブスクリプションのライフサイクルを管理できるようになりました。Disposable.java#L28-L46 による、地味ながら実用的な使い勝手の向上です。

オペレーターにおけるエラーハンドリング

RxJava オペレーターのエラーハンドリングは厳格なプロトコルに従っており、先ほどの FlowableMap の例でも確認できます。

まず、致命的なエラーはただちに再スローされます。Exceptions.java#L66-L73Exceptions.throwIfFatal()VirtualMachineErrorLinkageError を検査します。

public static void throwIfFatal(@NonNull Throwable t) {
    if (t instanceof VirtualMachineError) {
        throw (VirtualMachineError) t;
    } else if (t instanceof LinkageError) {
        throw (LinkageError) t;
    }
}

それ以外の例外はキャッチ可能として扱われ、リアクティブなエラーチャンネルを通じて転送されます。BasicFuseableSubscriberfail() ヘルパーメソッドが標準的な処理シーケンスを担い、上流のキャンセル、done フラグの設定、downstream.onError(ex) の呼び出しを順に行います。

エラーの届け先がない場合 — たとえば onComplete の後に onError が呼ばれた場合や、キャンセル中にエラーが発生した場合など — RxJava はそのエラーを RxJavaPlugins.onError() に転送します。エラーハンドラーが設定されていない場合は、stderr に出力した上でカレントスレッドでスローされます。この「未配信エラー」の仕組みが、例外がサイレントに握り潰されることを防ぐセーフティネットとなっています。

flowchart TD
    EX[Exception in operator] --> TIF{throwIfFatal?}
    TIF -->|VirtualMachineError/LinkageError| RETHROW[Rethrow immediately]
    TIF -->|Other| DELIVER{Can deliver onError?}
    DELIVER -->|Yes| CANCEL[Cancel upstream] --> ONERROR[downstream.onError]
    DELIVER -->|No: already terminal| PLUGIN[RxJavaPlugins.onError]
    PLUGIN --> HANDLER{Handler set?}
    HANDLER -->|Yes| CUSTOM[Custom handler]
    HANDLER -->|No| STDERR[Print + throw on thread]

Assembly フックと Subscribe フック

RxJavaPlugins はすべてのリアクティブタイプに対して2つのインターセプトポイントを提供しています。assembly フックsubscribe フックです。

assembly フックは、サブスクリプション時ではなく、オペレーターが生成されるチェーン構築時に発火します。RxJavaPlugins.java#L72-L73onFlowableAssembly フックは、構築中の Flowable をインターセプトして差し替えることができます。

static volatile Function<? super Flowable, ? extends Flowable> onFlowableAssembly;

subscribe フックは subscribe() が呼ばれたとき、subscribeActual() の実行前に発火します。RxJavaPlugins.java#L102-L104onFlowableSubscribe フックは Flowable と Subscriber の両方を受け取り、後者を差し替えることができます。

static volatile BiFunction<? super Flowable, ? super Subscriber, ? extends Subscriber> onFlowableSubscribe;

これらのフックはデバッグツールの基盤となっています。assembly フックを使ってオペレーター生成時にスタックトレースをキャプチャすれば、「このオペレーターはどこで組み立てられたのか?」というデバッグの定番の悩みを解決できます。また、subscribe フックですべての subscriber をインストルメンテーションでラップすれば、シグナルをログに記録することも可能です。

sequenceDiagram
    participant Dev as Developer
    participant Assembly as Assembly Hook
    participant Chain as Operator Chain
    participant Subscribe as Subscribe Hook
    participant Actual as subscribeActual
    
    Dev->>Chain: source.map(fn).filter(pred)
    Chain->>Assembly: onFlowableAssembly(FlowableMap)
    Assembly-->>Chain: possibly wrapped Flowable
    Chain->>Assembly: onFlowableAssembly(FlowableFilter)
    
    Dev->>Chain: .subscribe(subscriber)
    Chain->>Subscribe: onFlowableSubscribe(flowable, subscriber)
    Subscribe-->>Chain: possibly wrapped subscriber
    Chain->>Actual: subscribeActual(wrappedSubscriber)

どちらのフックもすべてのリアクティブタイプに対応しており、Observable、Single、Maybe、Completable、ParallelFlowable にも並行してフックが用意されています。

次回予告

オペレーターの組み立て方と、サブスクリプションがチェーンを伝播する仕組みを確認しました。ただし、最もパフォーマンスに直結する部分、つまりオペレーターが並行シグナルを安全に扱う方法と、fusion が中間キューをどう排除するかについては、まだ触れていません。第3回では内部の核心部分に踏み込み、atomic なドレインループパターン、ロックフリーな SPSC キュー、そして RxJava を JVM 最速クラスのリアクティブライブラリたらしめる QueueFuseable プロトコルを解説します。