In this tutorial, we’ll be discussing the two important operators of RxJava and how they differ from one another. To know the basics about RxJava and RxJava Operators refer this and this respectively.
combineLatest
CombineLatest operator is used when you’re combining multiple observables. It allows for 2 to 9 observables.
CombineLatest operator states that the item would be emitted only when any of the Observable sources emits a value. When that happens, CombineLatest combines the most recently emitted items from each of the source Observables.
combineLatest
expects two arguments:
- List Of Observables
- Function
Following is the diagram of it from the docs:
CombineLatestFrom
is often used in cases such as Android Login Form Validation.
Zip
emits items only when all of the zipped source Observables have emitted a previously unzipped item.CombineLatest
emits when any of the observables have emitted an item.Example:
In the following code we’ve added two observables:
Observable<Integer> observable1 = Observable.just(1, 2, 3);
Observable<Integer> observable2 = Observable.just(4);
List<Observable<Integer>> observableList = Arrays.asList(observable1, observable2);
Observable observable = Observable.combineLatest(observableList, new Function<Object[], Integer>() {
@Override
public Integer apply(Object... objects) throws Exception {
int concat = 0;
for (Object value : objects) {
if (value instanceof Integer) {
concat += (Integer) value;
}
}
return concat;
}
});
In the above code, we pass the List of Observables inside the combineLatest operator. Along with that, we pass a function. This function would be called each time any of the observables emits a value.
It’ll pass on the most recently emitted values from each of the observables.
Let’s subscribe to the above observable.
observable.subscribe(value -> System.out.println("Value is " + value));
The value is 7. 3 and 4 are the last emitted items.
Let’s change the Observables to:
Observable<Integer> observable1 = Observable.just(1, 2, 3);
Observable<Integer> observable2 = Observable.just(4,5,6);
The output will be :
So, combineLatest would emit items every time an item from observable2
is emitted.
Observable.just()
emits items one by one. So each item would be the last emitted item and would be combined with the last emitted item of observable1
Let’s add a Subject which is like a Hot Observable to the mix.
PublishSubject<Integer> publishSubject = PublishSubject.create();
Observable<Integer> observable1 = Observable.just(1, 2, 3);
Observable<Integer> observable2 = Observable.just(4);
List<Observable<Integer>> observableList = Arrays.asList(observable1, observable2, publishSubject);
Observable observable = Observable.combineLatest(observableList, new Function<Object[], Integer>() {
@Override
public Integer apply(Object... objects) throws Exception {
int concat = 0;
for (Object value : objects) {
if (value instanceof Integer) {
concat += (Integer) value;
}
}
return concat;
}
});
publishSubject.onNext(-10);
observable.subscribe(value -> System.out.println("Value is " + value));
publishSubject.onNext(10);
The output is 3 + 4 + 10 = 17.
Values emitted by the PublishSubject would be only received after Subscription.
withLatestFrom
Unlike combineLatest, withLatestFrom
can be used with only two observables.
withLatestFrom
operator would emit items only when the second observable starts emitting items.
Till then it will keep dropping items emitted by the first Observable.
Once the second Observable has an emitted an item, the combined Observable would emit the last emitted items by each of the Observables.
Following is the diagram for the above operator:
Let’s look at a sample example using the withLatestFrom
operator.
Subject<String> subject = PublishSubject.create();
Subject<String> publishSubject = PublishSubject.create();
subject.onNext("a");
Observable o = publishSubject.withLatestFrom(subject, new BiFunction<String, String, String>() {
@Override
public String apply(String s, String s2) throws Exception {
return s + s2;
}
});
Inside the BiFunction, the values, when emitted from each of the observables is received.
In the above code, s
contains the last string emitted from the observable subject
and s2
comes from publishSubject
.
We concatenate both of the strings. Now let’s subscribe to the combined observable and emit items.
subject.onNext("b");
o.subscribe(value -> System.out.println("Value is " + value), error -> System.out.println("onError"), () -> System.out.println("onComplete"), onSubscribe -> System.out.println("onSubscribe"));
publishSubject.onNext("c");
subject.onNext("d");
publishSubject.onNext("e");
publishSubject.onNext("f");
publishSubject.onNext("g");
subject.onNext("h");
The subscribe function consists of four lambda functions. onSubscribe, onNext, onComplete, onError.
Now we’ve started emitting values from each of the Subject.
Values from PublishSubject are only received by the observer after the point of subscription.
To use previous values you can always use a BehaviorSubject.
Now in the above code, until subject
emits a value, values from the publishSubject
object would be of no use.
So the output of the above code is:
As you can see, in the last statement, subject emits the string h, but since the publishSubject doesn’t emit anything after that, it isn’t printed.
So in withLatestFrom
one observable acts a checkpoint for the other.
That brings an end to this tutorial. We’ve discussed the two vital operators combineLatest
and withLatestFrom
and seen how they are completely different from one another.