rx-java Backpressure Introduction


Example

Backpressure is when in an Observable processing pipeline, some asynchronous stages can't process the values fast enough and need a way to tell the upstream producer to slow down.

The classic case of the need for backpressure is when the producer is a hot source:

PublishSubject<Integer> source = PublishSubject.create();

source
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);

for (int i = 0; i < 1_000_000; i++) {
    source.onNext(i);
}

Thread.sleep(10_000); 

In this example, the main thread will produce 1 million items to an end consumer which is processing it on a background thread. It is likely the compute(int) method takes some time but the overhead of the Observable operator chain may also add to the time it takes to process items. However, the producing thread with the for loop can't know this and keeps onNexting.

Internally, asynchronous operators have buffers to hold such elements until they can be processed. In the classical Rx.NET and early RxJava, these buffers were unbounded, meaning that they would likely hold nearly all 1 million elements from the example. The problem starts when there are, for example, 1 billion elements or the same 1 million sequence appears 1000 times in a program, leading to OutOfMemoryError and generally slowdowns due to excessive GC overhead.

Similar to how error-handling became a first-class citizen and received operators to deal with it (via onErrorXXX operators), backpressure is another property of dataflows that the programmer has to think about and handle (via onBackpressureXXX operators).

Beyond the PublishSubjectabove, there are other operators that don't support backpressure, mostly due to functional reasons. For example, the operator interval emits values periodically, backpressuring it would lead to shifting in the period relative to a wall clock.

In modern RxJava, most asynchronous operators now have a bounded internal buffer, like observeOn above and any attempt to overflow this buffer will terminate the whole sequence with MissingBackpressureException. The documentation of each operator has a description about its backpressure behavior.

However, backpressure is present more subtly in regular cold sequences (which don't and shouldn't yield MissingBackpressureException). If the first example is rewritten:

Observable.range(1, 1_000_000)
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);

Thread.sleep(10_000); 

There is no error and everything runs smoothly with small memory usage. The reason for this is that many source operators can "generate" values on demand and thus the operator observeOn can tell the range generate at most so many values the observeOn buffer can hold at once without overflow.

This negotiation is based on the computer science concept of co-routines (I call you, you call me). The operator range sends a callback, in the form of an implementation of the Producer interface, to the observeOn by calling its (inner Subscriber's) setProducer. In return, the observeOn calls Producer.request(n) with a value to tell the range it is allowed to produce (i.e., onNext it) that many additional elements. It is then the observeOn's responsibility to call the request method in the right time and with the right value to keep the data flowing but not overflowing.

Expressing backpressure in end-consumers is rarely necessary (because they are synchronous in respect to their immediate upstream and backpressure naturally happens due to call-stack blocking), but it may be easier to understand the workings of it:

Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {
    @Override
    public void onStart() {
        request(1);
    }

    public void onNext(Integer v) {
        compute(v);

        request(1);
    }

    @Override
    public void onError(Throwable ex) {
        ex.printStackTrace();
    }

    @Override
    public void onCompleted() {
        System.out.println("Done!");
    }
});

Here the onStart implementation indicates range to produce its first value, which is then received in onNext. Once the compute(int) finishes, the another value is then requested from range. In a naive implementation of range, such call would recursively call onNext, leading to StackOverflowError which is of course undesirable.

To prevent this, operators use so-called trampolining logic that prevents such reentrant calls. In range's terms, it will remember that there was a request(1) call while it called onNext() and once onNext() returns, it will make another round and call onNext() with the next integer value. Therefore, if the two are swapped, the example still works the same:

@Override
public void onNext(Integer v) {
    request(1);

    compute(v);
}

However, this is not true for onStart. Although the Observable infrastructure guarantees it will be called at most once on each Subscriber, the call to request(1) may trigger the emission of an element right away. If one has initialization logic after the call to request(1) which is needed by onNext, you may end up with exceptions:

Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {

    String name;

    @Override
    public void onStart() {
        request(1);

        name = "RangeExample";
    }

    @Override
    public void onNext(Integer v) {
        compute(name.length + v);

        request(1);
    }

    // ... rest is the same
});

In this synchronous case, a NullPointerException will be thrown immediately while still executing onStart. A more subtle bug happens if the call to request(1) triggers an asynchronous call to onNext on some other thread and reading name in onNext races writing it in onStart post request.

Therefore, one should do all field initialization in onStart or even before that and call request() last. Implementations of request() in operators ensure proper happens-before relation (or in other terms, memory release or full fence) when necessary.