深入 RxJava:架构概览与 877 个源文件的导航指南
前置知识
- ›对响应式编程基本概念有所了解(Observable、subscribe、operators)
- ›了解 Java 模块系统基础(module-info.java)
深入 RxJava:架构概览与 877 个源文件的导航指南
RxJava 的公共 API 用起来行云流水——链式调用几个 operator,subscribe,搞定。但在这份流畅的背后,是一个密度极高的代码库:数百个 operator 实现、无锁队列、三种截然不同的订阅协议,以及一套如今已横跨平台线程与虚拟线程的 scheduler 系统。本系列将带你拆开这台机器。我们从这里开始,先画一张地图。
项目基本信息与构建系统
RxJava 4.x 是一个单模块的 Gradle 项目,以 Java 26 为基准版本。这相比支持 Java 8 的 RxJava 3.x 是一次重大跨越。升级带来了虚拟线程、sealed 类型、record 以及模式匹配——4.x 代码库对这些特性均有所运用。
构建配置位于 build.gradle#L64-L70:
java {
toolchain {
languageVersion = JavaLanguageVersion.of(toolchainJdk)
}
sourceCompatibility = JavaVersion.VERSION_26
targetCompatibility = JavaVersion.VERSION_26
}
有一个细节值得特别关注:RxJava 4.x 零运行时依赖。外部的 org.reactivestreams 库已经移除——Flowable 现在直接实现 JDK 自带的 java.util.concurrent.Flow.Publisher。唯一的外部依赖全部限定在测试范围:JUnit、TestNG、Mockito、Guava、Reactive Streams TCK 以及 JMH。
项目身份信息定义在 gradle.properties 中:
group=io.reactivex.rxjava4
version=4.0.0-SNAPSHOT
POM_ARTIFACT_ID=rxjava
提示: 系统属性中的
rx4.*前缀(如rx4.buffer-size和rx4.computation-threads)是 RxJava 4.x 的命名空间,与 3.x 的rx3.*互不干扰。这意味着两个版本可以并行运行,不会出现属性冲突。
目录结构一览
源码树的目录嵌套较深,但结构规律清晰。以下是核心包的分布情况:
| 包路径 | 用途 | 大致文件数 |
|---|---|---|
core/ |
5+2 响应式类型、Scheduler、注解 | ~20 |
core/docs/ |
用于拆分 Javadoc 的 sealed 接口 | ~2 |
core/config/ |
配置 record(如 FlatMapConfig) | ~1 |
internal/operators/flowable/ |
Flowable operator 实现 | ~180 |
internal/operators/observable/ |
Observable operator 实现 | ~150 |
internal/operators/single/ |
Single operator 实现 | ~50 |
internal/operators/maybe/ |
Maybe operator 实现 | ~50 |
internal/operators/completable/ |
Completable operator 实现 | ~45 |
internal/operators/mixed/ |
跨类型 operator(如 Flowable→Single) | ~15 |
internal/operators/streamable/ |
Streamable operator 实现 | ~5 |
internal/schedulers/ |
Scheduler 实现 | ~15 |
internal/subscribers/ |
operator 内部使用的基础 subscriber 类 | ~10 |
internal/subscriptions/ |
Subscription 辅助工具 | ~5 |
internal/util/ |
BackpressureHelper、AtomicThrowable 等 | ~10 |
internal/virtual/ |
虚拟线程桥接实现 | ~3 |
schedulers/ |
公共 scheduler API(Schedulers、TestScheduler) | ~5 |
plugins/ |
RxJavaPlugins 全局钩子 | 1 |
operators/ |
公共队列与 fusion 接口 | ~8 |
subjects/、processors/ |
热源(Hot sources) | ~10 |
parallel/ |
ParallelFlowable | ~25 |
五个核心响应式类型的文件体积都相当可观。光是 Flowable.java 就超过 21,000 行。这些文件定义了完整的流式 API 接口——你调用过的每一个 map()、filter()、flatMap() 都是这些类上的方法。但每个 operator 的具体实现,则分散在 internal/operators/ 下的独立类中。
公共/内部 API 边界
RxJava 通过 Java 模块系统强制划定了 API 边界。src/main/module/module-info.java#L14-L34 中的模块定义明确声明了哪些包对外暴露:
module io.reactivex.rxjava4 {
exports io.reactivex.rxjava4.annotations;
exports io.reactivex.rxjava4.core;
exports io.reactivex.rxjava4.core.docs;
exports io.reactivex.rxjava4.core.config;
exports io.reactivex.rxjava4.disposables;
exports io.reactivex.rxjava4.exceptions;
// ... 另外 10 个导出包
requires java.management;
}
注意没有出现在列表中的部分:所有 io.reactivex.rxjava4.internal 下的内容。而绝大多数代码恰恰住在这里——所有 operator 实现、队列数据结构、scheduler 内部逻辑以及工具类。在模块路径下运行时,库的使用者根本无法访问这些类型。
flowchart TD
subgraph Public API ["Public API (exported)"]
A[core/ - Flowable, Observable, etc.]
B[schedulers/ - Schedulers, TestScheduler]
C[plugins/ - RxJavaPlugins]
D[operators/ - QueueFuseable, SpscArrayQueue]
E[disposables/ - Disposable]
end
subgraph Internal ["Internal (not exported)"]
F[internal/operators/ ~500 files]
G[internal/schedulers/]
H[internal/subscribers/]
I[internal/util/]
J[internal/virtual/]
end
A -->|"delegates to"| F
B -->|"creates"| G
A -->|"uses"| H
F -->|"uses"| I
这种设计使得公共 API 接口面相对于实现体量来说极为精简。五个核心类充当门面(facade)——它们的方法只是薄薄的委托层,负责构造内部 operator 对象。
5+2 核心响应式类型
RxJava 定义了五种主要响应式类型和两种专用类型,各自适用于不同场景:
classDiagram
class Flowable~T~ {
<<non-sealed>>
+subscribe(Subscriber)
+subscribeActual(Subscriber)*
Backpressure: Yes
Items: 0..N
}
class Observable~T~ {
<<abstract>>
+subscribe(Observer)
+subscribeActual(Observer)*
Backpressure: No
Items: 0..N
}
class Single~T~ {
<<abstract>>
+subscribe(SingleObserver)
Items: exactly 1
}
class Maybe~T~ {
<<abstract>>
+subscribe(MaybeObserver)
Items: 0 or 1
}
class Completable {
<<abstract>>
+subscribe(CompletableObserver)
Items: 0
}
class Streamable~T~ {
<<interface>>
+stream(): Streamer~T~
Pull-based async enumerable
}
class ParallelFlowable~T~ {
<<abstract>>
+subscribe(Subscriber[])
Splits across rails
}
Flowable 是主力类型——支持背压(backpressure)、实现 Flow.Publisher,适用于生产者可能超过消费者处理速度的场景。Observable 是它的轻量版:没有背压开销,非常适合 UI 事件以及本身就有上界的数据源。
Single、Maybe 和 Completable 是基数受限类型。它们存在的意义在于:发射恰好一个值的流(比如一次 HTTP 响应)与无界流在语义上有本质区别,这些类型将该约束直接编码进了类型系统。
4.x 新增的两种类型是 Streamable(一种面向虚拟线程设计的拉取式异步枚举)和 ParallelFlowable(将数据流拆分到多个并行"轨道"上处理 CPU 密集型任务)。Streamable 的详细内容将在第 6 部分深入讲解。
提示: 选择类型时,优先考虑基数:0 个元素用
Completable,0 或 1 个用Maybe,恰好 1 个用Single,0 到 N 个用Flowable或Observable。然后再考虑背压:网络/IO 数据源选Flowable,UI 事件选Observable。
Operator 导航:命名规范
面对 500 多个 operator 文件,你需要一套系统来快速定位。RxJava 提供了一套严格的命名规范来解决这个问题:
规律: {ReactiveType}{OperatorName}.java,位于 internal/operators/{type}/ 下
Flowable.map()→FlowableMap.java,位于internal/operators/flowable/Observable.map()→ObservableMap.java,位于internal/operators/observable/Flowable.flatMap()→FlowableFlatMap.javaSingle.map()→SingleMap.java
这一规范由第 7 部分将介绍的验证元测试强制执行,InternalWrongNaming 校验器会自动捕捉违规情况。
部分 operator 还带有后缀,用于表示跨类型转换或重载变体:
FlowableCollectSingle.java—— 返回 Single 的 Flowable operatorFlowableElementAtMaybe.java—— 返回 MaybeFlowableConcatMapScheduler.java—— 带 scheduler 参数的变体
一旦掌握这个规律,从任意 API 调用定位到对应实现只需几秒钟。
配置:系统属性与 RxJavaPlugins
RxJava 不使用配置文件,而是提供两种配置机制:系统属性和插件钩子系统。
系统属性(在类加载前设置)用于控制缓冲区大小和线程池,全部使用 rx4. 前缀:
| 属性 | 默认值 | 作用 |
|---|---|---|
rx4.buffer-size |
128 | 默认 operator 缓冲区大小 |
rx4.computation-threads |
CPU 核心数 | computation scheduler 线程池大小 |
rx4.cached-keep-alive-time |
60(秒) | IO scheduler worker 过期时间 |
rx4.computation-priority |
NORM_PRIORITY | computation 线程优先级 |
rx4.scheduler.use-nanotime |
false | 使用 nanoTime 替代 currentTimeMillis |
RxJavaPlugins 提供运行时钩子,用于错误处理、scheduler 替换以及埋点监控。钩子字段定义在 RxJavaPlugins.java#L34-L135 中:
flowchart LR
subgraph RxJavaPlugins
EH[errorHandler]
SA[onFlowableAssembly]
SS[onFlowableSubscribe]
SCH[onScheduleHandler]
SI[onInitComputationHandler]
LD[lockdown]
end
SA -->|"intercepts"| OP[Operator creation]
SS -->|"intercepts"| SUB[subscribe call]
EH -->|"catches"| UE[Undeliverable errors]
SCH -->|"wraps"| RUN[Scheduled Runnables]
LD -->|"freezes"| SA
LD -->|"freezes"| SS
lockdown() 方法在生产环境中尤为重要——一旦调用,所有插件钩子都无法再修改,从而防止各库之间相互干扰彼此的配置。
下一步
有了这张地图,我们就可以深入探索内部机制了。第 2 部分将完整追踪 subscribe 链路——从调用 flowable.subscribe(subscriber) 开始,经过插件钩子,向下穿过抽象的 subscribeActual(),直到使整个流式 API 得以运转的 operator 装饰器模式。我们将以 FlowableMap 为典型示例,解析 RxJava 中每一个 operator 的构建方式。