PublishSubject emits to an
Observer only those items that are emitted by the source
Observable subsequent to the time of the subscription.
Observable<Long> clock = Observable.interval(500, TimeUnit.MILLISECONDS); Subject<Long, Long> subjectLong = PublishSubject.create(); clock.subscribe(subjectLong); System.out.println("sub1 subscribing..."); subjectLong.subscribe(l -> System.out.println("sub1 -> " + l)); Thread.sleep(3000); System.out.println("sub2 subscribing..."); subjectLong.subscribe(l -> System.out.println("sub2 -> " + l)); Thread.sleep(5000);
sub1 subscribing... sub1 -> 0 sub1 -> 1 sub2 subscribing... sub1 -> 2 sub2 -> 2 sub1 -> 3 sub2 -> 3
In the above example, a
PublishSubject subscribes to an
Observable which acts like a clock, and emits items(Long) every 500 milli seconds. As seen in the output, the
PublishSubject passes on the vales it gets from the source (
clock) to its subscribers(
PublishSubject can start emitting items as soon as it is created, without any observer, which runs the risk of one or more items being lost till a observer can sunscribe.
createClock(); // 3 lines moved for brevity. same as above example Thread.sleep(5000); // introduces a delay before first subscribe sub1andsub2(); // 6 lines moved for brevity. same as above example
sub1 subscribing... sub1 -> 10 sub1 -> 11 sub2 subscribing... sub1 -> 12 sub2 -> 12 sub1 -> 13 sub2 -> 13
sub1 emits values starting from
10. The 5 second delay introduced caused a loss of items. These cannot be reproduces. This essentially makes
Also, note that if an observer subscribes to the
PublishSubject after it has emitted n items, these n items cannot be reproduced for this observer.
Below is the marble diagram of
PublishSubject emits items to all that have subscribed, at any point of time before the
onCompleted of the source
Observable is called.
If the source
Observable terminates with an error, the
PublishSubject will not emit any items to subsequent observers, but will simply pass along the error notification from the source Observable.
Suppose you want to create an application that will monitor the stock prices of a certain company and forward it to all clients who request for it.
/* Dummy stock prices */ Observable<Integer> prices = Observable.just(11, 12, 14, 11, 10, 12, 15, 11, 10); /* Your server */ PublishSubject<Integer> watcher = PublishSubject.create(); /* subscribe to listen to stock price changes and push to observers/clients */ prices.subscribe(watcher); /* Client application */ stockWatcher = getWatcherInstance(); // gets subject Subscription steve = stockWatcher.subscribe(i -> System.out.println("steve watching " + i)); Thread.sleep(1000); System.out.println("steve stops watching"); steve.unsubscribe();
In the above example use case, the
PublishSubject acts as a bridge to pass on the values from your server to all the clients that subscribe to your