rx-java Observable Hot and Cold Observables


Example

Observables are broadly categorised as Hot or Cold, depending on their emission behaviour.
A Cold Observable is one which starts emitting upon request(subscription), whereas a Hot Observable is one that emits regardless of subscriptions.

Cold Observable

/* Demonstration of a Cold Observable */
Observable<Long> cold = Observable.interval(500, TimeUnit.MILLISECONDS); // emits a long every 500 milli seconds
cold.subscribe(l -> System.out.println("sub1, " + l)); // subscriber1
Thread.sleep(1000); // interval between the two subscribes
cold.subscribe(l -> System.out.println("sub2, " + l)); // subscriber2

The output of the above code looks like (may vary):

sub1, 0    -> subscriber1 starts
sub1, 1
sub1, 2
sub2, 0    -> subscriber2 starts
sub1, 3
sub2, 1
sub1, 4
sub2, 2

Notice that even though sub2 starts late, it receives values from the start. To conclude, a Cold Observable only emits items when requested for. Multiple request start multiple pipelines.

Hot Observable

Note: Hot observables emit values independent of individual subscriptions. They have their own timeline and events occur whether someone is listening or not.

A Cold Observale can be converted to a Hot Observable with a simple publish.

Observable.interval(500, TimeUnit.MILLISECONDS)
    .publish(); // publish converts cold to hot

publish returns a ConnectableObservable that adds functionalities to connect and disconnect from the observable.

ConnectableObservable<Long> hot = Observable
                                    .interval(500, TimeUnit.MILLISECONDS)
                                    .publish(); // returns ConnectableObservable
hot.connect(); // connect to subscribe

hot.subscribe(l -> System.out.println("sub1, " + l));
Thread.sleep(1000);
hot.subscribe(l -> System.out.println("sub2, " + l));

The above yields:

sub1, 0  -> subscriber1 starts
sub1, 1
sub1, 2
sub2, 2  -> subscriber2 starts
sub1, 3
sub2, 3

Notice that even though sub2 starts observing late, it is in sync with sub1.
Disconnect is a little more complicated! Disconnect happens on the Subscription and not the Observable.

ConnectableObservable<Long> hot = Observable
                                    .interval(500, TimeUnit.MILLISECONDS)
                                    .publish(); // same as above
Subscription subscription = hot.connect(); // connect returns a subscription object, which we store for further use

hot.subscribe(l -> System.out.println("sub1, " + l));
Thread.sleep(1000);
hot.subscribe(l -> System.out.println("sub2, " + l));
Thread.sleep(1000);
subscription.unsubscribe(); // disconnect, or unsubscribe from subscription

System.out.println("reconnecting");
/* reconnect and redo */
subscription = hot.connect();
hot.subscribe(l -> System.out.println("sub1, " + l));
Thread.sleep(1000);
hot.subscribe(l -> System.out.println("sub2, " + l));
Thread.sleep(1000);
subscription.unsubscribe();

The above produces:

sub1, 0   -> subscriber1 starts
sub1, 1
sub1, 2
sub2, 2   -> subscriber2 starts
sub1, 3
sub2, 3
reconnecting  -> reconnect after unsubscribe
sub1, 0
...

Upon disconnect, the Observable essentially "terminates" and restarts when a new subscription is added.

Hot Observable can be used for creating an EventBus. Such EventBuses are generally light and super fast. The only downside of an RxBus is that all events must be manually implemented and passed to the bus.