Read OSS

守护 54,000 行代码的质量:RxJava 的测试与验证体系

中级

前置知识

  • 第 1 篇:架构与代码库导航
  • 第 4 篇:调度器系统与线程模型(TestScheduler 相关背景)
  • JUnit 5 基础知识

守护 54,000 行代码的质量:RxJava 的测试与验证体系

一个拥有 500 余个 operator 实现、无锁并发模式和三种订阅协议的代码库,天生就脆弱。drain 循环中一个细小的错误可能导致数据丢失,mapper 函数缺少空值检查可能让生产环境崩溃,内部类命名不当则会破坏其他 500 个文件所遵循的导航约定。RxJava 抵御这些风险的方式,是一套远超普通单元测试的多层测试架构——它涵盖了流式响应式断言、虚拟时间控制、24 个用于验证代码库规范的元测试,以及优先执行验证器的 CI 流水线。

TestSubscriber 与 TestObserver:流式响应式断言

测试响应式流本身就充满挑战——信号是异步的,顺序至关重要,还需要同时验证终止事件和数据内容。RxJava 为此提供了 TestSubscriber(用于 Flowable)和 TestObserver(用于 Observable)。

TestSubscriber 实现了 FlowableSubscriber 接口,能够捕获每一个信号:

public class TestSubscriber<T>
    extends BaseTestConsumer<T, TestSubscriber<T>>
    implements FlowableSubscriber<T>, Subscription {
    
    private final Subscriber<? super T> downstream;
    private volatile boolean cancelled;
    private final AtomicReference<Subscription> upstream;
    private final AtomicLong missedRequested;

流式断言 API 让测试代码既简洁又富有表达力:

Flowable.range(1, 5)
    .map(i -> i * 2)
    .test()
    .assertValues(2, 4, 6, 8, 10)
    .assertComplete()
    .assertNoErrors();

.test() 方法是一个便捷入口,它会创建 TestSubscriber、完成订阅并将其返回——一步到位。如果需要测试背压行为,还可以精确控制请求数量:

Flowable.range(1, 100)
    .test(0)                    // Initial request of 0
    .assertEmpty()              // No items received
    .requestMore(5)             // Request 5
    .assertValueCount(5)        // Got exactly 5
    .requestMore(Long.MAX_VALUE) // Unbounded
    .assertValueCount(100)
    .assertComplete();
classDiagram
    class BaseTestConsumer~T_U~ {
        #values: List~T~
        #errors: List~Throwable~
        #completions: long
        +assertValues(T...): U
        +assertError(Class): U
        +assertComplete(): U
        +assertNotComplete(): U
        +awaitDone(long, TimeUnit): U
        +assertValueCount(int): U
    }
    class TestSubscriber~T~ {
        -upstream: AtomicReference~Subscription~
        -missedRequested: AtomicLong
        +requestMore(long)
        +cancel()
    }
    class TestObserver~T~ {
        -upstream: AtomicReference~Disposable~
        +dispose()
    }
    
    BaseTestConsumer <|-- TestSubscriber
    BaseTestConsumer <|-- TestObserver

awaitDone() 方法在测试异步 operator 时不可或缺——它会阻塞测试线程,直到订阅者收到终止事件或超时为止:

Flowable.just(1)
    .delay(100, TimeUnit.MILLISECONDS)
    .test()
    .awaitDone(1, TimeUnit.SECONDS)
    .assertValue(1);

建议: 测试基于时间的 operator 时,应优先使用下文介绍的 TestScheduler,而非依赖真实延迟的 awaitDone()。真实延迟会让测试变慢且不稳定。awaitDone 适合那些确实需要等待异步任务完成的场景,例如测试使用真实调度器的 subscribeOn

TestScheduler:用虚拟时间实现确定性测试

TestScheduler 提供了对时间的手动控制能力,让你无需任何真实等待,就能对 delay()interval()timeout()debounce() 等 operator 进行确定性测试:

public final class TestScheduler extends Scheduler {
    final Queue<TimedRunnable> queue = new PriorityBlockingQueue<>(11);
    volatile long time;  // Stored in nanoseconds

通过 TestScheduler 调度的任务会按照预定时间放入优先队列,虚拟时钟只在你主动推进时才会前进:

  • advanceTimeBy(amount, unit) —— 将时钟向前推进指定的相对时长
  • advanceTimeTo(time, unit) —— 将时钟推进到指定的绝对时间点
  • triggerActions() —— 执行当前时间点所有到期的任务
sequenceDiagram
    participant Test
    participant TS as TestScheduler
    participant Op as delay operator
    
    Test->>TS: Create TestScheduler (time=0)
    Test->>Op: Flowable.just(1).delay(5, SECONDS, testScheduler)
    Test->>Op: .test()
    
    Note over Test: No time has passed
    Test->>TS: ts.assertEmpty()
    
    Test->>TS: advanceTimeBy(3, SECONDS)
    Note over TS: time=3s, delay=5s — not yet
    Test->>TS: ts.assertEmpty()
    
    Test->>TS: advanceTimeBy(2, SECONDS)
    Note over TS: time=5s, delay=5s — fire!
    TS->>Op: Execute delayed onNext
    Test->>TS: ts.assertValue(1).assertComplete()

这对于测试复杂的时序场景极为强大:

TestScheduler scheduler = new TestScheduler();

TestSubscriber<Long> ts = Flowable.interval(1, TimeUnit.SECONDS, scheduler)
    .test();

ts.assertEmpty();

scheduler.advanceTimeBy(3, TimeUnit.SECONDS);
ts.assertValues(0L, 1L, 2L);

scheduler.advanceTimeBy(2, TimeUnit.SECONDS);
ts.assertValues(0L, 1L, 2L, 3L, 4L);

TestScheduler 创建的 TestWorker 实例会遵循虚拟时钟的节奏。延迟任务在入队时会记录目标时间,advanceTimeBy 会按顺序触发所有目标时间 ≤ 新时钟值的任务。

24 个验证器元测试

这正是 RxJava 测试体系最与众不同之处。src/test/java/io/reactivex/rxjava4/validators/ 目录下有 24 个测试类,它们测试的不是 operator 的行为,而是代码库本身

验证器 检查内容
OperatorsAreFinal internal/operators/ 中的所有 operator 类是否声明为 final
ParamValidationCheckerTest 公开 API 方法是否对参数进行了校验(如空值检查)
JavadocWording Javadoc 的措辞和风格是否一致
SourceAnnotationCheck 每个 operator 是否都标注了 @BackpressureSupport@SchedulerSupport
InternalWrongNaming 内部类是否遵循 {Type}{Operator} 命名规范
OperatorsUseInterfaces operator 是否使用函数式接口而非具体类
PublicFinalMethods 核心类型上的公开方法是否声明为 final
TextualAorAn Javadoc 中冠词("a" 与 "an")的使用是否正确
CatchThrowIfFatalCheck 所有 catch 块是否调用了 Exceptions.throwIfFatal()
NoAnonymousInnerClassesTest 是否存在匿名内部类(应使用具名类以便调试)
TooManyEmptyNewLines 是否存在过多空行
NewLinesBeforeAnnotation 注解前的格式是否统一
JavadocCodesAndLinks Javadoc 中 @code@link 标签的使用是否正确
ParameterNamesInClassesTest -parameters 编译器标志是否生效

这些验证器通过反射和源文件解析,在整个代码库范围内强制执行规范。以 OperatorsAreFinal 为例,它会扫描 internal.operators 包下的每一个类,只要发现未标记为 final 的,测试就会失败。这从根本上杜绝了 operator 类被意外继承的可能,而这种继承可能破坏内部对类标识的假设。

CatchThrowIfFatalCheck 则是安全性最为关键的一个——它扫描源文件中的 catch (Throwable 块,验证 catch 之后的第一行是否为 Exceptions.throwIfFatal(ex)。这确保了 VirtualMachineErrorLinkageError 永远不会被静默吞掉,正如我们在第 2 篇中所讨论的那样。

这些元测试在 CI 中最先运行,早于完整测试套件。这意味着规范违规能在几秒内被发现,而不必等待长达 20 分钟的完整测试跑完。

CI 流水线与质量门禁

位于 .github/workflows/gradle_branch.yml 的 GitHub Actions 工作流实现了两阶段流水线:

- name: Run Validity Tests Upfront
  run: ./gradlew test --tests "io.reactivex.rxjava4.validators.*" --stacktrace --no-daemon
- name: Build RxJava
  run: ./gradlew build --stacktrace

第一阶段只运行 24 个验证器,通常不到一分钟就能完成。一旦发现规范违规,构建立即失败,不会在完整测试套件上浪费任何时间。

第二阶段执行完整的 build 任务,包括:

  • 通过 JUnit Platform 运行 JUnit 测试
  • 通过独立的 testNG 任务运行 TestNG 测试
  • 生成 JaCoCo 覆盖率报告
  • 执行 Checkstyle 检查
flowchart TD
    PUSH[Push to branch] --> SETUP[Setup JDK 26]
    SETUP --> VAL[Run Validators<br>24 meta-tests ~1min]
    VAL -->|Fail| STOP[❌ Fast fail]
    VAL -->|Pass| BUILD[Full build]
    BUILD --> JUNIT[JUnit tests<br>~15min]
    BUILD --> TESTNG[TestNG tests<br>~10min]
    BUILD --> JACOCO[JaCoCo coverage]
    BUILD --> STYLE[Checkstyle]
    JUNIT --> CODECOV[Upload to Codecov]
    TESTNG --> CODECOV
    CODECOV --> JAVADOC[Generate Javadoc]

build.gradle 中的测试配置值得关注,尤其是并行化的处理方式:

test {
    maxHeapSize = "1200m"
    if (System.getenv("CI") != null) {
        maxParallelForks = Runtime.runtime.availableProcessors()
    } else {
        maxParallelForks = Runtime.runtime.availableProcessors().intdiv(2) ?: 1
    }
    useJUnitPlatform()
}

CI 环境使用全部可用处理器并行运行测试,本地开发则使用一半。1200m 的堆内存是必要的,因为许多测试会演练背压场景,需要在内存中缓冲大量数据项。

Reactive Streams TCK 与 JMH 基准测试

RxJava 内置了针对 Reactive Streams 技术兼容性套件(TCK)的合规测试。这些测试基于 TestNG(TCK 本身使用 TestNG),用于验证 Flowable 及其 processor 是否正确实现了 Reactive Streams 规范——包括订阅管理、取消操作和需求信号等出了名难以正确实现的规则。

TCK 依赖以 test 作用域引入:

testImplementation "org.reactivestreams:reactive-streams-tck:$reactiveStreamsVersion"
testImplementation "org.reactivestreams:reactive-streams-tck-flow:$reactiveStreamsVersion"

-flow 变体针对 java.util.concurrent.Flow.Publisher 进行测试,与 RxJava 4.x 的实现相对应。

JMH(Java Microbenchmark Harness)基准测试位于 src/jmh/ 目录,用于衡量关键 operator 模式的吞吐量:

jmh {
    jmhVersion = jmhLibVersion
    humanOutputFile = null
    includeTests = false
}

可以通过 ./gradlew jmh -Pjmh=PatternName 运行针对性的性能测试。这些基准测试不纳入常规 CI 构建(共享 CI runner 上的基准测试结果本身就有噪声),但当开发者需要优化 drain 循环、队列操作、fusion 等性能敏感路径时,可以在本地运行。

更大的视角:测试即架构

RxJava 的测试基础设施体现了一种核心理念:规范即代码。与其把命名规则、注解要求和错误处理规范写进一份无人阅读的 wiki,不如将它们编码为可执行的测试,让构建失败来说话。这对于一个横跨 500 余个 operator 文件、由众多贡献者共同维护的项目而言尤为重要——无论代码出自何人之手,验证器都能确保一致性。

三层测试方法——规范验证器、功能测试和规格合规测试——分别覆盖了不同维度的正确性:

  1. 规范验证器:「代码结构是否正确?」(速度快,捕获结构性错误)
  2. 功能测试:「operator 的行为是否正确?」(覆盖全面,捕获逻辑错误)
  3. TCK 合规测试:「库是否符合规范?」(权威可靠,捕获协议错误)

正是这套架构,让 RxJava 在历经十年开发、多个主要版本迭代、数百位开发者贡献之后,依然保持着高质量。那 24 个验证器在很多意义上是整个代码库的无名英雄——正是它们,让 54,000 行无锁并发代码没有滑入混乱的深渊。


至此,我们对 RxJava 内部机制的七篇深度解析画上了句号。我们从公开 API 层出发,一路追踪了订阅链、drain 循环、fusion 协议、调度器系统、背压机制、虚拟线程集成,直到测试基础设施。这个代码库值得深入研读——你在这里看到的模式(WIP drain 循环、原子哨兵、prefetch/limit、封闭的文档接口)远不止适用于响应式编程。它们是在 Java 中构建高性能、可维护并发系统的通用经验。