Read OSS

The Subscribe Chain: How Every RxJava Operator Actually Works

Intermediate

Prerequisites

  • Article 1: architecture-and-codebase-navigation
  • Reactive Streams specification basics (Publisher/Subscriber/Subscription contract)
  • Decorator design pattern

The Subscribe Chain: How Every RxJava Operator Actually Works

Every RxJava pipeline you've ever built follows the same lifecycle: assemble a chain of operators, then subscribe to trigger data flow. What seems like a single subscribe() call actually cascades through plugin hooks, wraps your subscriber in safety layers, and triggers a reverse traversal up the operator chain. Understanding this mechanism is the key to understanding everything else in RxJava.

The subscribe() Entry Point

When you call subscribe() on a Flowable, the method at Flowable.java#L16009-L16031 executes three steps:

public final void subscribe(@NonNull FlowableSubscriber<? super T> subscriber) {
    Objects.requireNonNull(subscriber, "subscriber is null");
    try {
        Subscriber<? super T> flowableSubscriber = RxJavaPlugins.onSubscribe(this, subscriber);
        Objects.requireNonNull(flowableSubscriber, "The RxJavaPlugins.onSubscribe hook returned a null...");
        subscribeActual(flowableSubscriber);
    } catch (NullPointerException e) {
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        RxJavaPlugins.onError(e);
        // ...
    }
}
  1. Null-check the subscriber
  2. Run the subscribe hookRxJavaPlugins.onSubscribe() can replace or wrap the subscriber
  3. Delegate to subscribeActual() — the abstract method that every operator implements

This is the Template Method pattern. The subscribe() method is final — operators cannot override it. They must implement subscribeActual() instead, which guarantees that plugin hooks always fire.

sequenceDiagram
    participant User
    participant Flowable
    participant RxJavaPlugins
    participant Operator as subscribeActual()
    
    User->>Flowable: subscribe(subscriber)
    Flowable->>Flowable: Objects.requireNonNull(subscriber)
    Flowable->>RxJavaPlugins: onSubscribe(this, subscriber)
    RxJavaPlugins-->>Flowable: possibly wrapped subscriber
    Flowable->>Operator: subscribeActual(wrappedSubscriber)

The error handling is subtle: if subscribeActual throws, RxJava can't call onError (the subscriber might not have received onSubscribe yet) and can't call onSubscribe (it might have already been called). So it routes the error to RxJavaPlugins.onError() as an undeliverable error, then throws a NullPointerException wrapper — a pragmatic hack to satisfy the Reactive Streams spec.

AbstractFlowableWithUpstream: The Operator Base Class

When you chain operators — source.map(fn).filter(pred).observeOn(scheduler) — you're building a linked list of decorator objects. Each operator class extends Flowable and holds a reference to its upstream source.

The base class for this pattern is AbstractFlowableWithUpstream:

abstract class AbstractFlowableWithUpstream<T, R> extends Flowable<R> 
    implements HasUpstreamPublisher<T> {
    
    protected final Flowable<T> source;

    AbstractFlowableWithUpstream(Flowable<T> source) {
        this.source = Objects.requireNonNull(source, "source is null");
    }
}

This is remarkably lean — just a field and a constructor. But it establishes the decorator chain:

classDiagram
    class Flowable~T~ {
        <<abstract>>
        +subscribe(Subscriber)
        #subscribeActual(Subscriber)*
    }
    class AbstractFlowableWithUpstream~T_R~ {
        <<abstract>>
        #source: Flowable~T~
    }
    class FlowableMap~T_U~ {
        -mapper: Function
        #subscribeActual(Subscriber)
    }
    class FlowableFilter~T~ {
        -predicate: Predicate
        #subscribeActual(Subscriber)
    }
    class FlowableObserveOn~T~ {
        -scheduler: Scheduler
        #subscribeActual(Subscriber)
    }
    
    Flowable <|-- AbstractFlowableWithUpstream
    AbstractFlowableWithUpstream <|-- FlowableMap
    AbstractFlowableWithUpstream <|-- FlowableFilter
    AbstractFlowableWithUpstream <|-- FlowableObserveOn

When you write source.map(fn).filter(pred), assembly creates: FlowableFilter(source=FlowableMap(source=originalSource)). When you subscribe, the call propagates backward: FlowableFilter.subscribeActual subscribes to FlowableMap, which subscribes to originalSource.

Deep Dive: FlowableMap as the Canonical Simple Operator

FlowableMap at FlowableMap.java#L26-L83 is the simplest complete operator, yet it demonstrates every pattern:

public final class FlowableMap<T, U> extends AbstractFlowableWithUpstream<T, U> {
    final Function<? super T, ? extends U> mapper;
    
    @Override
    protected void subscribeActual(Subscriber<? super U> s) {
        if (s instanceof ConditionalSubscriber) {
            source.subscribe(new MapConditionalSubscriber<>((ConditionalSubscriber<? super U>)s, mapper));
        } else {
            source.subscribe(new MapSubscriber<>(s, mapper));
        }
    }

The subscribeActual wraps the downstream subscriber in a MapSubscriber and subscribes it to the upstream source. This is the decorator pattern in action — MapSubscriber intercepts onNext signals, applies the function, and forwards the result.

The MapSubscriber.onNext method tells the full story:

public void onNext(T t) {
    if (done) { return; }
    if (sourceMode != NONE) {
        downstream.onNext(null);  // fusion mode: signal "poll me"
        return;
    }
    U v;
    try {
        v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
    } catch (Throwable ex) {
        fail(ex);
        return;
    }
    downstream.onNext(v);
}

Three key patterns are visible here:

  1. Done-guard: Check done flag first, silently dropping late signals
  2. Fusion short-circuit: If operating in fusion mode (sourceMode != NONE), forward a null sentinel instead of a mapped value — the downstream will poll() the queue directly
  3. Try-catch with fail(): User code (mapper.apply(t)) is always wrapped. The fail() helper cancels upstream and signals onError downstream

The fusion support is also implemented via poll():

public U poll() throws Throwable {
    T t = qs.poll();
    return t != null ? Objects.requireNonNull(mapper.apply(t), "...") : null;
}

When fused, the map function is applied inside poll() rather than onNext() — the value passes directly from the upstream queue through the mapping function without any intermediate queueing. We'll explore this mechanism fully in Part 3.

sequenceDiagram
    participant Source
    participant MapSubscriber
    participant Downstream
    
    Source->>MapSubscriber: onSubscribe(subscription)
    MapSubscriber->>Downstream: onSubscribe(mapSubscriber)
    
    Note over Downstream: request(n)
    Downstream->>MapSubscriber: request(n)
    MapSubscriber->>Source: request(n)
    
    Source->>MapSubscriber: onNext(t)
    MapSubscriber->>MapSubscriber: v = mapper.apply(t)
    MapSubscriber->>Downstream: onNext(v)
    
    Source->>MapSubscriber: onComplete()
    MapSubscriber->>Downstream: onComplete()

Tip: Every operator in RxJava follows this exact structure: extend AbstractFlowableWithUpstream, implement subscribeActual to wrap the downstream, and forward signals with transformations. Once you understand FlowableMap, you have the template for reading any of the other 500+ operator files.

Flowable vs Observable: Two Subscription Protocols

The Flowable and Observable types use fundamentally different subscription mechanisms. This design split affects every operator implementation.

Flowable uses the Reactive Streams protocol with Subscription:

  • Subscriber.onSubscribe(Subscription s) — delivers the subscription
  • Subscription.request(long n) — downstream requests N items (backpressure)
  • Subscription.cancel() — downstream cancels

Observable uses Disposable:

  • Observer.onSubscribe(Disposable d) — delivers the disposable
  • Disposable.dispose() — downstream cancels
  • No request() — no backpressure

This means every operator that exists on both types has two implementations: FlowableMap and ObservableMap, FlowableFilter and ObservableFilter, and so on. The Flowable version must handle request(n) forwarding; the Observable version doesn't.

classDiagram
    class Subscription {
        <<interface>>
        +request(long n)
        +cancel()
    }
    class Disposable {
        <<interface>>
        +dispose()
        +isDisposed(): boolean
        +close()
    }
    
    class FlowableMap {
        Uses Subscription
        Handles request(n)
    }
    class ObservableMap {
        Uses Disposable
        No backpressure
    }
    
    Subscription <.. FlowableMap : uses
    Disposable <.. ObservableMap : uses

In RxJava 4.x, Disposable extends AutoCloseable — you can now use try-with-resources to manage subscription lifecycles. This is a small but meaningful ergonomic improvement from Disposable.java#L28-L46.

Error Handling in Operators

Error handling in RxJava operators follows a strict protocol, visible in the FlowableMap example above.

First, fatal errors are rethrown immediately. The Exceptions.throwIfFatal() method at Exceptions.java#L66-L73 checks for VirtualMachineError and LinkageError:

public static void throwIfFatal(@NonNull Throwable t) {
    if (t instanceof VirtualMachineError) {
        throw (VirtualMachineError) t;
    } else if (t instanceof LinkageError) {
        throw (LinkageError) t;
    }
}

Everything else is catchable and gets routed through the reactive error channel. The fail() helper method in BasicFuseableSubscriber handles the standard sequence: cancel upstream, mark as done, call downstream.onError(ex).

When an error has nowhere to go — for example, onError is called after onComplete, or an error occurs during cancellation — RxJava routes it to RxJavaPlugins.onError(). If no error handler is set, it prints to stderr and throws on the current thread. This "undeliverable error" mechanism is the safety net that prevents exceptions from being silently swallowed.

flowchart TD
    EX[Exception in operator] --> TIF{throwIfFatal?}
    TIF -->|VirtualMachineError/LinkageError| RETHROW[Rethrow immediately]
    TIF -->|Other| DELIVER{Can deliver onError?}
    DELIVER -->|Yes| CANCEL[Cancel upstream] --> ONERROR[downstream.onError]
    DELIVER -->|No: already terminal| PLUGIN[RxJavaPlugins.onError]
    PLUGIN --> HANDLER{Handler set?}
    HANDLER -->|Yes| CUSTOM[Custom handler]
    HANDLER -->|No| STDERR[Print + throw on thread]

Assembly and Subscribe Hooks

RxJavaPlugins provides two interception points for every reactive type: assembly hooks and subscribe hooks.

Assembly hooks fire when an operator is created (at chain-building time), not when subscription occurs. The onFlowableAssembly hook at RxJavaPlugins.java#L72-L73 can intercept or replace any Flowable being constructed:

static volatile Function<? super Flowable, ? extends Flowable> onFlowableAssembly;

Subscribe hooks fire when subscribe() is called, before subscribeActual() executes. The onFlowableSubscribe hook at RxJavaPlugins.java#L102-L104 receives both the Flowable and the Subscriber and can replace the latter:

static volatile BiFunction<? super Flowable, ? super Subscriber, ? extends Subscriber> onFlowableSubscribe;

These hooks are the foundation for debugging tools. An assembly hook can capture stack traces at operator creation time — solving the notorious "where was this operator built?" debugging problem. A subscribe hook can wrap every subscriber with instrumentation to log signals.

sequenceDiagram
    participant Dev as Developer
    participant Assembly as Assembly Hook
    participant Chain as Operator Chain
    participant Subscribe as Subscribe Hook
    participant Actual as subscribeActual
    
    Dev->>Chain: source.map(fn).filter(pred)
    Chain->>Assembly: onFlowableAssembly(FlowableMap)
    Assembly-->>Chain: possibly wrapped Flowable
    Chain->>Assembly: onFlowableAssembly(FlowableFilter)
    
    Dev->>Chain: .subscribe(subscriber)
    Chain->>Subscribe: onFlowableSubscribe(flowable, subscriber)
    Subscribe-->>Chain: possibly wrapped subscriber
    Chain->>Actual: subscribeActual(wrappedSubscriber)

Both hook types support all reactive types — there are parallel hooks for Observable, Single, Maybe, Completable, and ParallelFlowable.

What's Next

We've seen how operators are assembled and how subscription cascades through the chain. But we've glossed over the most performance-critical details: how operators handle concurrent signals safely, and how fusion eliminates intermediate queues. In Part 3, we'll dive into the engine room — the atomic drain loop pattern, lock-free SPSC queues, and the QueueFuseable protocol that makes RxJava one of the fastest reactive libraries on the JVM.