rx-java Backpressure The onBackpressureXXX operators


Example

Most developers encounter backpressure when their application fails with MissingBackpressureException and the exception usually points to the observeOn operator. The actual cause is usually the non-backpressured use of PublishSubject, timer() or interval() or custom operators created via create().

There are several ways of dealing with such situations.

Increasing the buffer sizes

Sometimes such overflows happen due to bursty sources. Suddenly, the user taps the screen too quickly and observeOn's default 16-element internal buffer on Android overflows.

Most backpressure-sensitive operators in the recent versions of RxJava now allow programmers to specify the size of their internal buffers. The relevant parameters are usually called bufferSize, prefetch or capacityHint. Given the overflowing example in the introduction, we can just increase the buffer size of observeOn to have enough room for all values.

PublishSubject<Integer> source = PublishSubject.create();

source.observeOn(Schedulers.computation(), 1024 * 1024)
      .subscribe(e -> { }, Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

Note however that generally, this may be only a temporary fix as the overflow can still happen if the source overproduces the predicted buffer size. In this case, one can use one of the following operators.

Batching/skipping values with standard operators

In case the source data can be processed more efficiently in batch, one can reduce the likelihood of MissingBackpressureException by using one of the standard batching operators (by size and/or by time).

PublishSubject<Integer> source = PublishSubject.create();

source
      .buffer(1024)
      .observeOn(Schedulers.computation(), 1024)
      .subscribe(list -> { 
          list.parallelStream().map(e -> e * e).first();
      }, Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

If some of the values can be safely ignored, one can use the sampling (with time or another Observable) and throttling operators (throttleFirst, throttleLast, throttleWithTimeout).

PublishSubject<Integer> source = PublishSubject.create();

source
      .sample(1, TimeUnit.MILLISECONDS)
      .observeOn(Schedulers.computation(), 1024)
      .subscribe(v -> compute(v), Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

Note hovewer that these operators only reduce the rate of value reception by the downstream and thus they may still lead to MissingBackpressureException.

onBackpressureBuffer()

This operator in its parameterless form reintroduces an unbounded buffer between the upstream source and the downstream operator. Being unbounded means as long as the JVM doesn't run out of memory, it can handle almost any amount coming from a bursty source.

 Observable.range(1, 1_000_000)
           .onBackpressureBuffer()
           .observeOn(Schedulers.computation(), 8)
           .subscribe(e -> { }, Throwable::printStackTrace);

In this example, the observeOn goes with a very low buffer size yet there is no MissingBackpressureException as onBackpressureBuffer soaks up all the 1 million values and hands over small batches of it to observeOn.

Note however that onBackpressureBuffer consumes its source in an unbounded manner, that is, without applying any backpressure to it. This has the consequence that even a backpressure-supporting source such as range will be completely realized.

There are 4 additional overloads of onBackpressureBuffer

onBackpressureBuffer(int capacity)

This is a bounded version that signals BufferOverflowErrorin case its buffer reaches the given capacity.

Observable.range(1, 1_000_000)
          .onBackpressureBuffer(16)
          .observeOn(Schedulers.computation())
          .subscribe(e -> { }, Throwable::printStackTrace);

The relevance of this operator is decreasing as more and more operators now allow setting their buffer sizes. For the rest, this gives an opportunity to "extend their internal buffer" by having a larger number with onBackpressureBuffer than their default.

onBackpressureBuffer(int capacity, Action0 onOverflow)

This overload calls a (shared) action in case an overflow happens. Its usefulness is rather limited as there is no other information provided about the overflow than the current call stack.

onBackpressureBuffer(int capacity, Action0 onOverflow, BackpressureOverflow.Strategy strategy)

This overload is actually more useful as it let's one define what to do in case the capacity has been reached. The BackpressureOverflow.Strategy is an interface actually but the class BackpressureOverflow offers 4 static fields with implementations of it representing typical actions:

  • ON_OVERFLOW_ERROR: this is the default behavior of the previous two overloads, signalling a BufferOverflowException
  • ON_OVERFLOW_DEFAULT: currently it is the same as ON_OVERFLOW_ERROR
  • ON_OVERFLOW_DROP_LATEST : if an overflow would happen, the current value will be simply ignored and only the old values will be delivered once the downstream requests.
  • ON_OVERFLOW_DROP_OLDEST : drops the oldest element in the buffer and adds the current value to it.
Observable.range(1, 1_000_000)
          .onBackpressureBuffer(16, () -> { },
              BufferOverflowStrategy.ON_OVERFLOW_DROP_OLDEST)
          .observeOn(Schedulers.computation())
          .subscribe(e -> { }, Throwable::printStackTrace);

Note that the last two strategies cause discontinuity in the stream as they drop out elements. In addition, they won't signal BufferOverflowException.

onBackpressureDrop()

Whenever the downstream is not ready to receive values, this operator will drop that elemenet from the sequence. One can think of it as a 0 capacity onBackpressureBuffer with strategy ON_OVERFLOW_DROP_LATEST.

This operator is useful when one can safely ignore values from a source (such as mouse moves or current GPS location signals) as there will be more up-to-date values later on.

 component.mouseMoves()
 .onBackpressureDrop()
 .observeOn(Schedulers.computation(), 1)
 .subscribe(event -> compute(event.x, event.y));

It may be useful in conjunction with the source operator interval(). For example, if one wants to perform some periodic background task but each iteration may last longer than the period, it is safe to drop the excess interval notification as there will be more later on:

 Observable.interval(1, TimeUnit.MINUTES)
 .onBackpressureDrop()
 .observeOn(Schedulers.io())
 .doOnNext(e -> networkCall.doStuff())
 .subscribe(v -> { }, Throwable::printStackTrace);

There exist one overload of this operator: onBackpressureDrop(Action1<? super T> onDrop) where the (shared) action is called with the value being dropped. This variant allows cleaning up the values themselves (e.g., releasing associated resources).

onBackpressureLatest()

The final operator keeps only the latest value and practically overwrites older, undelivered values. One can think of this as a variant of the onBackpressureBuffer with a capacity of 1 and strategy of ON_OVERFLOW_DROP_OLDEST.

Unlike onBackpressureDrop there is always a value available for consumption if the downstream happened to be lagging behind. This can be useful in some telemetry-like situations where the data may come in some bursty pattern but only the very latest is interesting for processing.

For example, if the user clicks a lot on the screen, we'd still want to react to its latest input.

component.mouseClicks()
.onBackpressureLatest()
.observeOn(Schedulers.computation())
.subscribe(event -> compute(event.x, event.y), Throwable::printStackTrace);

The use of onBackpressureDrop in this case would lead to a situation where the very last click gets dropped and leaves the user wondering why the business logic wasn't executed.