背压:Flowable 如何防止高速生产者压垮慢速消费者
前置知识
- ›第 2 篇:subscribe-chain-and-operator-anatomy
- ›第 3 篇:drain-loops-and-operator-fusion
- ›Reactive Streams 规范(Publisher/Subscriber/Subscription 契约)
背压:Flowable 如何防止高速生产者压垮慢速消费者
生产者与消费者速度不匹配,是所有异步系统迟早都要面对的难题。数据库游标产出数据行的速度,往往远超网络套接字的传输能力。一旦缺乏流量控制,中间缓冲区将无限膨胀,直至 JVM 内存耗尽。背压(Backpressure)正是 Reactive Streams 给出的解决方案:由消费者主动请求数据,生产者严格遵守这一限制。RxJava 的 Flowable 实现了这套协议,而其背后的实现细节远比概念本身复杂得多。
request(n) 协议与背压的意义
Reactive Streams 规范定义了一种推拉混合协议:
Publisher.subscribe(Subscriber)— 建立连接Subscriber.onSubscribe(Subscription)— 传递控制句柄Subscription.request(n)— 订阅者请求最多n个数据项Publisher最多发出n个onNext信号- 消费者处理完毕后继续请求更多数据
sequenceDiagram
participant Producer as Flowable (Publisher)
participant Sub as Subscriber
Producer->>Sub: onSubscribe(subscription)
Sub->>Producer: request(128)
loop Up to 128 items
Producer->>Sub: onNext(item)
end
Sub->>Producer: request(96)
Note over Sub: Requests more after consuming ~75%
loop Up to 96 items
Producer->>Sub: onNext(item)
end
Producer->>Sub: onComplete()
Observable 有意不实现这套协议。如第 2 篇所述,它使用 Disposable 而非 Subscription,没有 request(),也没有流量控制。这正是两种类型分开设计的原因:Observable 适用于数据量天然有界或由 UI 驱动的场景,在这些场景中背压的开销是不必要的。
提示: 如果你不确定该用
Flowable还是Observable,可以问自己一个问题:"这个数据源产出数据的速度,会不会超过消费者的处理速度?"如果是,选Flowable;如果数据源是事件驱动的(点击、固定频率的传感器读数等),选Observable。
BackpressureHelper:原子化的请求计数
下游的 request(n) 调用可能来自任意线程,因此必须以原子方式进行累加。BackpressureHelper 提供了基于 CAS 循环的工具方法:
public static long add(AtomicLong requested, long n) {
for (;;) {
long r = requested.get();
if (r == Long.MAX_VALUE) {
return Long.MAX_VALUE;
}
long u = addCap(r, n);
if (requested.compareAndSet(r, u)) {
return r;
}
}
}
为什么不直接用 AtomicLong.addAndGet(n)?原因有两点:
-
Long.MAX_VALUE 上限保护:在 Reactive Streams 规范中,
request(Long.MAX_VALUE)意味着"无限制请求"。一旦计数器达到Long.MAX_VALUE,就必须保持在这个值——继续追加请求不能导致数值溢出成负数。addCap方法专门处理这一情况。 -
返回旧值:operator 需要判断这次是否是第一次请求(即旧值是否为 0),以决定是否启动 drain 循环。
addAndGet返回的是新值,在这个场景下用处不大。
addCap 方法本身的实现也很优雅:
public static long addCap(long a, long b) {
long u = a + b;
if (u < 0L) {
return Long.MAX_VALUE;
}
return u;
}
通过符号位检测溢出——如果两个正数相加结果为负,说明发生了溢出。
produced() 方法从请求计数中减去已消费的数据量,同样保留 Long.MAX_VALUE 的特殊语义:
public static long produced(AtomicLong requested, long n) {
for (;;) {
long current = requested.get();
if (current == Long.MAX_VALUE) {
return Long.MAX_VALUE;
}
long update = current - n;
if (update < 0L) {
RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update));
update = 0L;
}
if (requested.compareAndSet(current, update)) {
return update;
}
}
}
flowchart TD
REQ[request n arrives] --> GET[r = requested.get]
GET --> MAX{r == MAX_VALUE?}
MAX -->|Yes| SKIP[Return MAX_VALUE - unbounded mode]
MAX -->|No| ADD[u = addCap r n]
ADD --> CAS{compareAndSet r u}
CAS -->|Success| DONE[Return r - previous value]
CAS -->|Fail: concurrent update| GET
SubscriptionHelper:校验与 CANCELLED 哨兵值
SubscriptionHelper 是一个只有单个枚举值 CANCELLED 的枚举类,充当原子哨兵:
public enum SubscriptionHelper implements Subscription {
CANCELLED;
@Override public void request(long n) { /* deliberately ignored */ }
@Override public void cancel() { /* deliberately ignored */ }
}
这个哨兵值使原子取消操作变得安全。operator 将上游的 Subscription 存储在 AtomicReference<Subscription> 中,取消时的逻辑如下:
public static boolean cancel(AtomicReference<Subscription> field) {
Subscription current = field.get();
if (current != CANCELLED) {
current = field.getAndSet(CANCELLED);
if (current != CANCELLED) {
if (current != null) { current.cancel(); }
return true;
}
}
return false;
}
getAndSet(CANCELLED) 以原子方式将哨兵值写入,并返回之前存储的值。如果返回值已经是 CANCELLED,说明另一个线程抢先完成了取消;如果是真正的 subscription,则调用其 cancel()。整个过程无需加锁,线程安全。
setOnce 方法处理 subscription 只应被设置一次的常见场景:
public static boolean setOnce(AtomicReference<Subscription> field, Subscription s) {
if (!field.compareAndSet(null, s)) {
s.cancel();
if (field.get() != CANCELLED) {
reportSubscriptionSet(); // Protocol violation
}
return false;
}
return true;
}
BackpressureStrategy 与 FlowableCreate 的 Emitter
通过 Flowable.create() 以命令式代码创建 Flowable 时,必须选择一种 BackpressureStrategy。BackpressureStrategy.java 中的枚举定义了五个选项:
| 策略 | 行为 | 风险 |
|---|---|---|
MISSING |
不处理背压,交由下游自行应对 | 若下游未处理则抛出 MissingBackpressureException |
ERROR |
需求超出时发出错误信号 | 安全但会中断流 |
BUFFER |
使用无界缓冲区 | 生产者速度远快于消费者时可能 OOM |
DROP |
无需求时丢弃数据项 | 数据丢失 |
LATEST |
只保留最新的一项数据 | 数据丢失,但始终持有"最新值" |
FlowableCreate.java#L40-L74 针对每种策略创建不同的 Emitter 子类:
public void subscribeActual(Subscriber<? super T> t) {
BaseEmitter<T> emitter;
switch (backpressure) {
case MISSING: emitter = new MissingEmitter<>(t); break;
case ERROR: emitter = new ErrorAsyncEmitter<>(t); break;
case DROP: emitter = new DropAsyncEmitter<>(t); break;
case LATEST: emitter = new LatestAsyncEmitter<>(t); break;
default: emitter = new BufferAsyncEmitter<>(t, bufferSize()); break;
}
t.onSubscribe(emitter);
source.subscribe(emitter);
}
flowchart TD
FC[FlowableCreate] --> SW{BackpressureStrategy?}
SW -->|MISSING| ME[MissingEmitter<br>No backpressure handling]
SW -->|ERROR| EE[ErrorAsyncEmitter<br>Throws MissingBackpressureException]
SW -->|DROP| DE[DropAsyncEmitter<br>Discards when no demand]
SW -->|LATEST| LE[LatestAsyncEmitter<br>Keeps newest in AtomicReference]
SW -->|BUFFER| BE[BufferAsyncEmitter<br>Unbounded SpscLinkedArrayQueue]
ME --> DS[downstream]
EE --> DS
DE --> DS
LE --> DS
BE --> DS
其中 LatestAsyncEmitter 的设计尤为精妙——它使用一个 AtomicReference 持有最新的值。当新的 onNext 到来但下游尚未请求数据时,它会原子地替换掉已存储的值。一旦下游发出需求,drain 循环便取出当前存储的最新值进行处理。
FlowableObserveOn 中的 Prefetch 与 Limit
prefetch/limit 模式是跨线程 operator(如 observeOn)管理背压的核心手段,它避免了逐条数据来回请求带来的额外开销。
在 FlowableObserveOn.java#L96-L101 中:
this.prefetch = prefetch;
this.limit = prefetch - (prefetch >> 2); // 75% refill threshold
默认 prefetch 为 128,因此 limit = 128 - 32 = 96。整体流程如下:
- 初始向上游请求
prefetch(128)个数据项 - 从队列中消费数据项,并累计已消费数量
- 当消费量达到
limit(96)时,再向上游请求limit个数据项 - 重置消费计数器
这形成了一个 75% 补货阈值:当缓冲区消费了 75% 的数据时,立即请求补充同等数量的新数据。这是以下几点之间的平衡:
- 吞吐量:批量请求减少了
request()调用的开销 - 内存:缓冲区容量始终不超过
prefetch - 延迟:在 75% 时触发补货,意味着新数据到来前缓冲区还有 25% 可继续处理,不会出现空窗期
sequenceDiagram
participant Upstream
participant ObserveOn
participant Downstream
ObserveOn->>Upstream: request(128)
Upstream->>ObserveOn: 128 items
loop Drain items 1-96
ObserveOn->>Downstream: onNext(item)
end
Note over ObserveOn: consumed=96 ≥ limit=96
ObserveOn->>Upstream: request(96)
loop Drain items 97-128 + new items
ObserveOn->>Downstream: onNext(item)
end
FlowableFlatMap:跨 N 个内部数据源的请求计数
FlowableFlatMap 大概是 RxJava 中最复杂的 operator,而背压处理正是其复杂性的根源所在。MergeSubscriber 需要在动态数量的内部数据源之间协调请求计数:
static final class MergeSubscriber<T, U> extends AtomicInteger
implements FlowableSubscriber<T>, Subscription {
final int maxConcurrency;
final int bufferSize;
volatile SimplePlainQueue<U> queue;
final AtomicThrowable errors = new AtomicThrowable();
final AtomicReference<InnerSubscriber<?, ?>[]> subscribers;
final AtomicLong requested = new AtomicLong();
long uniqueId;
int scalarEmitted;
final int scalarLimit;
复杂性来源于多个并发流的交织:
- 外部数据源发出数据项,触发内部 subscription 的创建
maxConcurrency限制同时活跃的内部数据源数量- 所有内部数据源的数据项被合并到单一输出流
- 下游的
request(n)需要分配给所有活跃的内部数据源 - 当某个内部数据源完成时,可从缓冲的外部数据项中订阅新的内部数据源
FlowableFlatMap.java#L107-L119 中的 onSubscribe 方法展示了初始请求策略:
public void onSubscribe(Subscription s) {
if (SubscriptionHelper.validate(this.upstream, s)) {
this.upstream = s;
downstream.onSubscribe(this);
if (!cancelled) {
if (maxConcurrency == Integer.MAX_VALUE) {
s.request(Long.MAX_VALUE); // Unbounded: get everything
} else {
s.request(maxConcurrency); // Bounded: get up to maxConcurrency items
}
}
}
}
第 3 篇介绍的标量优化在这里发挥了关键作用——对于 Flowable.just() 这类内部数据源,flatMap 会跳过整套 subscription 机制。它将标量数据项累积起来,并批量发出补货请求:
int scalarEmitted;
final int scalarLimit;
// scalarLimit = Math.max(1, maxConcurrency >> 1)
当 scalarEmitted 达到 scalarLimit 时,一次性请求 scalarLimit 个新的外部数据项。这样就将 request() 的 CAS 开销分摊到了多次标量数据的处理中。
flowchart TD
OUTER[Outer Flowable] -->|"request(maxConcurrency)"| MS[MergeSubscriber]
MS -->|"onNext(T)"| MAP[mapper.apply T]
MAP -->|"Scalar?"| SC{ScalarSupplier?}
SC -->|Yes| EMIT[Emit directly + scalarEmitted++]
SC -->|No| INNER[Subscribe InnerSubscriber]
INNER --> IS1[InnerSubscriber 1]
INNER --> IS2[InnerSubscriber 2]
INNER --> ISN[InnerSubscriber N]
IS1 -->|"items"| DRAIN[Drain loop merges all]
IS2 -->|"items"| DRAIN
ISN -->|"items"| DRAIN
DRAIN -->|"onNext"| DOWN[Downstream]
DOWN -->|"request n"| DRAIN
EMIT --> DOWN
MergeSubscriber 中的 drain 循环是整个代码库中最复杂的一个。它轮询所有活跃的内部 subscriber(通过原子数组追踪),从各自的队列中读取数据,检查全局错误累加器(AtomicThrowable),处理取消逻辑,并维护 requested 计数器——全程无锁。深入理解这一个类,可以说是 RxJava 内部机制研究的必经之路。
下一篇
至此,我们已经全面覆盖了 RxJava 的内部运行机制:operator 模式、drain 循环、fusion、scheduler,以及背压。第 6 篇将把目光转向 4.x 的新特性——带来基于拉取的异步枚举能力的 Streamable 类型、用于虚拟线程上命令式代码的 VirtualGenerator 与 VirtualEmitter,以及 sealed types 和 records 等现代 Java 特性如何重塑 API 设计。