订阅链:每个 RxJava 操作符的实际运作机制
前置知识
- ›第 1 篇:架构与代码库导航
- ›Reactive Streams 规范基础(Publisher/Subscriber/Subscription 契约)
- ›装饰器设计模式
订阅链:每个 RxJava 操作符的实际运作机制
你构建过的每一条 RxJava 管道,都遵循着相同的生命周期:先组装操作符链,再通过订阅触发数据流。表面上只是一次 subscribe() 调用,背后却暗藏玄机——它会依次经过插件钩子、将你的 Subscriber 包裹在安全层中,最终向上游反向遍历整条操作符链。理解这一机制,是读懂 RxJava 一切设计的关键所在。
subscribe() 入口
在 Flowable 上调用 subscribe() 时,Flowable.java#L16009-L16031 中的方法会依次执行三个步骤:
public final void subscribe(@NonNull FlowableSubscriber<? super T> subscriber) {
Objects.requireNonNull(subscriber, "subscriber is null");
try {
Subscriber<? super T> flowableSubscriber = RxJavaPlugins.onSubscribe(this, subscriber);
Objects.requireNonNull(flowableSubscriber, "The RxJavaPlugins.onSubscribe hook returned a null...");
subscribeActual(flowableSubscriber);
} catch (NullPointerException e) {
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
// ...
}
}
- 空值检查 subscriber
- 执行订阅钩子 —
RxJavaPlugins.onSubscribe()可以替换或包装 subscriber - 委托给
subscribeActual()— 每个操作符都必须实现的抽象方法
这是模板方法模式的典型应用。subscribe() 方法被声明为 final,操作符无法覆盖它,只能实现 subscribeActual()——这从根本上保证了插件钩子始终会被触发。
sequenceDiagram
participant User
participant Flowable
participant RxJavaPlugins
participant Operator as subscribeActual()
User->>Flowable: subscribe(subscriber)
Flowable->>Flowable: Objects.requireNonNull(subscriber)
Flowable->>RxJavaPlugins: onSubscribe(this, subscriber)
RxJavaPlugins-->>Flowable: possibly wrapped subscriber
Flowable->>Operator: subscribeActual(wrappedSubscriber)
这里有一个微妙的错误处理细节:如果 subscribeActual 抛出异常,RxJava 既无法调用 onError(subscriber 可能还没收到 onSubscribe),也无法调用 onSubscribe(它可能已经被调用过了)。因此,RxJava 会将错误路由到 RxJavaPlugins.onError(),作为"不可投递错误"处理,同时抛出一个 NullPointerException 包装——这是为了满足 Reactive Streams 规范而采用的务实之举。
AbstractFlowableWithUpstream:操作符基类
当你链式调用 source.map(fn).filter(pred).observeOn(scheduler) 时,实际上是在构建一个由装饰器对象组成的链表。每个操作符类都继承自 Flowable,并持有对其上游 source 的引用。
这一模式的基类是 AbstractFlowableWithUpstream:
abstract class AbstractFlowableWithUpstream<T, R> extends Flowable<R>
implements HasUpstreamPublisher<T> {
protected final Flowable<T> source;
AbstractFlowableWithUpstream(Flowable<T> source) {
this.source = Objects.requireNonNull(source, "source is null");
}
}
代码极为精简——只有一个字段和一个构造函数,却奠定了整条装饰器链的基础:
classDiagram
class Flowable~T~ {
<<abstract>>
+subscribe(Subscriber)
#subscribeActual(Subscriber)*
}
class AbstractFlowableWithUpstream~T_R~ {
<<abstract>>
#source: Flowable~T~
}
class FlowableMap~T_U~ {
-mapper: Function
#subscribeActual(Subscriber)
}
class FlowableFilter~T~ {
-predicate: Predicate
#subscribeActual(Subscriber)
}
class FlowableObserveOn~T~ {
-scheduler: Scheduler
#subscribeActual(Subscriber)
}
Flowable <|-- AbstractFlowableWithUpstream
AbstractFlowableWithUpstream <|-- FlowableMap
AbstractFlowableWithUpstream <|-- FlowableFilter
AbstractFlowableWithUpstream <|-- FlowableObserveOn
当你写下 source.map(fn).filter(pred) 时,组装阶段会创建:FlowableFilter(source=FlowableMap(source=originalSource))。订阅时,调用向上游反向传播:FlowableFilter.subscribeActual 订阅 FlowableMap,FlowableMap 再订阅 originalSource。
深入解析:以 FlowableMap 为典型示例
FlowableMap.java#L26-L83 中的 FlowableMap 是最简单的完整操作符实现,却展示了所有关键模式:
public final class FlowableMap<T, U> extends AbstractFlowableWithUpstream<T, U> {
final Function<? super T, ? extends U> mapper;
@Override
protected void subscribeActual(Subscriber<? super U> s) {
if (s instanceof ConditionalSubscriber) {
source.subscribe(new MapConditionalSubscriber<>((ConditionalSubscriber<? super U>)s, mapper));
} else {
source.subscribe(new MapSubscriber<>(s, mapper));
}
}
subscribeActual 将下游 subscriber 包装进 MapSubscriber,再将其订阅到上游 source。这正是装饰器模式的体现——MapSubscriber 拦截 onNext 信号,应用转换函数,并将结果向下游转发。
MapSubscriber.onNext 揭示了完整的处理逻辑:
public void onNext(T t) {
if (done) { return; }
if (sourceMode != NONE) {
downstream.onNext(null); // fusion mode: signal "poll me"
return;
}
U v;
try {
v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
这里有三个关键模式值得关注:
- 完成标志守卫:首先检查
done标志,静默丢弃迟到的信号 - 融合短路:如果处于融合模式(
sourceMode != NONE),向下游转发 null 哨兵值而非映射后的值——下游会直接调用poll()从队列中取数据 - 带
fail()的 try-catch:用户代码(mapper.apply(t))始终被包裹在 try-catch 中。fail()辅助方法会取消上游订阅,并向下游发出onError信号
融合支持同样通过 poll() 实现:
public U poll() throws Throwable {
T t = qs.poll();
return t != null ? Objects.requireNonNull(mapper.apply(t), "...") : null;
}
在融合模式下,map 函数在 poll() 内部执行,而非在 onNext() 中——数据从上游队列经过映射函数直接传递,省去了中间的排队环节。我们将在第 3 篇中深入探讨这一机制。
sequenceDiagram
participant Source
participant MapSubscriber
participant Downstream
Source->>MapSubscriber: onSubscribe(subscription)
MapSubscriber->>Downstream: onSubscribe(mapSubscriber)
Note over Downstream: request(n)
Downstream->>MapSubscriber: request(n)
MapSubscriber->>Source: request(n)
Source->>MapSubscriber: onNext(t)
MapSubscriber->>MapSubscriber: v = mapper.apply(t)
MapSubscriber->>Downstream: onNext(v)
Source->>MapSubscriber: onComplete()
MapSubscriber->>Downstream: onComplete()
提示: RxJava 中的每个操作符都遵循完全相同的结构:继承
AbstractFlowableWithUpstream,在subscribeActual中包装下游 subscriber,并在转换后转发信号。理解了FlowableMap,你就掌握了阅读其他 500+ 个操作符文件的通用模板。
Flowable 与 Observable:两种订阅协议
Flowable 和 Observable 使用了本质不同的订阅机制,这一设计差异影响着每一个操作符的实现。
Flowable 使用带 Subscription 的 Reactive Streams 协议:
Subscriber.onSubscribe(Subscription s)— 传递订阅对象Subscription.request(long n)— 下游请求 N 个元素(背压)Subscription.cancel()— 下游取消订阅
Observable 使用 Disposable:
Observer.onSubscribe(Disposable d)— 传递可处置对象Disposable.dispose()— 下游取消订阅- 无
request()— 不支持背压
因此,同时存在于两种类型上的每个操作符都有两套实现:FlowableMap 和 ObservableMap、FlowableFilter 和 ObservableFilter,以此类推。Flowable 版本必须处理 request(n) 的转发,Observable 版本则不需要。
classDiagram
class Subscription {
<<interface>>
+request(long n)
+cancel()
}
class Disposable {
<<interface>>
+dispose()
+isDisposed(): boolean
+close()
}
class FlowableMap {
Uses Subscription
Handles request(n)
}
class ObservableMap {
Uses Disposable
No backpressure
}
Subscription <.. FlowableMap : uses
Disposable <.. ObservableMap : uses
在 RxJava 4.x 中,Disposable 继承了 AutoCloseable——你现在可以使用 try-with-resources 来管理订阅生命周期。这是来自 Disposable.java#L28-L46 的一项实用改进,虽然细小,却意义明确。
操作符中的错误处理
RxJava 操作符中的错误处理遵循严格的协议,在前面的 FlowableMap 示例中已有所体现。
首先,致命错误会被立即重新抛出。Exceptions.java#L66-L73 中的 Exceptions.throwIfFatal() 方法会检查 VirtualMachineError 和 LinkageError:
public static void throwIfFatal(@NonNull Throwable t) {
if (t instanceof VirtualMachineError) {
throw (VirtualMachineError) t;
} else if (t instanceof LinkageError) {
throw (LinkageError) t;
}
}
其他所有异常都可被捕获,并通过响应式错误通道进行路由。BasicFuseableSubscriber 中的 fail() 辅助方法负责执行标准流程:取消上游订阅、标记为完成、调用 downstream.onError(ex)。
当错误无处可去时——例如,onError 在 onComplete 之后被调用,或在取消订阅期间发生了错误——RxJava 会将其路由到 RxJavaPlugins.onError()。如果未设置错误处理器,则会打印到 stderr 并在当前线程上抛出。这套"不可投递错误"机制是防止异常被静默吞掉的最后一道防线。
flowchart TD
EX[Exception in operator] --> TIF{throwIfFatal?}
TIF -->|VirtualMachineError/LinkageError| RETHROW[Rethrow immediately]
TIF -->|Other| DELIVER{Can deliver onError?}
DELIVER -->|Yes| CANCEL[Cancel upstream] --> ONERROR[downstream.onError]
DELIVER -->|No: already terminal| PLUGIN[RxJavaPlugins.onError]
PLUGIN --> HANDLER{Handler set?}
HANDLER -->|Yes| CUSTOM[Custom handler]
HANDLER -->|No| STDERR[Print + throw on thread]
组装钩子与订阅钩子
RxJavaPlugins 为每种响应式类型提供了两个拦截点:组装钩子和订阅钩子。
组装钩子在操作符创建时触发(即构建链的阶段),而非在订阅时。RxJavaPlugins.java#L72-L73 中的 onFlowableAssembly 钩子可以拦截或替换任何正在构造的 Flowable:
static volatile Function<? super Flowable, ? extends Flowable> onFlowableAssembly;
订阅钩子在 subscribe() 被调用时触发,早于 subscribeActual() 的执行。RxJavaPlugins.java#L102-L104 中的 onFlowableSubscribe 钩子同时接收 Flowable 和 Subscriber,并可替换后者:
static volatile BiFunction<? super Flowable, ? super Subscriber, ? extends Subscriber> onFlowableSubscribe;
这两个钩子是调试工具的基础。组装钩子可以在操作符创建时捕获堆栈跟踪,完美解决"这个操作符是在哪里构建的?"这一经典调试难题。订阅钩子则可以为每个 subscriber 包装埋点逻辑,用于记录信号。
sequenceDiagram
participant Dev as Developer
participant Assembly as Assembly Hook
participant Chain as Operator Chain
participant Subscribe as Subscribe Hook
participant Actual as subscribeActual
Dev->>Chain: source.map(fn).filter(pred)
Chain->>Assembly: onFlowableAssembly(FlowableMap)
Assembly-->>Chain: possibly wrapped Flowable
Chain->>Assembly: onFlowableAssembly(FlowableFilter)
Dev->>Chain: .subscribe(subscriber)
Chain->>Subscribe: onFlowableSubscribe(flowable, subscriber)
Subscribe-->>Chain: possibly wrapped subscriber
Chain->>Actual: subscribeActual(wrappedSubscriber)
两类钩子均支持所有响应式类型——Observable、Single、Maybe、Completable 和 ParallelFlowable 都有对应的并行钩子。
下一步
我们已经了解了操作符的组装方式,以及订阅如何在链中级联传播。但还有两个性能关键细节尚未触及:操作符如何安全地处理并发信号,以及融合机制如何消除中间队列。在第 3 篇中,我们将深入引擎室——探索原子 drain 循环模式、无锁 SPSC 队列,以及使 RxJava 跻身 JVM 上最快响应式库之列的 QueueFuseable 协议。