RxJava 4.x:Streamable、虚拟线程与响应式 Java 的未来
前置知识
- ›第 2 篇:subscribe-chain-and-operator-anatomy
- ›第 4 篇:scheduler-system-and-threading-model
- ›第 5 篇:backpressure-from-theory-to-implementation
- ›Java 21+ 虚拟线程与结构化并发概念
RxJava 4.x:Streamable、虚拟线程与响应式 Java 的未来
RxJava 4.x 不只是一次版本升级,它是对响应式编程在"线程已不再昂贵"这一新时代中的重新思考。以 Java 26 为基线,虚拟线程得以全面支持,而这恰恰动摇了响应式编程非阻塞模型的根本动机——节省稀缺的 OS 线程。但 RxJava 4.x 并没有就此抛弃响应式,而是选择同时拥抱两种范式:面向高吞吐量 pipeline 的推模式 Flowable,以及全新的拉模式 Streamable——专为可以在虚拟线程上自由阻塞的命令式代码而设计。
4.x 的核心变化:Java 26 基线与 Flow Publisher
最显眼的改动是移除了对外部 Reactive Streams 依赖的引用。在 RxJava 3.x 中,Flowable 实现的是 org.reactivestreams.Publisher;到了 4.x,它直接实现 java.util.concurrent.Flow.Publisher:
public abstract non-sealed class Flowable<@NonNull T> implements Publisher<T>,
FlowableDocBasic<T> {
摘自 Flowable.java#L159-L161。这里的 Publisher 是 java.util.concurrent.Flow.Publisher,自 Java 9 起便已内置于 JDK,因此运行时零外部依赖。
non-sealed 关键字是新面孔——Flowable 现在通过密封接口 FlowableDocBasic 允许扩展。module-info.java 中仅声明依赖 java.management(用于调度器线程池管理中的 ThreadMXBean 访问)。
4.x 在代码库中还引入了诸多现代 Java 特性:
- Records:
FlatMapConfig、Streamer.HiddenStreamer、Streamer.StreamerFinishViaDisposableContainerCanceller - 密封接口:
FlowableDocBasic sealed permits Flowable - 模式匹配:Streamable 中的
if (crash instanceof RuntimeException ex) var:在 4.x 代码中广泛用于局部类型推断AutoCloseable:Disposable extends AutoCloseable,Streamer extends AutoCloseable
Streamable<T>:异步枚举模式
Streamable.java#L35-L48 的 Javadoc 将 Streamable 描述为"Java 世界中全息涌现的 IAsyncEnumerable"。它是 RxJava 对 C# IAsyncEnumerable<T> 的回应——一种专为虚拟线程设计的拉模式异步迭代模式。
public interface Streamable<@NonNull T> {
Streamer<T> stream(@NonNull DisposableContainer cancellation);
default Streamer<T> stream() {
return stream(new CompositeDisposable());
}
}
Flowable 是将数据推送给 Subscriber,而 Streamable 则创建一个由消费者主动拉取的 Streamer。这种反转从根本上改变了编程模型。
Streamer.java#L34-L64 定义了三个核心操作:
public interface Streamer<@NonNull T> extends AutoCloseable {
CompletionStage<Boolean> next(@NonNull DisposableContainer cancellation);
T current();
CompletionStage<Void> finish(@NonNull DisposableContainer cancellation);
}
使用协议如下:调用 next(),它返回一个 CompletionStage<Boolean>。若结果为 true,调用 current() 获取当前值;若为 false,则流已结束。最后调用 finish() 进行清理。
sequenceDiagram
participant Consumer
participant Streamer
participant Source
Consumer->>Streamer: next(cancellation)
Streamer->>Source: Pull next item
Source-->>Streamer: Item available
Streamer-->>Consumer: CompletionStage → true
Consumer->>Streamer: current()
Streamer-->>Consumer: item value
Consumer->>Streamer: next(cancellation)
Streamer->>Source: Pull next item
Source-->>Streamer: Source exhausted
Streamer-->>Consumer: CompletionStage → false
Consumer->>Streamer: finish(cancellation)
Streamer-->>Consumer: CompletionStage → void
真正的强大之处体现在便捷方法上。Streamer.java#L169-L171 中的 awaitNext() 会阻塞虚拟线程,直到下一个元素就绪:
default boolean awaitNext() {
return await(next());
}
而静态方法 await 则充当 async/await 的桥梁:
static <T> T await(CompletionStage<T> stage, DisposableContainer canceller) {
var f = stage.toCompletableFuture();
var d = Disposable.fromFuture(f, true);
try (var _ = canceller.subscribe(d)) {
return f.join(); // Blocks the virtual thread, not the carrier
}
}
f.join() 会产生阻塞,但在虚拟线程上这几乎是零成本的——等待期间,承载线程会被释放去运行其他虚拟线程。这正是 Streamable 能够落地的关键:你写的是命令式阻塞代码,并发问题由运行时负责处理。
提示: 当你的处理逻辑本质上是顺序命令式的时候,
Streamable最能发挥优势——比如逐行读取文件、使用游标分页迭代数据库结果,或者任何用推模式Flowable会迫使你转而思考回调的场景。如果你的工作负载是对流进行 CPU 密集型计算,带有算子融合的Flowable会更快。
VirtualGenerator 与 VirtualEmitter:命令式与响应式的融合
连接命令式代码与响应式流的桥梁是 VirtualGenerator。来自 VirtualGenerator.java#L25-L33:
@FunctionalInterface
public interface VirtualGenerator<T> {
void generate(VirtualEmitter<T> emitter) throws Throwable;
}
以及 VirtualEmitter.java#L23-L37:
public interface VirtualEmitter<T> {
void emit(T item) throws Throwable;
DisposableContainer canceller();
}
用法出人意料地简洁——直接写一个阻塞循环:
Flowable<String> lines = Flowable.virtualCreate(emitter -> {
try (var reader = new BufferedReader(new FileReader("data.txt"))) {
String line;
while ((line = reader.readLine()) != null) {
emitter.emit(line); // Blocks if downstream isn't ready
}
}
});
emit() 调用是其中的关键创新。它会阻塞虚拟线程,直到下游订阅者有足够的请求容量。这以自然的方式实现了背压,开发者无需关心 request(n) 协议——阻塞调用本身就是背压机制。
flowchart LR
subgraph VirtualThread ["Virtual Thread"]
GEN[VirtualGenerator] --> EMIT[emitter.emit item]
EMIT -->|"blocks until demand"| EMIT
end
subgraph ReactiveWorld ["Reactive Pipeline"]
FV[FlowableVirtualCreateExecutor] --> OPS[.map .filter .observeOn]
OPS --> SUB[Subscriber]
end
EMIT -->|"bridges to"| FV
SUB -->|"request n"| FV
FV -->|"unblocks emit"| EMIT
Streamable.java#L131-L135 中的 Streamable.create() 将同一个 VirtualGenerator 接入拉模式:
static <T> Streamable<T> create(VirtualGenerator<T> generator) {
return Flowable.virtualCreate(generator).toStreamable();
}
目前这一实现仍通过 Flowable 进行桥接——代码中的 // FIXME native implementation 注释表明,未来计划提供直接实现。
API 中的现代 Java 特性
用密封接口拆分文档
Flowable 类超过 21,000 行。为了便于管理,4.x 使用密封接口将文档分散到多个文件,同时保持 API 集中在单一类上。
public sealed interface FlowableDocBasic<T> permits Flowable {
这个接口声明了 map()、filter() 等方法及其完整的 Javadoc。Flowable 实现 FlowableDocBasic 并提供实际逻辑。sealed permits Flowable 约束确保没有其他类能实现该接口——这纯粹是一种代码组织技巧,而非扩展点。
classDiagram
class FlowableDocBasic~T~ {
<<sealed>>
+map(Function): Flowable
+filter(Predicate): Flowable
Full Javadoc here
}
class Flowable~T~ {
<<non-sealed>>
Implements all methods
21000+ lines
}
FlowableDocBasic <|.. Flowable : permits
用 Record 封装配置
FlatMapConfig 使用 Java record 来打包算子配置:
public record FlatMapConfig(boolean delayErrors, int maxConcurrency, int bufferSize) {
public FlatMapConfig() {
this(false, Flowable.bufferSize(), Flowable.bufferSize());
}
}
多个构造函数提供了从默认配置到完全自定义的便捷重载。record 的紧凑构造函数负责参数校验:
public FlatMapConfig(boolean delayErrors, int maxConcurrency, int bufferSize) {
ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
// ...
}
Disposable 继承 AutoCloseable
public interface Disposable extends AutoCloseable {
void dispose();
boolean isDisposed();
default void close() {
dispose();
}
}
这使得订阅管理可以使用 try-with-resources——在 Streamable 场景下尤为实用,因为 Streamer 的生命周期是拉模式驱动且显式管理的:
try (var streamer = streamable.stream()) {
while (streamer.awaitNext()) {
process(streamer.current());
}
} // finish() called automatically via close()
推模式与拉模式的融合
RxJava 4.x 将自己定位于两种响应式编程范式的交汇处:
-
推模式(
Flowable):由生产者主导节奏,以背压作为安全阀。适用于算子融合和无锁队列至关重要的高吞吐量 pipeline。 -
拉模式(
Streamable):由消费者主导节奏,自然地在虚拟线程上阻塞。适用于需要与异步数据源交互的命令式代码。
桥接方法 Streamable.toFlowable() 和 Flowable.toStreamable() 支持两种模型之间的无缝转换。这意味着你可以先用虚拟线程上的命令式拉模式代码开始,在需要算子融合和排空循环性能时再切换到优化过的推模式 pipeline。
这种双模型正是 RxJava 4.x 对"既然有了虚拟线程,我们还需要响应式库吗"这一问题的回答。答案并非非此即彼——虚拟线程解决了线程稀缺问题,但并不解决组合、错误处理和流处理问题,而这些正是 RxJava 算子库的核心价值所在。两种模型是互补的,而非竞争关系。
下一篇
至此,我们已经探索了 RxJava 4.x 的全貌——从订阅链到排空循环、调度器、背压,再到全新的虚拟线程集成。第 7 篇,也是本系列的最终篇,我们将聚焦于这一切背后的质量保障体系:验证 21,000 行代码类的测试基础设施、强制执行代码库规范的 24 个元测试、用于确定性时间测试的 TestScheduler,以及在代码合并前拦截问题的 CI pipeline。