Read OSS

54,000行を正直に保つ:RxJavaのテストとバリデーションアーキテクチャ

中級

前提知識

  • 第1回:アーキテクチャとコードベースのナビゲーション
  • 第4回:スケジューラーシステムとスレッドモデル(TestSchedulerの文脈として)
  • JUnit 5の基礎知識

54,000行を正直に保つ:RxJavaのテストとバリデーションアーキテクチャ

500を超えるオペレーター実装、ロックフリーな並行処理パターン、3つのサブスクリプションプロトコル。これだけの要素が絡み合うコードベースは、本質的に壊れやすいものです。ドレインループのちょっとしたミスがデータロストを引き起こし、マッパー関数のnullチェック漏れが本番環境をクラッシュさせ、内部クラスの命名ミスが500もの他のファイルが依拠するナビゲーション規約を壊してしまいます。RxJavaがこうした障害に対抗するのが、単体テストをはるかに超えた多層的なテストアーキテクチャです。流暢なリアクティブアサーション、仮想時間の制御、コードベース規約を検証する24のメタテスト、そしてバリデーターを最初に実行するCIパイプライン。これらが一体となって品質を守っています。

TestSubscriberとTestObserver:流暢なリアクティブアサーション

リアクティブストリームのテストは本質的に難しいものです。シグナルは非同期で、順序が重要で、データと終端イベントを同時に検証しなければなりません。RxJavaはこの問題をTestSubscriber(Flowable向け)とTestObserver(Observable向け)で解決しています。

TestSubscriberFlowableSubscriberを実装し、あらゆるシグナルをキャプチャします。

public class TestSubscriber<T>
    extends BaseTestConsumer<T, TestSubscriber<T>>
    implements FlowableSubscriber<T>, Subscription {
    
    private final Subscriber<? super T> downstream;
    private volatile boolean cancelled;
    private final AtomicReference<Subscription> upstream;
    private final AtomicLong missedRequested;

流暢なアサーション APIを使うと、テストコードは非常に読みやすくなります。

Flowable.range(1, 5)
    .map(i -> i * 2)
    .test()
    .assertValues(2, 4, 6, 8, 10)
    .assertComplete()
    .assertNoErrors();

.test()メソッドはTestSubscriberの生成・サブスクライブ・返却をまとめて行う便利なショートカットです。バックプレッシャーのテストでは、リクエスト量を細かく制御できます。

Flowable.range(1, 100)
    .test(0)                    // Initial request of 0
    .assertEmpty()              // No items received
    .requestMore(5)             // Request 5
    .assertValueCount(5)        // Got exactly 5
    .requestMore(Long.MAX_VALUE) // Unbounded
    .assertValueCount(100)
    .assertComplete();
classDiagram
    class BaseTestConsumer~T_U~ {
        #values: List~T~
        #errors: List~Throwable~
        #completions: long
        +assertValues(T...): U
        +assertError(Class): U
        +assertComplete(): U
        +assertNotComplete(): U
        +awaitDone(long, TimeUnit): U
        +assertValueCount(int): U
    }
    class TestSubscriber~T~ {
        -upstream: AtomicReference~Subscription~
        -missedRequested: AtomicLong
        +requestMore(long)
        +cancel()
    }
    class TestObserver~T~ {
        -upstream: AtomicReference~Disposable~
        +dispose()
    }
    
    BaseTestConsumer <|-- TestSubscriber
    BaseTestConsumer <|-- TestObserver

awaitDone()メソッドは非同期オペレーターのテストに欠かせません。サブスクライバーが終端イベントを受け取るか、タイムアウトが切れるまでテストスレッドをブロックします。

Flowable.just(1)
    .delay(100, TimeUnit.MILLISECONDS)
    .test()
    .awaitDone(1, TimeUnit.SECONDS)
    .assertValue(1);

ヒント: 時間ベースのオペレーターをテストするときは、実際の遅延を伴うawaitDone()よりも次に紹介するTestSchedulerを優先しましょう。実際の待機はテストを遅く不安定にします。awaitDoneが本領を発揮するのは、実際のスケジューラーでsubscribeOnをテストするなど、真に非同期な処理を待つ必要がある場面です。

TestScheduler:決定論的テストのための仮想時間

TestSchedulerは時間を手動で制御できる仕組みを提供します。実際に待つことなく、delay()interval()timeout()debounce()といったオペレーターを決定論的にテストできます。

public final class TestScheduler extends Scheduler {
    final Queue<TimedRunnable> queue = new PriorityBlockingQueue<>(11);
    volatile long time;  // Stored in nanoseconds

TestSchedulerでスケジュールされたタスクは、実行予定時刻順に並べられた優先度付きキューに入れられます。仮想クロックが進むのは、明示的に指示したときだけです。

  • advanceTimeBy(amount, unit) — クロックを相対的に進める
  • advanceTimeTo(time, unit) — クロックを絶対的な時点に合わせる
  • triggerActions() — 現在時刻で実行すべきタスクをすべて実行する
sequenceDiagram
    participant Test
    participant TS as TestScheduler
    participant Op as delay operator
    
    Test->>TS: Create TestScheduler (time=0)
    Test->>Op: Flowable.just(1).delay(5, SECONDS, testScheduler)
    Test->>Op: .test()
    
    Note over Test: No time has passed
    Test->>TS: ts.assertEmpty()
    
    Test->>TS: advanceTimeBy(3, SECONDS)
    Note over TS: time=3s, delay=5s — not yet
    Test->>TS: ts.assertEmpty()
    
    Test->>TS: advanceTimeBy(2, SECONDS)
    Note over TS: time=5s, delay=5s — fire!
    TS->>Op: Execute delayed onNext
    Test->>TS: ts.assertValue(1).assertComplete()

複雑なタイミングシナリオのテストにも、この仕組みは絶大な威力を発揮します。

TestScheduler scheduler = new TestScheduler();

TestSubscriber<Long> ts = Flowable.interval(1, TimeUnit.SECONDS, scheduler)
    .test();

ts.assertEmpty();

scheduler.advanceTimeBy(3, TimeUnit.SECONDS);
ts.assertValues(0L, 1L, 2L);

scheduler.advanceTimeBy(2, TimeUnit.SECONDS);
ts.assertValues(0L, 1L, 2L, 3L, 4L);

TestSchedulerは仮想クロックに従うTestWorkerインスタンスを生成します。遅延タスクは実行予定時刻とともにキューに入れられ、advanceTimeByを呼ぶと予定時刻が新しいクロック値以下のタスクが順番に実行されます。

24のバリデーターメタテスト

ここからがRxJavaのテスト戦略で最もユニークな部分です。src/test/java/io/reactivex/rxjava4/validators/ディレクトリには24のテストクラスが存在しますが、これらはオペレーターの動作をテストするものではありません。コードベースそのものを検証するものです。

バリデーター 検証内容
OperatorsAreFinal internal/operators/内のすべてのオペレータークラスがfinalであること
ParamValidationCheckerTest パブリックAPIメソッドがパラメーターを検証していること(nullチェック)
JavadocWording Javadocの表現とスタイルが一貫していること
SourceAnnotationCheck すべてのオペレーターに@BackpressureSupport@SchedulerSupportがあること
InternalWrongNaming 内部クラスが{Type}{Operator}の命名規則に従っていること
OperatorsUseInterfaces オペレーターが具体クラスではなく関数型インターフェースを使用していること
PublicFinalMethods コアタイプのパブリックメソッドがfinalであること
TextualAorAn Javadoc内で冠詞("a"と"an")が正しく使われていること
CatchThrowIfFatalCheck すべてのcatchブロックでExceptions.throwIfFatal()が呼ばれていること
NoAnonymousInnerClassesTest 匿名内部クラスが使われていないこと(デバッグのために名前付きクラスを使う)
TooManyEmptyNewLines 過剰な空行がないこと
NewLinesBeforeAnnotation アノテーション前のフォーマットが一貫していること
JavadocCodesAndLinks Javadocの@codeタグと@linkタグが正しいこと
ParameterNamesInClassesTest -parametersコンパイラーフラグが機能していること

これらのバリデーターはリフレクションとソースファイルの解析を使い、コードベース全体に規約を強制します。たとえばOperatorsAreFinalinternal.operatorsパッケージのすべてのクラスをスキャンし、finalでないものがあればテストを失敗させます。これによりオペレータークラスの意図しない継承を防ぎ、クラスの同一性に関する内部前提が壊れるのを防いでいます。

最もセーフティクリティカルなのがCatchThrowIfFatalCheckです。catch (Throwableブロックを持つソースファイルを検索し、catchの直後の行がExceptions.throwIfFatal(ex)であることを確認します。第2回で触れたように、VirtualMachineErrorLinkageErrorが握りつぶされないよう保証するためです。

メタテストはCIで最初に実行されます。フルテストスイートの前に動くため、規約違反は20分のテスト実行を待つことなく、数秒以内に検出されます。

CIパイプラインと品質ゲート

.github/workflows/gradle_branch.ymlのGitHub Actionsワークフローは、2段階のパイプラインを実装しています。

- name: Run Validity Tests Upfront
  run: ./gradlew test --tests "io.reactivex.rxjava4.validators.*" --stacktrace --no-daemon
- name: Build RxJava
  run: ./gradlew build --stacktrace

第1段階は24のバリデーターだけを実行します。通常1分以内に完了し、規約違反があればフルテストスイートを待たず即座にビルドを失敗させます。

第2段階はフルのbuildタスクを実行します。内容は以下のとおりです。

  • JUnit Platformを通じたJUnitテスト
  • 別途testNGタスクによるTestNGテスト
  • JaCoCoカバレッジレポート
  • Checkstyle強制
flowchart TD
    PUSH[Push to branch] --> SETUP[Setup JDK 26]
    SETUP --> VAL[Run Validators<br>24 meta-tests ~1min]
    VAL -->|Fail| STOP[❌ Fast fail]
    VAL -->|Pass| BUILD[Full build]
    BUILD --> JUNIT[JUnit tests<br>~15min]
    BUILD --> TESTNG[TestNG tests<br>~10min]
    BUILD --> JACOCO[JaCoCo coverage]
    BUILD --> STYLE[Checkstyle]
    JUNIT --> CODECOV[Upload to Codecov]
    TESTNG --> CODECOV
    CODECOV --> JAVADOC[Generate Javadoc]

build.gradleのテスト設定で注目すべきは並列実行の扱いです。

test {
    maxHeapSize = "1200m"
    if (System.getenv("CI") != null) {
        maxParallelForks = Runtime.runtime.availableProcessors()
    } else {
        maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) ?: 1
    }
    useJUnitPlatform()
}

CIでは全プロセッサーをフル活用し、ローカル開発環境では半分に抑えています。1200mのヒープが必要なのは、バックプレッシャーシナリオのテストで大量のアイテムをバッファリングするケースが多いためです。

Reactive Streams TCKとJMHベンチマーク

RxJavaにはReactive Streams Technology Compatibility Kit(TCK)に対するコンプライアンステストが含まれています。TCK自体がTestNGをベースにしているため、これらのテストもTestNGで記述されており、FlowableとそのプロセッサーがReactive Streams仕様——特にサブスクリプション管理、キャンセル、デマンドシグナリングに関する厄介なルール群——を正しく実装していることを検証します。

TCKの依存関係はテストスコープで宣言されます。

testImplementation "org.reactivestreams:reactive-streams-tck:$reactiveStreamsVersion"
testImplementation "org.reactivestreams:reactive-streams-tck-flow:$reactiveStreamsVersion"

-flowバリアントはjava.util.concurrent.Flow.Publisherに対してテストを行い、RxJava 4.xの実装に対応しています。

JMH(Java Microbenchmark Harness)ベンチマークはsrc/jmh/に置かれ、主要なオペレーターパターンのスループットを計測します。

jmh {
    jmhVersion = jmhLibVersion
    humanOutputFile = null
    includeTests = false
}

./gradlew jmh -Pjmh=PatternNameで特定のベンチマークを実行できます。共有CIランナーではノイズが多いためベンチマークは通常のCIビルドには含まれませんが、ドレインループ、キュー操作、フュージョンといったパフォーマンス重要な処理を触るときにローカルで実行できるよう用意されています。

大局的な視点:アーキテクチャとしてのテスト

RxJavaのテストインフラが示す哲学は明快です。規約はコードである、ということです。命名規則やアノテーション要件、エラーハンドリングパターンを誰も読まないwikiに書き残すのではなく、ビルドを失敗させる実行可能なテストとして記述する。この考え方が全体を貫いています。500を超えるオペレーターファイルに多くのコントリビューターが関わるプロジェクトでは特に重要です。バリデーターがあることで誰が書いたコードでも一貫性が保たれます。

3層のアプローチ(規約バリデーター、機能テスト、仕様準拠テスト)はそれぞれ異なる種類の正しさをカバーしています。

  1. 規約バリデーター:「コードは正しく構造化されているか?」(高速で構造的なエラーを検出)
  2. 機能テスト:「オペレーターは正しく動作するか?」(網羅的でロジックのエラーを検出)
  3. TCKコンプライアンス:「ライブラリは仕様を満たしているか?」(権威あるプロトコルエラーの検出)

この構造があったからこそ、RxJavaは10年の開発、複数のメジャーバージョン、数百人のコントリビューターを経ても品質を維持し続けることができました。24のバリデーターは、コードベースの縁の下の力持ちです。54,000行ものロックフリーな並行処理コードが混沌へ陥らずに済んでいるのは、まさにこれらの存在のおかげです。


以上で、RxJavaの内部構造を7回にわたって深掘りするシリーズは完結です。パブリックAPIの表面からサブスクライブチェーン、ドレインループ、フュージョンプロトコル、スケジューラーシステム、バックプレッシャーの仕組み、仮想スレッドとの統合、そしてテストインフラまで、その全体像を追いかけてきました。このコードベースは丁寧に読み解くほど多くを教えてくれます。ここで見てきたパターン(WIPドレインループ、アトミックセンチネル、プリフェッチ/リミット、シールドされたドキュメントインターフェース)は、リアクティブプログラミングの枠を超えて応用できるものばかりです。Javaで高性能かつ保守性の高い並行システムを構築するための、普遍的な教訓と言えるでしょう。