Read OSS

深入 RxJava:架构概览与 877 个源文件的导航指南

中级

前置知识

  • 对响应式编程基本概念有所了解(Observable、subscribe、operators)
  • 了解 Java 模块系统基础(module-info.java)

深入 RxJava:架构概览与 877 个源文件的导航指南

RxJava 的公共 API 用起来行云流水——链式调用几个 operator,subscribe,搞定。但在这份流畅的背后,是一个密度极高的代码库:数百个 operator 实现、无锁队列、三种截然不同的订阅协议,以及一套如今已横跨平台线程与虚拟线程的 scheduler 系统。本系列将带你拆开这台机器。我们从这里开始,先画一张地图。

项目基本信息与构建系统

RxJava 4.x 是一个单模块的 Gradle 项目,以 Java 26 为基准版本。这相比支持 Java 8 的 RxJava 3.x 是一次重大跨越。升级带来了虚拟线程、sealed 类型、record 以及模式匹配——4.x 代码库对这些特性均有所运用。

构建配置位于 build.gradle#L64-L70

java {
    toolchain {
        languageVersion = JavaLanguageVersion.of(toolchainJdk)
    }
    sourceCompatibility = JavaVersion.VERSION_26
    targetCompatibility = JavaVersion.VERSION_26
}

有一个细节值得特别关注:RxJava 4.x 零运行时依赖。外部的 org.reactivestreams 库已经移除——Flowable 现在直接实现 JDK 自带的 java.util.concurrent.Flow.Publisher。唯一的外部依赖全部限定在测试范围:JUnit、TestNG、Mockito、Guava、Reactive Streams TCK 以及 JMH。

项目身份信息定义在 gradle.properties 中:

group=io.reactivex.rxjava4
version=4.0.0-SNAPSHOT
POM_ARTIFACT_ID=rxjava

提示: 系统属性中的 rx4.* 前缀(如 rx4.buffer-sizerx4.computation-threads)是 RxJava 4.x 的命名空间,与 3.x 的 rx3.* 互不干扰。这意味着两个版本可以并行运行,不会出现属性冲突。

目录结构一览

源码树的目录嵌套较深,但结构规律清晰。以下是核心包的分布情况:

包路径 用途 大致文件数
core/ 5+2 响应式类型、Scheduler、注解 ~20
core/docs/ 用于拆分 Javadoc 的 sealed 接口 ~2
core/config/ 配置 record(如 FlatMapConfig) ~1
internal/operators/flowable/ Flowable operator 实现 ~180
internal/operators/observable/ Observable operator 实现 ~150
internal/operators/single/ Single operator 实现 ~50
internal/operators/maybe/ Maybe operator 实现 ~50
internal/operators/completable/ Completable operator 实现 ~45
internal/operators/mixed/ 跨类型 operator(如 Flowable→Single) ~15
internal/operators/streamable/ Streamable operator 实现 ~5
internal/schedulers/ Scheduler 实现 ~15
internal/subscribers/ operator 内部使用的基础 subscriber 类 ~10
internal/subscriptions/ Subscription 辅助工具 ~5
internal/util/ BackpressureHelper、AtomicThrowable 等 ~10
internal/virtual/ 虚拟线程桥接实现 ~3
schedulers/ 公共 scheduler API(Schedulers、TestScheduler) ~5
plugins/ RxJavaPlugins 全局钩子 1
operators/ 公共队列与 fusion 接口 ~8
subjects/processors/ 热源(Hot sources) ~10
parallel/ ParallelFlowable ~25

五个核心响应式类型的文件体积都相当可观。光是 Flowable.java 就超过 21,000 行。这些文件定义了完整的流式 API 接口——你调用过的每一个 map()filter()flatMap() 都是这些类上的方法。但每个 operator 的具体实现,则分散在 internal/operators/ 下的独立类中。

公共/内部 API 边界

RxJava 通过 Java 模块系统强制划定了 API 边界。src/main/module/module-info.java#L14-L34 中的模块定义明确声明了哪些包对外暴露:

module io.reactivex.rxjava4 {
    exports io.reactivex.rxjava4.annotations;
    exports io.reactivex.rxjava4.core;
    exports io.reactivex.rxjava4.core.docs;
    exports io.reactivex.rxjava4.core.config;
    exports io.reactivex.rxjava4.disposables;
    exports io.reactivex.rxjava4.exceptions;
    // ... 另外 10 个导出包
    requires java.management;
}

注意没有出现在列表中的部分:所有 io.reactivex.rxjava4.internal 下的内容。而绝大多数代码恰恰住在这里——所有 operator 实现、队列数据结构、scheduler 内部逻辑以及工具类。在模块路径下运行时,库的使用者根本无法访问这些类型。

flowchart TD
    subgraph Public API ["Public API (exported)"]
        A[core/ - Flowable, Observable, etc.]
        B[schedulers/ - Schedulers, TestScheduler]
        C[plugins/ - RxJavaPlugins]
        D[operators/ - QueueFuseable, SpscArrayQueue]
        E[disposables/ - Disposable]
    end
    subgraph Internal ["Internal (not exported)"]
        F[internal/operators/ ~500 files]
        G[internal/schedulers/]
        H[internal/subscribers/]
        I[internal/util/]
        J[internal/virtual/]
    end
    A -->|"delegates to"| F
    B -->|"creates"| G
    A -->|"uses"| H
    F -->|"uses"| I

这种设计使得公共 API 接口面相对于实现体量来说极为精简。五个核心类充当门面(facade)——它们的方法只是薄薄的委托层,负责构造内部 operator 对象。

5+2 核心响应式类型

RxJava 定义了五种主要响应式类型和两种专用类型,各自适用于不同场景:

classDiagram
    class Flowable~T~ {
        <<non-sealed>>
        +subscribe(Subscriber)
        +subscribeActual(Subscriber)*
        Backpressure: Yes
        Items: 0..N
    }
    class Observable~T~ {
        <<abstract>>
        +subscribe(Observer)
        +subscribeActual(Observer)*
        Backpressure: No
        Items: 0..N
    }
    class Single~T~ {
        <<abstract>>
        +subscribe(SingleObserver)
        Items: exactly 1
    }
    class Maybe~T~ {
        <<abstract>>
        +subscribe(MaybeObserver)
        Items: 0 or 1
    }
    class Completable {
        <<abstract>>
        +subscribe(CompletableObserver)
        Items: 0
    }
    class Streamable~T~ {
        <<interface>>
        +stream(): Streamer~T~
        Pull-based async enumerable
    }
    class ParallelFlowable~T~ {
        <<abstract>>
        +subscribe(Subscriber[])
        Splits across rails
    }

Flowable 是主力类型——支持背压(backpressure)、实现 Flow.Publisher,适用于生产者可能超过消费者处理速度的场景。Observable 是它的轻量版:没有背压开销,非常适合 UI 事件以及本身就有上界的数据源。

SingleMaybeCompletable 是基数受限类型。它们存在的意义在于:发射恰好一个值的流(比如一次 HTTP 响应)与无界流在语义上有本质区别,这些类型将该约束直接编码进了类型系统。

4.x 新增的两种类型是 Streamable(一种面向虚拟线程设计的拉取式异步枚举)和 ParallelFlowable(将数据流拆分到多个并行"轨道"上处理 CPU 密集型任务)。Streamable 的详细内容将在第 6 部分深入讲解。

提示: 选择类型时,优先考虑基数:0 个元素用 Completable,0 或 1 个用 Maybe,恰好 1 个用 Single,0 到 N 个用 FlowableObservable。然后再考虑背压:网络/IO 数据源选 Flowable,UI 事件选 Observable

Operator 导航:命名规范

面对 500 多个 operator 文件,你需要一套系统来快速定位。RxJava 提供了一套严格的命名规范来解决这个问题:

规律: {ReactiveType}{OperatorName}.java,位于 internal/operators/{type}/

  • Flowable.map()FlowableMap.java,位于 internal/operators/flowable/
  • Observable.map()ObservableMap.java,位于 internal/operators/observable/
  • Flowable.flatMap()FlowableFlatMap.java
  • Single.map()SingleMap.java

这一规范由第 7 部分将介绍的验证元测试强制执行,InternalWrongNaming 校验器会自动捕捉违规情况。

部分 operator 还带有后缀,用于表示跨类型转换或重载变体:

  • FlowableCollectSingle.java —— 返回 Single 的 Flowable operator
  • FlowableElementAtMaybe.java —— 返回 Maybe
  • FlowableConcatMapScheduler.java —— 带 scheduler 参数的变体

一旦掌握这个规律,从任意 API 调用定位到对应实现只需几秒钟。

配置:系统属性与 RxJavaPlugins

RxJava 不使用配置文件,而是提供两种配置机制:系统属性和插件钩子系统。

系统属性(在类加载前设置)用于控制缓冲区大小和线程池,全部使用 rx4. 前缀:

属性 默认值 作用
rx4.buffer-size 128 默认 operator 缓冲区大小
rx4.computation-threads CPU 核心数 computation scheduler 线程池大小
rx4.cached-keep-alive-time 60(秒) IO scheduler worker 过期时间
rx4.computation-priority NORM_PRIORITY computation 线程优先级
rx4.scheduler.use-nanotime false 使用 nanoTime 替代 currentTimeMillis

RxJavaPlugins 提供运行时钩子,用于错误处理、scheduler 替换以及埋点监控。钩子字段定义在 RxJavaPlugins.java#L34-L135 中:

flowchart LR
    subgraph RxJavaPlugins
        EH[errorHandler]
        SA[onFlowableAssembly]
        SS[onFlowableSubscribe]
        SCH[onScheduleHandler]
        SI[onInitComputationHandler]
        LD[lockdown]
    end
    SA -->|"intercepts"| OP[Operator creation]
    SS -->|"intercepts"| SUB[subscribe call]
    EH -->|"catches"| UE[Undeliverable errors]
    SCH -->|"wraps"| RUN[Scheduled Runnables]
    LD -->|"freezes"| SA
    LD -->|"freezes"| SS

lockdown() 方法在生产环境中尤为重要——一旦调用,所有插件钩子都无法再修改,从而防止各库之间相互干扰彼此的配置。

下一步

有了这张地图,我们就可以深入探索内部机制了。第 2 部分将完整追踪 subscribe 链路——从调用 flowable.subscribe(subscriber) 开始,经过插件钩子,向下穿过抽象的 subscribeActual(),直到使整个流式 API 得以运转的 operator 装饰器模式。我们将以 FlowableMap 为典型示例,解析 RxJava 中每一个 operator 的构建方式。