Read OSS

RxJavaの内部構造:アーキテクチャの概要と877ファイルの歩き方

中級

前提知識

  • リアクティブプログラミングの基本概念(Observable、subscribe、オペレーター)への理解
  • Javaモジュールシステムの基礎知識(module-info.java)

RxJavaの内部構造:アーキテクチャの概要と877ファイルの歩き方

RxJavaのpublic APIは驚くほどシンプルです。いくつかのオペレーターをチェーンしてsubscribeするだけで、処理が完結します。しかしその流暢さの裏側には、膨大なコードが隠れています。数百のオペレーター実装、ロックフリーキュー、3種類のサブスクリプションプロトコル、そしてプラットフォームスレッドと仮想スレッドの両方に対応したスケジューラーシステム。これらが緻密に絡み合っています。本シリーズでは、その内部をひとつずつ解き明かしていきます。まずはここから、全体の地図を描くことから始めましょう。

プロジェクトの概要とビルドシステム

RxJava 4.xは、Java 26をベースラインとするシングルモジュールのGradleプロジェクトです。Java 8をサポートしていたRxJava 3.xからの大きな進化です。このバージョンアップにより、仮想スレッド、sealed型、レコード、パターンマッチングが使えるようになり、4.xのコードベースではこれらをフル活用しています。

ビルド設定はbuild.gradle#L64-L70に記述されています。

java {
    toolchain {
        languageVersion = JavaLanguageVersion.of(toolchainJdk)
    }
    sourceCompatibility = JavaVersion.VERSION_26
    targetCompatibility = JavaVersion.VERSION_26
}

注目すべき点として、RxJava 4.xはランタイム依存関係がゼロです。外部ライブラリorg.reactivestreamsは廃止され、FlowableはJDKに同梱されたjava.util.concurrent.Flow.Publisherを直接実装するようになりました。外部依存関係はJUnit、TestNG、Mockito、Guava、Reactive Streams TCK、JMHといったテストスコープのみに限定されています。

プロジェクトの識別情報はgradle.propertiesで定義されています。

group=io.reactivex.rxjava4
version=4.0.0-SNAPSHOT
POM_ARTIFACT_ID=rxjava

ヒント: システムプロパティのrx4.*プレフィックス(rx4.buffer-sizerx4.computation-threadsなど)はRxJava 4.x専用の名前空間で、3.xのrx3.*とは別系統です。これにより、両バージョンを同一環境で並行稼働させても、プロパティが衝突することはありません。

ディレクトリ構成

ソースツリーは深い階層構造を持っていますが、一貫したパターンで整理されています。主要なパッケージの内訳は以下のとおりです。

パッケージ 用途 概算ファイル数
core/ 5+2のリアクティブ型、Scheduler、アノテーション ~20
core/docs/ Javadoc分割用のsealed interface ~2
core/config/ 設定用レコード(例:FlatMapConfig) ~1
internal/operators/flowable/ Flowableオペレーターの実装 ~180
internal/operators/observable/ Observableオペレーターの実装 ~150
internal/operators/single/ Singleオペレーターの実装 ~50
internal/operators/maybe/ Maybeオペレーターの実装 ~50
internal/operators/completable/ Completableオペレーターの実装 ~45
internal/operators/mixed/ 型変換オペレーター(例:Flowable→Single) ~15
internal/operators/streamable/ Streamableオペレーターの実装 ~5
internal/schedulers/ Schedulerの実装 ~15
internal/subscribers/ オペレーター内部で使用するSubscriberの基底クラス ~10
internal/subscriptions/ Subscriptionヘルパー ~5
internal/util/ BackpressureHelper、AtomicThrowableなど ~10
internal/virtual/ 仮想スレッドのブリッジ実装 ~3
schedulers/ publicなScheduler API(Schedulers、TestScheduler) ~5
plugins/ RxJavaPluginsのグローバルフック 1
operators/ publicなキューおよびfusion interface ~8
subjects/, processors/ ホットソース ~10
parallel/ ParallelFlowable ~25

コアとなる5つのリアクティブ型のファイルは非常に大きく、Flowable.javaだけで21,000行を超えます。これらのファイルがfluent APIの全インターフェースを定義しており、map()filter()flatMap()といったおなじみのメソッドはすべてこれらのクラスに定義されています。ただし、各オペレーターの実装はinternal/operators/以下の独立したクラスに分離されています。

Public APIとInternal APIの境界

RxJavaはJavaモジュールシステムを使って、APIの境界を厳密に管理しています。src/main/module/module-info.java#L14-L34のモジュール定義では、公開するパッケージが明示されています。

module io.reactivex.rxjava4 {
    exports io.reactivex.rxjava4.annotations;
    exports io.reactivex.rxjava4.core;
    exports io.reactivex.rxjava4.core.docs;
    exports io.reactivex.rxjava4.core.config;
    exports io.reactivex.rxjava4.disposables;
    exports io.reactivex.rxjava4.exceptions;
    // ... 10 more exported packages
    requires java.management;
}

注目すべきは、io.reactivex.rxjava4.internal以下が一切含まれていない点です。コードの大部分はここに集約されています——オペレーターの実装、キューデータ構造、Schedulerの内部実装、ユーティリティクラスなど。モジュールパスで動作する環境では、ライブラリの利用者はこれらの型に直接アクセスできません。

flowchart TD
    subgraph Public API ["Public API (exported)"]
        A[core/ - Flowable, Observable, etc.]
        B[schedulers/ - Schedulers, TestScheduler]
        C[plugins/ - RxJavaPlugins]
        D[operators/ - QueueFuseable, SpscArrayQueue]
        E[disposables/ - Disposable]
    end
    subgraph Internal ["Internal (not exported)"]
        F[internal/operators/ ~500 files]
        G[internal/schedulers/]
        H[internal/subscribers/]
        I[internal/util/]
        J[internal/virtual/]
    end
    A -->|"delegates to"| F
    B -->|"creates"| G
    A -->|"uses"| H
    F -->|"uses"| I

この設計により、public APIの表面は実装の規模に比べて驚くほどコンパクトに保たれています。5つのコアクラスはファサードとして機能し、各メソッドは内部のオペレーターオブジェクトを生成するための薄いラッパーに過ぎません。

5+2のコアリアクティブ型

RxJavaには5つの主要なリアクティブ型と、2つの特殊な型があります。それぞれが明確に異なるユースケースに対応しています。

classDiagram
    class Flowable~T~ {
        <<non-sealed>>
        +subscribe(Subscriber)
        +subscribeActual(Subscriber)*
        Backpressure: Yes
        Items: 0..N
    }
    class Observable~T~ {
        <<abstract>>
        +subscribe(Observer)
        +subscribeActual(Observer)*
        Backpressure: No
        Items: 0..N
    }
    class Single~T~ {
        <<abstract>>
        +subscribe(SingleObserver)
        Items: exactly 1
    }
    class Maybe~T~ {
        <<abstract>>
        +subscribe(MaybeObserver)
        Items: 0 or 1
    }
    class Completable {
        <<abstract>>
        +subscribe(CompletableObserver)
        Items: 0
    }
    class Streamable~T~ {
        <<interface>>
        +stream(): Streamer~T~
        Pull-based async enumerable
    }
    class ParallelFlowable~T~ {
        <<abstract>>
        +subscribe(Subscriber[])
        Splits across rails
    }

Flowableはバックプレッシャーに対応したメインの型で、Flow.Publisherを実装しています。プロデューサーがコンシューマーを追い越す可能性があるストリームには、これが最適な選択です。Observableはその軽量版で、バックプレッシャーのオーバーヘッドがなく、UIイベントや本質的に有界なソースに適しています。

SingleMaybeCompletableはカーディナリティを制限した型です。HTTPレスポンスのようにちょうど1つの値を返すストリームは、無制限のストリームとは根本的に異なるセマンティクスを持ちます。これらの型はその制約を型システムに落とし込んでいます。

4.xで新たに加わった2つの型がStreamableParallelFlowableです。Streamableは仮想スレッド向けに設計されたプルベースの非同期列挙型、ParallelFlowableはCPUバウンドな処理のためにストリームを複数の「レール」に分割して並列処理する型です。Streamableの詳細はPart 6で掘り下げます。

ヒント: 型を選ぶときはまずカーディナリティで考えましょう。0件ならCompletable、0件または1件ならMaybe、厳密に1件ならSingle、0件以上ならFlowableまたはObservableです。次にバックプレッシャーの有無で絞り込みます。ネットワーク・IOソースにはFlowable、UIイベントにはObservableが適しています。

オペレーターの探し方:命名規則

500を超えるオペレーターファイルを効率よく探すには、体系的なアプローチが必要です。RxJavaは厳格な命名規則によってこれを解決しています。

パターン: {ReactiveType}{OperatorName}.java、配置先はinternal/operators/{type}/

  • Flowable.map()FlowableMap.javainternal/operators/flowable/
  • Observable.map()ObservableMap.javainternal/operators/observable/
  • Flowable.flatMap()FlowableFlatMap.java
  • Single.map()SingleMap.java

この命名規則はPart 7で解説するバリデーターのメタテストによって強制されています。InternalWrongNamingバリデーターが違反を自動的に検出します。

型変換やオーバーロードに対応するために、サフィックス付きのバリアントも存在します。

  • FlowableCollectSingle.java — Singleを返すFlowableオペレーター
  • FlowableElementAtMaybe.java — Maybeを返す
  • FlowableConcatMapScheduler.java — Schedulerをパラメーターに取るバリアント

この命名パターンを身につければ、どのAPIコールからでも実装にたどり着くまで数秒とかかりません。

設定:システムプロパティとRxJavaPlugins

RxJavaは設定ファイルを使いません。代わりに、システムプロパティとプラグインフックという2つの設定メカニズムを提供しています。

システムプロパティ(クラスロード前に設定)はバッファサイズとスレッドプールを制御します。すべてrx4.プレフィックスを使用します。

プロパティ デフォルト値 効果
rx4.buffer-size 128 オペレーターのデフォルトバッファサイズ
rx4.computation-threads CPU count Computationスケジューラーのプールサイズ
rx4.cached-keep-alive-time 60 (seconds) IOスケジューラーのワーカー有効期限
rx4.computation-priority NORM_PRIORITY Computationスレッドの優先度
rx4.scheduler.use-nanotime false currentTimeMillisの代わりにnanoTimeを使用

RxJavaPluginsはエラーハンドリング、Schedulerの置き換え、インストルメンテーション用のランタイムフックを提供します。フックフィールドはRxJavaPlugins.java#L34-L135で定義されています。

flowchart LR
    subgraph RxJavaPlugins
        EH[errorHandler]
        SA[onFlowableAssembly]
        SS[onFlowableSubscribe]
        SCH[onScheduleHandler]
        SI[onInitComputationHandler]
        LD[lockdown]
    end
    SA -->|"intercepts"| OP[Operator creation]
    SS -->|"intercepts"| SUB[subscribe call]
    EH -->|"catches"| UE[Undeliverable errors]
    SCH -->|"wraps"| RUN[Scheduled Runnables]
    LD -->|"freezes"| SA
    LD -->|"freezes"| SS

lockdown()メソッドは本番環境において特に重要です。一度呼び出すと、プラグインフックへの変更が一切できなくなり、ライブラリ同士が互いの設定に干渉するのを防ぎます。

次回予告

全体像が掴めたところで、いよいよ内部の仕組みへと踏み込んでいきましょう。Part 2では、subscribeの全体フローを追っていきます。flowable.subscribe(subscriber)の呼び出しから始まり、プラグインフックを経て、抽象メソッドsubscribeActual()を下り、fluent API全体を支えるオペレーターデコレーターパターンへと辿り着きます。FlowableMapをRxJavaのすべてのオペレーターに共通する典型例として、詳しく解説していきます。