Read OSS

深入解析调度器:线程池、虚拟线程与时间控制

高级

前置知识

  • 第 3 篇:drain-loops-and-operator-fusion
  • Java 线程池与 ExecutorService
  • 虚拟线程基础知识(Java 21+)

深入解析调度器:线程池、虚拟线程与时间控制

响应式编程中的线程管理与传统的"一请求一线程"模型有着本质区别。在 RxJava 中,你不需要指定某段代码运行在哪个线程上——你只需选择一个 Scheduler,由 subscribeOnobserveOn 这类 operator 将它注入到数据管道中。Scheduler 抽象将 operator 逻辑与线程管理彻底解耦。在 RxJava 4.x 中,这一抽象已经延伸至平台线程、弹性线程池以及 Java 虚拟线程。

Scheduler 与 Worker 抽象基类

一切都从 Scheduler.java#L93Scheduler 抽象类开始。它的契约看似简单:

public abstract class Scheduler {
    public abstract Worker createWorker();
    
    public Disposable scheduleDirect(Runnable run) { ... }
    public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) { ... }
    
    public void start() { }
    public void shutdown() { }
    public long now(TimeUnit unit) { ... }
}

其中最核心的抽象是 Worker。Worker 是一个顺序执行上下文——调度到同一个 Worker 上的任务保证串行、互不重叠地执行。这正是 observeOn 能保证线程安全的原因:它创建单个 Worker,并将所有 drain loop 的迭代调度到该 Worker 上,从而确保 onNextonErroronComplete 永远不会并发调用。

classDiagram
    class Scheduler {
        <<abstract>>
        +createWorker(): Worker
        +scheduleDirect(Runnable): Disposable
        +shutdown()
        +now(TimeUnit): long
    }
    class Worker {
        <<abstract>>
        +schedule(Runnable): Disposable
        +schedule(Runnable, long, TimeUnit): Disposable
        +dispose()
        +isDisposed(): boolean
    }
    class ComputationScheduler {
        FixedSchedulerPool pool
        Round-robin assignment
    }
    class CachedScheduler {
        CachedWorkerPool pool
        Elastic grow/shrink
    }
    class DeferredExecutorScheduler {
        Supplier~Executor~ supplier
        Virtual thread support
    }
    class TrampolineScheduler {
        Current thread execution
        Queue-based ordering
    }
    
    Scheduler <|-- ComputationScheduler
    Scheduler <|-- CachedScheduler
    Scheduler <|-- DeferredExecutorScheduler
    Scheduler <|-- TrampolineScheduler
    Scheduler *-- Worker

scheduleDirect 方法是一项优化——对于只需执行一次的任务,无需创建 Worker,可以直接提交到线程池。其默认实现会在内部临时创建一个 Worker,但像 ComputationScheduler 这样的具体实现会覆盖这一行为,直接在池层面进行更高效的调度。

延迟初始化与 Schedulers 工厂

Schedulers.java#L49-L100 中的 Schedulers 工厂类采用 holder 模式实现懒加载单例:

static final class ComputationHolder {
    static final Scheduler DEFAULT = new ComputationScheduler();
}
static final class IoHolder {
    static final Scheduler DEFAULT = new CachedScheduler();
}
static final class VirtualHolder {
    static final Scheduler DEFAULT = new DeferredExecutorScheduler(
        () -> Executors.newVirtualThreadPerTaskExecutor(), true, true);
}

静态初始化块通过 RxJavaPlugins 完成串联:

static {
    SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
    COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
    CACHED = RxJavaPlugins.initCachedScheduler(new IOTask());
    VIRTUAL = RxJavaPlugins.initVirtualScheduler(new VirtualTask());
    TRAMPOLINE = TrampolineScheduler.instance();
    NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}

init* 钩子允许在初始化阶段(即任何调度器实例被创建之前)完全替换默认实现。运行时的 set*SchedulerHandler 钩子则是对已创建实例进行包装。这种两层机制意味着,只需一次钩子调用,就能将整个应用的 Schedulers.io() 替换为基于虚拟线程的调度器。

flowchart LR
    subgraph Initialization ["Init time (once)"]
        IT[initComputationScheduler] --> CH[ComputationHolder.DEFAULT]
    end
    subgraph Runtime ["Runtime (per call)"]
        SC[Schedulers.computation] --> RH[onComputationHandler]
        RH --> RETURN[Return Scheduler]
    end
    IT -.->|creates| SC

提示: Schedulers.virtual() 在 RxJava 4.x 中已成为正式的一等调度器。如果你目前使用 Schedulers.io() 处理阻塞 I/O,不妨考虑迁移到 Schedulers.virtual()——由于虚拟线程的创建成本极低,它可以完全消除线程池容量规划的烦恼。

ComputationScheduler:固定线程池与轮询分配

位于 ComputationScheduler.java#L29-L117 的计算调度器管理着一组固定数量的 PoolWorker 实例,每个实例背后都有一个单线程的 ScheduledExecutorService

static {
    MAX_THREADS = cap(Runtime.getRuntime().availableProcessors(), 
                      Integer.getInteger(KEY_MAX_THREADS, 0));
}

线程池大小默认等于 availableProcessors(),也可以通过 rx4.computation-threads 覆盖。cap 方法确保自定义值不会超过 CPU 核心数。

Worker 分配采用简单的轮询策略:

public PoolWorker getEventLoop() {
    int c = cores;
    if (c == 0) { return SHUTDOWN_WORKER; }
    return eventLoops[(int)(n++ % c)];
}

这里调用的并非 createWorker(),而是用于 scheduleDirect 的路径——此场景下任务不需要 Worker 级别的顺序保证。createWorker() 则会将 PoolWorker 包装为 EventLoopWorker,后者会追踪已调度的任务,并支持一次性取消所有任务。

flowchart TD
    CS[ComputationScheduler] --> FSP[FixedSchedulerPool]
    FSP --> PW1[PoolWorker 0<br>ScheduledExecutorService]
    FSP --> PW2[PoolWorker 1<br>ScheduledExecutorService]
    FSP --> PW3[PoolWorker 2<br>ScheduledExecutorService]
    FSP --> PWN[PoolWorker N-1<br>ScheduledExecutorService]
    
    SD[scheduleDirect] -->|"round-robin"| FSP
    CW[createWorker] -->|"wraps one PoolWorker"| ELW[EventLoopWorker]
    ELW -->|"tracks tasks"| PW2

CachedScheduler(IO)与 ScheduledRunnable 生命周期

位于 CachedScheduler.java#L27-L100 的 IO 调度器(CachedScheduler)采用了截然不同的策略——其线程池会根据负载动态伸缩。

调用 createWorker() 时,它首先检查 CachedWorkerPool 中是否有可复用的 Worker:

  1. expiringWorkerQueue(一个 ConcurrentLinkedQueue<ThreadWorker>)中取出条目
  2. 若找到未过期的 Worker,则直接复用
  3. 若队列为空,则创建一个新的 ThreadWorker(带有新线程)

被释放的 Worker 会携带时间戳重新进入过期队列。后台有一个回收线程定期运行(由 rx4.cached-keep-alive-time 配置,默认 60 秒),负责清理已过期的 Worker。

ScheduledRunnable 是所有调度器通用的可取消任务包装器。它通过 AtomicReferenceArray 管理状态机,槽位分别存储父级 DisposableContainer、提交后的 Future 以及执行线程。状态转换(READY → RUNNING → FINISHED,或 READY → DISPOSED)通过 CAS 操作完成,从而实现任务执行期间的安全并发取消。

sequenceDiagram
    participant Client
    participant CachedScheduler
    participant Pool as CachedWorkerPool
    participant Worker as ThreadWorker
    
    Client->>CachedScheduler: createWorker()
    CachedScheduler->>Pool: get()
    
    alt Worker available in queue
        Pool->>Pool: Poll expiringWorkerQueue
        Pool-->>CachedScheduler: Reused ThreadWorker
    else No worker available
        Pool->>Worker: new ThreadWorker(threadFactory)
        Pool-->>CachedScheduler: New ThreadWorker
    end
    
    Client->>Worker: schedule(runnable)
    Worker->>Worker: submit to ScheduledExecutorService
    
    Client->>Worker: dispose()
    Worker->>Pool: release(worker) with timestamp
    
    Note over Pool: Evictor runs every 60s
    Pool->>Pool: Remove expired workers

虚拟线程调度器(4.x 新特性)

DeferredExecutorScheduler 封装了一个 Supplier<Executor>,并为每个 Worker 创建独立的 Executor:

public final class DeferredExecutorScheduler extends Scheduler {
    final Supplier<? extends Executor> executorSupplier;
    final boolean interruptibleWorker;
    final boolean fair;

    public Worker createWorker() {
        return new ExecutorWorker(executorSupplier.get(), interruptibleWorker, fair);
    }
}

在虚拟线程调度器中,supplier 为 () -> Executors.newVirtualThreadPerTaskExecutor(),每个 Worker 拥有各自独立的虚拟线程 Executor。这是对传统池化模型的彻底颠覆:不再复用固定数量的平台线程,而是为每个任务分配专属的虚拟线程。

ExecutorWorker 采用了与第 3 篇相同的 WIP drain loop 模式——任务被排入 MpscLinkedQueue,同一时间只有一个虚拟线程负责消费队列。这一机制在充分利用虚拟线程轻量特性的同时,仍然保证了 Worker 的顺序执行语义。

调度器通过 fair 标志提供两种消费模式:

  • 激进模式fair=false):单次循环迭代中排空所有待处理任务
  • 公平模式fair=true):每次仅执行一个任务,然后重新提交到 Executor,允许其他虚拟线程穿插执行

虚拟线程调度器默认使用公平模式(VirtualHolder 初始化时传入 true, true),在多个 Worker 竞争 CPU 时能获得更好的公平性。

subscribeOn 与 observeOn:将调度器注入调用链

这两个 operator 是调度器进入 operator 链的主要方式,但它们的工作机制截然不同。

subscribeOn 位于 FlowableSubscribeOn.java#L42-L49,它控制订阅操作(以及数据源发射)发生在哪个线程上:

public void subscribeActual(final Subscriber<? super T> s) {
    Scheduler.Worker w = scheduler.createWorker();
    final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<>(s, w, source, nonScheduledRequests);
    s.onSubscribe(sos);
    w.schedule(sos);  // The actual subscription happens on the scheduler's thread
}

w.schedule(sos) 执行时,run() 方法会在调度器线程上订阅上游数据源,这意味着数据源的发射逻辑也在该线程上运行。需要注意的是,subscribeOn 只影响订阅路径——一旦数据开始流动,后续事件仍在数据源原本的发射线程上传递。

observeOn(详见第 3 篇)控制的是下游接收信号所在的线程。它将上游到来的数据项排入队列,再由调度器的 Worker 线程负责消费并向下传递。

sequenceDiagram
    participant MainThread
    participant IOThread as IO Scheduler
    participant CompThread as Computation Scheduler
    participant Sub as Subscriber
    
    Note over MainThread: source.subscribeOn(io).observeOn(computation).subscribe(sub)
    
    MainThread->>IOThread: subscribeOn schedules subscription
    IOThread->>IOThread: Source emits onNext items
    IOThread->>CompThread: observeOn queues items
    CompThread->>Sub: Drain delivers onNext on comp thread

subscribeOn 在链中的位置无关紧要(它作用于数据源),但 observeOn 的位置至关重要(它影响其之后的所有下游操作)。多次调用 observeOn 会产生多次线程切换;多次调用 subscribeOn 则是冗余的——只有离数据源最近的那一个会生效。

下一篇预告

我们已经了解了调度器如何提供线程,以及 operator 如何使用它们。但还有一个最微妙的问题尚未触及:当一个快速生产者与一个慢速消费者分别运行在不同调度器上时,会发生什么?第 5 篇将深入探讨背压(backpressure)机制——包括 request(n) 协议、BackpressureHelper 的原子 CAS 运算、五种 BackpressureStrategy 选项,以及 FlowableFlatMap 如何在数十个并发内部数据源之间协调请求计数。