rxjs Operator: PublishReplay How does PublishReplay work


Example

It internally creates a ReplaySubject and makes it multicast compatible. The minimal replay value of ReplaySubject is 1 emission. This results in the following:

  • First subscription will trigger the publishReplay(1) to internally subscribe to the source stream and pipe all emissions through the ReplaySubject, effectively caching the last n(=1) emissions
  • If a second subscription is started while the source is still active the multicast() will connect us to the same replaySubject and we will receive all next emissions until the source stream completes.
  • If a subscription is started after the source is already completed the replaySubject has cached the last n emissions and it will only receive those before completing.

const source = Rx.Observable.from([1,2])
  .mergeMap(i => Rx.Observable.of('emission:'+i).delay(i * 100))
  .do(null,null,() => console.log('source stream completed'))
  .publishReplay(1)
  .refCount();

// two subscriptions which are both in time before the stream completes
source.subscribe(val => console.log(`sub1:${val}`), null, () => console.log('sub1 completed'));
source.subscribe(val => console.log(`sub2:${val}`), null, () => console.log('sub2 completed'));

// new subscription after the stream has completed already
setTimeout(() => {
  source.subscribe(val => console.log(`sub_late-to-the-party:${val}`), null, () => console.log('sub_late-to-the-party completed'));
}, 500);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>