Read OSS

引擎室:Drain 循环、队列融合与无锁并发

高级

前置知识

  • 第 2 篇:subscribe-chain-and-operator-anatomy
  • Java 并发基础(AtomicInteger、CAS 操作、volatile 语义、happens-before 保证)
  • 队列数据结构基础

引擎室:Drain 循环、队列融合与无锁并发

RxJava 的算子经常需要同时处理来自多个线程的信号——生产者线程调用 onNext、消费者线程调用 request,还可能有第三个线程发出取消信号。传统的 synchronized 块虽然能解决问题,但在拥有数十个阶段的算子链中,其性能开销会不断叠加。RxJava 通过两项关键创新解决了这一问题:原子 WIP drain 循环(无需加锁即可序列化访问)和算子融合(彻底消除中间队列)。正是这两者的结合,使 RxJava 能够达到每秒处理数亿事件的吞吐量。

原子 WIP Drain 循环模式

WIP(work-in-progress)drain 循环是 RxJava 中最重要的并发模式,几乎出现在每一个跨线程边界的算子中。FlowableObserveOn.java#L164-L168 中的入口看起来出奇地简单:

final void trySchedule() {
    if (getAndIncrement() != 0) {
        return;
    }
    worker.schedule(this);
}

该类继承自 AtomicInteger,这个计数器充当"处理中"指示器。整个模式的运作逻辑如下:

  1. 进入:调用 getAndIncrement()。若返回值为 0,当前线程"抢到锁"并进入 drain 循环;若返回值非 0,说明已有其他线程正在 drain——直接递增计数器后返回即可。
  2. 处理:从队列中取出数据,同时检查完成状态和取消状态。
  3. 退出:调用 addAndGet(-missed)。若结果为 0,说明 drain 期间没有新任务到达,可以安全退出;若结果非 0,则继续循环处理新到达的任务。
flowchart TD
    ENTER[trySchedule called] --> INC{getAndIncrement != 0?}
    INC -->|"Yes: other thread draining"| RETURN[Return immediately]
    INC -->|"No: we won the race"| SCHEDULE[Schedule drain on worker]
    SCHEDULE --> DRAIN[Process queued items]
    DRAIN --> EXIT{addAndGet -missed == 0?}
    EXIT -->|"Yes: no new work"| DONE[Exit loop]
    EXIT -->|"No: work arrived during drain"| DRAIN

这一模式是无锁的:没有任何线程会阻塞等待其他线程。同时它也是线程安全的:计数器的递增操作既作为"有任务可处理"的信号,也作为"同一时间只有一个线程在 drain"的保证。退出时的检查(addAndGet(-missed))处理了一个关键竞态:当 drainer 即将退出时,生产者恰好递增了计数器——drainer 会看到非零的结果并重新进入循环。

提示: 阅读 RxJava 算子源码时,留意那些继承自 AtomicInteger 的类——这是 drain 循环的典型标志。drain 方法通常命名为 drain(),若类实现了 Runnable 接口,则也可能命名为 run()

FlowableObserveOn:最具代表性的复杂算子

FlowableObserveOn 是研究完整 drain 循环的最佳范本,因为它涵盖了所有关键机制:线程切换、队列管理、背压处理以及融合协商。

FlowableObserveOn.java#L62-L101 中的 BaseObserveOnSubscriber 完成了基础设施的搭建:

abstract static class BaseObserveOnSubscriber<T>
    extends BasicIntQueueSubscription<T>
    implements FlowableSubscriber<T>, Runnable {
    
    final Worker worker;
    final int prefetch;
    final int limit;
    // ...
    
    BaseObserveOnSubscriber(Worker worker, boolean delayError, int prefetch) {
        this.worker = worker;
        this.prefetch = prefetch;
        this.limit = prefetch - (prefetch >> 2);  // 75% refill threshold
    }

run() 方法会根据情况分发到三种不同的 drain 模式:

public final void run() {
    if (outputFused) {
        runBackfused();
    } else if (sourceMode == SYNC) {
        runSync();
    } else {
        runAsync();
    }
}

FlowableObserveOn.java#L362-L380 中的异步 drain 是复杂度最高的部分——它从队列中读取数据、遵守背压约束(requested.get())、检查终止事件,并处理"消费一定数量的元素后触发新的上游请求"的补充模式。

sequenceDiagram
    participant Upstream
    participant ObserveOn as ObserveOnSubscriber
    participant Queue as SpscArrayQueue
    participant Worker as Scheduler.Worker
    participant Downstream
    
    Upstream->>ObserveOn: onNext(item)
    ObserveOn->>Queue: offer(item)
    ObserveOn->>ObserveOn: trySchedule() [WIP++]
    
    Note over Worker: On scheduler thread
    Worker->>ObserveOn: run()
    ObserveOn->>Queue: poll()
    Queue-->>ObserveOn: item
    ObserveOn->>Downstream: onNext(item)
    ObserveOn->>ObserveOn: consumed++ (replenish at limit)

无锁 SPSC 队列

算子用于缓冲数据的队列并非 java.util.concurrent 中的标准队列,而是从 JCTools 项目衍生而来的自定义 SPSC(单生产者-单消费者)实现。

SpscArrayQueue 是有界变体,底层是一个以 AtomicReferenceArray 为支撑的环形缓冲区:

public final class SpscArrayQueue<E> extends AtomicReferenceArray<E> 
    implements SimplePlainQueue<E> {
    
    final int mask;
    final AtomicLong producerIndex;
    final AtomicLong consumerIndex;
    final int lookAheadStep;

    public SpscArrayQueue(int capacity) {
        super(Pow2.roundToPowerOfTwo(capacity));
        this.mask = length() - 1;
        this.producerIndex = new AtomicLong();
        this.consumerIndex = new AtomicLong();
        lookAheadStep = Math.min(capacity / 4, MAX_LOOK_AHEAD_STEP);
    }

几个关键设计决策:

  • 2 的幂次容量:容量向上取整为 2 的幂,使得索引计算可以用位掩码(index & mask)代替取模运算——这是一项显著的微优化
  • 生产者/消费者索引分离:各自独立分配为 AtomicLong 对象,位于不同的缓存行,避免伪共享
  • 预读优化(look-ahead):生产者在实际检查 offer 槽位之前,会提前检查多个槽位,从而降低对消费者索引进行 volatile 读取的频率

SpscLinkedArrayQueue 变体通过将多个固定大小的数组段链接在一起,提供无界容量,适用于创建时无法预估队列大小的场景。

在多生产者场景下(例如 flatMap 合并多个内部数据源),RxJava 使用 MpscLinkedQueue——一种基于 Michael-Scott 算法的无锁队列,专为多生产者-单消费者场景设计。

classDiagram
    class SimplePlainQueue~E~ {
        <<interface>>
        +offer(E): boolean
        +poll(): E
        +isEmpty(): boolean
        +clear()
    }
    class SpscArrayQueue~E~ {
        Bounded ring buffer
        Power-of-2 size
        Wait-free offer/poll
    }
    class SpscLinkedArrayQueue~E~ {
        Unbounded linked arrays
        Grows on demand
    }
    class MpscLinkedQueue~E~ {
        Multi-producer safe
        Lock-free CAS offer
    }
    
    SimplePlainQueue <|.. SpscArrayQueue
    SimplePlainQueue <|.. SpscLinkedArrayQueue
    SimplePlainQueue <|.. MpscLinkedQueue

QueueFuseable:算子融合协议

算子融合是 RxJava 用于消除相邻算子之间中间队列的机制。在没有融合的情况下,map 会将数据放入队列,observeOn 再从队列中读取;而融合之后,observeOn 可以直接从 map 的上游拉取数据,在 poll() 调用期间内联执行映射函数。

该协议由 QueueFuseable 定义:

public interface QueueFuseable<T> extends SimpleQueue<T> {
    int NONE = 0;      // No fusion
    int SYNC = 1;      // Synchronous: poll() is blocking and complete
    int ASYNC = 2;     // Asynchronous: poll() may return null, use onNext as signal
    int ANY = SYNC | ASYNC;
    int BOUNDARY = 4;  // Don't fuse across thread boundaries
    
    int requestFusion(int mode);
}

融合协商发生在 onSubscribe() 期间。当下游算子收到上游的 Subscription 时,它会检查上游是否同时实现了 QueueSubscription。如果是,则调用 requestFusion(mode)

sequenceDiagram
    participant Upstream as map (upstream)
    participant Downstream as observeOn (downstream)
    
    Upstream->>Downstream: onSubscribe(mapSubscriber)
    Downstream->>Downstream: Is upstream QueueSubscription?
    Downstream->>Upstream: requestFusion(ANY | BOUNDARY)
    
    alt SYNC fusion granted
        Upstream-->>Downstream: SYNC
        Note over Downstream: Will call poll() directly
        Note over Downstream: null from poll() = complete
    else ASYNC fusion granted
        Upstream-->>Downstream: ASYNC
        Note over Downstream: onNext(null) = "poll me now"
        Note over Downstream: poll() in drain loop
    else No fusion
        Upstream-->>Downstream: NONE
        Note over Downstream: Normal onNext path
    end

BOUNDARY 标志至关重要。observeOn 在请求融合时会传入 ANY | BOUNDARY,向上游表明"我处于线程边界处",从而防止融合意外地将计算迁移到另一个线程。举个例子:如果 map().observeOn() 完全融合,map 函数将在 observeOn 调度器的线程上执行,而非订阅线程——这一语义变化可能打破用户的假设。

FlowableMap 中,融合支持通过 FlowableMap.java#L79-L82 中的 poll() 实现:

public U poll() throws Throwable {
    T t = qs.poll();
    return t != null ? Objects.requireNonNull(mapper.apply(t), "...") : null;
}

融合模式下,调用 map subscriber 的 poll() 会触发对上游队列 poll() 的调用,并就地执行映射函数后返回结果。没有中间队列,没有 onNext 调用,也没有 request() 的计数开销。融合状态下 map 的代价,本质上只剩下一次函数调用。

ConditionalSubscriber 与 ScalarSupplier 优化

另外两项微优化在大型算子链中同样能带来可观的收益。

ConditionalSubscriber 解决的是 filter 类算子特有的问题。在 filter().map() 链中,当过滤谓词返回 false 时,filter 必须调用 upstream.request(1) 来请求下一个元素,由此产生 request/onNext 信号的来回传递。

ConditionalSubscriber.tryOnNext(T t) 通过返回 boolean 来解决这个问题——返回值表示该元素是否被消费。见 FlowableFilter.java#L59-L78

public boolean tryOnNext(T t) {
    if (done) { return false; }
    boolean b;
    try {
        b = filter.test(t);
    } catch (Throwable e) {
        fail(e);
        return true;
    }
    if (b) {
        downstream.onNext(t);
    }
    return b;
}

当感知 ConditionalSubscriber 的上游通过 tryOnNext 投递元素时,返回 false 意味着"再给我下一个",无需任何 request() 调用。这消除了被过滤掉的元素在 request 计数器上产生的逐元素 CAS 操作。

ScalarSupplier 是一个标记接口,用于标识那些能同步产出恰好一个值的数据源(例如 Flowable.just(1))。当 flatMapFlowableFlatMap.java#L47-L52 遇到实现了 ScalarSupplier 的内部数据源时,可以跳过整套订阅机制——无需创建 subscriber、无需走 onSubscribe/onNext/onComplete 协议、无需追踪内部订阅。直接取值并发出即可。

flowchart TD
    FM[flatMap receives inner source] --> CHECK{ScalarSupplier?}
    CHECK -->|Yes| FAST[Get value directly, emit]
    CHECK -->|No| FULL[Full subscribe protocol]
    FULL --> INNER[Create InnerSubscriber]
    INNER --> TRACK[Add to active subscribers]
    TRACK --> SUB[Subscribe to inner source]
    SUB --> DRAIN[Drain loop merges items]

这些优化单独来看或许微不足道,但在 Flowable.range(1, 1_000_000).flatMap(i -> Flowable.just(i * 2)).filter(i -> i > 100).map(i -> i.toString()) 这样的 pipeline 中,它们每秒可以省去数百万次原子操作。

下一篇

drain 循环模式和融合协议是性能的基础,但它们需要线程来驱动。第 4 篇将深入 RxJava 的调度器系统:Schedulers.computation() 如何通过轮询方式管理固定线程池、IO 调度器如何弹性扩缩容、新的虚拟线程调度器如何集成,以及 subscribeOnobserveOn 在使用这些调度器时有何本质区别。