rxjs Operator: PublishReplay Unexpected emissions when using publishReplay


Example

Based on a question. The following snippets Does not cache the expected emission and prevents further calls. Instead it re-subscribes to the realSource for every subscription.

var state = 5
var realSource = Rx.Observable.create(observer =>  {
  console.log("creating expensive HTTP-based emission"); 
  observer.next(state++);
//  observer.complete(); //absent on purpose
  
  return () => {
    console.log('unsubscribing from source')
  }
});


var source = realSource
.do(null, null, () => console.log('stream completed'))
.publishReplay()
.refCount();
    
subscription1 = source.subscribe({next: (v) => console.log('observerA: ' + v)});
subscription1.unsubscribe();
 
subscription2 = source.subscribe(v => console.log('observerB: ' + v));
subscription2.unsubscribe();
        
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.0/Rx.js"></script>

When running this snippet we can see clearly that it is not emitting duplicate values for Observer B, it is in fact creating new emissions for every subscription. How come?

Every subscription is unsubscribed before the next subscription takes place. This effectively makes the refCount decrease back to zero, no multicasting is being done.

The issue resides in the fact that the realSource stream does not complete. Because we are not multicasting the next subscriber gets a fresh instance of realSource through the ReplaySubject and the new emissions are prepended with the previous already emitted emissions.