A common problem with remote services is rate limiting. The remote service allows us to send only a limited number of requests or amount of data per time period.
In RxJS 5 a very similar functionality is provided by the bufferTime operator and especially if we leave the second parameter unspecified (it defines how often we want to create a new buffer. If we leave it undefined/null it'll create a new buffer right after emitting the current one).
A typical usage of bufferTime will look like this:
bufferTime(1000, null, 5)
This will buffer items until one of the two conditions are met. Then it'll emit the buffer and start another one:
1000ms5 itemsFor demonstrational purposes we can create a source Observable that emits very fast so the bufferTime will hit the size limit (5) and emit more often than once every 1000ms:
const source = Observable.range(1, 25)
.concatMap(val => Observable.of(val).delay(75));
Then we'll chain it with bufferTime and concatMap. The concatMap operator is where we force the 1000ms delay:
const startTime = (new Date()).getTime();
const source = Observable.range(1, 25)
.concatMap(val => Observable.of(val).delay(75));
source.bufferTime(1000, null, 5)
.concatMap(buffer => Observable.of(buffer).delay(1000))
.timestamp()
.map(obj => {
obj.timestamp = obj.timestamp - startTime;
return obj;
})
.subscribe(obj => console.log(obj));
See live demo: https://jsbin.com/kotibow/3/edit?js,console
We added also timestamp() to see the emission times to make sure the delay is really at least 1000ms.
Note that we didn't have to use Observable.of(buffer) at all. We're using it here just to manually check that the number of buffered items is correct.
From the console output we can see that the delay between two emissions is roughly 1000ms:
Timestamp { value: [ 1, 2, 3, 4, 5 ], timestamp: 1475 }
Timestamp { value: [ 6, 7, 8, 9, 10 ], timestamp: 2564 }
Timestamp { value: [ 11, 12, 13, 14, 15 ], timestamp: 3567 }
Timestamp { value: [ 16, 17, 18, 19, 20 ], timestamp: 4572 }
Timestamp { value: [ 21, 22, 23, 24, 25 ], timestamp: 5573 }
Timestamp { value: [], timestamp: 6578 }
Now we can also test a situation where the source emits slowly so the bufferTime operator is going to hit the max interval condition:
const source = Observable.range(1, 25)
.concatMap(val => Observable.of(val).delay(300));
See live demo: https://jsbin.com/tuwowan/2/edit?js,console
Then the output should start after about 2s because it took 1s for the bufferTime operator to emit and then we added the 1s delay;
Timestamp { value: [ 1, 2, 3 ], timestamp: 2017 }
Timestamp { value: [ 4, 5, 6 ], timestamp: 3079 }
Timestamp { value: [ 7, 8, 9, 10 ], timestamp: 4088 }
Timestamp { value: [ 11, 12, 13 ], timestamp: 5093 }
Timestamp { value: [ 14, 15, 16 ], timestamp: 6094 }
Timestamp { value: [ 17, 18, 19, 20 ], timestamp: 7098 }
Timestamp { value: [ 21, 22, 23 ], timestamp: 8103 }
Timestamp { value: [ 24, 25 ], timestamp: 9104 }
If we wanted to use this approach in a real world application we'd put the remote call into the concatMap operator. This way we can control whether we want to force the 1s delay between requests or responses from the remote service.
For example we can force the minimum 1s delay between requests by using forkJoin in the concatMap callback:
function mockHTTPRequest(buffer) {
return Observable.of(true).delay(Math.random() * 1500)
}
const startTime = (new Date()).getTime();
const source = Observable.range(1, 25)
.concatMap(val => Observable.of(val).delay(75));
source.bufferTime(1000, null, 5)
.concatMap(buffer => Observable.forkJoin(
mockHTTPRequest(buffer),
Observable.of(buffer).delay(1000)
))
.timestamp()
.map(obj => {
obj.timestamp = obj.timestamp - startTime;
return obj;
})
.subscribe(obj => console.log(obj));
See live demo: https://jsbin.com/xijaver/edit?js,console
Thanks to forkJoin the concatMap needs to wait for both Observables to complete.
On the other hand, if we wanted to force 1s delay between responses we'd just append the delay() operator after mockHTTPRequest():
.concatMap(buffer => mockHTTPRequest(buffer).delay(1000))
See live demo: https://jsbin.com/munopot/2/edit?js,console