Read OSS

スケジューラの全貌:スレッドプール、仮想スレッド、時間制御

上級

前提知識

  • 第3回:drain-loops-and-operator-fusion
  • Java のスレッドプールと ExecutorService
  • 仮想スレッド(Java 21+)の基礎知識

スケジューラの全貌:スレッドプール、仮想スレッド、時間制御

リアクティブプログラミングにおけるスレッド管理は、従来のスレッドごとのリクエストモデルとは根本的に異なります。RxJava では「どのスレッドでコードを実行するか」を直接指定するのではなく、Scheduler を選択し、subscribeOnobserveOn といった operator がそれをパイプラインに注入します。この Scheduler 抽象化によって operator のロジックはスレッド管理から切り離されており、RxJava 4.x ではプラットフォームスレッド、エラスティックプール、Java の仮想スレッドまでをひとつの抽象で扱えるようになっています。

Scheduler と Worker の抽象基底クラス

すべての出発点は Scheduler.java#L93 にある Scheduler 抽象クラスです。そのコントラクトは見た目よりずっとシンプルです。

public abstract class Scheduler {
    public abstract Worker createWorker();
    
    public Disposable scheduleDirect(Runnable run) { ... }
    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) { ... }
    
    public void start() { }
    public void shutdown() { }
    public long now(TimeUnit unit) { ... }
}

中核となる抽象化は Worker です。Worker は逐次実行コンテキストであり、同じ Worker にスケジュールされたタスクは直列かつ重複なしに実行されることが保証されています。observeOn が安全に動作するのもこの仕組みのおかげです。observeOn は単一の Worker を生成し、すべての drain ループのイテレーションをそこにスケジュールすることで、onNextonErroronComplete が同時に呼び出されないことを保証しています。

classDiagram
    class Scheduler {
        <<abstract>>
        +createWorker(): Worker
        +scheduleDirect(Runnable): Disposable
        +shutdown()
        +now(TimeUnit): long
    }
    class Worker {
        <<abstract>>
        +schedule(Runnable): Disposable
        +schedule(Runnable, long, TimeUnit): Disposable
        +dispose()
        +isDisposed(): boolean
    }
    class ComputationScheduler {
        FixedSchedulerPool pool
        Round-robin assignment
    }
    class CachedScheduler {
        CachedWorkerPool pool
        Elastic grow/shrink
    }
    class DeferredExecutorScheduler {
        Supplier~Executor~ supplier
        Virtual thread support
    }
    class TrampolineScheduler {
        Current thread execution
        Queue-based ordering
    }
    
    Scheduler <|-- ComputationScheduler
    Scheduler <|-- CachedScheduler
    Scheduler <|-- DeferredExecutorScheduler
    Scheduler <|-- TrampolineScheduler
    Scheduler *-- Worker

scheduleDirect メソッドは最適化のための仕組みです。単一タスクのために Worker を生成する代わりに、プール上に直接スケジュールします。デフォルト実装では内部的に一時的な Worker を作成しますが、ComputationScheduler のような具象実装ではプールレベルのより効率的なスケジューリングでこれをオーバーライドしています。

遅延初期化と Schedulers ファクトリ

Schedulers.java#L49-L100 にある Schedulers ファクトリクラスは、遅延シングルトン初期化にホルダーパターンを採用しています。

static final class ComputationHolder {
    static final Scheduler DEFAULT = new ComputationScheduler();
}
static final class IoHolder {
    static final Scheduler DEFAULT = new CachedScheduler();
}
static final class VirtualHolder {
    static final Scheduler DEFAULT = new DeferredExecutorScheduler(
        () -> Executors.newVirtualThreadPerTaskExecutor(), true, true);
}

static イニシャライザがこれらを RxJavaPlugins 経由でつなぎ合わせます。

static {
    SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
    COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
    CACHED = RxJavaPlugins.initCachedScheduler(new IOTask());
    VIRTUAL = RxJavaPlugins.initVirtualScheduler(new VirtualTask());
    TRAMPOLINE = TrampolineScheduler.instance();
    NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}

init* フックを使うと、スケジューラインスタンスが生成される前の初期化タイミングでデフォルト実装をまるごと差し替えられます。一方、実行時の set*SchedulerHandler フックは生成済みのインスタンスをラップします。この二段構えの仕組みにより、たとえば Schedulers.io() 全体を仮想スレッドバックのスケジューラに切り替えるのも、フック呼び出し一回で済みます。

flowchart LR
    subgraph Initialization ["Init time (once)"]
        IT[initComputationScheduler] --> CH[ComputationHolder.DEFAULT]
    end
    subgraph Runtime ["Runtime (per call)"]
        SC[Schedulers.computation] --> RH[onComputationHandler]
        RH --> RETURN[Return Scheduler]
    end
    IT -.->|creates| SC

Tip: RxJava 4.x では Schedulers.virtual() がファーストクラスのスケジューラとして利用できます。ブロッキング I/O 処理で Schedulers.io() を使っているなら、Schedulers.virtual() への切り替えを検討してみましょう。仮想スレッドは生成コストが低いため、スレッドプールのサイジング問題を根本から解消できます。

ComputationScheduler:ラウンドロビン割り当てによる固定プール

ComputationScheduler.java#L29-L117 にある computation スケジューラは、それぞれが単一スレッドの ScheduledExecutorService で動く PoolWorker インスタンスの固定プールを管理しています。

static {
    MAX_THREADS = cap(Runtime.getRuntime().availableProcessors(), 
                      Integer.getInteger(KEY_MAX_THREADS, 0));
}

プールサイズはデフォルトで availableProcessors() の値になり、rx4.computation-threads で上書き可能です。cap メソッドにより、上書き値が CPU 数を超えないよう制限されます。

Worker の割り当てはシンプルなラウンドロビンです。

public PoolWorker getEventLoop() {
    int c = cores;
    if (c == 0) { return SHUTDOWN_WORKER; }
    return eventLoops[(int)(n++ % c)];
}

これは createWorker() ではなく scheduleDirect 向けの処理です。Worker レベルの逐次実行保証が不要な単発タスクに使われます。createWorker() では PoolWorkerEventLoopWorker でラップし、スケジュールされたタスクを追跡して一括 dispose できるようにしています。

flowchart TD
    CS[ComputationScheduler] --> FSP[FixedSchedulerPool]
    FSP --> PW1[PoolWorker 0<br>ScheduledExecutorService]
    FSP --> PW2[PoolWorker 1<br>ScheduledExecutorService]
    FSP --> PW3[PoolWorker 2<br>ScheduledExecutorService]
    FSP --> PWN[PoolWorker N-1<br>ScheduledExecutorService]
    
    SD[scheduleDirect] -->|"round-robin"| FSP
    CW[createWorker] -->|"wraps one PoolWorker"| ELW[EventLoopWorker]
    ELW -->|"tracks tasks"| PW2

CachedScheduler(IO)と ScheduledRunnable のライフサイクル

CachedScheduler.java#L27-L100 にある IO スケジューラ(CachedScheduler)は、需要に応じてプールが伸縮するという点で根本的に異なる動作をします。

createWorker() が呼ばれると、まず CachedWorkerPool で有効期限切れでない Worker を探します。

  1. expiringWorkerQueueConcurrentLinkedQueue<ThreadWorker>)をポーリング
  2. 有効な Worker が見つかればそれを再利用
  3. なければ新しいスレッドを持つ ThreadWorker を生成

dispose された Worker はタイムスタンプ付きで expiring キューに戻されます。バックグラウンドのエビクタースレッドが定期的に(rx4.cached-keep-alive-time で設定、デフォルト 60 秒)実行され、期限切れの Worker を除去します。

ScheduledRunnable クラスは、すべてのスケジューラで共通して使われるキャンセル可能なタスクラッパーです。AtomicReferenceArray によるステートマシンを持ち、親の DisposableContainerFuture・実行スレッドをスロットで管理します。状態遷移(READY → RUNNING → FINISHED、または READY → DISPOSED)は CAS 操作で行われるため、タスク実行中であっても安全に並行キャンセルが可能です。

sequenceDiagram
    participant Client
    participant CachedScheduler
    participant Pool as CachedWorkerPool
    participant Worker as ThreadWorker
    
    Client->>CachedScheduler: createWorker()
    CachedScheduler->>Pool: get()
    
    alt Worker available in queue
        Pool->>Pool: Poll expiringWorkerQueue
        Pool-->>CachedScheduler: Reused ThreadWorker
    else No worker available
        Pool->>Worker: new ThreadWorker(threadFactory)
        Pool-->>CachedScheduler: New ThreadWorker
    end
    
    Client->>Worker: schedule(runnable)
    Worker->>Worker: submit to ScheduledExecutorService
    
    Client->>Worker: dispose()
    Worker->>Pool: release(worker) with timestamp
    
    Note over Pool: Evictor runs every 60s
    Pool->>Pool: Remove expired workers

仮想スレッドスケジューラ(4.x の新機能)

DeferredExecutorSchedulerSupplier<Executor> をラップし、Worker ごとに新しい executor を生成します。

public final class DeferredExecutorScheduler extends Scheduler {
    final Supplier<? extends Executor> executorSupplier;
    final boolean interruptibleWorker;
    final boolean fair;

    public Worker createWorker() {
        return new ExecutorWorker(executorSupplier.get(), interruptibleWorker, fair);
    }
}

仮想スレッドスケジューラでは、supplier は () -> Executors.newVirtualThreadPerTaskExecutor() です。各 Worker が独自の仮想スレッド executor を持ちます。これはプールモデルからのパラダイムシフトと言えます。固定数のプラットフォームスレッドを使い回すのではなく、タスクごとに専用の仮想スレッドが割り当てられます。

ExecutorWorker は第3回で解説した WIP drain ループパターンを採用しています。タスクは MpscLinkedQueue にキューイングされ、一度にひとつの仮想スレッドだけがキューをドレインします。仮想スレッドは安価に生成できますが、この仕組みによって Worker の逐次実行保証はきちんと維持されます。

スケジューラは fair フラグで二つのドレインモードを切り替えられます。

  • Eager モードfair=false):キューに積まれたタスクをひとつのループで一気に処理
  • Fair モードfair=true):タスクをひとつ処理したら executor に再サブミット——他の仮想スレッドが割り込める余地を作る

仮想スレッドスケジューラは VirtualHolder の初期化(true, true)でフェアモードが設定されており、多くの Worker が CPU 時間を奪い合う状況でもバランスよく処理できます。

subscribeOn vs observeOn:スケジューラをチェーンに注入する

この二つの operator はスケジューラを operator チェーンに組み込む主な手段ですが、動作の仕方はまったく異なります。

subscribeOnFlowableSubscribeOn.java#L42-L49 で定義されており、購読(そして上流のアイテム生成)を行うスレッドを制御します。

public void subscribeActual(final Subscriber<? super T> s) {
    Scheduler.Worker w = scheduler.createWorker();
    final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<>(s, w, source, nonScheduledRequests);
    s.onSubscribe(sos);
    w.schedule(sos);  // The actual subscription happens on the scheduler's thread
}

w.schedule(sos) が実行されると、run() メソッドがスケジューラのスレッド上で上流ソースを購読します。つまり、ソースのアイテム生成ロジックはそのスレッドで動きます。ただし subscribeOn が影響するのは購読パスのみです。アイテムが流れ始めると、ソースが emit したスレッドをそのまま引き継ぎます。

observeOn(第3回で解説)は、下流へのシグナル配信に使うスレッドを制御します。上流から届いたアイテムをキューに積み、スケジューラの Worker スレッド上でドレインします。

sequenceDiagram
    participant MainThread
    participant IOThread as IO Scheduler
    participant CompThread as Computation Scheduler
    participant Sub as Subscriber
    
    Note over MainThread: source.subscribeOn(io).observeOn(computation).subscribe(sub)
    
    MainThread->>IOThread: subscribeOn schedules subscription
    IOThread->>IOThread: Source emits onNext items
    IOThread->>CompThread: observeOn queues items
    CompThread->>Sub: Drain delivers onNext on comp thread

subscribeOn はチェーン内の位置に関係なく上流ソースに作用しますが、observeOn は配置した場所より下流のすべてに影響します。observeOn を複数置けばスレッドホップが複数発生し、subscribeOn を複数置いても意味はありません——ソースに最も近いひとつだけが有効です。

次回予告

スケジューラがどのようにスレッドを提供し、operator がそれをどう活用するかを見てきました。ただし、もっとも繊細な局面についてはまだ触れていません。あるスケジューラ上の速い producer が別のスケジューラ上の遅い consumer を圧倒してしまう場合、何が起きるのでしょうか。第5回ではバックプレッシャーを取り上げます。request(n) プロトコル、BackpressureHelper のアトミックな CAS 演算、五つの BackpressureStrategy オプション、そして FlowableFlatMap が数十の並行内部ソースをまたいでリクエスト数を管理する仕組みを深掘りします。