In this tutorials, we’ll be discussing RxJava Observables and Observers in length. We’ll discuss their various types and what each of them has to offer.
Observables and Observers
In RxJava, Observables are the source which emits items to the Observers. For Observers to listen to the Observables, they need to subscribe first. The instance created after subscribing in RxJava2 is called Disposable
.
In order to stop listening to Observables, we can call unsubscribe by calling the method dispose()
on the Disposable instance.
Creating Observables
We can create Observables in many ways. One of the ways are:
1 2 3 4 5 6 7 8 9 10 |
Observable<Integer> observable = new ObservableCreate<Integer>(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(ObservableEmitter<Integer> emitter) throws Exception { emitter.onNext(10); emitter.onNext(20); emitter.onComplete(); } }); |
Observable.OnSubscribe
is an interface which defines the action to be taken when a subscriber subscribes to the Observable. The subscribe method would only run when an Observer is subscribed to the Observable.
onNext
is used to emit the next item.
onError
is triggered when an error occurs.
onComplete
is called after the last item is emitted.
Now in order to catch these values, we must subscriber an observer. For that we have to create an observer first:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
Observer<Integer> observer = new Observer<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("onSubscribe"); } @Override public void onNext(Integer o) { System.out.println("onNext " + o); } @Override public void onError(Throwable e) { } @Override public void onComplete() { System.out.println("onComplete"); } }; |
Let’s subscribe to it:
1 2 3 |
observable.subscribe(observer); |
This creates a Subscription between the observer and observable. The Observable would now emit values which would be caught by the onNext
of the Observer.
The output from the console is:
1 2 3 4 5 6 7 |
Output: onSubscribe onNext 10 onNext 20 onComplete |
If you subscribe()
multiple times, each time the items would be emitted.
Methods to Create Observables
We can create Observables in the following ways:
- Observable.from()
- Observable.just() – Pass one or more values inside this.
- Observable.range – The first argument expects the starting value. The second expects the size. Eg:
Observable.range(1,2)
would emit 1 and 2. - Observable.interval() – Emits the values in the interval defined. The values emitted would be of the type Long. More on this later.
For more info check out the RxJava Tutorial.
Cold Observables and Hot Observables
Cold Observables are Observables that emit one or values such that each Subscriber would receive all the values from the beginning.
Hot Observables are Observables in which the Observer won’t be able to receive items emitted before it subscribed. Only items emitted after the Observer is emitted could be received.
The example we’d defined above was a Cold Observable.
To create a Hot Observable we do:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
Observable<Long> observableInterval = Observable.interval(2, TimeUnit.SECONDS); PublishSubject<Long> publishSubject = PublishSubject.create(); observableInterval.subscribe(publishSubject); publishSubject.subscribe(l -> System.out.println("Subscriber #1 onNext: " + l)); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } publishSubject.subscribe(l -> System.out.println("Subscriber #2 onNext: " + l)); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } |
To create a Hot Observable we need to use Subject. A Subject can act as an Observable or Observer at any given time.
Values from 0 would be emitted every 2 seconds. We’ve set the thread to sleep for 2 seconds after the first observer is subscribed. Hence the second observer won’t get the initial emitted items as shown in the output below:
Types of Observables
Following are the major types of Observables with each have a slightly different functionality and use case:
- Observable – Emits one or more values. We have already discussed this above.
- Single – Emits a single value or throws an error.
- Maybe – This may or may not emit a value. Should be used when you need some data optionally.
- Flowable – Used when a huge amount of data has to be emitted. It is used for backpressure. More on this later.
- Completable – This just emits success or failure. No data is emitted.
Types of Observers
For every Observable type above we have an Observer type as well in RxJava.
- Observer.
- SingleObservable
- MaybeObservable
- CompletableObserver
Flowable Observable uses the default Observer. Since RxJava2 for Flowables, Subscribers are used instead of Observers
Let’s now look at the basic implementation of each of the Observables with the Observers.
Single
This emits just one value. This can be used with Retrofit network calls.
Following is an example of Single:
1 2 3 4 5 6 7 |
Observable<Integer> integerObservable = Observable.just(1,2,3); Single<Integer> integerSingle = integerObservable.single(1); integerSingle.subscribe(l -> System.out.println("Subscriber #1 onNext: " + l), (Throwable e) -> System.out.println("onError")); integerSingle = integerObservable.singleOrError(); integerSingle.subscribe(l -> System.out.println("Subscriber #1 onNext: " + l), (Throwable e) -> System.out.println("onError")); |
This would give an onError
in both the cases since neither of has a single value.
single(Integer defaultValue)
and singleOrError()
are just two of the methods.
We can add plenty of other operators as well just as all
, any
,contains
, count
etc. Generally, a predicate is set in these methods which would return a single value.
Example:
1 2 3 4 5 6 7 8 9 10 11 12 |
Observable<Integer> integerObservable = Observable.just(1, 2, 3); Single<Boolean> booleanSingle = integerObservable.any(new Predicate<Integer>() { @Override public boolean test(Integer integer) throws Exception { return integer % 2 == 0; } }); booleanSingle.subscribe(l -> System.out.println("Subscriber #1 onNext: " + l), (Throwable e) -> System.out.println("onError")); Single<Long> integerSingle = integerObservable.count(); integerSingle.subscribe(l -> System.out.println("Subscriber #1 onNext: " + l), (Throwable e) -> System.out.println("onError")); |
This prints true
and 3
respectively.
Maybe
Maybe emits 0 or 1 items. The MaybeObserver has the method onSuccess
in place of onNext()
.
Following is an example using Maybe in which we print the maximum number from an Observable of Integers.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
Observable<Integer> integerObservable = Observable.just(1, 2, 3, 4, 5); Maybe<Integer> integerMaybe = integerObservable.reduce(new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer integer, Integer integer2) throws Exception { if (integer > integer2) return integer; else return integer2; } }); MaybeObserver<Integer> maybeObserver = new MaybeObserver<Integer>() { @Override public void onSubscribe(Disposable d) { System.out.println("onSubscribe"); } @Override public void onSuccess(Integer o) { System.out.println("onSuccess : " + o); } @Override public void onError(Throwable e) { System.out.println("onError"); } @Override public void onComplete() { System.out.println("onComplete"); } }; integerMaybe.subscribe(maybeObserver); |
This prints 5.
Besides the reduce
function, there are plenty of other functions such as firstElement(), lastElement() etc.
To create a zero emission observable, do:
1 2 |
Maybe<Integer> emptySource = Maybe.empty(); |
Completable
Completable is used in cases where you need to know whether an operation is completable successfully or not. Example: Uploading an image to the server. Unlike Maybe and Single, the CompletableObserver doesn’t return any value at all. Neither does the Completable Observable has a type.
Example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
Observable<Integer> integerObservable = Observable.empty(); Completable completable = integerObservable.ignoreElements(); CompletableObserver completableObserver = new CompletableObserver() { @Override public void onSubscribe(Disposable d) { System.out.println("onSubscribe"); } @Override public void onComplete() { System.out.println("onComplete"); } @Override public void onError(Throwable e) { System.out.println("onError"); } }; completable.subscribe(completableObserver); |
Flowable
Flowable is used when you need to handle lots of data. It supports backpressure. We’ll discuss it at length in another tutorial. For now, a Flowable Observable needs a Subscriber class as the Observer since RxJava2.
Following is a sample of Flowable:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
Flowable<Integer> integerFlowable = Flowable.range(1,500000); integerFlowable.reduce(new BiFunction<Integer, Integer, Integer>() { @Override public Integer apply(Integer integer, Integer integer2) throws Exception { return integer + integer2; } }); Subscriber<Integer> integerSubscriber = new Subscriber<Integer>() { @Override public void onSubscribe(Subscription s) { System.out.println("onSubscribe"); s.request(Long.MAX_VALUE); } @Override public void onNext(Integer integer) { System.out.println("onNext: " + integer); } @Override public void onError(Throwable t) { } @Override public void onComplete() { System.out.println("onComplete"); } }; integerFlowable.subscribe(integerSubscriber); |
For a Subscriber to start receiving emissions we must manually invoke request() on the Subscription instance as done above.
Converting Between Observables
We have various helper methods to convert an Observable type into another.
For example:
To convert any type to a Completable, either of the methods are available:
toCompletable()
ignoreElements()
Similarly, to convert to Observable, toObservable()
method is suffice.
Flowable – toFlowable()
Maybe – toMaybe()
Single – reduce()/firstElement() etc.
This brings an end to this tutorial on RxJava Observables.