スケジューラの全貌:スレッドプール、仮想スレッド、時間制御
前提知識
- ›第3回:drain-loops-and-operator-fusion
- ›Java のスレッドプールと ExecutorService
- ›仮想スレッド(Java 21+)の基礎知識
スケジューラの全貌:スレッドプール、仮想スレッド、時間制御
リアクティブプログラミングにおけるスレッド管理は、従来のスレッドごとのリクエストモデルとは根本的に異なります。RxJava では「どのスレッドでコードを実行するか」を直接指定するのではなく、Scheduler を選択し、subscribeOn や observeOn といった operator がそれをパイプラインに注入します。この Scheduler 抽象化によって operator のロジックはスレッド管理から切り離されており、RxJava 4.x ではプラットフォームスレッド、エラスティックプール、Java の仮想スレッドまでをひとつの抽象で扱えるようになっています。
Scheduler と Worker の抽象基底クラス
すべての出発点は Scheduler.java#L93 にある Scheduler 抽象クラスです。そのコントラクトは見た目よりずっとシンプルです。
public abstract class Scheduler {
public abstract Worker createWorker();
public Disposable scheduleDirect(Runnable run) { ... }
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) { ... }
public void start() { }
public void shutdown() { }
public long now(TimeUnit unit) { ... }
}
中核となる抽象化は Worker です。Worker は逐次実行コンテキストであり、同じ Worker にスケジュールされたタスクは直列かつ重複なしに実行されることが保証されています。observeOn が安全に動作するのもこの仕組みのおかげです。observeOn は単一の Worker を生成し、すべての drain ループのイテレーションをそこにスケジュールすることで、onNext・onError・onComplete が同時に呼び出されないことを保証しています。
classDiagram
class Scheduler {
<<abstract>>
+createWorker(): Worker
+scheduleDirect(Runnable): Disposable
+shutdown()
+now(TimeUnit): long
}
class Worker {
<<abstract>>
+schedule(Runnable): Disposable
+schedule(Runnable, long, TimeUnit): Disposable
+dispose()
+isDisposed(): boolean
}
class ComputationScheduler {
FixedSchedulerPool pool
Round-robin assignment
}
class CachedScheduler {
CachedWorkerPool pool
Elastic grow/shrink
}
class DeferredExecutorScheduler {
Supplier~Executor~ supplier
Virtual thread support
}
class TrampolineScheduler {
Current thread execution
Queue-based ordering
}
Scheduler <|-- ComputationScheduler
Scheduler <|-- CachedScheduler
Scheduler <|-- DeferredExecutorScheduler
Scheduler <|-- TrampolineScheduler
Scheduler *-- Worker
scheduleDirect メソッドは最適化のための仕組みです。単一タスクのために Worker を生成する代わりに、プール上に直接スケジュールします。デフォルト実装では内部的に一時的な Worker を作成しますが、ComputationScheduler のような具象実装ではプールレベルのより効率的なスケジューリングでこれをオーバーライドしています。
遅延初期化と Schedulers ファクトリ
Schedulers.java#L49-L100 にある Schedulers ファクトリクラスは、遅延シングルトン初期化にホルダーパターンを採用しています。
static final class ComputationHolder {
static final Scheduler DEFAULT = new ComputationScheduler();
}
static final class IoHolder {
static final Scheduler DEFAULT = new CachedScheduler();
}
static final class VirtualHolder {
static final Scheduler DEFAULT = new DeferredExecutorScheduler(
() -> Executors.newVirtualThreadPerTaskExecutor(), true, true);
}
static イニシャライザがこれらを RxJavaPlugins 経由でつなぎ合わせます。
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
CACHED = RxJavaPlugins.initCachedScheduler(new IOTask());
VIRTUAL = RxJavaPlugins.initVirtualScheduler(new VirtualTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
init* フックを使うと、スケジューラインスタンスが生成される前の初期化タイミングでデフォルト実装をまるごと差し替えられます。一方、実行時の set*SchedulerHandler フックは生成済みのインスタンスをラップします。この二段構えの仕組みにより、たとえば Schedulers.io() 全体を仮想スレッドバックのスケジューラに切り替えるのも、フック呼び出し一回で済みます。
flowchart LR
subgraph Initialization ["Init time (once)"]
IT[initComputationScheduler] --> CH[ComputationHolder.DEFAULT]
end
subgraph Runtime ["Runtime (per call)"]
SC[Schedulers.computation] --> RH[onComputationHandler]
RH --> RETURN[Return Scheduler]
end
IT -.->|creates| SC
Tip: RxJava 4.x では
Schedulers.virtual()がファーストクラスのスケジューラとして利用できます。ブロッキング I/O 処理でSchedulers.io()を使っているなら、Schedulers.virtual()への切り替えを検討してみましょう。仮想スレッドは生成コストが低いため、スレッドプールのサイジング問題を根本から解消できます。
ComputationScheduler:ラウンドロビン割り当てによる固定プール
ComputationScheduler.java#L29-L117 にある computation スケジューラは、それぞれが単一スレッドの ScheduledExecutorService で動く PoolWorker インスタンスの固定プールを管理しています。
static {
MAX_THREADS = cap(Runtime.getRuntime().availableProcessors(),
Integer.getInteger(KEY_MAX_THREADS, 0));
}
プールサイズはデフォルトで availableProcessors() の値になり、rx4.computation-threads で上書き可能です。cap メソッドにより、上書き値が CPU 数を超えないよう制限されます。
Worker の割り当てはシンプルなラウンドロビンです。
public PoolWorker getEventLoop() {
int c = cores;
if (c == 0) { return SHUTDOWN_WORKER; }
return eventLoops[(int)(n++ % c)];
}
これは createWorker() ではなく scheduleDirect 向けの処理です。Worker レベルの逐次実行保証が不要な単発タスクに使われます。createWorker() では PoolWorker を EventLoopWorker でラップし、スケジュールされたタスクを追跡して一括 dispose できるようにしています。
flowchart TD
CS[ComputationScheduler] --> FSP[FixedSchedulerPool]
FSP --> PW1[PoolWorker 0<br>ScheduledExecutorService]
FSP --> PW2[PoolWorker 1<br>ScheduledExecutorService]
FSP --> PW3[PoolWorker 2<br>ScheduledExecutorService]
FSP --> PWN[PoolWorker N-1<br>ScheduledExecutorService]
SD[scheduleDirect] -->|"round-robin"| FSP
CW[createWorker] -->|"wraps one PoolWorker"| ELW[EventLoopWorker]
ELW -->|"tracks tasks"| PW2
CachedScheduler(IO)と ScheduledRunnable のライフサイクル
CachedScheduler.java#L27-L100 にある IO スケジューラ(CachedScheduler)は、需要に応じてプールが伸縮するという点で根本的に異なる動作をします。
createWorker() が呼ばれると、まず CachedWorkerPool で有効期限切れでない Worker を探します。
expiringWorkerQueue(ConcurrentLinkedQueue<ThreadWorker>)をポーリング- 有効な Worker が見つかればそれを再利用
- なければ新しいスレッドを持つ
ThreadWorkerを生成
dispose された Worker はタイムスタンプ付きで expiring キューに戻されます。バックグラウンドのエビクタースレッドが定期的に(rx4.cached-keep-alive-time で設定、デフォルト 60 秒)実行され、期限切れの Worker を除去します。
ScheduledRunnable クラスは、すべてのスケジューラで共通して使われるキャンセル可能なタスクラッパーです。AtomicReferenceArray によるステートマシンを持ち、親の DisposableContainer・Future・実行スレッドをスロットで管理します。状態遷移(READY → RUNNING → FINISHED、または READY → DISPOSED)は CAS 操作で行われるため、タスク実行中であっても安全に並行キャンセルが可能です。
sequenceDiagram
participant Client
participant CachedScheduler
participant Pool as CachedWorkerPool
participant Worker as ThreadWorker
Client->>CachedScheduler: createWorker()
CachedScheduler->>Pool: get()
alt Worker available in queue
Pool->>Pool: Poll expiringWorkerQueue
Pool-->>CachedScheduler: Reused ThreadWorker
else No worker available
Pool->>Worker: new ThreadWorker(threadFactory)
Pool-->>CachedScheduler: New ThreadWorker
end
Client->>Worker: schedule(runnable)
Worker->>Worker: submit to ScheduledExecutorService
Client->>Worker: dispose()
Worker->>Pool: release(worker) with timestamp
Note over Pool: Evictor runs every 60s
Pool->>Pool: Remove expired workers
仮想スレッドスケジューラ(4.x の新機能)
DeferredExecutorScheduler は Supplier<Executor> をラップし、Worker ごとに新しい executor を生成します。
public final class DeferredExecutorScheduler extends Scheduler {
final Supplier<? extends Executor> executorSupplier;
final boolean interruptibleWorker;
final boolean fair;
public Worker createWorker() {
return new ExecutorWorker(executorSupplier.get(), interruptibleWorker, fair);
}
}
仮想スレッドスケジューラでは、supplier は () -> Executors.newVirtualThreadPerTaskExecutor() です。各 Worker が独自の仮想スレッド executor を持ちます。これはプールモデルからのパラダイムシフトと言えます。固定数のプラットフォームスレッドを使い回すのではなく、タスクごとに専用の仮想スレッドが割り当てられます。
ExecutorWorker は第3回で解説した WIP drain ループパターンを採用しています。タスクは MpscLinkedQueue にキューイングされ、一度にひとつの仮想スレッドだけがキューをドレインします。仮想スレッドは安価に生成できますが、この仕組みによって Worker の逐次実行保証はきちんと維持されます。
スケジューラは fair フラグで二つのドレインモードを切り替えられます。
- Eager モード(
fair=false):キューに積まれたタスクをひとつのループで一気に処理 - Fair モード(
fair=true):タスクをひとつ処理したら executor に再サブミット——他の仮想スレッドが割り込める余地を作る
仮想スレッドスケジューラは VirtualHolder の初期化(true, true)でフェアモードが設定されており、多くの Worker が CPU 時間を奪い合う状況でもバランスよく処理できます。
subscribeOn vs observeOn:スケジューラをチェーンに注入する
この二つの operator はスケジューラを operator チェーンに組み込む主な手段ですが、動作の仕方はまったく異なります。
subscribeOn は FlowableSubscribeOn.java#L42-L49 で定義されており、購読(そして上流のアイテム生成)を行うスレッドを制御します。
public void subscribeActual(final Subscriber<? super T> s) {
Scheduler.Worker w = scheduler.createWorker();
final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<>(s, w, source, nonScheduledRequests);
s.onSubscribe(sos);
w.schedule(sos); // The actual subscription happens on the scheduler's thread
}
w.schedule(sos) が実行されると、run() メソッドがスケジューラのスレッド上で上流ソースを購読します。つまり、ソースのアイテム生成ロジックはそのスレッドで動きます。ただし subscribeOn が影響するのは購読パスのみです。アイテムが流れ始めると、ソースが emit したスレッドをそのまま引き継ぎます。
observeOn(第3回で解説)は、下流へのシグナル配信に使うスレッドを制御します。上流から届いたアイテムをキューに積み、スケジューラの Worker スレッド上でドレインします。
sequenceDiagram
participant MainThread
participant IOThread as IO Scheduler
participant CompThread as Computation Scheduler
participant Sub as Subscriber
Note over MainThread: source.subscribeOn(io).observeOn(computation).subscribe(sub)
MainThread->>IOThread: subscribeOn schedules subscription
IOThread->>IOThread: Source emits onNext items
IOThread->>CompThread: observeOn queues items
CompThread->>Sub: Drain delivers onNext on comp thread
subscribeOn はチェーン内の位置に関係なく上流ソースに作用しますが、observeOn は配置した場所より下流のすべてに影響します。observeOn を複数置けばスレッドホップが複数発生し、subscribeOn を複数置いても意味はありません——ソースに最も近いひとつだけが有効です。
次回予告
スケジューラがどのようにスレッドを提供し、operator がそれをどう活用するかを見てきました。ただし、もっとも繊細な局面についてはまだ触れていません。あるスケジューラ上の速い producer が別のスケジューラ上の遅い consumer を圧倒してしまう場合、何が起きるのでしょうか。第5回ではバックプレッシャーを取り上げます。request(n) プロトコル、BackpressureHelper のアトミックな CAS 演算、五つの BackpressureStrategy オプション、そして FlowableFlatMap が数十の並行内部ソースをまたいでリクエスト数を管理する仕組みを深掘りします。