Read OSS

背压:Flowable 如何防止高速生产者压垮慢速消费者

高级

前置知识

  • 第 2 篇:subscribe-chain-and-operator-anatomy
  • 第 3 篇:drain-loops-and-operator-fusion
  • Reactive Streams 规范(Publisher/Subscriber/Subscription 契约)

背压:Flowable 如何防止高速生产者压垮慢速消费者

生产者与消费者速度不匹配,是所有异步系统迟早都要面对的难题。数据库游标产出数据行的速度,往往远超网络套接字的传输能力。一旦缺乏流量控制,中间缓冲区将无限膨胀,直至 JVM 内存耗尽。背压(Backpressure)正是 Reactive Streams 给出的解决方案:由消费者主动请求数据,生产者严格遵守这一限制。RxJava 的 Flowable 实现了这套协议,而其背后的实现细节远比概念本身复杂得多。

request(n) 协议与背压的意义

Reactive Streams 规范定义了一种推拉混合协议:

  1. Publisher.subscribe(Subscriber) — 建立连接
  2. Subscriber.onSubscribe(Subscription) — 传递控制句柄
  3. Subscription.request(n) — 订阅者请求最多 n 个数据项
  4. Publisher 最多发出 nonNext 信号
  5. 消费者处理完毕后继续请求更多数据
sequenceDiagram
    participant Producer as Flowable (Publisher)
    participant Sub as Subscriber
    
    Producer->>Sub: onSubscribe(subscription)
    Sub->>Producer: request(128)
    
    loop Up to 128 items
        Producer->>Sub: onNext(item)
    end
    
    Sub->>Producer: request(96)
    Note over Sub: Requests more after consuming ~75%
    
    loop Up to 96 items
        Producer->>Sub: onNext(item)
    end
    
    Producer->>Sub: onComplete()

Observable 有意不实现这套协议。如第 2 篇所述,它使用 Disposable 而非 Subscription,没有 request(),也没有流量控制。这正是两种类型分开设计的原因:Observable 适用于数据量天然有界或由 UI 驱动的场景,在这些场景中背压的开销是不必要的。

提示: 如果你不确定该用 Flowable 还是 Observable,可以问自己一个问题:"这个数据源产出数据的速度,会不会超过消费者的处理速度?"如果是,选 Flowable;如果数据源是事件驱动的(点击、固定频率的传感器读数等),选 Observable

BackpressureHelper:原子化的请求计数

下游的 request(n) 调用可能来自任意线程,因此必须以原子方式进行累加。BackpressureHelper 提供了基于 CAS 循环的工具方法:

public static long add(AtomicLong requested, long n) {
    for (;;) {
        long r = requested.get();
        if (r == Long.MAX_VALUE) {
            return Long.MAX_VALUE;
        }
        long u = addCap(r, n);
        if (requested.compareAndSet(r, u)) {
            return r;
        }
    }
}

为什么不直接用 AtomicLong.addAndGet(n)?原因有两点:

  1. Long.MAX_VALUE 上限保护:在 Reactive Streams 规范中,request(Long.MAX_VALUE) 意味着"无限制请求"。一旦计数器达到 Long.MAX_VALUE,就必须保持在这个值——继续追加请求不能导致数值溢出成负数。addCap 方法专门处理这一情况。

  2. 返回旧值:operator 需要判断这次是否是第一次请求(即旧值是否为 0),以决定是否启动 drain 循环。addAndGet 返回的是新值,在这个场景下用处不大。

addCap 方法本身的实现也很优雅:

public static long addCap(long a, long b) {
    long u = a + b;
    if (u < 0L) {
        return Long.MAX_VALUE;
    }
    return u;
}

通过符号位检测溢出——如果两个正数相加结果为负,说明发生了溢出。

produced() 方法从请求计数中减去已消费的数据量,同样保留 Long.MAX_VALUE 的特殊语义:

public static long produced(AtomicLong requested, long n) {
    for (;;) {
        long current = requested.get();
        if (current == Long.MAX_VALUE) {
            return Long.MAX_VALUE;
        }
        long update = current - n;
        if (update < 0L) {
            RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update));
            update = 0L;
        }
        if (requested.compareAndSet(current, update)) {
            return update;
        }
    }
}
flowchart TD
    REQ[request n arrives] --> GET[r = requested.get]
    GET --> MAX{r == MAX_VALUE?}
    MAX -->|Yes| SKIP[Return MAX_VALUE - unbounded mode]
    MAX -->|No| ADD[u = addCap r n]
    ADD --> CAS{compareAndSet r u}
    CAS -->|Success| DONE[Return r - previous value]
    CAS -->|Fail: concurrent update| GET

SubscriptionHelper:校验与 CANCELLED 哨兵值

SubscriptionHelper 是一个只有单个枚举值 CANCELLED 的枚举类,充当原子哨兵:

public enum SubscriptionHelper implements Subscription {
    CANCELLED;
    
    @Override public void request(long n) { /* deliberately ignored */ }
    @Override public void cancel() { /* deliberately ignored */ }
}

这个哨兵值使原子取消操作变得安全。operator 将上游的 Subscription 存储在 AtomicReference<Subscription> 中,取消时的逻辑如下:

public static boolean cancel(AtomicReference<Subscription> field) {
    Subscription current = field.get();
    if (current != CANCELLED) {
        current = field.getAndSet(CANCELLED);
        if (current != CANCELLED) {
            if (current != null) { current.cancel(); }
            return true;
        }
    }
    return false;
}

getAndSet(CANCELLED) 以原子方式将哨兵值写入,并返回之前存储的值。如果返回值已经是 CANCELLED,说明另一个线程抢先完成了取消;如果是真正的 subscription,则调用其 cancel()。整个过程无需加锁,线程安全。

setOnce 方法处理 subscription 只应被设置一次的常见场景:

public static boolean setOnce(AtomicReference<Subscription> field, Subscription s) {
    if (!field.compareAndSet(null, s)) {
        s.cancel();
        if (field.get() != CANCELLED) {
            reportSubscriptionSet();  // Protocol violation
        }
        return false;
    }
    return true;
}

BackpressureStrategy 与 FlowableCreate 的 Emitter

通过 Flowable.create() 以命令式代码创建 Flowable 时,必须选择一种 BackpressureStrategyBackpressureStrategy.java 中的枚举定义了五个选项:

策略 行为 风险
MISSING 不处理背压,交由下游自行应对 若下游未处理则抛出 MissingBackpressureException
ERROR 需求超出时发出错误信号 安全但会中断流
BUFFER 使用无界缓冲区 生产者速度远快于消费者时可能 OOM
DROP 无需求时丢弃数据项 数据丢失
LATEST 只保留最新的一项数据 数据丢失,但始终持有"最新值"

FlowableCreate.java#L40-L74 针对每种策略创建不同的 Emitter 子类:

public void subscribeActual(Subscriber<? super T> t) {
    BaseEmitter<T> emitter;
    switch (backpressure) {
        case MISSING:  emitter = new MissingEmitter<>(t); break;
        case ERROR:    emitter = new ErrorAsyncEmitter<>(t); break;
        case DROP:     emitter = new DropAsyncEmitter<>(t); break;
        case LATEST:   emitter = new LatestAsyncEmitter<>(t); break;
        default:       emitter = new BufferAsyncEmitter<>(t, bufferSize()); break;
    }
    t.onSubscribe(emitter);
    source.subscribe(emitter);
}
flowchart TD
    FC[FlowableCreate] --> SW{BackpressureStrategy?}
    SW -->|MISSING| ME[MissingEmitter<br>No backpressure handling]
    SW -->|ERROR| EE[ErrorAsyncEmitter<br>Throws MissingBackpressureException]
    SW -->|DROP| DE[DropAsyncEmitter<br>Discards when no demand]
    SW -->|LATEST| LE[LatestAsyncEmitter<br>Keeps newest in AtomicReference]
    SW -->|BUFFER| BE[BufferAsyncEmitter<br>Unbounded SpscLinkedArrayQueue]
    
    ME --> DS[downstream]
    EE --> DS
    DE --> DS
    LE --> DS
    BE --> DS

其中 LatestAsyncEmitter 的设计尤为精妙——它使用一个 AtomicReference 持有最新的值。当新的 onNext 到来但下游尚未请求数据时,它会原子地替换掉已存储的值。一旦下游发出需求,drain 循环便取出当前存储的最新值进行处理。

FlowableObserveOn 中的 Prefetch 与 Limit

prefetch/limit 模式是跨线程 operator(如 observeOn)管理背压的核心手段,它避免了逐条数据来回请求带来的额外开销。

FlowableObserveOn.java#L96-L101 中:

this.prefetch = prefetch;
this.limit = prefetch - (prefetch >> 2);  // 75% refill threshold

默认 prefetch 为 128,因此 limit = 128 - 32 = 96。整体流程如下:

  1. 初始向上游请求 prefetch(128)个数据项
  2. 从队列中消费数据项,并累计已消费数量
  3. 当消费量达到 limit(96)时,再向上游请求 limit 个数据项
  4. 重置消费计数器

这形成了一个 75% 补货阈值:当缓冲区消费了 75% 的数据时,立即请求补充同等数量的新数据。这是以下几点之间的平衡:

  • 吞吐量:批量请求减少了 request() 调用的开销
  • 内存:缓冲区容量始终不超过 prefetch
  • 延迟:在 75% 时触发补货,意味着新数据到来前缓冲区还有 25% 可继续处理,不会出现空窗期
sequenceDiagram
    participant Upstream
    participant ObserveOn
    participant Downstream
    
    ObserveOn->>Upstream: request(128)
    Upstream->>ObserveOn: 128 items
    
    loop Drain items 1-96
        ObserveOn->>Downstream: onNext(item)
    end
    
    Note over ObserveOn: consumed=96 ≥ limit=96
    ObserveOn->>Upstream: request(96)
    
    loop Drain items 97-128 + new items
        ObserveOn->>Downstream: onNext(item)
    end

FlowableFlatMap:跨 N 个内部数据源的请求计数

FlowableFlatMap 大概是 RxJava 中最复杂的 operator,而背压处理正是其复杂性的根源所在。MergeSubscriber 需要在动态数量的内部数据源之间协调请求计数:

static final class MergeSubscriber<T, U> extends AtomicInteger 
    implements FlowableSubscriber<T>, Subscription {
    
    final int maxConcurrency;
    final int bufferSize;
    volatile SimplePlainQueue<U> queue;
    final AtomicThrowable errors = new AtomicThrowable();
    final AtomicReference<InnerSubscriber<?, ?>[]> subscribers;
    final AtomicLong requested = new AtomicLong();
    long uniqueId;
    int scalarEmitted;
    final int scalarLimit;

复杂性来源于多个并发流的交织:

  1. 外部数据源发出数据项,触发内部 subscription 的创建
  2. maxConcurrency 限制同时活跃的内部数据源数量
  3. 所有内部数据源的数据项被合并到单一输出流
  4. 下游的 request(n) 需要分配给所有活跃的内部数据源
  5. 当某个内部数据源完成时,可从缓冲的外部数据项中订阅新的内部数据源

FlowableFlatMap.java#L107-L119 中的 onSubscribe 方法展示了初始请求策略:

public void onSubscribe(Subscription s) {
    if (SubscriptionHelper.validate(this.upstream, s)) {
        this.upstream = s;
        downstream.onSubscribe(this);
        if (!cancelled) {
            if (maxConcurrency == Integer.MAX_VALUE) {
                s.request(Long.MAX_VALUE);  // Unbounded: get everything
            } else {
                s.request(maxConcurrency);  // Bounded: get up to maxConcurrency items
            }
        }
    }
}

第 3 篇介绍的标量优化在这里发挥了关键作用——对于 Flowable.just() 这类内部数据源,flatMap 会跳过整套 subscription 机制。它将标量数据项累积起来,并批量发出补货请求:

int scalarEmitted;
final int scalarLimit;
// scalarLimit = Math.max(1, maxConcurrency >> 1)

scalarEmitted 达到 scalarLimit 时,一次性请求 scalarLimit 个新的外部数据项。这样就将 request() 的 CAS 开销分摊到了多次标量数据的处理中。

flowchart TD
    OUTER[Outer Flowable] -->|"request(maxConcurrency)"| MS[MergeSubscriber]
    MS -->|"onNext(T)"| MAP[mapper.apply T]
    MAP -->|"Scalar?"| SC{ScalarSupplier?}
    SC -->|Yes| EMIT[Emit directly + scalarEmitted++]
    SC -->|No| INNER[Subscribe InnerSubscriber]
    INNER --> IS1[InnerSubscriber 1]
    INNER --> IS2[InnerSubscriber 2]
    INNER --> ISN[InnerSubscriber N]
    IS1 -->|"items"| DRAIN[Drain loop merges all]
    IS2 -->|"items"| DRAIN
    ISN -->|"items"| DRAIN
    DRAIN -->|"onNext"| DOWN[Downstream]
    DOWN -->|"request n"| DRAIN
    EMIT --> DOWN

MergeSubscriber 中的 drain 循环是整个代码库中最复杂的一个。它轮询所有活跃的内部 subscriber(通过原子数组追踪),从各自的队列中读取数据,检查全局错误累加器(AtomicThrowable),处理取消逻辑,并维护 requested 计数器——全程无锁。深入理解这一个类,可以说是 RxJava 内部机制研究的必经之路。

下一篇

至此,我们已经全面覆盖了 RxJava 的内部运行机制:operator 模式、drain 循环、fusion、scheduler,以及背压。第 6 篇将把目光转向 4.x 的新特性——带来基于拉取的异步枚举能力的 Streamable 类型、用于虚拟线程上命令式代码的 VirtualGeneratorVirtualEmitter,以及 sealed types 和 records 等现代 Java 特性如何重塑 API 设计。