Read OSS

RxJava 4.x: Streamable、仮想スレッド、そしてリアクティブ Java の未来

上級

前提知識

  • 第2回: subscribe-chain-and-operator-anatomy
  • 第4回: scheduler-system-and-threading-model
  • 第5回: backpressure-from-theory-to-implementation
  • Java 21 以降の仮想スレッドと構造化並行性の概念

RxJava 4.x: Streamable、仮想スレッド、そしてリアクティブ Java の未来

RxJava 4.x は単なるバージョンアップではありません。「スレッドが安価になった世界で、リアクティブプログラミングはどうあるべきか」という問いへの、根本的な再考です。Java 26 を基盤とすることで仮想スレッドが利用可能になり、リアクティブの非ブロッキングモデルが解決しようとしていた本質的な課題、すなわち OS スレッドの希少性がほぼ解消されます。とはいえ RxJava 4.x はリアクティブを捨てるのではなく、両方のパラダイムを取り込む道を選びました。高スループットのパイプラインには既存のプッシュ型 Flowable を、仮想スレッド上で自由にブロックできる命令型コードには新しいプル型 Streamable を、それぞれ使い分けられる設計になっています。

4.x の変更点: Java 26 ベースラインと Flow Publisher

もっとも目に見える変化は、外部依存の Reactive Streams ライブラリがなくなったことです。RxJava 3.x では Flowableorg.reactivestreams.Publisher を実装していましたが、4.x では java.util.concurrent.Flow.Publisher を直接実装するようになりました。

public abstract non-sealed class Flowable<@NonNull T> implements Publisher<T>,
    FlowableDocBasic<T> {

(Flowable.java#L159-L161) ここでの Publisher は Java 9 以降 JDK に組み込まれている java.util.concurrent.Flow.Publisher です。ランタイム依存がゼロになりました。

non-sealed キーワードも新しい要素です。Flowable は sealed インターフェース FlowableDocBasic を通じて拡張を許可しています。module-info.java が require しているのはスケジューラープールの管理で ThreadMXBean を使うための java.management のみです。

コードベース全体に施された Java モダナイズの主な内容は以下の通りです。

  • Records: FlatMapConfigStreamer.HiddenStreamerStreamer.StreamerFinishViaDisposableContainerCanceller
  • Sealed interfaces: FlowableDocBasic sealed permits Flowable
  • パターンマッチング: Streamable 内の if (crash instanceof RuntimeException ex)
  • var: 4.x コード全体でローカル型推論に積極活用
  • AutoCloseable: Disposable extends AutoCloseableStreamer extends AutoCloseable

Streamable<T>: 非同期列挙パターン

Streamable.java#L35-L48 の Javadoc には「Java 世界のホログラフィック的な IAsyncEnumerable」と書かれています。C# の IAsyncEnumerable<T> に相当するもので、仮想スレッド向けのプル型非同期イテレーションパターンを RxJava として実現したものです。

public interface Streamable<@NonNull T> {
    Streamer<T> stream(@NonNull DisposableContainer cancellation);
    
    default Streamer<T> stream() {
        return stream(new CompositeDisposable());
    }
}

Flowable がアイテムを Subscriber にプッシュするのに対して、Streamable はコンシューマーが自らプルする Streamer を生成します。この方向性の逆転がプログラミングモデルを根本から変えています。

Streamer.java#L34-L64Streamer インターフェースには、3 つのコア操作が定義されています。

public interface Streamer<@NonNull T> extends AutoCloseable {
    CompletionStage<Boolean> next(@NonNull DisposableContainer cancellation);
    T current();
    CompletionStage<Void> finish(@NonNull DisposableContainer cancellation);
}

プロトコルの流れは次の通りです。next() を呼ぶと CompletionStage<Boolean> が返ります。true で完了すれば current() で値を取得します。false の場合はストリームの終端です。後片付けには finish() を呼びます。

sequenceDiagram
    participant Consumer
    participant Streamer
    participant Source
    
    Consumer->>Streamer: next(cancellation)
    Streamer->>Source: Pull next item
    Source-->>Streamer: Item available
    Streamer-->>Consumer: CompletionStage → true
    Consumer->>Streamer: current()
    Streamer-->>Consumer: item value
    
    Consumer->>Streamer: next(cancellation)
    Streamer->>Source: Pull next item
    Source-->>Streamer: Source exhausted
    Streamer-->>Consumer: CompletionStage → false
    
    Consumer->>Streamer: finish(cancellation)
    Streamer-->>Consumer: CompletionStage → void

ただし真価は便利メソッドに現れます。Streamer.java#L169-L171awaitNext() は、次のアイテムが利用可能になるまで仮想スレッドをブロックします。

default boolean awaitNext() {
    return await(next());
}

static メソッドの await は async/await のブリッジを提供します。

static <T> T await(CompletionStage<T> stage, DisposableContainer canceller) {
    var f = stage.toCompletableFuture();
    var d = Disposable.fromFuture(f, true);
    try (var _ = canceller.subscribe(d)) {
        return f.join();  // Blocks the virtual thread, not the carrier
    }
}

f.join() はブロックしますが、仮想スレッド上ではコストがほぼゼロです。待機中にキャリアスレッドは解放され、他の仮想スレッドを実行し続けます。これが Streamable を実用的にしている理由です。ブロックする命令型コードをそのまま書けば、あとはランタイムが並行性を面倒みてくれます。

ヒント: Streamable が特に活きるのは、ファイルの行読み込み、カーソルページネーションを使ったデータベース結果のイテレーション、あるいはプッシュ型の Flowable ではコールバック思考を強いられるシーケンシャルな処理など、ロジックが本質的に順次・命令型の場面です。CPU バウンドなストリーム処理には、オペレーターフュージョンが効く Flowable のほうが高速です。

VirtualGenerator と VirtualEmitter: 命令型とリアクティブの架け橋

命令型コードとリアクティブストリームをつなぐのが VirtualGenerator です。VirtualGenerator.java#L25-L33 は次のように定義されています。

@FunctionalInterface
public interface VirtualGenerator<T> {
    void generate(VirtualEmitter<T> emitter) throws Throwable;
}

そして VirtualEmitter.java#L23-L37:

public interface VirtualEmitter<T> {
    void emit(T item) throws Throwable;
    DisposableContainer canceller();
}

使い方は驚くほどシンプルで、ブロッキングループをそのまま書くだけです。

Flowable<String> lines = Flowable.virtualCreate(emitter -> {
    try (var reader = new BufferedReader(new FileReader("data.txt"))) {
        String line;
        while ((line = reader.readLine()) != null) {
            emitter.emit(line);  // Blocks if downstream isn't ready
        }
    }
});

emit() の呼び出しがこの設計の核心です。ダウンストリームのサブスクライバーがリクエスト容量を持つまで仮想スレッドをブロックします。つまり request(n) プロトコルを意識しなくても、ブロッキング呼び出し自体がバックプレッシャーの仕組みとして機能するわけです。

flowchart LR
    subgraph VirtualThread ["Virtual Thread"]
        GEN[VirtualGenerator] --> EMIT[emitter.emit item]
        EMIT -->|"blocks until demand"| EMIT
    end
    subgraph ReactiveWorld ["Reactive Pipeline"]
        FV[FlowableVirtualCreateExecutor] --> OPS[.map .filter .observeOn]
        OPS --> SUB[Subscriber]
    end
    EMIT -->|"bridges to"| FV
    SUB -->|"request n"| FV
    FV -->|"unblocks emit"| EMIT

Streamable.java#L131-L135Streamable.create() は、同じ VirtualGenerator をプル型モデルに接続します。

static <T> Streamable<T> create(VirtualGenerator<T> generator) {
    return Flowable.virtualCreate(generator).toStreamable();
}

現状は Flowable 経由でブリッジしており、// FIXME native implementation というコメントから、直接実装が計画されていることがわかります。

API に取り込まれたモダン Java の機能

ドキュメント分割のための sealed インターフェース

Flowable クラスは 21,000 行を超えます。この規模を管理するため、4.x では sealed インターフェースを使ってドキュメントを複数ファイルに分散させつつ、API は単一クラスに集約する手法を採っています。

FlowableDocBasic.java#L24:

public sealed interface FlowableDocBasic<T> permits Flowable {

このインターフェースには map()filter() といったメソッドが完全な Javadoc とともに宣言されています。Flowable はこれを実装して実際の処理を提供します。sealed permits Flowable という制約により他のクラスがこのインターフェースを実装することはできず、拡張ポイントではなくコード整理のためだけの仕組みです。

classDiagram
    class FlowableDocBasic~T~ {
        <<sealed>>
        +map(Function): Flowable
        +filter(Predicate): Flowable
        Full Javadoc here
    }
    class Flowable~T~ {
        <<non-sealed>>
        Implements all methods
        21000+ lines
    }
    FlowableDocBasic <|.. Flowable : permits

設定を record で表現する

FlatMapConfig は Java の record を使ってオペレーターの設定をまとめています。

public record FlatMapConfig(boolean delayErrors, int maxConcurrency, int bufferSize) {
    public FlatMapConfig() {
        this(false, Flowable.bufferSize(), Flowable.bufferSize());
    }
}

複数のコンストラクターがデフォルト設定からフルカスタマイズまでの利便性を提供しています。record のコンパクトコンストラクターではパラメーターのバリデーションも行われています。

public FlatMapConfig(boolean delayErrors, int maxConcurrency, int bufferSize) {
    ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    // ...
}

Disposable extends AutoCloseable

Disposable.java#L28-L46:

public interface Disposable extends AutoCloseable {
    void dispose();
    boolean isDisposed();
    
    default void close() {
        dispose();
    }
}

AutoCloseable を継承することで、サブスクリプション管理に try-with-resources が使えるようになりました。Streamer のライフサイクルがプル型で明示的な Streamable との組み合わせでは特に重宝します。

try (var streamer = streamable.stream()) {
    while (streamer.awaitNext()) {
        process(streamer.current());
    }
}  // finish() called automatically via close()

プッシュとプルの融合

RxJava 4.x は、2 つのリアクティブプログラミングパラダイムの交差点に位置しています。

  • プッシュ型 (Flowable): プロデューサーがペースを主導し、バックプレッシャーが安全弁として機能します。オペレーターフュージョンとロックフリーキューが効果を発揮する、高スループットパイプラインに最適です。

  • プル型 (Streamable): コンシューマーがペースを主導し、仮想スレッドを自然にブロックします。非同期ソースとやり取りする必要がある命令型コードに最適です。

Streamable.toFlowable()Flowable.toStreamable() というブリッジメソッドにより、2 つのモデル間をシームレスに変換できます。仮想スレッド上の命令型プルコードで書き始め、オペレーターフュージョンやドレインループのパフォーマンスが必要となった段階でプッシュ型パイプラインへ切り替える、という使い方が可能です。

「仮想スレッドさえあればリアクティブライブラリなど不要では」という問いに対して、RxJava 4.x が出した答えがこのデュアルモデルです。仮想スレッドがスレッド希少性を解決する一方、コンポジション、エラーハンドリング、ストリーム処理といった課題は別次元の話です。2 つのモデルが競合するのではなく、互いを補い合う設計になっています。

次回予告

ここまで RxJava 4.x の全機能を一通り見てきました。subscribe チェーンからドレインループ、スケジューラー、バックプレッシャー、新しい仮想スレッド統合まで網羅しました。第 7 回(最終回)では、これらすべてを正確に検証するテスト基盤に焦点を当てます。21,000 行超のクラスを検証するテストインフラ、コードベース規約を強制する 24 本のメタテスト、決定論的な時間テストのための TestScheduler、マージ前に問題を捕捉する CI パイプラインを解説します。