Backpressure: How Flowable Keeps Fast Producers from Overwhelming Slow Consumers
Prerequisites
- ›Article 2: subscribe-chain-and-operator-anatomy
- ›Article 3: drain-loops-and-operator-fusion
- ›Reactive Streams specification (Publisher/Subscriber/Subscription contract)
Backpressure: How Flowable Keeps Fast Producers from Overwhelming Slow Consumers
Every async system eventually faces the producer-consumer speed mismatch problem. A database cursor can produce rows far faster than a network socket can transmit them. Without flow control, the intermediary buffers grow without bound until the JVM runs out of memory. Backpressure is the Reactive Streams solution: the consumer requests items, and the producer respects those limits. RxJava's Flowable implements this protocol — and the implementation details are far more subtle than the concept suggests.
The request(n) Protocol and Why Backpressure Exists
The Reactive Streams contract defines a pull-push hybrid protocol:
Publisher.subscribe(Subscriber)— establish the connectionSubscriber.onSubscribe(Subscription)— delivers the control handleSubscription.request(n)— subscriber requests at mostnitemsPublisheremits up tononNextsignals- Subscriber requests more when ready
sequenceDiagram
participant Producer as Flowable (Publisher)
participant Sub as Subscriber
Producer->>Sub: onSubscribe(subscription)
Sub->>Producer: request(128)
loop Up to 128 items
Producer->>Sub: onNext(item)
end
Sub->>Producer: request(96)
Note over Sub: Requests more after consuming ~75%
loop Up to 96 items
Producer->>Sub: onNext(item)
end
Producer->>Sub: onComplete()
Observable deliberately does not implement this protocol. As discussed in Part 2, it uses Disposable instead of Subscription — no request(), no flow control. This is why the type split exists: Observable is for inherently bounded or UI-driven sources where backpressure overhead is unnecessary.
Tip: If you're unsure whether to use
FlowableorObservable, ask: "Can this source produce data faster than the consumer can process it?" If yes →Flowable. If the source is event-driven (clicks, sensor readings at fixed intervals) →Observable.
BackpressureHelper: Atomic Request Accounting
The request(n) calls from downstream must be accumulated atomically, because they can arrive from any thread. BackpressureHelper provides the CAS-loop utilities:
public static long add(AtomicLong requested, long n) {
for (;;) {
long r = requested.get();
if (r == Long.MAX_VALUE) {
return Long.MAX_VALUE;
}
long u = addCap(r, n);
if (requested.compareAndSet(r, u)) {
return r;
}
}
}
Why not just AtomicLong.addAndGet(n)? Two reasons:
-
Long.MAX_VALUE capping: In the Reactive Streams spec,
request(Long.MAX_VALUE)means "unbounded." Once the counter reachesLong.MAX_VALUE, it must stay there — additional requests should not overflow to negative values. TheaddCapmethod handles this. -
Return the previous value: Operators need to know if this was the first request (previous value was 0) to start draining.
addAndGetreturns the new value, which is less useful for this pattern.
The addCap method itself is elegant:
public static long addCap(long a, long b) {
long u = a + b;
if (u < 0L) {
return Long.MAX_VALUE;
}
return u;
}
Overflow detection via sign check — if adding two positive longs produces a negative, it overflowed.
The produced() method subtracts consumed items from the requested count, again with Long.MAX_VALUE preservation:
public static long produced(AtomicLong requested, long n) {
for (;;) {
long current = requested.get();
if (current == Long.MAX_VALUE) {
return Long.MAX_VALUE;
}
long update = current - n;
if (update < 0L) {
RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update));
update = 0L;
}
if (requested.compareAndSet(current, update)) {
return update;
}
}
}
flowchart TD
REQ[request n arrives] --> GET[r = requested.get]
GET --> MAX{r == MAX_VALUE?}
MAX -->|Yes| SKIP[Return MAX_VALUE - unbounded mode]
MAX -->|No| ADD[u = addCap r n]
ADD --> CAS{compareAndSet r u}
CAS -->|Success| DONE[Return r - previous value]
CAS -->|Fail: concurrent update| GET
SubscriptionHelper: Validation and the CANCELLED Sentinel
SubscriptionHelper is an enum with a single value — CANCELLED — that serves as an atomic sentinel:
public enum SubscriptionHelper implements Subscription {
CANCELLED;
@Override public void request(long n) { /* deliberately ignored */ }
@Override public void cancel() { /* deliberately ignored */ }
}
This sentinel enables safe atomic cancellation. Operators store their upstream Subscription in an AtomicReference<Subscription>. To cancel:
public static boolean cancel(AtomicReference<Subscription> field) {
Subscription current = field.get();
if (current != CANCELLED) {
current = field.getAndSet(CANCELLED);
if (current != CANCELLED) {
if (current != null) { current.cancel(); }
return true;
}
}
return false;
}
The getAndSet(CANCELLED) atomically swaps in the sentinel and returns whatever was there before. If it was already CANCELLED, another thread beat us. If it was a real subscription, we cancel it. This pattern is thread-safe without locks.
The setOnce method handles the common case where a subscription should be set exactly once:
public static boolean setOnce(AtomicReference<Subscription> field, Subscription s) {
if (!field.compareAndSet(null, s)) {
s.cancel();
if (field.get() != CANCELLED) {
reportSubscriptionSet(); // Protocol violation
}
return false;
}
return true;
}
BackpressureStrategy and FlowableCreate Emitters
When you create a Flowable from imperative code via Flowable.create(), you must choose a BackpressureStrategy. The enum at BackpressureStrategy.java defines five options:
| Strategy | Behavior | Risk |
|---|---|---|
MISSING |
No strategy — delegate to downstream | MissingBackpressureException if unhandled |
ERROR |
Signal error when demand is exceeded | Safe but disruptive |
BUFFER |
Unbounded buffer | OOM if producer is much faster |
DROP |
Discard items when no demand | Data loss |
LATEST |
Keep only the newest item | Data loss, but always has "freshest" |
FlowableCreate at FlowableCreate.java#L40-L74 creates a different Emitter subclass for each strategy:
public void subscribeActual(Subscriber<? super T> t) {
BaseEmitter<T> emitter;
switch (backpressure) {
case MISSING: emitter = new MissingEmitter<>(t); break;
case ERROR: emitter = new ErrorAsyncEmitter<>(t); break;
case DROP: emitter = new DropAsyncEmitter<>(t); break;
case LATEST: emitter = new LatestAsyncEmitter<>(t); break;
default: emitter = new BufferAsyncEmitter<>(t, bufferSize()); break;
}
t.onSubscribe(emitter);
source.subscribe(emitter);
}
flowchart TD
FC[FlowableCreate] --> SW{BackpressureStrategy?}
SW -->|MISSING| ME[MissingEmitter<br>No backpressure handling]
SW -->|ERROR| EE[ErrorAsyncEmitter<br>Throws MissingBackpressureException]
SW -->|DROP| DE[DropAsyncEmitter<br>Discards when no demand]
SW -->|LATEST| LE[LatestAsyncEmitter<br>Keeps newest in AtomicReference]
SW -->|BUFFER| BE[BufferAsyncEmitter<br>Unbounded SpscLinkedArrayQueue]
ME --> DS[downstream]
EE --> DS
DE --> DS
LE --> DS
BE --> DS
The LatestAsyncEmitter is particularly clever — it uses an AtomicReference to hold the latest value. When a new onNext arrives but the downstream hasn't requested, it atomically replaces the stored value. When demand arrives, the drain loop picks up whatever value is currently stored.
Prefetch and Limit in FlowableObserveOn
The prefetch/limit pattern is how operators that cross thread boundaries (like observeOn) manage backpressure without per-item request/response overhead.
At FlowableObserveOn.java#L96-L101:
this.prefetch = prefetch;
this.limit = prefetch - (prefetch >> 2); // 75% refill threshold
With the default prefetch of 128, limit is 128 - 32 = 96. The pattern:
- Request
prefetch(128) items initially from upstream - Process items from the queue, counting consumed items
- When consumed reaches
limit(96), request anotherlimitfrom upstream - Reset consumed counter
This creates a 75% refill threshold: when 75% of the buffer has been consumed, request a new batch equal to that consumed amount. It's a balance between:
- Throughput: Large batches reduce the overhead of request() calls
- Memory: The buffer never needs more than
prefetchcapacity - Latency: Starting the refill at 75% means there's still 25% of the buffer to process while new items arrive
sequenceDiagram
participant Upstream
participant ObserveOn
participant Downstream
ObserveOn->>Upstream: request(128)
Upstream->>ObserveOn: 128 items
loop Drain items 1-96
ObserveOn->>Downstream: onNext(item)
end
Note over ObserveOn: consumed=96 ≥ limit=96
ObserveOn->>Upstream: request(96)
loop Drain items 97-128 + new items
ObserveOn->>Downstream: onNext(item)
end
FlowableFlatMap: Request Accounting Across N Inner Sources
FlowableFlatMap is arguably the most complex operator in RxJava, and its backpressure handling is the primary reason. The MergeSubscriber must coordinate request accounting across a dynamic set of inner sources:
static final class MergeSubscriber<T, U> extends AtomicInteger
implements FlowableSubscriber<T>, Subscription {
final int maxConcurrency;
final int bufferSize;
volatile SimplePlainQueue<U> queue;
final AtomicThrowable errors = new AtomicThrowable();
final AtomicReference<InnerSubscriber<?, ?>[]> subscribers;
final AtomicLong requested = new AtomicLong();
long uniqueId;
int scalarEmitted;
final int scalarLimit;
The complexity comes from multiple concurrent flows:
- Outer source emits items that trigger inner subscriptions
maxConcurrencylimits how many inner sources can be active simultaneously- Each inner source's items are merged into a single output stream
- The downstream's
request(n)must be distributed across all active inner sources - When an inner source completes, a new one can be subscribed from a buffered outer item
The onSubscribe method at FlowableFlatMap.java#L107-L119 shows the initial request strategy:
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.upstream, s)) {
this.upstream = s;
downstream.onSubscribe(this);
if (!cancelled) {
if (maxConcurrency == Integer.MAX_VALUE) {
s.request(Long.MAX_VALUE); // Unbounded: get everything
} else {
s.request(maxConcurrency); // Bounded: get up to maxConcurrency items
}
}
}
}
The scalar optimization we saw in Part 3 plays a crucial role here — for Flowable.just() inner sources, flatMap skips the entire subscription machinery. It accumulates scalar emissions and batches the replenishment request:
int scalarEmitted;
final int scalarLimit;
// scalarLimit = Math.max(1, maxConcurrency >> 1)
When scalarEmitted reaches scalarLimit, a batch request for scalarLimit more outer items is made. This amortizes the CAS cost of request() across multiple scalar emissions.
flowchart TD
OUTER[Outer Flowable] -->|"request(maxConcurrency)"| MS[MergeSubscriber]
MS -->|"onNext(T)"| MAP[mapper.apply T]
MAP -->|"Scalar?"| SC{ScalarSupplier?}
SC -->|Yes| EMIT[Emit directly + scalarEmitted++]
SC -->|No| INNER[Subscribe InnerSubscriber]
INNER --> IS1[InnerSubscriber 1]
INNER --> IS2[InnerSubscriber 2]
INNER --> ISN[InnerSubscriber N]
IS1 -->|"items"| DRAIN[Drain loop merges all]
IS2 -->|"items"| DRAIN
ISN -->|"items"| DRAIN
DRAIN -->|"onNext"| DOWN[Downstream]
DOWN -->|"request n"| DRAIN
EMIT --> DOWN
The drain loop in MergeSubscriber is the most complex in the entire codebase. It round-robins across active inner subscribers (tracked in an atomic array), reads from their individual queues, checks the global error accumulator (AtomicThrowable), handles cancellation, and manages the requested counter — all in a lock-free drain loop. Understanding this single class is a rite of passage for RxJava internals work.
What's Next
We've now covered the full internal machinery of RxJava: the operator pattern, drain loops, fusion, schedulers, and backpressure. In Part 6, we shift to what's new in 4.x — the Streamable type that brings pull-based async enumeration to Java, VirtualGenerator and VirtualEmitter for imperative code on virtual threads, and how modern Java features like sealed types and records are reshaping the API.