Read OSS

Schedulers Unmasked: Thread Pools, Virtual Threads, and Time Control

Advanced

Prerequisites

  • Article 3: drain-loops-and-operator-fusion
  • Java thread pools and ExecutorService
  • Basic understanding of virtual threads (Java 21+)

Schedulers Unmasked: Thread Pools, Virtual Threads, and Time Control

Threading in reactive programming is fundamentally different from traditional thread-per-request models. In RxJava, you don't pick which thread runs your code — you pick a Scheduler, and operators like subscribeOn and observeOn inject it into the pipeline. The Scheduler abstraction decouples operator logic from thread management, and in RxJava 4.x, that abstraction now spans platform threads, elastic pools, and Java's virtual threads.

Scheduler and Worker Abstract Base

Everything starts with the Scheduler abstract class at Scheduler.java#L93. Its contract is deceptively simple:

public abstract class Scheduler {
    public abstract Worker createWorker();
    
    public Disposable scheduleDirect(Runnable run) { ... }
    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) { ... }
    
    public void start() { }
    public void shutdown() { }
    public long now(TimeUnit unit) { ... }
}

The key abstraction is the Worker. A Worker is a sequential execution context — tasks scheduled on the same Worker are guaranteed to execute serially and non-overlapping. This is what makes observeOn safe: it creates a single Worker and schedules all drain loop iterations on it, ensuring that onNext, onError, and onComplete are never called concurrently.

classDiagram
    class Scheduler {
        <<abstract>>
        +createWorker(): Worker
        +scheduleDirect(Runnable): Disposable
        +shutdown()
        +now(TimeUnit): long
    }
    class Worker {
        <<abstract>>
        +schedule(Runnable): Disposable
        +schedule(Runnable, long, TimeUnit): Disposable
        +dispose()
        +isDisposed(): boolean
    }
    class ComputationScheduler {
        FixedSchedulerPool pool
        Round-robin assignment
    }
    class CachedScheduler {
        CachedWorkerPool pool
        Elastic grow/shrink
    }
    class DeferredExecutorScheduler {
        Supplier~Executor~ supplier
        Virtual thread support
    }
    class TrampolineScheduler {
        Current thread execution
        Queue-based ordering
    }
    
    Scheduler <|-- ComputationScheduler
    Scheduler <|-- CachedScheduler
    Scheduler <|-- DeferredExecutorScheduler
    Scheduler <|-- TrampolineScheduler
    Scheduler *-- Worker

The scheduleDirect method is an optimization — instead of creating a Worker for a single task, it schedules directly on the pool. The default implementation creates a temporary Worker under the hood, but concrete implementations like ComputationScheduler override it with more efficient pool-level scheduling.

Lazy Initialization and the Schedulers Factory

The Schedulers factory class at Schedulers.java#L49-L100 uses the holder pattern for lazy singleton initialization:

static final class ComputationHolder {
    static final Scheduler DEFAULT = new ComputationScheduler();
}
static final class IoHolder {
    static final Scheduler DEFAULT = new CachedScheduler();
}
static final class VirtualHolder {
    static final Scheduler DEFAULT = new DeferredExecutorScheduler(
        () -> Executors.newVirtualThreadPerTaskExecutor(), true, true);
}

The static initializer wires these through RxJavaPlugins:

static {
    SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
    COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
    CACHED = RxJavaPlugins.initCachedScheduler(new IOTask());
    VIRTUAL = RxJavaPlugins.initVirtualScheduler(new VirtualTask());
    TRAMPOLINE = TrampolineScheduler.instance();
    NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}

The init* hooks allow complete replacement of the default schedulers at initialization time — before any scheduler instance is created. The runtime set*SchedulerHandler hooks wrap the already-created instance. This two-level scheme means you can replace Schedulers.io() with a virtual-thread-backed scheduler for your entire application with a single hook call.

flowchart LR
    subgraph Initialization ["Init time (once)"]
        IT[initComputationScheduler] --> CH[ComputationHolder.DEFAULT]
    end
    subgraph Runtime ["Runtime (per call)"]
        SC[Schedulers.computation] --> RH[onComputationHandler]
        RH --> RETURN[Return Scheduler]
    end
    IT -.->|creates| SC

Tip: RxJava's Schedulers.virtual() is available as a first-class scheduler in 4.x. If you're migrating from Schedulers.io() for blocking I/O work, consider switching to Schedulers.virtual() — it eliminates thread pool sizing concerns entirely since virtual threads are cheap to create.

ComputationScheduler: Fixed Pool with Round-Robin Assignment

The computation scheduler at ComputationScheduler.java#L29-L117 manages a fixed pool of PoolWorker instances, each backed by a single-threaded ScheduledExecutorService.

static {
    MAX_THREADS = cap(Runtime.getRuntime().availableProcessors(), 
                      Integer.getInteger(KEY_MAX_THREADS, 0));
}

The pool size defaults to availableProcessors() and can be overridden with rx4.computation-threads. The cap method ensures the override never exceeds the CPU count.

Worker assignment uses simple round-robin:

public PoolWorker getEventLoop() {
    int c = cores;
    if (c == 0) { return SHUTDOWN_WORKER; }
    return eventLoops[(int)(n++ % c)];
}

This isn't createWorker() — this is for scheduleDirect, where tasks don't need Worker-level sequential guarantees. createWorker() wraps a PoolWorker in an EventLoopWorker that tracks scheduled tasks and enables disposal of all tasks at once.

flowchart TD
    CS[ComputationScheduler] --> FSP[FixedSchedulerPool]
    FSP --> PW1[PoolWorker 0<br>ScheduledExecutorService]
    FSP --> PW2[PoolWorker 1<br>ScheduledExecutorService]
    FSP --> PW3[PoolWorker 2<br>ScheduledExecutorService]
    FSP --> PWN[PoolWorker N-1<br>ScheduledExecutorService]
    
    SD[scheduleDirect] -->|"round-robin"| FSP
    CW[createWorker] -->|"wraps one PoolWorker"| ELW[EventLoopWorker]
    ELW -->|"tracks tasks"| PW2

CachedScheduler (IO) and ScheduledRunnable Lifecycle

The IO scheduler (CachedScheduler) at CachedScheduler.java#L27-L100 operates fundamentally differently — its pool grows and shrinks based on demand.

When createWorker() is called, it first checks the CachedWorkerPool for an expired worker:

  1. Poll the expiringWorkerQueue (a ConcurrentLinkedQueue<ThreadWorker>)
  2. If a non-expired worker is found, reuse it
  3. If none available, create a new ThreadWorker with a fresh thread

Workers that are disposed go back into the expiring queue with a timestamp. A background evictor thread runs periodically (configured by rx4.cached-keep-alive-time, default 60 seconds) to purge expired workers.

The ScheduledRunnable class is the cancellable task wrapper used across all schedulers. It manages a state machine via AtomicReferenceArray with slots for the parent DisposableContainer, the Future from submission, and the runner thread. State transitions (READY → RUNNING → FINISHED, or READY → DISPOSED) are handled via CAS operations, enabling safe concurrent cancellation even while the task is executing.

sequenceDiagram
    participant Client
    participant CachedScheduler
    participant Pool as CachedWorkerPool
    participant Worker as ThreadWorker
    
    Client->>CachedScheduler: createWorker()
    CachedScheduler->>Pool: get()
    
    alt Worker available in queue
        Pool->>Pool: Poll expiringWorkerQueue
        Pool-->>CachedScheduler: Reused ThreadWorker
    else No worker available
        Pool->>Worker: new ThreadWorker(threadFactory)
        Pool-->>CachedScheduler: New ThreadWorker
    end
    
    Client->>Worker: schedule(runnable)
    Worker->>Worker: submit to ScheduledExecutorService
    
    Client->>Worker: dispose()
    Worker->>Pool: release(worker) with timestamp
    
    Note over Pool: Evictor runs every 60s
    Pool->>Pool: Remove expired workers

Virtual Thread Scheduler (New in 4.x)

The DeferredExecutorScheduler wraps a Supplier<Executor> and creates a new executor for each Worker:

public final class DeferredExecutorScheduler extends Scheduler {
    final Supplier<? extends Executor> executorSupplier;
    final boolean interruptibleWorker;
    final boolean fair;

    public Worker createWorker() {
        return new ExecutorWorker(executorSupplier.get(), interruptibleWorker, fair);
    }
}

For the virtual thread scheduler, the supplier is () -> Executors.newVirtualThreadPerTaskExecutor(). Each Worker gets its own virtual-thread executor. This is a paradigm shift from the pooled model: instead of reusing a fixed set of platform threads, each task gets its own virtual thread.

The ExecutorWorker uses the same WIP drain loop pattern we saw in Part 3 — tasks are queued in an MpscLinkedQueue, and only one virtual thread at a time drains the queue. This preserves the Worker's sequential execution guarantee even though virtual threads are cheap to spawn.

The scheduler offers two drain modes controlled by the fair flag:

  • Eager mode (fair=false): drain all queued tasks in one loop iteration
  • Fair mode (fair=true): drain one task, then re-submit to the executor — allowing other virtual threads to interleave

The virtual thread scheduler is configured as fair (true, true in the VirtualHolder initialization), giving better fairness when many Workers compete for CPU time.

subscribeOn vs observeOn: Injecting Schedulers into the Chain

These two operators are the primary way schedulers enter the operator chain, and they work very differently.

subscribeOn at FlowableSubscribeOn.java#L42-L49 controls which thread performs the subscription (and thus the source emission):

public void subscribeActual(final Subscriber<? super T> s) {
    Scheduler.Worker w = scheduler.createWorker();
    final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<>(s, w, source, nonScheduledRequests);
    s.onSubscribe(sos);
    w.schedule(sos);  // The actual subscription happens on the scheduler's thread
}

When w.schedule(sos) executes, the run() method subscribes to the upstream source on the scheduler's thread. This means the source's emission logic runs on that thread. Importantly, subscribeOn only affects the subscription path — once items are flowing, they continue on whatever thread the source emits from.

observeOn (which we examined in Part 3) controls which thread receives downstream signals. It queues items as they arrive from upstream and drains them on the scheduler's Worker thread.

sequenceDiagram
    participant MainThread
    participant IOThread as IO Scheduler
    participant CompThread as Computation Scheduler
    participant Sub as Subscriber
    
    Note over MainThread: source.subscribeOn(io).observeOn(computation).subscribe(sub)
    
    MainThread->>IOThread: subscribeOn schedules subscription
    IOThread->>IOThread: Source emits onNext items
    IOThread->>CompThread: observeOn queues items
    CompThread->>Sub: Drain delivers onNext on comp thread

The position of subscribeOn in the chain doesn't matter (it affects the source), but the position of observeOn does (it affects everything downstream of it). Multiple observeOn calls create multiple thread hops; multiple subscribeOn calls are redundant — only the one closest to the source takes effect.

What's Next

We've seen how schedulers provide threads and how operators use them. But we've deferred the most nuanced aspect: what happens when a fast producer on one scheduler floods a slow consumer on another? In Part 5, we'll tackle backpressure — the request(n) protocol, BackpressureHelper's atomic CAS arithmetic, the five BackpressureStrategy options, and how FlowableFlatMap juggles request accounting across dozens of concurrent inner sources.