Using RxJava for Android Dev

Desc of RxJava

ReactiveX is a library for composing asynchronous and event-based programs by using observable sequences.

It extends the observer pattern to support sequences of data and/or events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety, concurrent data structures, and non-blocking I/O.

Observables fill the gap by being the ideal way to access asynchronous sequences of multiple items

single items multiple items
synchronous T getData() Iterable getData()
asynchronous Future getData() Observable getData()

Using of RxAndroid

1. Setting Up RxAndroid

To use RxAndroid in an Android Studio project, add it as a compile dependency in the app module’s build.gradle.

1
compile 'io.reactivex:rxandroid:0.25.0'

2. Basics of Observers and Observables

When working with ReactiveX, you will be using observables and observers extensively. You can think of an observable as an object that emits data and an observer as an object that consumes that data. In RxJava and RxAndroid, observers are instances of the Observer interface, and observables are instances of the Observable class.

The Observable class has many static methods, called operators, to create Observable objects. The following code shows you how to use the just operator to create a very simple Observable that emits a single String.

1
2
Observable<String> myObservable 
= Observable.just("Hello"); // Emits "Hello"

The observable we just created will emit its data only when it has at least one observer. To create an observer, you create a class that implements the Observer interface. The Observer interface has intuitively named methods to handle the different types of notifications it can receive from the observable. Here’s an observer that can print the String emitted by the observable we created earlier:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Observer<String> myObserver = new Observer<String>() {
@Override
public void onCompleted() {
// Called when the observable has no more data to emit
}

@Override
public void onError(Throwable e) {
// Called when the observable encounters an error
}

@Override
public void onNext(String s) {
// Called each time the observable emits data
Log.d("MY OBSERVER", s);
}
};

To assign an observer to an observable, you should use the subscribe method, which returns a Subscription object. The following code makes myObserver observe myObservable:

1
Subscription mySubscription = myObservable.subscribe(myObserver);

As soon as an observer is added to the observable, it emits its data. Therefore, if you execute the code now, you will see Hello printed in Android Studio’s logcat window.

You might have noticed that we didn’t use the onCompleted and the onError methods in myObserver. As these methods are often left unused, you also have the option of using the Action1 interface, which contains a single method named call.

1
2
3
4
5
6
Action1<String> myAction = new Action1<String>() {
@Override
public void call(String s) {
Log.d("My Action", s);
}
};

When you pass an instance of Action1 to the subscribe method, the call method is invoked whenever the observable emits data.

1
Subscription mySubscription = myObservable.subscribe(myAction1);

To detach an observer from its observable while the observable is still emitting data, you can call the unsubscribe method on the Subscription object.

1
mySubscription.unsubscribe();

3. Using Operators

Now that you know how to create observers and observables, let me show you how to use ReactiveX’s operators that can create, transform, and perform other operations on observables. Let’s start by creating a slightly more advanced Observable, one that emits items from an array of Integer objects. To do so, you have to use the from operator, which can generate an Observable from arrays and lists.

1
2
3
4
5
6
7
8
9
Observable<Integer> myArrayObservable 
= Observable.from(new Integer[]{1, 2, 3, 4, 5, 6}); // Emits each item of the array, one at a time

myArrayObservable.subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
Log.d("My Action", String.valueOf(i)); // Prints the number received
}
});

When you run this code, you will see each of the numbers of the array printed one after another.

If you’re familiar with JavaScript, Ruby, or Kotlin, you might be familiar with higher-order functions such as map and filter, which can be used when working with arrays. ReactiveX has operators that can perform similar operations on observables. However, because Java 7 doesn’t have lambdas and higher-order functions, we’ll have to do it with classes that simulate lambdas. To simulate a lambda that takes one argument, you will have to create a class that implements the Func1 interface.

Here’s how you can use the map operator to square each item of myArrayObservable:

1
2
3
4
5
6
myArrayObservable = myArrayObservable.map(new Func1<Integer, Integer>() { // Input and Output are both Integer
@Override
public Integer call(Integer integer) {
return integer * integer; // Square the number
}
});

Note that the call to the map operator returns a new Observable, it doesn’t change the original Observable. If you subscribe to myArrayObservable now, you will receive squares of the numbers.

Operators can be chained. For example, the following code block uses the skip operator to skip the first two numbers, and then the filter operator to ignore odd numbers:

1
2
3
4
5
6
7
8
9
10
myArrayObservable
.skip(2) // Skip the first two items
.filter(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer integer) { // Ignores any item that returns false
return integer % 2 == 0;
}
});

// Emits 4 and 6

4. Handling Asynchronous Jobs

The observers and observables we created in the previous sections worked on a single thread, Android’s UI thread. In this section, I will show you how to use ReactiveX to manage multiple threads and how ReactiveX solves the problem of callback hell.

Assume you have a method named fetchData that can be used to fetch data from an API. Let’s say it accepts a URL as its parameter and returns the contents of the response as a String. The following code snippet shows how it could be used.

1
2
String content = fetchData("http://www.google.com");
// fetches the contents of google.com as a String

This method needs to run on its own thread, because Android does not allow network operations on the UI thread. This means you would either create an AsyncTask or create a Thread that uses a Handler.

With ReactiveX, however, you have a third option that is slightly more concise. Using the subscribeOn and observeOn operators, you can explicitly specify which thread should run the background job and which thread should handle the user interface updates.

The following code creates a custom Observable using the create operator. When you create an Observable in this manner, you have to implement the Observable.OnSubscribe interface and control what it emits by calling the onNext, onError, and onCompleted methods yourself.

1
2
3
4
5
6
7
8
9
10
11
12
Observable<String> fetchFromGoogle = Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
String data = fetchData("http://www.google.com");
subscriber.onNext(data); // Emit the contents of the URL
subscriber.onCompleted(); // Nothing more to emit
}catch(Exception e){
subscriber.onError(e); // In case there are network errors
}
}
});

When the Observable is ready, you can use subscribeOn and observeOn to specify the threads it should use and subscribe to it.

1
2
3
4
5
6
7
8
9
fetchFromGoogle
.subscribeOn(Schedulers.newThread()) // Create a new Thread
.observeOn(AndroidSchedulers.mainThread()) // Use the UI thread
.subscribe(new Action1<String>() {
@Override
public void call(String s) {
view.setText(view.getText() + "\n" + s); // Change a View
}
});

You might still be thinking that the reactive approach isn’t drastically better than using the AsyncTask or Handler classes. You are right, you don’t really need ReactiveX if you have to manage only one background job.

Now consider a scenario that would result in a complex codebase if you used the conventional approach. Let’s say you have to fetch data from two (or more) websites in parallel and update a View only when all the requests have completed. If you follow the conventional approach, you would have to write lots of unnecessary code to make sure that the requests completed without errors.

Consider another scenario in which you have to start a background job only after another background job has completed. Using the conventional approach, this would result in nested callbacks.

With ReactiveX’s operators, both scenarios can be handled with very little code. For example, if you have to use fetchData to fetch the contents of two websites, fore example Google and Yahoo, you would create two Observable objects, and use the subscribeOn method to make them run on different threads.

1
2
fetchFromGoogle = fetchFromGoogle.subscribeOn(Schedulers.newThread());
fetchFromYahoo = fetchFromYahoo.subscribeOn(Schedulers.newThread());

To handle the first scenario in which both requests need to run in parallel, you can use the zip operator and subscribe to the Observable it returns.

1
2
3
4
5
6
7
8
9
// Fetch from both simultaneously
Observable<String> zipped
= Observable.zip(fetchFromGoogle, fetchFromYahoo, new Func2<String, String, String>() {
@Override
public String call(String google, String yahoo) {
// Do something with the results of both threads
return google + "\n" + yahoo;
}
});

Similarly, to handle the second scenario, you can use the concat operator to run the threads one after another.

1
2
Observable<String> concatenated = Observable.concat(fetchFromGoogle, fetchFromYahoo);
// Emit the results one after another

5. Handling Events

RxAndroid has a class named ViewObservable that makes it easy to handle events associated with View objects. The following code snippet shows you how to create a ViewObservable that can be used to handle the click events of a Button.

1
2
3
4
5
Button myButton 
= (Button)findViewById(R.id.my_button); // Create a Button from a layout

Observable<OnClickEvent> clicksObservable
= ViewObservable.clicks(myButton); // Create a ViewObservable for the Button

You can now subscribe to clicksObservable and use any of the operators you learned about in the previous sections. For example, if you want your app to skip the first four clicks of the button and start responding from the fifth click onwards, you could use the following implementation:

1
2
3
4
5
6
7
8
9
clicksObservable
.skip(4) // Skip the first 4 clicks
.subscribe(new Action1<OnClickEvent>() {
@Override
public void call(OnClickEvent onClickEvent) {
Log.d("Click Action", "Clicked!");
// Printed from the fifth click onwards
}
});

##Conclusion

In this tutorial, you learned how to use ReactiveX’s observers, observables, and operators to handle multiple asynchronous operations and events. As working with ReactiveX involves functional, reactive programming, a programming paradigm most Android developers are not used to, don’t be too hard on yourself if you don’t get it right the first time. You should also know that ReactiveX code will be a lot more readable if you use a modern programming language, such as Kotlin, that supports higher-order functions.

reference articles:

ReactiveX

Getting Started With ReactiveX on Android