A typical use case for RxJS is creating HTTP requests and caching their results for some period of time. Also, we always want to run only one request at a time and share its response.
For example the following code caches 1 item for max. 1000ms:
var updateRequest = Observable.defer(() => makeMockHttpRequest())
.publishReplay(1, 1000)
.refCount()
.take(1);
var counter = 1;
function makeMockHttpRequest() {
return Observable.of(counter++)
.delay(100);
}
function requestCachedHttpResult() {
return updateRequest;
}
Function makeMockHttpRequest()
simulates an HTTP request that arrives with 100ms
delay.
Function requestCachedHttpResult()
is where we subscribe to get actual or cached response.
With .publishReplay(1, 1000)
we used RxJS multicasting to internally use ReplaySubject
and keep 1
item for maximum 1000ms
. Then refCount()
is used to keep always only one subscription to the source
which is Observable.defer()
. This Observable is used to create a new request and increments counter
to prove that cached values and new subscriptions share the same data.
When we want to get current data we call requestCachedHttpResult()
. To ensure the Observer will be completed properly after emitting data we used take(1)
operator.
requestCachedHttpResult()
.subscribe(val => console.log("Response 0:", val));
This creates a single request with mockDataFetch()
and prints to console:
1
A more complicated example will call multiple requests at different times where we want to test that the mocked HTTP connections and responses are shared.
requestCachedHttpResult()
.subscribe(val => console.log("Response 0:", val));
setTimeout(() => requestCachedHttpResult()
.subscribe(val => console.log("Response 50:", val))
, 50);
setTimeout(() => requestCachedHttpResult()
.subscribe(val => console.log("Response 200:", val))
, 200);
setTimeout(() => requestCachedHttpResult()
.subscribe(val => console.log("Response 1200:", val))
, 1200);
setTimeout(() => requestCachedHttpResult()
.subscribe(val => console.log("Response 1500:", val))
, 1500);
setTimeout(() => requestCachedHttpResult()
.subscribe(val => console.log("Response 3500:", val))
, 3500);
See live demo: https://jsbin.com/todude/5/edit?js,console
Each request is sent with delay and happen in the following order:
0
- First request that makes the refCount()
to subscribe to its source
which makes the mockDataFetch()
call. Its response is going to be delayed by 100ms
. At this moment ConnectableObservable
inside publishReplay()
operator has one Observer.
50
- Second request subscribes to the ConnectableObservable
as well. At this moment ConnectableObservable
inside publishReplay()
operator has two Observer. It doesn't create another request with makeMockHttpRequest()
because refCount()
is already subscribed.
100
- The first response is ready. It's first cached by the ReplaySubject
and then reemitted to the two Observers subscribed to ConnectableObservable
. Both Observers are completed thanks to take(1)
and unsubscribed.
200
- Subscribes to the ReplaySubject
that immediately emits its cached value which causes take(1)
to complete the Observer and unsubscribes right away. No HTTP requests are made and no subscription remains.
1200
- The same as the first event at 0
. At this point the cached value has been discarded because it's older than 1000ms
.
1500
- Same as the fourth event at 200
.
3500
- The same as the first event at 1200
.
The output in console is the following:
Response 0: 1
Response 50: 1
Response 200: 1
Response 1200: 2
Response 1500: 2
Response 3500: 3
In RxJS 5 a similar functionality was covered by cache()
operator. However, it was removed in 5.0.0-rc.1
due to its limited functionality.
Handling errors
If we want to handle errors produced by the remote service (the makeMockHttpRequest
function) we need to catch them before they're merged into the main Observable chain because any error received by the ReplaySubject
inside publishReplay()
would mark its internal state as stopped
(Read more here Subject and its internal state ) which is definitelly not what we want.
In the following example we're simulating an error when counter === 2
and catching it with the catch()
operator. We're using catch()
to only transform the error
notification into a regular next
so we can handle the error in observers:
function makeMockHttpRequest() {
return Observable.of(counter++)
.delay(100)
.map(i => {
if (i === 2) {
throw new Error('Invalid URL');
}
return i;
})
.catch(err => Observable.of(err));
}
See live demo: https://jsbin.com/kavihu/10/edit?js,console
This will print to console the following output. Notice the errors are received in the next
handlers:
Response 0: 1
Response 50: 1
Response 200: 1
Response 1200: [object Error] { ... }
Response 1500: [object Error] { ... }
Response 3500: 3
If we want to handle errors as regular error
notifications in each observer we have to rethrow them after the publishReplay()
operator for the reasons explained above.
var updateRequest = Observable.defer(() => makeMockHttpRequest())
.publishReplay(1, 1000)
.refCount()
.take(1)
.map(val => {
if (val instanceof Error) {
throw val;
}
return val;
});
See live demo: https://jsbin.com/fabosam/5/edit?js,console (notice that we had to add also error callbacks for each observer).
Response 0: 1
Response 50: 1
Response 200: 1
error callback: Error: Invalid URL
error callback: Error: Invalid URL
Response 3500: 3