The Engine Room: Drain Loops, Queue Fusion, and Lock-Free Concurrency
Prerequisites
- ›Article 2: subscribe-chain-and-operator-anatomy
- ›Java concurrency primitives (AtomicInteger, CAS operations, volatile semantics, happens-before guarantees)
- ›Queue data structure basics
The Engine Room: Drain Loops, Queue Fusion, and Lock-Free Concurrency
RxJava operators often receive signals from multiple threads — a producer thread calling onNext, a consumer thread calling request, and potentially a cancellation from a third thread. Traditional synchronized blocks would work, but their cost compounds across operator chains with dozens of stages. RxJava solves this with two key innovations: the atomic WIP drain loop that serializes access without locks, and operator fusion that eliminates intermediate queues entirely. Together, they're the reason RxJava achieves throughput measured in hundreds of millions of events per second.
The Atomic WIP Drain Loop Pattern
The most important concurrency pattern in RxJava is the work-in-progress (WIP) drain loop. It appears in virtually every operator that crosses thread boundaries. The entry point at FlowableObserveOn.java#L164-L168 is deceptively simple:
final void trySchedule() {
if (getAndIncrement() != 0) {
return;
}
worker.schedule(this);
}
The class extends AtomicInteger, and this counter acts as a work-in-progress indicator. The pattern works as follows:
- Entry:
getAndIncrement(). If the previous value was 0, this thread "wins" and enters the drain loop. If non-zero, another thread is already draining — just increment and return. - Drain: Process items from the queue, checking for completion and cancellation.
- Exit:
addAndGet(-missed). If the result is 0, no work arrived during draining — safe to exit. If non-zero, loop again to process newly arrived work.
flowchart TD
ENTER[trySchedule called] --> INC{getAndIncrement != 0?}
INC -->|"Yes: other thread draining"| RETURN[Return immediately]
INC -->|"No: we won the race"| SCHEDULE[Schedule drain on worker]
SCHEDULE --> DRAIN[Process queued items]
DRAIN --> EXIT{addAndGet -missed == 0?}
EXIT -->|"Yes: no new work"| DONE[Exit loop]
EXIT -->|"No: work arrived during drain"| DRAIN
This pattern is lock-free: no thread ever blocks waiting for another. It's also race-safe: the increment acts as both a "signal" that work is available and a "lock" that ensures only one thread drains at a time. The exit check (addAndGet(-missed)) handles the critical race where a producer increments the counter just as the drainer is about to exit — the drainer sees the non-zero result and loops back.
Tip: When reading RxJava operator code, look for classes extending
AtomicInteger— that's the telltale sign of a drain loop. The drain method is usually calleddrain()or namedrun()when the class implementsRunnable.
FlowableObserveOn: The Canonical Complex Operator
FlowableObserveOn is the best operator for studying the drain loop in full because it does everything: thread-hopping, queue management, backpressure handling, and fusion negotiation.
The BaseObserveOnSubscriber at FlowableObserveOn.java#L62-L101 sets up the infrastructure:
abstract static class BaseObserveOnSubscriber<T>
extends BasicIntQueueSubscription<T>
implements FlowableSubscriber<T>, Runnable {
final Worker worker;
final int prefetch;
final int limit;
// ...
BaseObserveOnSubscriber(Worker worker, boolean delayError, int prefetch) {
this.worker = worker;
this.prefetch = prefetch;
this.limit = prefetch - (prefetch >> 2); // 75% refill threshold
}
The run() method dispatches to one of three drain modes:
public final void run() {
if (outputFused) {
runBackfused();
} else if (sourceMode == SYNC) {
runSync();
} else {
runAsync();
}
}
The async drain at FlowableObserveOn.java#L362-L380 is where the full complexity lives — it reads from the queue, honors backpressure (requested.get()), checks for terminal events, and handles the refill pattern where consumed items trigger new upstream requests.
sequenceDiagram
participant Upstream
participant ObserveOn as ObserveOnSubscriber
participant Queue as SpscArrayQueue
participant Worker as Scheduler.Worker
participant Downstream
Upstream->>ObserveOn: onNext(item)
ObserveOn->>Queue: offer(item)
ObserveOn->>ObserveOn: trySchedule() [WIP++]
Note over Worker: On scheduler thread
Worker->>ObserveOn: run()
ObserveOn->>Queue: poll()
Queue-->>ObserveOn: item
ObserveOn->>Downstream: onNext(item)
ObserveOn->>ObserveOn: consumed++ (replenish at limit)
Lock-Free SPSC Queues
The queues that operators use for buffering are not java.util.concurrent queues — they're custom SPSC (Single-Producer-Single-Consumer) implementations derived from the JCTools project.
SpscArrayQueue is the bounded variant — a ring buffer backed by an AtomicReferenceArray:
public final class SpscArrayQueue<E> extends AtomicReferenceArray<E>
implements SimplePlainQueue<E> {
final int mask;
final AtomicLong producerIndex;
final AtomicLong consumerIndex;
final int lookAheadStep;
public SpscArrayQueue(int capacity) {
super(Pow2.roundToPowerOfTwo(capacity));
this.mask = length() - 1;
this.producerIndex = new AtomicLong();
this.consumerIndex = new AtomicLong();
lookAheadStep = Math.min(capacity / 4, MAX_LOOK_AHEAD_STEP);
}
Key design decisions:
- Power-of-2 sizing: The capacity is rounded up so that index masking (
index & mask) replaces modulo division — a significant micro-optimization - Separate producer/consumer indices: Each is an
AtomicLongon its own cache line (viaAtomicLongobject allocation), avoiding false sharing - Look-ahead optimization: The producer checks multiple slots ahead before checking the actual offer slot, reducing the frequency of volatile reads on the consumer index
The SpscLinkedArrayQueue variant provides unbounded capacity by linking fixed-size array segments together. It's used when the queue size can't be predicted at creation time.
For multi-producer scenarios (like flatMap merging multiple inner sources), RxJava uses MpscLinkedQueue — a Michael-Scott style lock-free queue adapted for the multiple-producer-single-consumer case.
classDiagram
class SimplePlainQueue~E~ {
<<interface>>
+offer(E): boolean
+poll(): E
+isEmpty(): boolean
+clear()
}
class SpscArrayQueue~E~ {
Bounded ring buffer
Power-of-2 size
Wait-free offer/poll
}
class SpscLinkedArrayQueue~E~ {
Unbounded linked arrays
Grows on demand
}
class MpscLinkedQueue~E~ {
Multi-producer safe
Lock-free CAS offer
}
SimplePlainQueue <|.. SpscArrayQueue
SimplePlainQueue <|.. SpscLinkedArrayQueue
SimplePlainQueue <|.. MpscLinkedQueue
QueueFuseable: The Operator Fusion Protocol
Operator fusion is RxJava's mechanism for eliminating intermediate queues between adjacent operators. Instead of map putting items into a queue and observeOn reading from it, fusion lets observeOn pull directly from map's upstream, applying the mapping function inline during poll().
The protocol is defined by QueueFuseable:
public interface QueueFuseable<T> extends SimpleQueue<T> {
int NONE = 0; // No fusion
int SYNC = 1; // Synchronous: poll() is blocking and complete
int ASYNC = 2; // Asynchronous: poll() may return null, use onNext as signal
int ANY = SYNC | ASYNC;
int BOUNDARY = 4; // Don't fuse across thread boundaries
int requestFusion(int mode);
}
Fusion negotiation happens during onSubscribe(). When a downstream operator receives its upstream Subscription, it checks if the upstream also implements QueueSubscription. If so, it calls requestFusion(mode):
sequenceDiagram
participant Upstream as map (upstream)
participant Downstream as observeOn (downstream)
Upstream->>Downstream: onSubscribe(mapSubscriber)
Downstream->>Downstream: Is upstream QueueSubscription?
Downstream->>Upstream: requestFusion(ANY | BOUNDARY)
alt SYNC fusion granted
Upstream-->>Downstream: SYNC
Note over Downstream: Will call poll() directly
Note over Downstream: null from poll() = complete
else ASYNC fusion granted
Upstream-->>Downstream: ASYNC
Note over Downstream: onNext(null) = "poll me now"
Note over Downstream: poll() in drain loop
else No fusion
Upstream-->>Downstream: NONE
Note over Downstream: Normal onNext path
end
The BOUNDARY flag is critical. When observeOn requests fusion, it passes ANY | BOUNDARY — telling the upstream "I'm crossing a thread boundary." This prevents fusion from accidentally moving computation across threads. For example, if map().observeOn() fused completely, the map function would run on the observeOn scheduler's thread instead of the subscription thread — a semantic change that could break user assumptions.
In FlowableMap, fusion support is implemented via poll() at FlowableMap.java#L79-L82:
public U poll() throws Throwable {
T t = qs.poll();
return t != null ? Objects.requireNonNull(mapper.apply(t), "...") : null;
}
When fused, calling poll() on the map subscriber calls poll() on the map's upstream queue, applies the mapping function, and returns the result. No intermediate queue, no onNext call, no request() bookkeeping. The cost of map under fusion is essentially just the function call.
ConditionalSubscriber and ScalarSupplier Optimizations
Two additional micro-optimizations compound across large operator chains.
ConditionalSubscriber addresses a problem specific to filter-like operators. In a filter().map() chain, when the filter predicate returns false, the filter must call upstream.request(1) to ask for the next item. This creates a ping-pong of request/onNext signals.
ConditionalSubscriber.tryOnNext(T t) solves this — it returns boolean indicating whether the item was consumed. At FlowableFilter.java#L59-L78:
public boolean tryOnNext(T t) {
if (done) { return false; }
boolean b;
try {
b = filter.test(t);
} catch (Throwable e) {
fail(e);
return true;
}
if (b) {
downstream.onNext(t);
}
return b;
}
When a ConditionalSubscriber-aware upstream delivers items via tryOnNext, a false return means "give me the next one" without any request() call. This eliminates per-item CAS operations on the request counter for filtered-out items.
ScalarSupplier is a marker interface for sources that produce exactly one value synchronously (like Flowable.just(1)). When flatMap encounters a ScalarSupplier inner source at FlowableFlatMap.java#L47-L52, it can skip the entire subscribe machinery — no subscriber creation, no onSubscribe/onNext/onComplete protocol, no inner subscription tracking. Just get the value and emit it.
flowchart TD
FM[flatMap receives inner source] --> CHECK{ScalarSupplier?}
CHECK -->|Yes| FAST[Get value directly, emit]
CHECK -->|No| FULL[Full subscribe protocol]
FULL --> INNER[Create InnerSubscriber]
INNER --> TRACK[Add to active subscribers]
TRACK --> SUB[Subscribe to inner source]
SUB --> DRAIN[Drain loop merges items]
These optimizations may seem small individually, but in a pipeline like Flowable.range(1, 1_000_000).flatMap(i -> Flowable.just(i * 2)).filter(i -> i > 100).map(i -> i.toString()), they eliminate millions of atomic operations per second.
What's Next
The drain loop pattern and fusion protocol are the foundation — but they need a thread to run on. In Part 4, we'll examine RxJava's scheduler system: how Schedulers.computation() manages its fixed thread pool with round-robin assignment, how the IO scheduler elastically grows and shrinks, how the new virtual thread scheduler integrates, and exactly how subscribeOn and observeOn use these schedulers differently.