In this tutorial, we’ll discuss and implement the various operators that RxJava has. How each operator transforms the Observable sequences and what the subscriber sees? Let’s start!
Table of Contents
- 1 RxJava Operators
- 1.1 1. map and filter
- 1.2 2. take, count and skip
- 1.3 3. startWith,reduce, repeat, scan
- 1.4 4. all, contains, elementAt
- 1.5 5. distinct, toList, toSortedList
- 1.6 6. concat, merge, zip
- 1.7 7. debounce, delay
RxJava Operators
As we had seen in the first RxJava tutorial, Operators are something that acts on an Observable and passes the transformed data to the Subscribers. With multiple operators, each operator finishes its own task and then passes the transformed data to the next operator.
Create a new IntelliJ Java Project and add the RxJava dependency to get started. We’ll use functional programming and lambdas to make the code simpler and less verbose.
1. map and filter
map()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
import rx.Observable; public class RxOperators { public static void main(String[] args) { //map operator Observable<String> mapObservable = Observable.just("hello world","the observable emits lower case sentences","subscriber sees it as upper case","map operator"); mapObservable.map(String::toUpperCase).subscribe(System.out::println); } } //Prints //HELLO WORLD //THE OBSERVABLE EMITS LOWER CASE SENTENCES //SUBSCRIBER SEES IT AS UPPER CASE //MAP OPERATOR |
filter()
1 2 3 4 5 6 7 |
//filter operator Observable<String> filterObservable = Observable.from(new String[]{"Hello","How are you?", "doing"}); filterObservable.filter(string->string.contains(" ")).subscribe(System.out::println); //Prints //How are you? |
Using Map and Filter : To return the square of all the even numbers.
1 2 3 4 5 |
//map and filter Observable<Integer> mapFilter = Observable.range(0,10); mapFilter.filter(i-> i%2==0).map(i-> i*i).subscribe(System.out::println); |
2. take, count and skip
take operator has three main variants. Let’s look at each of them.
take()
passes the first n items emitted by the Observable.takeFirst()
emits the first item that meets the condition specified in the operator.takeLast()
prints the last n items. Just the opposite oftake()
takeUntil()
emits items until a second observable doesn’t start emitting values. Alternatively, it can be used for conditions as well.takeWhile()
emits items as long as a condition is true. It ignores the remaining.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
Observable<Integer> takeObservable = Observable.range(0,100); takeObservable.take(5).subscribe(System.out::println); //prints 0 to 4 takeObservable.takeFirst(i -> i*i>1000).subscribe(System.out::println); //prints 32 takeObservable.takeLast(1).subscribe(System.out::println); //prints 99 List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5); Observable<Integer> takeUntilObservable = Observable.from(numbers); takeUntilObservable.takeUntil(number -> number>2) .subscribe(System.out::println); //prints 1 to 3 takeObservable.takeWhile(number -> number<4) .subscribe(System.out::println); //prints 1 to 3 takeObservable.takeWhile(number -> number>2) .subscribe(System.out::println); //prints nothing |
The takeFirst()
function prints the first value whose square is greater than 1000.
The takeUntil()
operator is like a do-while loop. It first prints the statement before checking for the condition for the next iteration.
As the name says, count()
returns the number of values that reach the subscriber.
1 2 3 4 |
Observable<String> countObservable = Observable.from(new String[]{"First","Second", "Third", "Seventh"}); countObservable.filter(string-> string.length()>5).count().subscribe(System.out::println); //prints 2 |
Passing null as one of the values would cause a runtime crash in the above case. Hence we need to specify the action for onError()
1 2 3 4 5 6 |
Observable<String> countObservable = Observable.from(new String[]{"First","Second", "Third", null}); countObservable.filter(string-> string.length()>5).count().subscribe(System.out::println, throwable -> System.out.println("One of the values is not valid")); //prints //One of the values is not valid |
skip
operator has a few variants that are described and implemented below
skip()
ignores the first n values.skipLast()
ignored the last n valuesskipWhile()
ignores all the values until a specific condition is met. It emits all the remainder of values.skipUntil()
ignores all the values until another observable starts emitting. We’ll look at this later
1 2 3 4 5 6 7 |
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5); Observable<Integer> skipObservable = Observable.from(numbers); skipObservable.skip(3).subscribe(System.out::println); //prints 4 and 5 skipObservable.skipLast(3).subscribe(System.out::println); //prints 1 and 2 skipObservable.skipWhile(i-> i<3).subscribe(System.out::println); prints 3 4 and 5 |
skip is somewhat the opposite of take.
skip is the ideal operator to use when avoiding null values.
3. startWith,reduce, repeat, scan
startWith()
appends the given element at the start of the emission.
1 2 3 4 5 6 |
Observable<String> startWithObservable = Observable.just(" Rx", "Java", " Operators", " Tutorial"); startWithObservable.startWith("Welcome to the").subscribe(System.out::print); //Prints //Welcome to the RxJava Operators Tutorial |
reduce()
operator acts as an accumulator. It adds a the next value to previously added values.
Finally prints the accumulated value for the subscriber.
1 2 3 4 |
Observable<Integer> reduceObservable = Observable.range(1, 5); reduceObservable.reduce((integer, integer2) -> integer + integer2).subscribe(System.out::println); |
In the above code, for the first emission, integer is 1 integer2 is 2, for the second emission, 3 and 3 respectively and so on.
reduce operator is useful for calculating the sum, appending strings etc.
The following code finds the length of the concatenated strings.
1 2 3 4 |
Observable<String> reduceStringObservable = Observable.just("Rx", "Java"); reduceStringObservable.reduce((x, y) -> x + y).map(String::length).subscribe(System.out::println); |
repeat operator repeats the emission twice.
1 2 3 4 5 6 7 8 9 10 |
Observable.just("Android", "iOS", "Windows") .repeat(2) .subscribe(System.out::println); //Prints //Android //iOS //Android //iOS |
scan
operator unlike reduce, prints the accumulator value incrementally
1 2 3 4 5 6 7 8 9 |
Observable.range(1, 5).scan((integer, integer2) -> integer + integer2).subscribe(System.out::println); //prints 1 3 6 10 15 |
4. all, contains, elementAt
The all()
operator checks whether each value meets the condition. It returns a true/false.
1 2 3 4 |
Observable.range(1, 5).all(i-> i%2==0).subscribe(System.out::println);//prints false Observable.range(1, 5).map(i-> i*2).all(i-> i%2==0).subscribe(System.out::println); //prints true |
contains()
: It checks if the value mentioned exists in the Observable lot.
1 2 3 4 |
Observable.range(1, 5).contains(6).subscribe(System.out::println); //prints false Observable.range(1, 5).contains(4).subscribe(System.out::println); //prints true |
elementAt()
: It prints the value present at the given index among the list of emitted values.
1 2 3 |
Observable.range(1, 5).elementAt(4).subscribe(System.out::println); //prints 5 |
5. distinct, toList, toSortedList
distinct()
operator eliminates duplicate values.
1 2 3 4 |
Observable.just(1,2,3,1,5,1,2,3).count().subscribe(System.out::println); //prints 8 Observable.just(1,2,3,1,5,1,2,3).distinct().count().subscribe(System.out::println); //prints 4 |
toList
and toSortedList
are used to convert the emission into another observable that is of the type list.
toSortedList
sorts the emission in ascending order.
We can also set our own comparator to sort.
1 2 3 4 5 6 7 8 9 |
Observable.just(1, 2, 3, 1, 5, 1, 2, 3).toList().subscribe(System.out::println); Observable.just(1, 2, 3, 1, 5, 1, 2, 3).toSortedList().subscribe(System.out::println); Observable.just(1, 10, 20, 2).toSortedList((integer, integer2) -> integer2 < integer ? -1 : integer == integer2 ? 1 : 0).subscribe(System.out::println); //prints //[1, 2, 3, 1, 5, 1, 2, 3] //[1, 1, 1, 2, 2, 3, 3, 5] //[20, 10, 2, 1] |
In the above code, the third observable sorts the values in descending order.
6. concat, merge, zip
concat
is used to concatenate observables without interleaving them.
merge
is used to concatenate observables by interleaving them
That means that in concat
the new observable would contain the first followed by the second.
In merge
they can be mixed, depending on when they arrive.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
Observable.concat( Observable.interval(1, TimeUnit.SECONDS).map(id -> "CA" + id), Observable.interval(1, TimeUnit.SECONDS).map(id -> "CB" + id)) .subscribe(System.out::println); Thread.sleep(5000); //Prints: //CA0 //CA1 //CA2 //CA3 //CA4 |
Note: We need to add a sleep to prevent early exit of the main function.
In the concat() method, until the first observable isn’t done it won’t move to the second.
So in the above code, the second doesn’t get printed since the first observable prints its quota in the 5 seconds.
Let’s set a limit on the first using take
.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
Observable.concat( Observable.interval(1, TimeUnit.SECONDS).take(2).map(id -> "CA" + id), Observable.interval(1, TimeUnit.SECONDS).map(id -> "CB" + id)) .subscribe(System.out::println); Thread.sleep(5000); //prints CA0 CA1 CB0 CB1 CB2 |
merge two observables
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
Observable.merge( Observable.interval(1, TimeUnit.SECONDS).map(id -> "MA" + id), Observable.interval(1, TimeUnit.SECONDS).map(id -> "MB" + id)) .subscribe(System.out::println); Thread.sleep(5000); //prints MB0 MA0 MB1 MA1 MA2 MB2 MB3 MA3 MA4 MB4 |
As you can see merge interleaves them. But it doesn’t guarantee the sequence of emission.
merge
and concat
above work on Observables only.
To make them work as operators we need to use mergeWith
and concatWith
1 2 3 4 5 6 |
Observable sourceA = Observable.interval(1, TimeUnit.SECONDS).map(id -> "MA" + id); Observable sourceB = Observable.interval(1, TimeUnit.SECONDS).map(id -> "MB" + id); sourceB.mergeWith(sourceA).subscribe(System.out::println); Thread.sleep(5000); |
Use concatWith
in the above code to get the relevant operator for concat.
zip is used to pair each emission from each of the observables. Each observable would wait for the others to emit the current value and then each of the values is available for you in a function.
zip is useful to concatenate different types.
1 2 3 4 5 6 7 8 9 10 11 12 |
Observable.zip( Observable.interval(1, TimeUnit.SECONDS).map(id -> String.valueOf( (char)(id + 65))), Observable.interval(2, TimeUnit.SECONDS), (s1, s2) -> s1 + s2) .subscribe(System.out::println); Thread.sleep(9000); //prints //A0 //B1 //C2 //D3 |
In the above code, the first observable emits an integer every second. The map operator converts the integer to the relevant character using the ASCII code and returns it as a string.
The second observable emits an integer every two seconds. So the first observable needs to wait for this.
Try implementing the zipWith
operator yourself. It’s analogous to concatWith
and mergeWith
.
7. debounce, delay
A debounce
operator only emits an item from an Observable if a particular timespan has passed without it emitting another item. This operator is really useful in places like EditText in a SearchView in Android.
Typically entering anything, the search view would call a backend API with the current text. Using the debounce operator saves the immediate call. It gives the user a time to assess the text they’ve entered and whether they’re looking to modify it. Only after the certain timespan is passed the backend API would be called!
1 2 3 4 5 6 |
Observable.just(1,2,3,4,5,6) .debounce(1, TimeUnit.SECONDS) .subscribe(System.out::println, System.out::println, () -> System.out.print("OnComplete")); //Prints 6 |
The last element is printed in the cases like above since there is nothing left and the debounce operator emits the value after the delay.
The delay
operator delays the start of emission of values from the Observable by a certain time.
1 2 3 4 5 6 |
Observable.just(1,2,3,4,5,6) .delay(5, TimeUnit.SECONDS) .subscribe(System.out::println, System.out::println, () -> System.out.print("OnComplete")); Thread.sleep(4000); |
In the above code, nothing happens since our main function would return after 4 seconds thereby preventing the emission that was to occur after 5 seconds.
This brings an end to this tutorial. We’ve covered some major operators. You can download the RxJavaOperators Project source code from the link below.