PublishSubject
emits to an Observer
only those items that are emitted by the source Observable
subsequent to the time of the subscription.
A simple PublishSubject
example:
Observable<Long> clock = Observable.interval(500, TimeUnit.MILLISECONDS);
Subject<Long, Long> subjectLong = PublishSubject.create();
clock.subscribe(subjectLong);
System.out.println("sub1 subscribing...");
subjectLong.subscribe(l -> System.out.println("sub1 -> " + l));
Thread.sleep(3000);
System.out.println("sub2 subscribing...");
subjectLong.subscribe(l -> System.out.println("sub2 -> " + l));
Thread.sleep(5000);
Output:
sub1 subscribing...
sub1 -> 0
sub1 -> 1
sub2 subscribing...
sub1 -> 2
sub2 -> 2
sub1 -> 3
sub2 -> 3
In the above example, a PublishSubject
subscribes to an Observable
which acts like a clock, and emits items(Long) every 500 milli seconds. As seen in the output, the PublishSubject
passes on the vales it gets from the source (clock
) to its subscribers(sub1
and sub2
).
A PublishSubject
can start emitting items as soon as it is created, without any observer, which runs the risk of one or more items being lost till a observer can sunscribe.
createClock(); // 3 lines moved for brevity. same as above example
Thread.sleep(5000); // introduces a delay before first subscribe
sub1andsub2(); // 6 lines moved for brevity. same as above example
Output:
sub1 subscribing...
sub1 -> 10
sub1 -> 11
sub2 subscribing...
sub1 -> 12
sub2 -> 12
sub1 -> 13
sub2 -> 13
Notice that sub1
emits values starting from 10
. The 5 second delay introduced caused a loss of items. These cannot be reproduces. This essentially makes PublishSubject
a Hot Observable
.
Also, note that if an observer subscribes to the PublishSubject
after it has emitted n items, these n items cannot be reproduced for this observer.
Below is the marble diagram of PublishSubject
The PublishSubject
emits items to all that have subscribed, at any point of time before the onCompleted
of the source Observable
is called.
If the source Observable
terminates with an error, the PublishSubject
will not emit any items to subsequent observers, but will simply pass along the error notification from the source Observable.
Use Case
Suppose you want to create an application that will monitor the stock prices of a certain company and forward it to all clients who request for it.
/* Dummy stock prices */
Observable<Integer> prices = Observable.just(11, 12, 14, 11, 10, 12, 15, 11, 10);
/* Your server */
PublishSubject<Integer> watcher = PublishSubject.create();
/* subscribe to listen to stock price changes and push to observers/clients */
prices.subscribe(watcher);
/* Client application */
stockWatcher = getWatcherInstance(); // gets subject
Subscription steve = stockWatcher.subscribe(i -> System.out.println("steve watching " + i));
Thread.sleep(1000);
System.out.println("steve stops watching");
steve.unsubscribe();
In the above example use case, the PublishSubject
acts as a bridge to pass on the values from your server to all the clients that subscribe to your watcher
.
Further reading: