A common problem with remote services is rate limiting. The remote service allows us to send only a limited number of requests or amount of data per time period.
In RxJS 5 a very similar functionality is provided by the bufferTime
operator and especially if we leave the second parameter unspecified (it defines how often we want to create a new buffer. If we leave it undefined/null it'll create a new buffer right after emitting the current one).
A typical usage of bufferTime
will look like this:
bufferTime(1000, null, 5)
This will buffer items until one of the two conditions are met. Then it'll emit the buffer and start another one:
1000ms
5
itemsFor demonstrational purposes we can create a source Observable that emits very fast so the bufferTime
will hit the size limit (5
) and emit more often than once every 1000ms
:
const source = Observable.range(1, 25)
.concatMap(val => Observable.of(val).delay(75));
Then we'll chain it with bufferTime
and concatMap
. The concatMap
operator is where we force the 1000ms
delay:
const startTime = (new Date()).getTime();
const source = Observable.range(1, 25)
.concatMap(val => Observable.of(val).delay(75));
source.bufferTime(1000, null, 5)
.concatMap(buffer => Observable.of(buffer).delay(1000))
.timestamp()
.map(obj => {
obj.timestamp = obj.timestamp - startTime;
return obj;
})
.subscribe(obj => console.log(obj));
See live demo: https://jsbin.com/kotibow/3/edit?js,console
We added also timestamp()
to see the emission times to make sure the delay is really at least 1000ms
.
Note that we didn't have to use Observable.of(buffer)
at all. We're using it here just to manually check that the number of buffered items is correct.
From the console output we can see that the delay between two emissions is roughly 1000ms
:
Timestamp { value: [ 1, 2, 3, 4, 5 ], timestamp: 1475 }
Timestamp { value: [ 6, 7, 8, 9, 10 ], timestamp: 2564 }
Timestamp { value: [ 11, 12, 13, 14, 15 ], timestamp: 3567 }
Timestamp { value: [ 16, 17, 18, 19, 20 ], timestamp: 4572 }
Timestamp { value: [ 21, 22, 23, 24, 25 ], timestamp: 5573 }
Timestamp { value: [], timestamp: 6578 }
Now we can also test a situation where the source emits slowly so the bufferTime
operator is going to hit the max interval condition:
const source = Observable.range(1, 25)
.concatMap(val => Observable.of(val).delay(300));
See live demo: https://jsbin.com/tuwowan/2/edit?js,console
Then the output should start after about 2s
because it took 1s
for the bufferTime
operator to emit and then we added the 1s
delay;
Timestamp { value: [ 1, 2, 3 ], timestamp: 2017 }
Timestamp { value: [ 4, 5, 6 ], timestamp: 3079 }
Timestamp { value: [ 7, 8, 9, 10 ], timestamp: 4088 }
Timestamp { value: [ 11, 12, 13 ], timestamp: 5093 }
Timestamp { value: [ 14, 15, 16 ], timestamp: 6094 }
Timestamp { value: [ 17, 18, 19, 20 ], timestamp: 7098 }
Timestamp { value: [ 21, 22, 23 ], timestamp: 8103 }
Timestamp { value: [ 24, 25 ], timestamp: 9104 }
If we wanted to use this approach in a real world application we'd put the remote call into the concatMap
operator. This way we can control whether we want to force the 1s
delay between requests or responses from the remote service.
For example we can force the minimum 1s
delay between requests by using forkJoin
in the concatMap
callback:
function mockHTTPRequest(buffer) {
return Observable.of(true).delay(Math.random() * 1500)
}
const startTime = (new Date()).getTime();
const source = Observable.range(1, 25)
.concatMap(val => Observable.of(val).delay(75));
source.bufferTime(1000, null, 5)
.concatMap(buffer => Observable.forkJoin(
mockHTTPRequest(buffer),
Observable.of(buffer).delay(1000)
))
.timestamp()
.map(obj => {
obj.timestamp = obj.timestamp - startTime;
return obj;
})
.subscribe(obj => console.log(obj));
See live demo: https://jsbin.com/xijaver/edit?js,console
Thanks to forkJoin
the concatMap
needs to wait for both Observables to complete.
On the other hand, if we wanted to force 1s
delay between responses we'd just append the delay()
operator after mockHTTPRequest()
:
.concatMap(buffer => mockHTTPRequest(buffer).delay(1000))
See live demo: https://jsbin.com/munopot/2/edit?js,console