引擎室:Drain 循环、队列融合与无锁并发
前置知识
- ›第 2 篇:subscribe-chain-and-operator-anatomy
- ›Java 并发基础(AtomicInteger、CAS 操作、volatile 语义、happens-before 保证)
- ›队列数据结构基础
引擎室:Drain 循环、队列融合与无锁并发
RxJava 的算子经常需要同时处理来自多个线程的信号——生产者线程调用 onNext、消费者线程调用 request,还可能有第三个线程发出取消信号。传统的 synchronized 块虽然能解决问题,但在拥有数十个阶段的算子链中,其性能开销会不断叠加。RxJava 通过两项关键创新解决了这一问题:原子 WIP drain 循环(无需加锁即可序列化访问)和算子融合(彻底消除中间队列)。正是这两者的结合,使 RxJava 能够达到每秒处理数亿事件的吞吐量。
原子 WIP Drain 循环模式
WIP(work-in-progress)drain 循环是 RxJava 中最重要的并发模式,几乎出现在每一个跨线程边界的算子中。FlowableObserveOn.java#L164-L168 中的入口看起来出奇地简单:
final void trySchedule() {
if (getAndIncrement() != 0) {
return;
}
worker.schedule(this);
}
该类继承自 AtomicInteger,这个计数器充当"处理中"指示器。整个模式的运作逻辑如下:
- 进入:调用
getAndIncrement()。若返回值为 0,当前线程"抢到锁"并进入 drain 循环;若返回值非 0,说明已有其他线程正在 drain——直接递增计数器后返回即可。 - 处理:从队列中取出数据,同时检查完成状态和取消状态。
- 退出:调用
addAndGet(-missed)。若结果为 0,说明 drain 期间没有新任务到达,可以安全退出;若结果非 0,则继续循环处理新到达的任务。
flowchart TD
ENTER[trySchedule called] --> INC{getAndIncrement != 0?}
INC -->|"Yes: other thread draining"| RETURN[Return immediately]
INC -->|"No: we won the race"| SCHEDULE[Schedule drain on worker]
SCHEDULE --> DRAIN[Process queued items]
DRAIN --> EXIT{addAndGet -missed == 0?}
EXIT -->|"Yes: no new work"| DONE[Exit loop]
EXIT -->|"No: work arrived during drain"| DRAIN
这一模式是无锁的:没有任何线程会阻塞等待其他线程。同时它也是线程安全的:计数器的递增操作既作为"有任务可处理"的信号,也作为"同一时间只有一个线程在 drain"的保证。退出时的检查(addAndGet(-missed))处理了一个关键竞态:当 drainer 即将退出时,生产者恰好递增了计数器——drainer 会看到非零的结果并重新进入循环。
提示: 阅读 RxJava 算子源码时,留意那些继承自
AtomicInteger的类——这是 drain 循环的典型标志。drain 方法通常命名为drain(),若类实现了Runnable接口,则也可能命名为run()。
FlowableObserveOn:最具代表性的复杂算子
FlowableObserveOn 是研究完整 drain 循环的最佳范本,因为它涵盖了所有关键机制:线程切换、队列管理、背压处理以及融合协商。
FlowableObserveOn.java#L62-L101 中的 BaseObserveOnSubscriber 完成了基础设施的搭建:
abstract static class BaseObserveOnSubscriber<T>
extends BasicIntQueueSubscription<T>
implements FlowableSubscriber<T>, Runnable {
final Worker worker;
final int prefetch;
final int limit;
// ...
BaseObserveOnSubscriber(Worker worker, boolean delayError, int prefetch) {
this.worker = worker;
this.prefetch = prefetch;
this.limit = prefetch - (prefetch >> 2); // 75% refill threshold
}
run() 方法会根据情况分发到三种不同的 drain 模式:
public final void run() {
if (outputFused) {
runBackfused();
} else if (sourceMode == SYNC) {
runSync();
} else {
runAsync();
}
}
FlowableObserveOn.java#L362-L380 中的异步 drain 是复杂度最高的部分——它从队列中读取数据、遵守背压约束(requested.get())、检查终止事件,并处理"消费一定数量的元素后触发新的上游请求"的补充模式。
sequenceDiagram
participant Upstream
participant ObserveOn as ObserveOnSubscriber
participant Queue as SpscArrayQueue
participant Worker as Scheduler.Worker
participant Downstream
Upstream->>ObserveOn: onNext(item)
ObserveOn->>Queue: offer(item)
ObserveOn->>ObserveOn: trySchedule() [WIP++]
Note over Worker: On scheduler thread
Worker->>ObserveOn: run()
ObserveOn->>Queue: poll()
Queue-->>ObserveOn: item
ObserveOn->>Downstream: onNext(item)
ObserveOn->>ObserveOn: consumed++ (replenish at limit)
无锁 SPSC 队列
算子用于缓冲数据的队列并非 java.util.concurrent 中的标准队列,而是从 JCTools 项目衍生而来的自定义 SPSC(单生产者-单消费者)实现。
SpscArrayQueue 是有界变体,底层是一个以 AtomicReferenceArray 为支撑的环形缓冲区:
public final class SpscArrayQueue<E> extends AtomicReferenceArray<E>
implements SimplePlainQueue<E> {
final int mask;
final AtomicLong producerIndex;
final AtomicLong consumerIndex;
final int lookAheadStep;
public SpscArrayQueue(int capacity) {
super(Pow2.roundToPowerOfTwo(capacity));
this.mask = length() - 1;
this.producerIndex = new AtomicLong();
this.consumerIndex = new AtomicLong();
lookAheadStep = Math.min(capacity / 4, MAX_LOOK_AHEAD_STEP);
}
几个关键设计决策:
- 2 的幂次容量:容量向上取整为 2 的幂,使得索引计算可以用位掩码(
index & mask)代替取模运算——这是一项显著的微优化 - 生产者/消费者索引分离:各自独立分配为
AtomicLong对象,位于不同的缓存行,避免伪共享 - 预读优化(look-ahead):生产者在实际检查 offer 槽位之前,会提前检查多个槽位,从而降低对消费者索引进行 volatile 读取的频率
SpscLinkedArrayQueue 变体通过将多个固定大小的数组段链接在一起,提供无界容量,适用于创建时无法预估队列大小的场景。
在多生产者场景下(例如 flatMap 合并多个内部数据源),RxJava 使用 MpscLinkedQueue——一种基于 Michael-Scott 算法的无锁队列,专为多生产者-单消费者场景设计。
classDiagram
class SimplePlainQueue~E~ {
<<interface>>
+offer(E): boolean
+poll(): E
+isEmpty(): boolean
+clear()
}
class SpscArrayQueue~E~ {
Bounded ring buffer
Power-of-2 size
Wait-free offer/poll
}
class SpscLinkedArrayQueue~E~ {
Unbounded linked arrays
Grows on demand
}
class MpscLinkedQueue~E~ {
Multi-producer safe
Lock-free CAS offer
}
SimplePlainQueue <|.. SpscArrayQueue
SimplePlainQueue <|.. SpscLinkedArrayQueue
SimplePlainQueue <|.. MpscLinkedQueue
QueueFuseable:算子融合协议
算子融合是 RxJava 用于消除相邻算子之间中间队列的机制。在没有融合的情况下,map 会将数据放入队列,observeOn 再从队列中读取;而融合之后,observeOn 可以直接从 map 的上游拉取数据,在 poll() 调用期间内联执行映射函数。
该协议由 QueueFuseable 定义:
public interface QueueFuseable<T> extends SimpleQueue<T> {
int NONE = 0; // No fusion
int SYNC = 1; // Synchronous: poll() is blocking and complete
int ASYNC = 2; // Asynchronous: poll() may return null, use onNext as signal
int ANY = SYNC | ASYNC;
int BOUNDARY = 4; // Don't fuse across thread boundaries
int requestFusion(int mode);
}
融合协商发生在 onSubscribe() 期间。当下游算子收到上游的 Subscription 时,它会检查上游是否同时实现了 QueueSubscription。如果是,则调用 requestFusion(mode):
sequenceDiagram
participant Upstream as map (upstream)
participant Downstream as observeOn (downstream)
Upstream->>Downstream: onSubscribe(mapSubscriber)
Downstream->>Downstream: Is upstream QueueSubscription?
Downstream->>Upstream: requestFusion(ANY | BOUNDARY)
alt SYNC fusion granted
Upstream-->>Downstream: SYNC
Note over Downstream: Will call poll() directly
Note over Downstream: null from poll() = complete
else ASYNC fusion granted
Upstream-->>Downstream: ASYNC
Note over Downstream: onNext(null) = "poll me now"
Note over Downstream: poll() in drain loop
else No fusion
Upstream-->>Downstream: NONE
Note over Downstream: Normal onNext path
end
BOUNDARY 标志至关重要。observeOn 在请求融合时会传入 ANY | BOUNDARY,向上游表明"我处于线程边界处",从而防止融合意外地将计算迁移到另一个线程。举个例子:如果 map().observeOn() 完全融合,map 函数将在 observeOn 调度器的线程上执行,而非订阅线程——这一语义变化可能打破用户的假设。
在 FlowableMap 中,融合支持通过 FlowableMap.java#L79-L82 中的 poll() 实现:
public U poll() throws Throwable {
T t = qs.poll();
return t != null ? Objects.requireNonNull(mapper.apply(t), "...") : null;
}
融合模式下,调用 map subscriber 的 poll() 会触发对上游队列 poll() 的调用,并就地执行映射函数后返回结果。没有中间队列,没有 onNext 调用,也没有 request() 的计数开销。融合状态下 map 的代价,本质上只剩下一次函数调用。
ConditionalSubscriber 与 ScalarSupplier 优化
另外两项微优化在大型算子链中同样能带来可观的收益。
ConditionalSubscriber 解决的是 filter 类算子特有的问题。在 filter().map() 链中,当过滤谓词返回 false 时,filter 必须调用 upstream.request(1) 来请求下一个元素,由此产生 request/onNext 信号的来回传递。
ConditionalSubscriber.tryOnNext(T t) 通过返回 boolean 来解决这个问题——返回值表示该元素是否被消费。见 FlowableFilter.java#L59-L78:
public boolean tryOnNext(T t) {
if (done) { return false; }
boolean b;
try {
b = filter.test(t);
} catch (Throwable e) {
fail(e);
return true;
}
if (b) {
downstream.onNext(t);
}
return b;
}
当感知 ConditionalSubscriber 的上游通过 tryOnNext 投递元素时,返回 false 意味着"再给我下一个",无需任何 request() 调用。这消除了被过滤掉的元素在 request 计数器上产生的逐元素 CAS 操作。
ScalarSupplier 是一个标记接口,用于标识那些能同步产出恰好一个值的数据源(例如 Flowable.just(1))。当 flatMap 在 FlowableFlatMap.java#L47-L52 遇到实现了 ScalarSupplier 的内部数据源时,可以跳过整套订阅机制——无需创建 subscriber、无需走 onSubscribe/onNext/onComplete 协议、无需追踪内部订阅。直接取值并发出即可。
flowchart TD
FM[flatMap receives inner source] --> CHECK{ScalarSupplier?}
CHECK -->|Yes| FAST[Get value directly, emit]
CHECK -->|No| FULL[Full subscribe protocol]
FULL --> INNER[Create InnerSubscriber]
INNER --> TRACK[Add to active subscribers]
TRACK --> SUB[Subscribe to inner source]
SUB --> DRAIN[Drain loop merges items]
这些优化单独来看或许微不足道,但在 Flowable.range(1, 1_000_000).flatMap(i -> Flowable.just(i * 2)).filter(i -> i > 100).map(i -> i.toString()) 这样的 pipeline 中,它们每秒可以省去数百万次原子操作。
下一篇
drain 循环模式和融合协议是性能的基础,但它们需要线程来驱动。第 4 篇将深入 RxJava 的调度器系统:Schedulers.computation() 如何通过轮询方式管理固定线程池、IO 调度器如何弹性扩缩容、新的虚拟线程调度器如何集成,以及 subscribeOn 和 observeOn 在使用这些调度器时有何本质区别。