A very common use-case in web applications is performing multiple asynchronous (eg. HTTP) requests and gathering their results as they arrive or all of them at once (eg. in Angular2 with the HTTP service).
1. Gathering async responses one by one as they arrive
This is typically done with mergeMap()
operator that takes a projection function that has to return an Observable. Operator mergeMap()
internally subscribes to each Observable immediately even if the previous Observable hasn't completed yet.
function mockHTTPRequest(url) {
return Observable.of(`Response from ${url}`)
.delay(Math.random() * 1000);
}
var urls = ['url-1', 'url-2', 'url-3', 'url-4'];
Observable.from(urls)
.mergeMap(url => mockHTTPRequest(url))
.subscribe(val => console.log(val));
This prints responses to console in different order because of the random delay:
Response from url-3
Response from url-4
Response from url-2
Response from url-1
See live demo: https://jsbin.com/xaqudan/2/edit?js,console
Each response (item emitted via next
call) is reemitted by mergeMap()
immediately.
For our purpose of sending multiple HTTP requests it's useful to mention that mergeMap()
can take three arguments in total:
Controlling the number of parallel requests
With the third argument we can control how many parallel requests we'll handle (assuming that each Observable performing an HTTP request is "cold").
In the following example we'll run only 2 requests at the same time.
function mockHTTPRequest(url) {
return Observable.of(`Response from ${url}`)
.delay(1000);
}
let urls = ['url-1', 'url-2', 'url-3', 'url-4'];
let start = (new Date()).getTime();
Observable.from(urls)
.mergeMap(url => mockHTTPRequest(url), undefined, 2)
.timestamp()
.map(stamp => [stamp.timestamp - start, stamp.value])
.subscribe(val => console.log(val));
See live demo: https://jsbin.com/sojejal/4/edit?js,console
Notice that the first two requests completed after 1s while the other two after 2s.
[1004, "Response from url-1"]
[1010, "Response from url-2"]
[2007, "Response from url-3"]
[2012, "Response from url-4"]
Handling errors
If any of the source Observables fail (sends error
notification) the mergeMap()
resends the error further as error
. In case we want each Observable to fail gracefully we need to use for example catch()
operator.
function mockHTTPRequest(url) {
return Observable.of(`Response from ${url}`)
.delay(Math.random() * 1000)
.map(value => {
if (url === 'url-3') {
throw new Error(`Error response from ${url}`)
}
return value;
});
}
var urls = ['url-1', 'url-2', 'url-3', 'url-4'];
Observable.from(urls)
.mergeMap(url => mockHTTPRequest(url).catch(() => Observable.empty()))
.subscribe(val => console.log(val));
The response for url-3
throws an error that is sent as error
notification. This is later caught by catch()
operator and replaced with Observable.empty()
which is just a complete
notification. For this reason this response is ignored.
The output for this example is the following:
Response from url-4
Response from url-1
Response from url-2
See live demo: https://jsbin.com/kuqumud/4/edit?js,console
2. Gathering all async responses at once
Following the preceding examples we could gather all responses with toArray()
operator.
function mockHTTPRequest(url) {
return Observable.of(`Response from ${url}`)
.delay(Math.random() * 1000);
}
var urls = ['url-1', 'url-2', 'url-3', 'url-4'];
Observable.from(urls)
.mergeMap(url => mockHTTPRequest(url))
.toArray()
.subscribe(val => console.log(val));
However, using toArray()
operator has an important consequence. Whether the subscriber receives the results isn't only controlled by completing all the HTTP requests but also by completing the source Observable (Observable.from
in our case). This means that we can't use source Observables that never complete (eg. Observable.fromEvent
).
Another way to achieve the same result is using Observable.forkJoin()
which takes as argument an array of Observables that we want to subscribe to and wait until all of them emit at least one value and complete.
function mockHTTPRequest(url) {
return Observable.of(`Response from ${url}`)
.delay(Math.random() * 1000);
}
var urls = ['url-1', 'url-2', 'url-3', 'url-4'];
var observables = urls.map(url => mockHTTPRequest(url));
Observable.forkJoin(observables)
.subscribe(val => console.log(val));
This prints all responses as a single array:
["Response from url-1", "Response from url-2", "Response from url-3", "Response from url-4"]
See live demo: https://jsbin.com/fomoye/2/edit?js,console
The Observable.forkJoin()
also takes as an optional argument a result selector function that lets us modify the final result before emitting it further:
Observable.forkJoin(observables, (...results) => {
return results.length;
})
.subscribe(val => console.log(val));
This prints to console:
4
See live demo: https://jsbin.com/muwiqic/1/edit?js,console
Note that the argument for the result selector function are unpacked.
Handling errors
For error handling we can use the same approach as in the preceding example with catch()
operator.
However, there's one important thing to be aware of. The forkJoin()
requires every source Observable to emit at least one value. If we used catch(() => Observable.empty())
like we did before the forkJoin()
would never emit anything because Observable.empty()
is just a complete
notification.
This is why we need to use for example Observable.of(null)
which is a null
value followed by complete
notification.
function mockHTTPRequest(url) {
return Observable.of(`Response from ${url}`)
.delay(Math.random() * 1000)
.map(value => {
if (url === 'url-3') {
throw new Error(`Error response from ${url}`)
}
return value;
});
}
var urls = ['url-1', 'url-2', 'url-3', 'url-4'];
var observables = urls.map(url => mockHTTPRequest(url).catch(() => Observable.of(null)));
Observable.forkJoin(observables)
.subscribe(val => console.log(val));
See live demo: https://jsbin.com/yidiked/2/edit?js,console
This prints to console:
["Response from url-1", "Response from url-2", null, "Response from url-4"]
Notice that the error is replaced by null
. If we used just Observable.empty()
the forkJoin()
would never emit anything.