The Subscribe Chain: How Every RxJava Operator Actually Works
Prerequisites
- ›Article 1: architecture-and-codebase-navigation
- ›Reactive Streams specification basics (Publisher/Subscriber/Subscription contract)
- ›Decorator design pattern
The Subscribe Chain: How Every RxJava Operator Actually Works
Every RxJava pipeline you've ever built follows the same lifecycle: assemble a chain of operators, then subscribe to trigger data flow. What seems like a single subscribe() call actually cascades through plugin hooks, wraps your subscriber in safety layers, and triggers a reverse traversal up the operator chain. Understanding this mechanism is the key to understanding everything else in RxJava.
The subscribe() Entry Point
When you call subscribe() on a Flowable, the method at Flowable.java#L16009-L16031 executes three steps:
public final void subscribe(@NonNull FlowableSubscriber<? super T> subscriber) {
Objects.requireNonNull(subscriber, "subscriber is null");
try {
Subscriber<? super T> flowableSubscriber = RxJavaPlugins.onSubscribe(this, subscriber);
Objects.requireNonNull(flowableSubscriber, "The RxJavaPlugins.onSubscribe hook returned a null...");
subscribeActual(flowableSubscriber);
} catch (NullPointerException e) {
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
// ...
}
}
- Null-check the subscriber
- Run the subscribe hook —
RxJavaPlugins.onSubscribe()can replace or wrap the subscriber - Delegate to
subscribeActual()— the abstract method that every operator implements
This is the Template Method pattern. The subscribe() method is final — operators cannot override it. They must implement subscribeActual() instead, which guarantees that plugin hooks always fire.
sequenceDiagram
participant User
participant Flowable
participant RxJavaPlugins
participant Operator as subscribeActual()
User->>Flowable: subscribe(subscriber)
Flowable->>Flowable: Objects.requireNonNull(subscriber)
Flowable->>RxJavaPlugins: onSubscribe(this, subscriber)
RxJavaPlugins-->>Flowable: possibly wrapped subscriber
Flowable->>Operator: subscribeActual(wrappedSubscriber)
The error handling is subtle: if subscribeActual throws, RxJava can't call onError (the subscriber might not have received onSubscribe yet) and can't call onSubscribe (it might have already been called). So it routes the error to RxJavaPlugins.onError() as an undeliverable error, then throws a NullPointerException wrapper — a pragmatic hack to satisfy the Reactive Streams spec.
AbstractFlowableWithUpstream: The Operator Base Class
When you chain operators — source.map(fn).filter(pred).observeOn(scheduler) — you're building a linked list of decorator objects. Each operator class extends Flowable and holds a reference to its upstream source.
The base class for this pattern is AbstractFlowableWithUpstream:
abstract class AbstractFlowableWithUpstream<T, R> extends Flowable<R>
implements HasUpstreamPublisher<T> {
protected final Flowable<T> source;
AbstractFlowableWithUpstream(Flowable<T> source) {
this.source = Objects.requireNonNull(source, "source is null");
}
}
This is remarkably lean — just a field and a constructor. But it establishes the decorator chain:
classDiagram
class Flowable~T~ {
<<abstract>>
+subscribe(Subscriber)
#subscribeActual(Subscriber)*
}
class AbstractFlowableWithUpstream~T_R~ {
<<abstract>>
#source: Flowable~T~
}
class FlowableMap~T_U~ {
-mapper: Function
#subscribeActual(Subscriber)
}
class FlowableFilter~T~ {
-predicate: Predicate
#subscribeActual(Subscriber)
}
class FlowableObserveOn~T~ {
-scheduler: Scheduler
#subscribeActual(Subscriber)
}
Flowable <|-- AbstractFlowableWithUpstream
AbstractFlowableWithUpstream <|-- FlowableMap
AbstractFlowableWithUpstream <|-- FlowableFilter
AbstractFlowableWithUpstream <|-- FlowableObserveOn
When you write source.map(fn).filter(pred), assembly creates: FlowableFilter(source=FlowableMap(source=originalSource)). When you subscribe, the call propagates backward: FlowableFilter.subscribeActual subscribes to FlowableMap, which subscribes to originalSource.
Deep Dive: FlowableMap as the Canonical Simple Operator
FlowableMap at FlowableMap.java#L26-L83 is the simplest complete operator, yet it demonstrates every pattern:
public final class FlowableMap<T, U> extends AbstractFlowableWithUpstream<T, U> {
final Function<? super T, ? extends U> mapper;
@Override
protected void subscribeActual(Subscriber<? super U> s) {
if (s instanceof ConditionalSubscriber) {
source.subscribe(new MapConditionalSubscriber<>((ConditionalSubscriber<? super U>)s, mapper));
} else {
source.subscribe(new MapSubscriber<>(s, mapper));
}
}
The subscribeActual wraps the downstream subscriber in a MapSubscriber and subscribes it to the upstream source. This is the decorator pattern in action — MapSubscriber intercepts onNext signals, applies the function, and forwards the result.
The MapSubscriber.onNext method tells the full story:
public void onNext(T t) {
if (done) { return; }
if (sourceMode != NONE) {
downstream.onNext(null); // fusion mode: signal "poll me"
return;
}
U v;
try {
v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
Three key patterns are visible here:
- Done-guard: Check
doneflag first, silently dropping late signals - Fusion short-circuit: If operating in fusion mode (
sourceMode != NONE), forward a null sentinel instead of a mapped value — the downstream willpoll()the queue directly - Try-catch with
fail(): User code (mapper.apply(t)) is always wrapped. Thefail()helper cancels upstream and signalsonErrordownstream
The fusion support is also implemented via poll():
public U poll() throws Throwable {
T t = qs.poll();
return t != null ? Objects.requireNonNull(mapper.apply(t), "...") : null;
}
When fused, the map function is applied inside poll() rather than onNext() — the value passes directly from the upstream queue through the mapping function without any intermediate queueing. We'll explore this mechanism fully in Part 3.
sequenceDiagram
participant Source
participant MapSubscriber
participant Downstream
Source->>MapSubscriber: onSubscribe(subscription)
MapSubscriber->>Downstream: onSubscribe(mapSubscriber)
Note over Downstream: request(n)
Downstream->>MapSubscriber: request(n)
MapSubscriber->>Source: request(n)
Source->>MapSubscriber: onNext(t)
MapSubscriber->>MapSubscriber: v = mapper.apply(t)
MapSubscriber->>Downstream: onNext(v)
Source->>MapSubscriber: onComplete()
MapSubscriber->>Downstream: onComplete()
Tip: Every operator in RxJava follows this exact structure: extend
AbstractFlowableWithUpstream, implementsubscribeActualto wrap the downstream, and forward signals with transformations. Once you understandFlowableMap, you have the template for reading any of the other 500+ operator files.
Flowable vs Observable: Two Subscription Protocols
The Flowable and Observable types use fundamentally different subscription mechanisms. This design split affects every operator implementation.
Flowable uses the Reactive Streams protocol with Subscription:
Subscriber.onSubscribe(Subscription s)— delivers the subscriptionSubscription.request(long n)— downstream requests N items (backpressure)Subscription.cancel()— downstream cancels
Observable uses Disposable:
Observer.onSubscribe(Disposable d)— delivers the disposableDisposable.dispose()— downstream cancels- No
request()— no backpressure
This means every operator that exists on both types has two implementations: FlowableMap and ObservableMap, FlowableFilter and ObservableFilter, and so on. The Flowable version must handle request(n) forwarding; the Observable version doesn't.
classDiagram
class Subscription {
<<interface>>
+request(long n)
+cancel()
}
class Disposable {
<<interface>>
+dispose()
+isDisposed(): boolean
+close()
}
class FlowableMap {
Uses Subscription
Handles request(n)
}
class ObservableMap {
Uses Disposable
No backpressure
}
Subscription <.. FlowableMap : uses
Disposable <.. ObservableMap : uses
In RxJava 4.x, Disposable extends AutoCloseable — you can now use try-with-resources to manage subscription lifecycles. This is a small but meaningful ergonomic improvement from Disposable.java#L28-L46.
Error Handling in Operators
Error handling in RxJava operators follows a strict protocol, visible in the FlowableMap example above.
First, fatal errors are rethrown immediately. The Exceptions.throwIfFatal() method at Exceptions.java#L66-L73 checks for VirtualMachineError and LinkageError:
public static void throwIfFatal(@NonNull Throwable t) {
if (t instanceof VirtualMachineError) {
throw (VirtualMachineError) t;
} else if (t instanceof LinkageError) {
throw (LinkageError) t;
}
}
Everything else is catchable and gets routed through the reactive error channel. The fail() helper method in BasicFuseableSubscriber handles the standard sequence: cancel upstream, mark as done, call downstream.onError(ex).
When an error has nowhere to go — for example, onError is called after onComplete, or an error occurs during cancellation — RxJava routes it to RxJavaPlugins.onError(). If no error handler is set, it prints to stderr and throws on the current thread. This "undeliverable error" mechanism is the safety net that prevents exceptions from being silently swallowed.
flowchart TD
EX[Exception in operator] --> TIF{throwIfFatal?}
TIF -->|VirtualMachineError/LinkageError| RETHROW[Rethrow immediately]
TIF -->|Other| DELIVER{Can deliver onError?}
DELIVER -->|Yes| CANCEL[Cancel upstream] --> ONERROR[downstream.onError]
DELIVER -->|No: already terminal| PLUGIN[RxJavaPlugins.onError]
PLUGIN --> HANDLER{Handler set?}
HANDLER -->|Yes| CUSTOM[Custom handler]
HANDLER -->|No| STDERR[Print + throw on thread]
Assembly and Subscribe Hooks
RxJavaPlugins provides two interception points for every reactive type: assembly hooks and subscribe hooks.
Assembly hooks fire when an operator is created (at chain-building time), not when subscription occurs. The onFlowableAssembly hook at RxJavaPlugins.java#L72-L73 can intercept or replace any Flowable being constructed:
static volatile Function<? super Flowable, ? extends Flowable> onFlowableAssembly;
Subscribe hooks fire when subscribe() is called, before subscribeActual() executes. The onFlowableSubscribe hook at RxJavaPlugins.java#L102-L104 receives both the Flowable and the Subscriber and can replace the latter:
static volatile BiFunction<? super Flowable, ? super Subscriber, ? extends Subscriber> onFlowableSubscribe;
These hooks are the foundation for debugging tools. An assembly hook can capture stack traces at operator creation time — solving the notorious "where was this operator built?" debugging problem. A subscribe hook can wrap every subscriber with instrumentation to log signals.
sequenceDiagram
participant Dev as Developer
participant Assembly as Assembly Hook
participant Chain as Operator Chain
participant Subscribe as Subscribe Hook
participant Actual as subscribeActual
Dev->>Chain: source.map(fn).filter(pred)
Chain->>Assembly: onFlowableAssembly(FlowableMap)
Assembly-->>Chain: possibly wrapped Flowable
Chain->>Assembly: onFlowableAssembly(FlowableFilter)
Dev->>Chain: .subscribe(subscriber)
Chain->>Subscribe: onFlowableSubscribe(flowable, subscriber)
Subscribe-->>Chain: possibly wrapped subscriber
Chain->>Actual: subscribeActual(wrappedSubscriber)
Both hook types support all reactive types — there are parallel hooks for Observable, Single, Maybe, Completable, and ParallelFlowable.
What's Next
We've seen how operators are assembled and how subscription cascades through the chain. But we've glossed over the most performance-critical details: how operators handle concurrent signals safely, and how fusion eliminates intermediate queues. In Part 3, we'll dive into the engine room — the atomic drain loop pattern, lock-free SPSC queues, and the QueueFuseable protocol that makes RxJava one of the fastest reactive libraries on the JVM.