Read OSS

订阅链:每个 RxJava 操作符的实际运作机制

中级

前置知识

  • 第 1 篇:架构与代码库导航
  • Reactive Streams 规范基础(Publisher/Subscriber/Subscription 契约)
  • 装饰器设计模式

订阅链:每个 RxJava 操作符的实际运作机制

你构建过的每一条 RxJava 管道,都遵循着相同的生命周期:先组装操作符链,再通过订阅触发数据流。表面上只是一次 subscribe() 调用,背后却暗藏玄机——它会依次经过插件钩子、将你的 Subscriber 包裹在安全层中,最终向上游反向遍历整条操作符链。理解这一机制,是读懂 RxJava 一切设计的关键所在。

subscribe() 入口

Flowable 上调用 subscribe() 时,Flowable.java#L16009-L16031 中的方法会依次执行三个步骤:

public final void subscribe(@NonNull FlowableSubscriber<? super T> subscriber) {
    Objects.requireNonNull(subscriber, "subscriber is null");
    try {
        Subscriber<? super T> flowableSubscriber = RxJavaPlugins.onSubscribe(this, subscriber);
        Objects.requireNonNull(flowableSubscriber, "The RxJavaPlugins.onSubscribe hook returned a null...");
        subscribeActual(flowableSubscriber);
    } catch (NullPointerException e) {
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        RxJavaPlugins.onError(e);
        // ...
    }
}
  1. 空值检查 subscriber
  2. 执行订阅钩子RxJavaPlugins.onSubscribe() 可以替换或包装 subscriber
  3. 委托给 subscribeActual() — 每个操作符都必须实现的抽象方法

这是模板方法模式的典型应用。subscribe() 方法被声明为 final,操作符无法覆盖它,只能实现 subscribeActual()——这从根本上保证了插件钩子始终会被触发。

sequenceDiagram
    participant User
    participant Flowable
    participant RxJavaPlugins
    participant Operator as subscribeActual()
    
    User->>Flowable: subscribe(subscriber)
    Flowable->>Flowable: Objects.requireNonNull(subscriber)
    Flowable->>RxJavaPlugins: onSubscribe(this, subscriber)
    RxJavaPlugins-->>Flowable: possibly wrapped subscriber
    Flowable->>Operator: subscribeActual(wrappedSubscriber)

这里有一个微妙的错误处理细节:如果 subscribeActual 抛出异常,RxJava 既无法调用 onError(subscriber 可能还没收到 onSubscribe),也无法调用 onSubscribe(它可能已经被调用过了)。因此,RxJava 会将错误路由到 RxJavaPlugins.onError(),作为"不可投递错误"处理,同时抛出一个 NullPointerException 包装——这是为了满足 Reactive Streams 规范而采用的务实之举。

AbstractFlowableWithUpstream:操作符基类

当你链式调用 source.map(fn).filter(pred).observeOn(scheduler) 时,实际上是在构建一个由装饰器对象组成的链表。每个操作符类都继承自 Flowable,并持有对其上游 source 的引用。

这一模式的基类是 AbstractFlowableWithUpstream

abstract class AbstractFlowableWithUpstream<T, R> extends Flowable<R> 
    implements HasUpstreamPublisher<T> {
    
    protected final Flowable<T> source;

    AbstractFlowableWithUpstream(Flowable<T> source) {
        this.source = Objects.requireNonNull(source, "source is null");
    }
}

代码极为精简——只有一个字段和一个构造函数,却奠定了整条装饰器链的基础:

classDiagram
    class Flowable~T~ {
        <<abstract>>
        +subscribe(Subscriber)
        #subscribeActual(Subscriber)*
    }
    class AbstractFlowableWithUpstream~T_R~ {
        <<abstract>>
        #source: Flowable~T~
    }
    class FlowableMap~T_U~ {
        -mapper: Function
        #subscribeActual(Subscriber)
    }
    class FlowableFilter~T~ {
        -predicate: Predicate
        #subscribeActual(Subscriber)
    }
    class FlowableObserveOn~T~ {
        -scheduler: Scheduler
        #subscribeActual(Subscriber)
    }
    
    Flowable <|-- AbstractFlowableWithUpstream
    AbstractFlowableWithUpstream <|-- FlowableMap
    AbstractFlowableWithUpstream <|-- FlowableFilter
    AbstractFlowableWithUpstream <|-- FlowableObserveOn

当你写下 source.map(fn).filter(pred) 时,组装阶段会创建:FlowableFilter(source=FlowableMap(source=originalSource))。订阅时,调用向上游反向传播:FlowableFilter.subscribeActual 订阅 FlowableMapFlowableMap 再订阅 originalSource

深入解析:以 FlowableMap 为典型示例

FlowableMap.java#L26-L83 中的 FlowableMap 是最简单的完整操作符实现,却展示了所有关键模式:

public final class FlowableMap<T, U> extends AbstractFlowableWithUpstream<T, U> {
    final Function<? super T, ? extends U> mapper;
    
    @Override
    protected void subscribeActual(Subscriber<? super U> s) {
        if (s instanceof ConditionalSubscriber) {
            source.subscribe(new MapConditionalSubscriber<>((ConditionalSubscriber<? super U>)s, mapper));
        } else {
            source.subscribe(new MapSubscriber<>(s, mapper));
        }
    }

subscribeActual 将下游 subscriber 包装进 MapSubscriber,再将其订阅到上游 source。这正是装饰器模式的体现——MapSubscriber 拦截 onNext 信号,应用转换函数,并将结果向下游转发。

MapSubscriber.onNext 揭示了完整的处理逻辑:

public void onNext(T t) {
    if (done) { return; }
    if (sourceMode != NONE) {
        downstream.onNext(null);  // fusion mode: signal "poll me"
        return;
    }
    U v;
    try {
        v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
    } catch (Throwable ex) {
        fail(ex);
        return;
    }
    downstream.onNext(v);
}

这里有三个关键模式值得关注:

  1. 完成标志守卫:首先检查 done 标志,静默丢弃迟到的信号
  2. 融合短路:如果处于融合模式(sourceMode != NONE),向下游转发 null 哨兵值而非映射后的值——下游会直接调用 poll() 从队列中取数据
  3. fail() 的 try-catch:用户代码(mapper.apply(t))始终被包裹在 try-catch 中。fail() 辅助方法会取消上游订阅,并向下游发出 onError 信号

融合支持同样通过 poll() 实现:

public U poll() throws Throwable {
    T t = qs.poll();
    return t != null ? Objects.requireNonNull(mapper.apply(t), "...") : null;
}

在融合模式下,map 函数在 poll() 内部执行,而非在 onNext() 中——数据从上游队列经过映射函数直接传递,省去了中间的排队环节。我们将在第 3 篇中深入探讨这一机制。

sequenceDiagram
    participant Source
    participant MapSubscriber
    participant Downstream
    
    Source->>MapSubscriber: onSubscribe(subscription)
    MapSubscriber->>Downstream: onSubscribe(mapSubscriber)
    
    Note over Downstream: request(n)
    Downstream->>MapSubscriber: request(n)
    MapSubscriber->>Source: request(n)
    
    Source->>MapSubscriber: onNext(t)
    MapSubscriber->>MapSubscriber: v = mapper.apply(t)
    MapSubscriber->>Downstream: onNext(v)
    
    Source->>MapSubscriber: onComplete()
    MapSubscriber->>Downstream: onComplete()

提示: RxJava 中的每个操作符都遵循完全相同的结构:继承 AbstractFlowableWithUpstream,在 subscribeActual 中包装下游 subscriber,并在转换后转发信号。理解了 FlowableMap,你就掌握了阅读其他 500+ 个操作符文件的通用模板。

Flowable 与 Observable:两种订阅协议

Flowable 和 Observable 使用了本质不同的订阅机制,这一设计差异影响着每一个操作符的实现。

Flowable 使用带 Subscription 的 Reactive Streams 协议:

  • Subscriber.onSubscribe(Subscription s) — 传递订阅对象
  • Subscription.request(long n) — 下游请求 N 个元素(背压)
  • Subscription.cancel() — 下游取消订阅

Observable 使用 Disposable

  • Observer.onSubscribe(Disposable d) — 传递可处置对象
  • Disposable.dispose() — 下游取消订阅
  • request() — 不支持背压

因此,同时存在于两种类型上的每个操作符都有两套实现FlowableMapObservableMapFlowableFilterObservableFilter,以此类推。Flowable 版本必须处理 request(n) 的转发,Observable 版本则不需要。

classDiagram
    class Subscription {
        <<interface>>
        +request(long n)
        +cancel()
    }
    class Disposable {
        <<interface>>
        +dispose()
        +isDisposed(): boolean
        +close()
    }
    
    class FlowableMap {
        Uses Subscription
        Handles request(n)
    }
    class ObservableMap {
        Uses Disposable
        No backpressure
    }
    
    Subscription <.. FlowableMap : uses
    Disposable <.. ObservableMap : uses

在 RxJava 4.x 中,Disposable 继承了 AutoCloseable——你现在可以使用 try-with-resources 来管理订阅生命周期。这是来自 Disposable.java#L28-L46 的一项实用改进,虽然细小,却意义明确。

操作符中的错误处理

RxJava 操作符中的错误处理遵循严格的协议,在前面的 FlowableMap 示例中已有所体现。

首先,致命错误会被立即重新抛出Exceptions.java#L66-L73 中的 Exceptions.throwIfFatal() 方法会检查 VirtualMachineErrorLinkageError

public static void throwIfFatal(@NonNull Throwable t) {
    if (t instanceof VirtualMachineError) {
        throw (VirtualMachineError) t;
    } else if (t instanceof LinkageError) {
        throw (LinkageError) t;
    }
}

其他所有异常都可被捕获,并通过响应式错误通道进行路由。BasicFuseableSubscriber 中的 fail() 辅助方法负责执行标准流程:取消上游订阅、标记为完成、调用 downstream.onError(ex)

当错误无处可去时——例如,onErroronComplete 之后被调用,或在取消订阅期间发生了错误——RxJava 会将其路由到 RxJavaPlugins.onError()。如果未设置错误处理器,则会打印到 stderr 并在当前线程上抛出。这套"不可投递错误"机制是防止异常被静默吞掉的最后一道防线。

flowchart TD
    EX[Exception in operator] --> TIF{throwIfFatal?}
    TIF -->|VirtualMachineError/LinkageError| RETHROW[Rethrow immediately]
    TIF -->|Other| DELIVER{Can deliver onError?}
    DELIVER -->|Yes| CANCEL[Cancel upstream] --> ONERROR[downstream.onError]
    DELIVER -->|No: already terminal| PLUGIN[RxJavaPlugins.onError]
    PLUGIN --> HANDLER{Handler set?}
    HANDLER -->|Yes| CUSTOM[Custom handler]
    HANDLER -->|No| STDERR[Print + throw on thread]

组装钩子与订阅钩子

RxJavaPlugins 为每种响应式类型提供了两个拦截点:组装钩子订阅钩子

组装钩子在操作符创建时触发(即构建链的阶段),而非在订阅时。RxJavaPlugins.java#L72-L73 中的 onFlowableAssembly 钩子可以拦截或替换任何正在构造的 Flowable:

static volatile Function<? super Flowable, ? extends Flowable> onFlowableAssembly;

订阅钩子在 subscribe() 被调用时触发,早于 subscribeActual() 的执行。RxJavaPlugins.java#L102-L104 中的 onFlowableSubscribe 钩子同时接收 Flowable 和 Subscriber,并可替换后者:

static volatile BiFunction<? super Flowable, ? super Subscriber, ? extends Subscriber> onFlowableSubscribe;

这两个钩子是调试工具的基础。组装钩子可以在操作符创建时捕获堆栈跟踪,完美解决"这个操作符是在哪里构建的?"这一经典调试难题。订阅钩子则可以为每个 subscriber 包装埋点逻辑,用于记录信号。

sequenceDiagram
    participant Dev as Developer
    participant Assembly as Assembly Hook
    participant Chain as Operator Chain
    participant Subscribe as Subscribe Hook
    participant Actual as subscribeActual
    
    Dev->>Chain: source.map(fn).filter(pred)
    Chain->>Assembly: onFlowableAssembly(FlowableMap)
    Assembly-->>Chain: possibly wrapped Flowable
    Chain->>Assembly: onFlowableAssembly(FlowableFilter)
    
    Dev->>Chain: .subscribe(subscriber)
    Chain->>Subscribe: onFlowableSubscribe(flowable, subscriber)
    Subscribe-->>Chain: possibly wrapped subscriber
    Chain->>Actual: subscribeActual(wrappedSubscriber)

两类钩子均支持所有响应式类型——Observable、Single、Maybe、Completable 和 ParallelFlowable 都有对应的并行钩子。

下一步

我们已经了解了操作符的组装方式,以及订阅如何在链中级联传播。但还有两个性能关键细节尚未触及:操作符如何安全地处理并发信号,以及融合机制如何消除中间队列。在第 3 篇中,我们将深入引擎室——探索原子 drain 循环模式、无锁 SPSC 队列,以及使 RxJava 跻身 JVM 上最快响应式库之列的 QueueFuseable 协议。