深入解析调度器:线程池、虚拟线程与时间控制
前置知识
- ›第 3 篇:drain-loops-and-operator-fusion
- ›Java 线程池与 ExecutorService
- ›虚拟线程基础知识(Java 21+)
深入解析调度器:线程池、虚拟线程与时间控制
响应式编程中的线程管理与传统的"一请求一线程"模型有着本质区别。在 RxJava 中,你不需要指定某段代码运行在哪个线程上——你只需选择一个 Scheduler,由 subscribeOn、observeOn 这类 operator 将它注入到数据管道中。Scheduler 抽象将 operator 逻辑与线程管理彻底解耦。在 RxJava 4.x 中,这一抽象已经延伸至平台线程、弹性线程池以及 Java 虚拟线程。
Scheduler 与 Worker 抽象基类
一切都从 Scheduler.java#L93 的 Scheduler 抽象类开始。它的契约看似简单:
public abstract class Scheduler {
public abstract Worker createWorker();
public Disposable scheduleDirect(Runnable run) { ... }
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) { ... }
public void start() { }
public void shutdown() { }
public long now(TimeUnit unit) { ... }
}
其中最核心的抽象是 Worker。Worker 是一个顺序执行上下文——调度到同一个 Worker 上的任务保证串行、互不重叠地执行。这正是 observeOn 能保证线程安全的原因:它创建单个 Worker,并将所有 drain loop 的迭代调度到该 Worker 上,从而确保 onNext、onError、onComplete 永远不会并发调用。
classDiagram
class Scheduler {
<<abstract>>
+createWorker(): Worker
+scheduleDirect(Runnable): Disposable
+shutdown()
+now(TimeUnit): long
}
class Worker {
<<abstract>>
+schedule(Runnable): Disposable
+schedule(Runnable, long, TimeUnit): Disposable
+dispose()
+isDisposed(): boolean
}
class ComputationScheduler {
FixedSchedulerPool pool
Round-robin assignment
}
class CachedScheduler {
CachedWorkerPool pool
Elastic grow/shrink
}
class DeferredExecutorScheduler {
Supplier~Executor~ supplier
Virtual thread support
}
class TrampolineScheduler {
Current thread execution
Queue-based ordering
}
Scheduler <|-- ComputationScheduler
Scheduler <|-- CachedScheduler
Scheduler <|-- DeferredExecutorScheduler
Scheduler <|-- TrampolineScheduler
Scheduler *-- Worker
scheduleDirect 方法是一项优化——对于只需执行一次的任务,无需创建 Worker,可以直接提交到线程池。其默认实现会在内部临时创建一个 Worker,但像 ComputationScheduler 这样的具体实现会覆盖这一行为,直接在池层面进行更高效的调度。
延迟初始化与 Schedulers 工厂
Schedulers.java#L49-L100 中的 Schedulers 工厂类采用 holder 模式实现懒加载单例:
static final class ComputationHolder {
static final Scheduler DEFAULT = new ComputationScheduler();
}
static final class IoHolder {
static final Scheduler DEFAULT = new CachedScheduler();
}
static final class VirtualHolder {
static final Scheduler DEFAULT = new DeferredExecutorScheduler(
() -> Executors.newVirtualThreadPerTaskExecutor(), true, true);
}
静态初始化块通过 RxJavaPlugins 完成串联:
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
CACHED = RxJavaPlugins.initCachedScheduler(new IOTask());
VIRTUAL = RxJavaPlugins.initVirtualScheduler(new VirtualTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
init* 钩子允许在初始化阶段(即任何调度器实例被创建之前)完全替换默认实现。运行时的 set*SchedulerHandler 钩子则是对已创建实例进行包装。这种两层机制意味着,只需一次钩子调用,就能将整个应用的 Schedulers.io() 替换为基于虚拟线程的调度器。
flowchart LR
subgraph Initialization ["Init time (once)"]
IT[initComputationScheduler] --> CH[ComputationHolder.DEFAULT]
end
subgraph Runtime ["Runtime (per call)"]
SC[Schedulers.computation] --> RH[onComputationHandler]
RH --> RETURN[Return Scheduler]
end
IT -.->|creates| SC
提示:
Schedulers.virtual()在 RxJava 4.x 中已成为正式的一等调度器。如果你目前使用Schedulers.io()处理阻塞 I/O,不妨考虑迁移到Schedulers.virtual()——由于虚拟线程的创建成本极低,它可以完全消除线程池容量规划的烦恼。
ComputationScheduler:固定线程池与轮询分配
位于 ComputationScheduler.java#L29-L117 的计算调度器管理着一组固定数量的 PoolWorker 实例,每个实例背后都有一个单线程的 ScheduledExecutorService。
static {
MAX_THREADS = cap(Runtime.getRuntime().availableProcessors(),
Integer.getInteger(KEY_MAX_THREADS, 0));
}
线程池大小默认等于 availableProcessors(),也可以通过 rx4.computation-threads 覆盖。cap 方法确保自定义值不会超过 CPU 核心数。
Worker 分配采用简单的轮询策略:
public PoolWorker getEventLoop() {
int c = cores;
if (c == 0) { return SHUTDOWN_WORKER; }
return eventLoops[(int)(n++ % c)];
}
这里调用的并非 createWorker(),而是用于 scheduleDirect 的路径——此场景下任务不需要 Worker 级别的顺序保证。createWorker() 则会将 PoolWorker 包装为 EventLoopWorker,后者会追踪已调度的任务,并支持一次性取消所有任务。
flowchart TD
CS[ComputationScheduler] --> FSP[FixedSchedulerPool]
FSP --> PW1[PoolWorker 0<br>ScheduledExecutorService]
FSP --> PW2[PoolWorker 1<br>ScheduledExecutorService]
FSP --> PW3[PoolWorker 2<br>ScheduledExecutorService]
FSP --> PWN[PoolWorker N-1<br>ScheduledExecutorService]
SD[scheduleDirect] -->|"round-robin"| FSP
CW[createWorker] -->|"wraps one PoolWorker"| ELW[EventLoopWorker]
ELW -->|"tracks tasks"| PW2
CachedScheduler(IO)与 ScheduledRunnable 生命周期
位于 CachedScheduler.java#L27-L100 的 IO 调度器(CachedScheduler)采用了截然不同的策略——其线程池会根据负载动态伸缩。
调用 createWorker() 时,它首先检查 CachedWorkerPool 中是否有可复用的 Worker:
- 从
expiringWorkerQueue(一个ConcurrentLinkedQueue<ThreadWorker>)中取出条目 - 若找到未过期的 Worker,则直接复用
- 若队列为空,则创建一个新的
ThreadWorker(带有新线程)
被释放的 Worker 会携带时间戳重新进入过期队列。后台有一个回收线程定期运行(由 rx4.cached-keep-alive-time 配置,默认 60 秒),负责清理已过期的 Worker。
ScheduledRunnable 是所有调度器通用的可取消任务包装器。它通过 AtomicReferenceArray 管理状态机,槽位分别存储父级 DisposableContainer、提交后的 Future 以及执行线程。状态转换(READY → RUNNING → FINISHED,或 READY → DISPOSED)通过 CAS 操作完成,从而实现任务执行期间的安全并发取消。
sequenceDiagram
participant Client
participant CachedScheduler
participant Pool as CachedWorkerPool
participant Worker as ThreadWorker
Client->>CachedScheduler: createWorker()
CachedScheduler->>Pool: get()
alt Worker available in queue
Pool->>Pool: Poll expiringWorkerQueue
Pool-->>CachedScheduler: Reused ThreadWorker
else No worker available
Pool->>Worker: new ThreadWorker(threadFactory)
Pool-->>CachedScheduler: New ThreadWorker
end
Client->>Worker: schedule(runnable)
Worker->>Worker: submit to ScheduledExecutorService
Client->>Worker: dispose()
Worker->>Pool: release(worker) with timestamp
Note over Pool: Evictor runs every 60s
Pool->>Pool: Remove expired workers
虚拟线程调度器(4.x 新特性)
DeferredExecutorScheduler 封装了一个 Supplier<Executor>,并为每个 Worker 创建独立的 Executor:
public final class DeferredExecutorScheduler extends Scheduler {
final Supplier<? extends Executor> executorSupplier;
final boolean interruptibleWorker;
final boolean fair;
public Worker createWorker() {
return new ExecutorWorker(executorSupplier.get(), interruptibleWorker, fair);
}
}
在虚拟线程调度器中,supplier 为 () -> Executors.newVirtualThreadPerTaskExecutor(),每个 Worker 拥有各自独立的虚拟线程 Executor。这是对传统池化模型的彻底颠覆:不再复用固定数量的平台线程,而是为每个任务分配专属的虚拟线程。
ExecutorWorker 采用了与第 3 篇相同的 WIP drain loop 模式——任务被排入 MpscLinkedQueue,同一时间只有一个虚拟线程负责消费队列。这一机制在充分利用虚拟线程轻量特性的同时,仍然保证了 Worker 的顺序执行语义。
调度器通过 fair 标志提供两种消费模式:
- 激进模式(
fair=false):单次循环迭代中排空所有待处理任务 - 公平模式(
fair=true):每次仅执行一个任务,然后重新提交到 Executor,允许其他虚拟线程穿插执行
虚拟线程调度器默认使用公平模式(VirtualHolder 初始化时传入 true, true),在多个 Worker 竞争 CPU 时能获得更好的公平性。
subscribeOn 与 observeOn:将调度器注入调用链
这两个 operator 是调度器进入 operator 链的主要方式,但它们的工作机制截然不同。
subscribeOn 位于 FlowableSubscribeOn.java#L42-L49,它控制订阅操作(以及数据源发射)发生在哪个线程上:
public void subscribeActual(final Subscriber<? super T> s) {
Scheduler.Worker w = scheduler.createWorker();
final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<>(s, w, source, nonScheduledRequests);
s.onSubscribe(sos);
w.schedule(sos); // The actual subscription happens on the scheduler's thread
}
w.schedule(sos) 执行时,run() 方法会在调度器线程上订阅上游数据源,这意味着数据源的发射逻辑也在该线程上运行。需要注意的是,subscribeOn 只影响订阅路径——一旦数据开始流动,后续事件仍在数据源原本的发射线程上传递。
observeOn(详见第 3 篇)控制的是下游接收信号所在的线程。它将上游到来的数据项排入队列,再由调度器的 Worker 线程负责消费并向下传递。
sequenceDiagram
participant MainThread
participant IOThread as IO Scheduler
participant CompThread as Computation Scheduler
participant Sub as Subscriber
Note over MainThread: source.subscribeOn(io).observeOn(computation).subscribe(sub)
MainThread->>IOThread: subscribeOn schedules subscription
IOThread->>IOThread: Source emits onNext items
IOThread->>CompThread: observeOn queues items
CompThread->>Sub: Drain delivers onNext on comp thread
subscribeOn 在链中的位置无关紧要(它作用于数据源),但 observeOn 的位置至关重要(它影响其之后的所有下游操作)。多次调用 observeOn 会产生多次线程切换;多次调用 subscribeOn 则是冗余的——只有离数据源最近的那一个会生效。
下一篇预告
我们已经了解了调度器如何提供线程,以及 operator 如何使用它们。但还有一个最微妙的问题尚未触及:当一个快速生产者与一个慢速消费者分别运行在不同调度器上时,会发生什么?第 5 篇将深入探讨背压(backpressure)机制——包括 request(n) 协议、BackpressureHelper 的原子 CAS 运算、五种 BackpressureStrategy 选项,以及 FlowableFlatMap 如何在数十个并发内部数据源之间协调请求计数。