rx-java PublishSubject


Example

PublishSubject emits to an Observer only those items that are emitted by the source Observable subsequent to the time of the subscription.

A simple PublishSubject example:

Observable<Long> clock = Observable.interval(500, TimeUnit.MILLISECONDS);
Subject<Long, Long> subjectLong = PublishSubject.create();

clock.subscribe(subjectLong);

System.out.println("sub1 subscribing...");
subjectLong.subscribe(l -> System.out.println("sub1 -> " + l));
Thread.sleep(3000);
System.out.println("sub2 subscribing...");
subjectLong.subscribe(l -> System.out.println("sub2 -> " + l));
Thread.sleep(5000);

Output:

sub1 subscribing...
sub1 -> 0
sub1 -> 1
sub2 subscribing...
sub1 -> 2
sub2 -> 2
sub1 -> 3
sub2 -> 3

In the above example, a PublishSubject subscribes to an Observable which acts like a clock, and emits items(Long) every 500 milli seconds. As seen in the output, the PublishSubject passes on the vales it gets from the source (clock) to its subscribers(sub1 and sub2).

A PublishSubject can start emitting items as soon as it is created, without any observer, which runs the risk of one or more items being lost till a observer can sunscribe.

createClock(); // 3 lines moved for brevity. same as above example

Thread.sleep(5000); // introduces a delay before first subscribe

sub1andsub2(); // 6 lines moved for brevity. same as above example

Output:

sub1 subscribing...
sub1 -> 10
sub1 -> 11
sub2 subscribing...
sub1 -> 12
sub2 -> 12
sub1 -> 13
sub2 -> 13

Notice that sub1 emits values starting from 10. The 5 second delay introduced caused a loss of items. These cannot be reproduces. This essentially makes PublishSubject a Hot Observable.

Also, note that if an observer subscribes to the PublishSubject after it has emitted n items, these n items cannot be reproduced for this observer.

Below is the marble diagram of PublishSubject

enter image description here

The PublishSubject emits items to all that have subscribed, at any point of time before the onCompleted of the source Observable is called.

If the source Observable terminates with an error, the PublishSubject will not emit any items to subsequent observers, but will simply pass along the error notification from the source Observable.

enter image description here

Use Case
Suppose you want to create an application that will monitor the stock prices of a certain company and forward it to all clients who request for it.

/* Dummy stock prices */
Observable<Integer> prices = Observable.just(11, 12, 14, 11, 10, 12, 15, 11, 10);

/* Your server */
PublishSubject<Integer> watcher = PublishSubject.create();
/* subscribe to listen to stock price changes and push to observers/clients */
prices.subscribe(watcher);

/* Client application */
stockWatcher = getWatcherInstance(); // gets subject
Subscription steve = stockWatcher.subscribe(i -> System.out.println("steve watching " + i));
Thread.sleep(1000);
System.out.println("steve stops watching");
steve.unsubscribe();

In the above example use case, the PublishSubject acts as a bridge to pass on the values from your server to all the clients that subscribe to your watcher.

Further reading:

  • PublishSubject javadocs
  • Blog by Thomas Nield (Advanced reading)