Read OSS

The Engine Room: Drain Loops, Queue Fusion, and Lock-Free Concurrency

Advanced

Prerequisites

  • Article 2: subscribe-chain-and-operator-anatomy
  • Java concurrency primitives (AtomicInteger, CAS operations, volatile semantics, happens-before guarantees)
  • Queue data structure basics

The Engine Room: Drain Loops, Queue Fusion, and Lock-Free Concurrency

RxJava operators often receive signals from multiple threads — a producer thread calling onNext, a consumer thread calling request, and potentially a cancellation from a third thread. Traditional synchronized blocks would work, but their cost compounds across operator chains with dozens of stages. RxJava solves this with two key innovations: the atomic WIP drain loop that serializes access without locks, and operator fusion that eliminates intermediate queues entirely. Together, they're the reason RxJava achieves throughput measured in hundreds of millions of events per second.

The Atomic WIP Drain Loop Pattern

The most important concurrency pattern in RxJava is the work-in-progress (WIP) drain loop. It appears in virtually every operator that crosses thread boundaries. The entry point at FlowableObserveOn.java#L164-L168 is deceptively simple:

final void trySchedule() {
    if (getAndIncrement() != 0) {
        return;
    }
    worker.schedule(this);
}

The class extends AtomicInteger, and this counter acts as a work-in-progress indicator. The pattern works as follows:

  1. Entry: getAndIncrement(). If the previous value was 0, this thread "wins" and enters the drain loop. If non-zero, another thread is already draining — just increment and return.
  2. Drain: Process items from the queue, checking for completion and cancellation.
  3. Exit: addAndGet(-missed). If the result is 0, no work arrived during draining — safe to exit. If non-zero, loop again to process newly arrived work.
flowchart TD
    ENTER[trySchedule called] --> INC{getAndIncrement != 0?}
    INC -->|"Yes: other thread draining"| RETURN[Return immediately]
    INC -->|"No: we won the race"| SCHEDULE[Schedule drain on worker]
    SCHEDULE --> DRAIN[Process queued items]
    DRAIN --> EXIT{addAndGet -missed == 0?}
    EXIT -->|"Yes: no new work"| DONE[Exit loop]
    EXIT -->|"No: work arrived during drain"| DRAIN

This pattern is lock-free: no thread ever blocks waiting for another. It's also race-safe: the increment acts as both a "signal" that work is available and a "lock" that ensures only one thread drains at a time. The exit check (addAndGet(-missed)) handles the critical race where a producer increments the counter just as the drainer is about to exit — the drainer sees the non-zero result and loops back.

Tip: When reading RxJava operator code, look for classes extending AtomicInteger — that's the telltale sign of a drain loop. The drain method is usually called drain() or named run() when the class implements Runnable.

FlowableObserveOn: The Canonical Complex Operator

FlowableObserveOn is the best operator for studying the drain loop in full because it does everything: thread-hopping, queue management, backpressure handling, and fusion negotiation.

The BaseObserveOnSubscriber at FlowableObserveOn.java#L62-L101 sets up the infrastructure:

abstract static class BaseObserveOnSubscriber<T>
    extends BasicIntQueueSubscription<T>
    implements FlowableSubscriber<T>, Runnable {
    
    final Worker worker;
    final int prefetch;
    final int limit;
    // ...
    
    BaseObserveOnSubscriber(Worker worker, boolean delayError, int prefetch) {
        this.worker = worker;
        this.prefetch = prefetch;
        this.limit = prefetch - (prefetch >> 2);  // 75% refill threshold
    }

The run() method dispatches to one of three drain modes:

public final void run() {
    if (outputFused) {
        runBackfused();
    } else if (sourceMode == SYNC) {
        runSync();
    } else {
        runAsync();
    }
}

The async drain at FlowableObserveOn.java#L362-L380 is where the full complexity lives — it reads from the queue, honors backpressure (requested.get()), checks for terminal events, and handles the refill pattern where consumed items trigger new upstream requests.

sequenceDiagram
    participant Upstream
    participant ObserveOn as ObserveOnSubscriber
    participant Queue as SpscArrayQueue
    participant Worker as Scheduler.Worker
    participant Downstream
    
    Upstream->>ObserveOn: onNext(item)
    ObserveOn->>Queue: offer(item)
    ObserveOn->>ObserveOn: trySchedule() [WIP++]
    
    Note over Worker: On scheduler thread
    Worker->>ObserveOn: run()
    ObserveOn->>Queue: poll()
    Queue-->>ObserveOn: item
    ObserveOn->>Downstream: onNext(item)
    ObserveOn->>ObserveOn: consumed++ (replenish at limit)

Lock-Free SPSC Queues

The queues that operators use for buffering are not java.util.concurrent queues — they're custom SPSC (Single-Producer-Single-Consumer) implementations derived from the JCTools project.

SpscArrayQueue is the bounded variant — a ring buffer backed by an AtomicReferenceArray:

public final class SpscArrayQueue<E> extends AtomicReferenceArray<E> 
    implements SimplePlainQueue<E> {
    
    final int mask;
    final AtomicLong producerIndex;
    final AtomicLong consumerIndex;
    final int lookAheadStep;

    public SpscArrayQueue(int capacity) {
        super(Pow2.roundToPowerOfTwo(capacity));
        this.mask = length() - 1;
        this.producerIndex = new AtomicLong();
        this.consumerIndex = new AtomicLong();
        lookAheadStep = Math.min(capacity / 4, MAX_LOOK_AHEAD_STEP);
    }

Key design decisions:

  • Power-of-2 sizing: The capacity is rounded up so that index masking (index & mask) replaces modulo division — a significant micro-optimization
  • Separate producer/consumer indices: Each is an AtomicLong on its own cache line (via AtomicLong object allocation), avoiding false sharing
  • Look-ahead optimization: The producer checks multiple slots ahead before checking the actual offer slot, reducing the frequency of volatile reads on the consumer index

The SpscLinkedArrayQueue variant provides unbounded capacity by linking fixed-size array segments together. It's used when the queue size can't be predicted at creation time.

For multi-producer scenarios (like flatMap merging multiple inner sources), RxJava uses MpscLinkedQueue — a Michael-Scott style lock-free queue adapted for the multiple-producer-single-consumer case.

classDiagram
    class SimplePlainQueue~E~ {
        <<interface>>
        +offer(E): boolean
        +poll(): E
        +isEmpty(): boolean
        +clear()
    }
    class SpscArrayQueue~E~ {
        Bounded ring buffer
        Power-of-2 size
        Wait-free offer/poll
    }
    class SpscLinkedArrayQueue~E~ {
        Unbounded linked arrays
        Grows on demand
    }
    class MpscLinkedQueue~E~ {
        Multi-producer safe
        Lock-free CAS offer
    }
    
    SimplePlainQueue <|.. SpscArrayQueue
    SimplePlainQueue <|.. SpscLinkedArrayQueue
    SimplePlainQueue <|.. MpscLinkedQueue

QueueFuseable: The Operator Fusion Protocol

Operator fusion is RxJava's mechanism for eliminating intermediate queues between adjacent operators. Instead of map putting items into a queue and observeOn reading from it, fusion lets observeOn pull directly from map's upstream, applying the mapping function inline during poll().

The protocol is defined by QueueFuseable:

public interface QueueFuseable<T> extends SimpleQueue<T> {
    int NONE = 0;      // No fusion
    int SYNC = 1;      // Synchronous: poll() is blocking and complete
    int ASYNC = 2;     // Asynchronous: poll() may return null, use onNext as signal
    int ANY = SYNC | ASYNC;
    int BOUNDARY = 4;  // Don't fuse across thread boundaries
    
    int requestFusion(int mode);
}

Fusion negotiation happens during onSubscribe(). When a downstream operator receives its upstream Subscription, it checks if the upstream also implements QueueSubscription. If so, it calls requestFusion(mode):

sequenceDiagram
    participant Upstream as map (upstream)
    participant Downstream as observeOn (downstream)
    
    Upstream->>Downstream: onSubscribe(mapSubscriber)
    Downstream->>Downstream: Is upstream QueueSubscription?
    Downstream->>Upstream: requestFusion(ANY | BOUNDARY)
    
    alt SYNC fusion granted
        Upstream-->>Downstream: SYNC
        Note over Downstream: Will call poll() directly
        Note over Downstream: null from poll() = complete
    else ASYNC fusion granted
        Upstream-->>Downstream: ASYNC
        Note over Downstream: onNext(null) = "poll me now"
        Note over Downstream: poll() in drain loop
    else No fusion
        Upstream-->>Downstream: NONE
        Note over Downstream: Normal onNext path
    end

The BOUNDARY flag is critical. When observeOn requests fusion, it passes ANY | BOUNDARY — telling the upstream "I'm crossing a thread boundary." This prevents fusion from accidentally moving computation across threads. For example, if map().observeOn() fused completely, the map function would run on the observeOn scheduler's thread instead of the subscription thread — a semantic change that could break user assumptions.

In FlowableMap, fusion support is implemented via poll() at FlowableMap.java#L79-L82:

public U poll() throws Throwable {
    T t = qs.poll();
    return t != null ? Objects.requireNonNull(mapper.apply(t), "...") : null;
}

When fused, calling poll() on the map subscriber calls poll() on the map's upstream queue, applies the mapping function, and returns the result. No intermediate queue, no onNext call, no request() bookkeeping. The cost of map under fusion is essentially just the function call.

ConditionalSubscriber and ScalarSupplier Optimizations

Two additional micro-optimizations compound across large operator chains.

ConditionalSubscriber addresses a problem specific to filter-like operators. In a filter().map() chain, when the filter predicate returns false, the filter must call upstream.request(1) to ask for the next item. This creates a ping-pong of request/onNext signals.

ConditionalSubscriber.tryOnNext(T t) solves this — it returns boolean indicating whether the item was consumed. At FlowableFilter.java#L59-L78:

public boolean tryOnNext(T t) {
    if (done) { return false; }
    boolean b;
    try {
        b = filter.test(t);
    } catch (Throwable e) {
        fail(e);
        return true;
    }
    if (b) {
        downstream.onNext(t);
    }
    return b;
}

When a ConditionalSubscriber-aware upstream delivers items via tryOnNext, a false return means "give me the next one" without any request() call. This eliminates per-item CAS operations on the request counter for filtered-out items.

ScalarSupplier is a marker interface for sources that produce exactly one value synchronously (like Flowable.just(1)). When flatMap encounters a ScalarSupplier inner source at FlowableFlatMap.java#L47-L52, it can skip the entire subscribe machinery — no subscriber creation, no onSubscribe/onNext/onComplete protocol, no inner subscription tracking. Just get the value and emit it.

flowchart TD
    FM[flatMap receives inner source] --> CHECK{ScalarSupplier?}
    CHECK -->|Yes| FAST[Get value directly, emit]
    CHECK -->|No| FULL[Full subscribe protocol]
    FULL --> INNER[Create InnerSubscriber]
    INNER --> TRACK[Add to active subscribers]
    TRACK --> SUB[Subscribe to inner source]
    SUB --> DRAIN[Drain loop merges items]

These optimizations may seem small individually, but in a pipeline like Flowable.range(1, 1_000_000).flatMap(i -> Flowable.just(i * 2)).filter(i -> i > 100).map(i -> i.toString()), they eliminate millions of atomic operations per second.

What's Next

The drain loop pattern and fusion protocol are the foundation — but they need a thread to run on. In Part 4, we'll examine RxJava's scheduler system: how Schedulers.computation() manages its fixed thread pool with round-robin assignment, how the IO scheduler elastically grows and shrinks, how the new virtual thread scheduler integrates, and exactly how subscribeOn and observeOn use these schedulers differently.