Read OSS

RxJava 4.x:Streamable、虚拟线程与响应式 Java 的未来

高级

前置知识

  • 第 2 篇:subscribe-chain-and-operator-anatomy
  • 第 4 篇:scheduler-system-and-threading-model
  • 第 5 篇:backpressure-from-theory-to-implementation
  • Java 21+ 虚拟线程与结构化并发概念

RxJava 4.x:Streamable、虚拟线程与响应式 Java 的未来

RxJava 4.x 不只是一次版本升级,它是对响应式编程在"线程已不再昂贵"这一新时代中的重新思考。以 Java 26 为基线,虚拟线程得以全面支持,而这恰恰动摇了响应式编程非阻塞模型的根本动机——节省稀缺的 OS 线程。但 RxJava 4.x 并没有就此抛弃响应式,而是选择同时拥抱两种范式:面向高吞吐量 pipeline 的推模式 Flowable,以及全新的拉模式 Streamable——专为可以在虚拟线程上自由阻塞的命令式代码而设计。

4.x 的核心变化:Java 26 基线与 Flow Publisher

最显眼的改动是移除了对外部 Reactive Streams 依赖的引用。在 RxJava 3.x 中,Flowable 实现的是 org.reactivestreams.Publisher;到了 4.x,它直接实现 java.util.concurrent.Flow.Publisher

public abstract non-sealed class Flowable<@NonNull T> implements Publisher<T>,
    FlowableDocBasic<T> {

摘自 Flowable.java#L159-L161。这里的 Publisherjava.util.concurrent.Flow.Publisher,自 Java 9 起便已内置于 JDK,因此运行时零外部依赖。

non-sealed 关键字是新面孔——Flowable 现在通过密封接口 FlowableDocBasic 允许扩展。module-info.java 中仅声明依赖 java.management(用于调度器线程池管理中的 ThreadMXBean 访问)。

4.x 在代码库中还引入了诸多现代 Java 特性:

  • RecordsFlatMapConfigStreamer.HiddenStreamerStreamer.StreamerFinishViaDisposableContainerCanceller
  • 密封接口FlowableDocBasic sealed permits Flowable
  • 模式匹配:Streamable 中的 if (crash instanceof RuntimeException ex)
  • var:在 4.x 代码中广泛用于局部类型推断
  • AutoCloseableDisposable extends AutoCloseableStreamer extends AutoCloseable

Streamable<T>:异步枚举模式

Streamable.java#L35-L48 的 Javadoc 将 Streamable 描述为"Java 世界中全息涌现的 IAsyncEnumerable"。它是 RxJava 对 C# IAsyncEnumerable<T> 的回应——一种专为虚拟线程设计的拉模式异步迭代模式。

public interface Streamable<@NonNull T> {
    Streamer<T> stream(@NonNull DisposableContainer cancellation);
    
    default Streamer<T> stream() {
        return stream(new CompositeDisposable());
    }
}

Flowable 是将数据推送给 Subscriber,而 Streamable 则创建一个由消费者主动拉取Streamer。这种反转从根本上改变了编程模型。

Streamer.java#L34-L64 定义了三个核心操作:

public interface Streamer<@NonNull T> extends AutoCloseable {
    CompletionStage<Boolean> next(@NonNull DisposableContainer cancellation);
    T current();
    CompletionStage<Void> finish(@NonNull DisposableContainer cancellation);
}

使用协议如下:调用 next(),它返回一个 CompletionStage<Boolean>。若结果为 true,调用 current() 获取当前值;若为 false,则流已结束。最后调用 finish() 进行清理。

sequenceDiagram
    participant Consumer
    participant Streamer
    participant Source
    
    Consumer->>Streamer: next(cancellation)
    Streamer->>Source: Pull next item
    Source-->>Streamer: Item available
    Streamer-->>Consumer: CompletionStage → true
    Consumer->>Streamer: current()
    Streamer-->>Consumer: item value
    
    Consumer->>Streamer: next(cancellation)
    Streamer->>Source: Pull next item
    Source-->>Streamer: Source exhausted
    Streamer-->>Consumer: CompletionStage → false
    
    Consumer->>Streamer: finish(cancellation)
    Streamer-->>Consumer: CompletionStage → void

真正的强大之处体现在便捷方法上。Streamer.java#L169-L171 中的 awaitNext() 会阻塞虚拟线程,直到下一个元素就绪:

default boolean awaitNext() {
    return await(next());
}

而静态方法 await 则充当 async/await 的桥梁:

static <T> T await(CompletionStage<T> stage, DisposableContainer canceller) {
    var f = stage.toCompletableFuture();
    var d = Disposable.fromFuture(f, true);
    try (var _ = canceller.subscribe(d)) {
        return f.join();  // Blocks the virtual thread, not the carrier
    }
}

f.join() 会产生阻塞,但在虚拟线程上这几乎是零成本的——等待期间,承载线程会被释放去运行其他虚拟线程。这正是 Streamable 能够落地的关键:你写的是命令式阻塞代码,并发问题由运行时负责处理。

提示: 当你的处理逻辑本质上是顺序命令式的时候,Streamable 最能发挥优势——比如逐行读取文件、使用游标分页迭代数据库结果,或者任何用推模式 Flowable 会迫使你转而思考回调的场景。如果你的工作负载是对流进行 CPU 密集型计算,带有算子融合的 Flowable 会更快。

VirtualGenerator 与 VirtualEmitter:命令式与响应式的融合

连接命令式代码与响应式流的桥梁是 VirtualGenerator。来自 VirtualGenerator.java#L25-L33

@FunctionalInterface
public interface VirtualGenerator<T> {
    void generate(VirtualEmitter<T> emitter) throws Throwable;
}

以及 VirtualEmitter.java#L23-L37

public interface VirtualEmitter<T> {
    void emit(T item) throws Throwable;
    DisposableContainer canceller();
}

用法出人意料地简洁——直接写一个阻塞循环:

Flowable<String> lines = Flowable.virtualCreate(emitter -> {
    try (var reader = new BufferedReader(new FileReader("data.txt"))) {
        String line;
        while ((line = reader.readLine()) != null) {
            emitter.emit(line);  // Blocks if downstream isn't ready
        }
    }
});

emit() 调用是其中的关键创新。它会阻塞虚拟线程,直到下游订阅者有足够的请求容量。这以自然的方式实现了背压,开发者无需关心 request(n) 协议——阻塞调用本身就是背压机制。

flowchart LR
    subgraph VirtualThread ["Virtual Thread"]
        GEN[VirtualGenerator] --> EMIT[emitter.emit item]
        EMIT -->|"blocks until demand"| EMIT
    end
    subgraph ReactiveWorld ["Reactive Pipeline"]
        FV[FlowableVirtualCreateExecutor] --> OPS[.map .filter .observeOn]
        OPS --> SUB[Subscriber]
    end
    EMIT -->|"bridges to"| FV
    SUB -->|"request n"| FV
    FV -->|"unblocks emit"| EMIT

Streamable.java#L131-L135 中的 Streamable.create() 将同一个 VirtualGenerator 接入拉模式:

static <T> Streamable<T> create(VirtualGenerator<T> generator) {
    return Flowable.virtualCreate(generator).toStreamable();
}

目前这一实现仍通过 Flowable 进行桥接——代码中的 // FIXME native implementation 注释表明,未来计划提供直接实现。

API 中的现代 Java 特性

用密封接口拆分文档

Flowable 类超过 21,000 行。为了便于管理,4.x 使用密封接口将文档分散到多个文件,同时保持 API 集中在单一类上。

FlowableDocBasic.java#L24

public sealed interface FlowableDocBasic<T> permits Flowable {

这个接口声明了 map()filter() 等方法及其完整的 Javadoc。Flowable 实现 FlowableDocBasic 并提供实际逻辑。sealed permits Flowable 约束确保没有其他类能实现该接口——这纯粹是一种代码组织技巧,而非扩展点。

classDiagram
    class FlowableDocBasic~T~ {
        <<sealed>>
        +map(Function): Flowable
        +filter(Predicate): Flowable
        Full Javadoc here
    }
    class Flowable~T~ {
        <<non-sealed>>
        Implements all methods
        21000+ lines
    }
    FlowableDocBasic <|.. Flowable : permits

用 Record 封装配置

FlatMapConfig 使用 Java record 来打包算子配置:

public record FlatMapConfig(boolean delayErrors, int maxConcurrency, int bufferSize) {
    public FlatMapConfig() {
        this(false, Flowable.bufferSize(), Flowable.bufferSize());
    }
}

多个构造函数提供了从默认配置到完全自定义的便捷重载。record 的紧凑构造函数负责参数校验:

public FlatMapConfig(boolean delayErrors, int maxConcurrency, int bufferSize) {
    ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    // ...
}

Disposable 继承 AutoCloseable

来自 Disposable.java#L28-L46

public interface Disposable extends AutoCloseable {
    void dispose();
    boolean isDisposed();
    
    default void close() {
        dispose();
    }
}

这使得订阅管理可以使用 try-with-resources——在 Streamable 场景下尤为实用,因为 Streamer 的生命周期是拉模式驱动且显式管理的:

try (var streamer = streamable.stream()) {
    while (streamer.awaitNext()) {
        process(streamer.current());
    }
}  // finish() called automatically via close()

推模式与拉模式的融合

RxJava 4.x 将自己定位于两种响应式编程范式的交汇处:

  • 推模式Flowable):由生产者主导节奏,以背压作为安全阀。适用于算子融合和无锁队列至关重要的高吞吐量 pipeline。

  • 拉模式Streamable):由消费者主导节奏,自然地在虚拟线程上阻塞。适用于需要与异步数据源交互的命令式代码。

桥接方法 Streamable.toFlowable()Flowable.toStreamable() 支持两种模型之间的无缝转换。这意味着你可以先用虚拟线程上的命令式拉模式代码开始,在需要算子融合和排空循环性能时再切换到优化过的推模式 pipeline。

这种双模型正是 RxJava 4.x 对"既然有了虚拟线程,我们还需要响应式库吗"这一问题的回答。答案并非非此即彼——虚拟线程解决了线程稀缺问题,但并不解决组合、错误处理和流处理问题,而这些正是 RxJava 算子库的核心价值所在。两种模型是互补的,而非竞争关系。

下一篇

至此,我们已经探索了 RxJava 4.x 的全貌——从订阅链到排空循环、调度器、背压,再到全新的虚拟线程集成。第 7 篇,也是本系列的最终篇,我们将聚焦于这一切背后的质量保障体系:验证 21,000 行代码类的测试基础设施、强制执行代码库规范的 24 个元测试、用于确定性时间测试的 TestScheduler,以及在代码合并前拦截问题的 CI pipeline。