Backpressure is when in an Observable
processing pipeline, some asynchronous stages can't process the values fast enough and need a way to tell the upstream producer to slow down.
The classic case of the need for backpressure is when the producer is a hot source:
PublishSubject<Integer> source = PublishSubject.create();
source
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);
for (int i = 0; i < 1_000_000; i++) {
source.onNext(i);
}
Thread.sleep(10_000);
In this example, the main thread will produce 1 million items to an end consumer which is processing it on a background thread. It is likely the compute(int)
method takes some time but the overhead of the Observable
operator chain may also add to the time it takes to process items. However, the producing thread with the for loop can't know this and keeps onNext
ing.
Internally, asynchronous operators have buffers to hold such elements until they can be processed. In the classical Rx.NET and early RxJava, these buffers were unbounded, meaning that they would likely hold nearly all 1 million elements from the example. The problem starts when there are, for example, 1 billion elements or the same 1 million sequence appears 1000 times in a program, leading to OutOfMemoryError
and generally slowdowns due to excessive GC overhead.
Similar to how error-handling became a first-class citizen and received operators to deal with it (via onErrorXXX
operators), backpressure is another property of dataflows that the programmer has to think about and handle (via onBackpressureXXX
operators).
Beyond the PublishSubject
above, there are other operators that don't support backpressure, mostly due to functional reasons. For example, the operator interval
emits values periodically, backpressuring it would lead to shifting in the period relative to a wall clock.
In modern RxJava, most asynchronous operators now have a bounded internal buffer, like observeOn
above and any attempt to overflow this buffer will terminate the whole sequence with MissingBackpressureException
. The documentation of each operator has a description about its backpressure behavior.
However, backpressure is present more subtly in regular cold sequences (which don't and shouldn't yield MissingBackpressureException
). If the first example is rewritten:
Observable.range(1, 1_000_000)
.observeOn(Schedulers.computation())
.subscribe(v -> compute(v), Throwable::printStackTrace);
Thread.sleep(10_000);
There is no error and everything runs smoothly with small memory usage. The reason for this is that many source operators can "generate" values on demand and thus the operator observeOn
can tell the range
generate at most so many values the observeOn
buffer can hold at once without overflow.
This negotiation is based on the computer science concept of co-routines (I call you, you call me). The operator range
sends a callback, in the form of an implementation of the Producer
interface, to the observeOn
by calling its (inner Subscriber
's) setProducer
. In return, the observeOn
calls Producer.request(n)
with a value to tell the range
it is allowed to produce (i.e., onNext
it) that many additional elements. It is then the observeOn
's responsibility to call the request
method in the right time and with the right value to keep the data flowing but not overflowing.
Expressing backpressure in end-consumers is rarely necessary (because they are synchronous in respect to their immediate upstream and backpressure naturally happens due to call-stack blocking), but it may be easier to understand the workings of it:
Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {
@Override
public void onStart() {
request(1);
}
public void onNext(Integer v) {
compute(v);
request(1);
}
@Override
public void onError(Throwable ex) {
ex.printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("Done!");
}
});
Here the onStart
implementation indicates range
to produce its first value, which is then received in onNext
. Once the compute(int)
finishes, the another value is then requested from range
. In a naive implementation of range
, such call would recursively call onNext
, leading to StackOverflowError
which is of course undesirable.
To prevent this, operators use so-called trampolining logic that prevents such reentrant calls. In range
's terms, it will remember that there was a request(1)
call while it called onNext()
and once onNext()
returns, it will make another round and call onNext()
with the next integer value. Therefore, if the two are swapped, the example still works the same:
@Override
public void onNext(Integer v) {
request(1);
compute(v);
}
However, this is not true for onStart
. Although the Observable
infrastructure guarantees it will be called at most once on each Subscriber
, the call to request(1)
may trigger the emission of an element right away. If one has initialization logic after the call to request(1)
which is needed by onNext
, you may end up with exceptions:
Observable.range(1, 1_000_000)
.subscribe(new Subscriber<Integer>() {
String name;
@Override
public void onStart() {
request(1);
name = "RangeExample";
}
@Override
public void onNext(Integer v) {
compute(name.length + v);
request(1);
}
// ... rest is the same
});
In this synchronous case, a NullPointerException
will be thrown immediately while still executing onStart
. A more subtle bug happens if the call to request(1)
triggers an asynchronous call to onNext
on some other thread and reading name
in onNext
races writing it in onStart
post request
.
Therefore, one should do all field initialization in onStart
or even before that and call request()
last. Implementations of request()
in operators ensure proper happens-before relation (or in other terms, memory release or full fence) when necessary.