The Reactive Extensions for the JVM, better known as RxJava, have become a popular tool in Android development over the past years. Many new and existing libraries added support for Rx and “reactive” solutions to existing problems that have popped up over time. RxJava is known for both its power and its steep learning curve, so if you aren’t using RxJava already, what are you missing out on? Should you even be using RxJava?
In this talk from Mobilization 2016, we’ll go over what RxJava is and how you can use it to solve problems in your Android apps one step at a time. We will also discuss how you can prevent shooting yourself in the foot by looking at common mistakes you may make when using RxJava, and review tips and tricks for keeping your reactive app manageable.
Introduction (0:00)
I’m Hugo, an independent Android developer at my company, Little Robots. I’m also a Google Developer Expert for Android. In this post, I’ll be discussing RxJava.
I want to give you a taste of what RxJava can look like, how to get started with it, and also show some real-life examples from apps that I’ve built. I’ll also give you some tips and tricks to get started with RxJava.
Typical Code (1:26)
// Everything is awesome and simple! (pulling the result)
public Article getArticle() throws Exception {}
// Asynchronous callback (awaiting the result)
public void getArticle(ArticleCallback callback) {}
// Asynchronous execution w/ callback on main thread
public AsyncTask<Void, Void, Article> fetchArticle() {}
Which problems are we trying to solve? For your typical code, if you’re on the happy path, you have something synchronous. You want to get an article, so you just call the getter and you’re done.
However, it’s not that simple. This article needs to come from the network; maybe a database, or something that you can’t really synchronously call to because you don’t want to block the main thread. In that case, you can resort to something like a callback interface, but this has a couple of problems: it’s hard to control lifecycle, and you generally can’t cancel it.
It’s hard to change multiple callbacks, so if you have to do multiple things, you have to coordinate all these callbacks. There’s no real standard way to do an error handling, so your callback will have a success or an error handling. There is also the question of what thread is this callback returning?
For Android code, you really want to return the result on the main thread because you want to show it in the UI. AsyncTask is sort of a solution to that. It allows you to run a background task on the background thread. It allows you to cancel it and to make sure that the result comes back on the main thread.
rx.Observables And Error Handling (2:58)
public Observable<Article> article() {}
There is still the problem of chaining and error handling. Error handling with AsyncTask is kind of ugly. When we apply RxJava to this problem, we create an observable that returns us an article. Once you invoke this method, nothing is actually executed. It’s setting up a stream where once you subscribe to it, it will start giving you these article objects. Once you execute, or subscribe to this code, it can be synchronous or asynchronous.
An observable also has built-in error handling. You can cancel it, it can be either synchronous or asynchronous, and we can handle errors. There are three types of events. There is the value event, which comes back in the onNext()
method, and that means that every time an article is produced by this observable, you would get a callback. Second is the error event, which would result in termination of the observable at that time. If you get an error from this observable, you are ensured that no other events are coming.
There’s a similar event called the completion event which means completing the stream without an error. You get this article or multiples, and at some point the stream will complete, and will tell you that it’s complete.
The Stream (5:11)
Subscribing is the way to get the data out of these observables. You just have to call this method, then subscribe to it and give it a subscriber. A subscriber has these three methods: onNext
, onError
, onComplete
.
Every time an article is produced, we get an onNext
event. If something happens, you get an error with a trouble, so it might be any exception, including exceptions that happened when you were handling stuff within your observable. If it’s completed, it will tell you as well.
Subscription (5:46
Subscription s = article().subscribe(new Subscriber<Article>() {
@Override
public void onNext(Article article) {
// this can be called multiple times
}
@Override
public void onError(Throwable e) {
// no more events, and automatically unsubscribed
}
@Override
public void onCompleted() {
// no more events, and automatically unsubscribed
}
});
There are a couple of ways to create a subscriber. One way is by subclassing a subscriber. A common way is to use inner classes.
Subscription s = article().subscribe(new Action1<Article>() {
@Override
public void call(Article article) {
}
}, new Action1<Throwable>() {
@Override
public void call(Throwable throwable) {
}
}, new Action0() {
@Override
public void call() {
}
});
If you tree inner classes for the same method, you can shorten it to this syntax.
Lambda Expressions (6:10)
Subscription s = article().subscribe(
article -> { ... },
error -> { ... },
() -> { ... });
Unsubscribe (6:48)
It’s pretty important that you keep track of your subscription, because at some point you need to unsubscribe.
s.unsubscribe();
If you are done with a stream, you want to unsubscribe. For example, if you have a fragment or an activity, you’re dealing with lifecycle, you want to cancel the subscription. You want to cancel the observable emitting events. If you don’t unsubscribe, you might leak memory. Especially when you use these inner classes, you will hold a reference to your fragment or your activity where you declared this callback. Don’t lose the subscriptions.
If you subscribe somewhere, you always want to be able to unsubscribe. There’s no hard or fast rule to know if it will leak memory, but just assume that it will.
Learning Curve (8:25)
RxJava has a reputation for being complicated and having a steep learning curve. It’s not because the syntax is so complicated, but because applying RxJava to your application is an architectural change.
It requires a different way of thinking about your application. If you are applying a library to do HTTP calls, you’re trying to reduce the amount of code that you’re writing. Maybe you could write the same thing yourself in 20 lines of code, but then you apply some fancy library and now it only takes one line of code. If you apply RxJava, you’re doing basically the same thing in a different way.
Stream vs Single Value (9:15)
You have to be thinking of a stream of events instead of doing a single callback. You also have to think, “Is this an observable that will emit one event or multiple events?” and “Does this stream ever complete?”
Then there’s the functional style of RxJava. What types are immutable, and where are we chaining all the operations on the stream? The observable itself is simple, but the power comes from manipulating the stream by using operators. Typically, I use about 10 operators, but there are more than that and you can combine them in all kinds of ways.
That also means that you have to learn some tricks to figure out which combination of operators works. There are many combinations for that.
Documentation (10:54)
Another thing that makes RxJava a little hard to learn is the documentation. There’s a lot of good documentation, but it might be very daunting to understand if you were to see it for the first time.
There’s this marble diagram, which is supposed to help you understand how an operator works. This tries to express that these are the events coming from the source observable.
flatMap (11:42)
flatMap
takes an event, and for every event that is emitted, it will produce a new observable that might produce one or more events. The red circle, for example, is transformed to two diamonds. You can see that for the green and the blue, the same applies, but you can also see that these things are not necessarily in order.
Gentle Start (12:30)
I think if you are starting out with RxJava, it’s probably a hurdle to understand all of this. I would really recommend a gentle start. Don’t try to go all reactive, because you’ll most likely fail. Instead, apply it where it makes sense.
Start with your domain layer, or in your business logic. Maybe you have some data transformation that you want to do. It’s very powerful for that. It’s also very powerful for doing something like an HTTP call, like REST. I would figure that most of you know RxJava from working with Retrofit. It makes it very easy to get this observable, and once you subscribe to it, it covers the HTTP calls being made and you get results.
RxJava as a Specialized EventBus
What RxJava does is quite similar to an EventBus. It sends events, but it also adds on some power where you can transform the events and subscribe to specific events that you are interested in.
Convert from Article to rfidTag String (15:28)
public Observable<String> rfIdTags(Observable<Article> observable) {
// map = convert an object to another object
return observable.map(article -> article.rfIdTag);
}
This is from an app where we scan inventory in shops with an RFID scanner. An article might be a t-shirt or something else. It has a tag, and we want to know which articles are actually in the store. A very simple example of using RxJava here is that we need to convert an observable of articles back into an observable of their RFID tags. We want to know a stream of tags.
We can apply a simple operator for that, given that we have this observable of articles. For now, it doesn’t really matter where it comes from.
We’re going to map the article to its RFID tag, to its string. You can see it’s taking the article and converting it to a string using this map. That’s pretty simple, but it’s important to realize that we are actually creating a new observable that’s wrapping the old one. It’s a bit like a string. If you were to do string.substring
, you are creating a new string; same for observables. That’s something to realize, because we are going to chain all these operations in the end. If you subscribe to this, you will get a callback for every article, but you will get this RFID tag and not the article itself.
Observable<String> newObservable = observable.map(article -> article.rfIdTag);
Count Same Articles (18:06)
We have a bunch of articles which have a unique RFID tag but might be the same article. I might have 10 red t-shirts, for example, which all have a different RFID tag. We want to group these articles by their article ID and then count how many there are in each group. Then, I want to create a new object out of that, emitting an article quantity.
public Observable<ArticleQuantity> quantity(Observable<Article> observable) {
return observable.filter(article -> article.isInStock).
groupBy(this::articleIdFromTag).
// convert each group to a new observable
flatMap(group ->
// each item in the group is converted to ArticleQuantity
group.map(articleInfo -> new ArticleQuantity(group.getKey(), 1)).
// then these items are reduced to a single item per group
reduce((q1, q2) ->
new ArticleQuantity(q1.articleId, q1.quantity + q2.quantity))
);
}
First, I want to filter out all the articles that are not supposed to be in stock. This filter operator will create a new observable, will chain it, and will filter out all the articles that are not in stock.
Then we’re going to group all the articles by using their RFID tag. This notation means that we are calling a method which takes an article and then returns the tag, the article IDs.
groupBy (19:45)
Your groupBy function emits not just new values, but it’s actually emitting a new observable of observables. The circles are grouped in a new observable, and all the triangles are also grouped. After we apply this operation, we now have an observable that has a GroupObservable<String, Article>
. The string is the key, and the article that I want to figure out, I want to count eventually.
Now that we have these nice groups, we can apply flatMap
on this GroupObservable
. We want to actually process every group. We take in the group, and then map it into a new Object. We have this article, grouped by its idea, and we can create our almost final object, which is the quantity. We get the article ID here and we have a quantity of one. They are still not unique.
Reduce (22:09)
Now we want to sum up every article in this group, and there’s an operator for that called reduce
.
It takes two Objects and expects you to create one Object out of it. For every ArticleQuantity that I get in here, I’ll just take the ArticleID from the first one, because I know they are the same. They are in the same group, and then I just sum up the quantities.
After all this work, we now have an observable that will emit an ArticleQuantity, unique per group, that give me how many of each I have in the group. I do think it’s very powerful. It’s basically just processing this data.
We have another situation where we needed to do something more advanced: we want to create an object called the expected stock.
Observable<ExpectedStock> expectedStock(Observable<Article> articles) {
Observable<Article> observable = articles.publish().autoConnect(2);
return Observable.combineLatest(rfIdTags(observable).toList(),
quantity(observable).toList(), (rfidTags, quantities) ->
new ExpectedStock(rfidTags, quantities));
}
We want to combine all the RFID tags that we have seen. That’s a simple thing to do there. We can combine these previous functions that we made. We can convert it to a list, and we can do the same for the quantity.
There’s a trick here: we’re going to take the original observable, then we’re going to create that into a connectable observable, and then we’re going to tell that observable that we only want to start emitting values once we have two subscribers.
The idea of a connectable observable is that you basically broadcast one observable to multiple subscribers at the same time.
ConnectableObservable<Article> connectable = observable.publish();
Subscription s1 = connectable.subscribe();
Subscription s2 = connectable.subscribe();
connectable.connect();
I know that I have two subscribers and once the second subscriber has subscribed, then the producing of these values starts. This is why this works.
ConnectableObservable<Article> connectable = observable.publish().autoConnect(2);
Subscription s1 = connectable.subscribe();
Subscription s2 = connectable.subscribe();
// happens automatically now
// connectable.connect();
I know that we have two subscribers because we have the RFID tags, and we have the quantity that we convert into a list and the net effect of this is that if this observable will trigger a network call. Only one network call is made and not two, which is of course what we want in order to be efficient. Data processing is really good case for RxJava.
Schedulers (26:28)
RxJava has a very powerful concept of schedulers, which allow you to control on which track stuff is happening. This is very important for Android because you can’t really block.
There is Schedulers.io()
, Schedulers.computation()
, and you can also create your own one from an executer. For Android, there’s a project called RxAndroid which supplies a schedule which does the work on the main thread.
There are also two operators again. Everything has an operator that controls how you apply the schedulers. They are subscribeOn
and observeOn
.
subscribeOn
I’ve seen some confusing statements about subscribeOn
, but the key thing to remember is that you can apply basically only once. If you have a chain of operators on your observable, you can only use subscribeOn
once because the net effect is that subscribeOn
controls on which thread the subscription is happening. The minute I call subscribe, it will create a new thread, and then do all the stuff that is needed to subscribe. If you apply that multiple times, it doesn’t have really an effect because it will just create a couple of threads and end up where in the first subscribeOn
basically.
observeOn
On the other hand, observeOn
controls where the event is emitted. You typically use that with the main thread to make sure that once you get your result, you get it back on the main thread. Then, you can safely update your views.
observable.map(...).subscribeOn(io()).observeOn(mainThread()).flatMap(...);
In this example, the map would be running on the IO’s thread, and then once the result from the map comes back, it will switch to the main thread and then it goes on with another operator, flatMapping or whatever. Usually in the more common sense, you will have this is the last operator and then apply subscribe.
This is also a nice caveat here that there are some operators that apply their own scheduler. That can be surprising. Sometimes you’ve done all these things right, and then you apply the schedule and all of a sudden it’s still telling you that your views are fetched on the wrong thread. That might be because operators like timer()
and debounce()
might use a different scheduler if you don’t tell them to use the scheduler that you like, so it’s kind of tricky.
Those are the basic ingredients you need to get started with RxJava I think, if you have schedulers and the observables and the operators.
Common Mistakes (30:06)
I’ve been doing code reviews lately, and these are a few things that I see often:
Subscriptions (30:16)
public class MyActivity extends Activity {
private CompositeSubscription mSubscriptions = new CompositeSubscription();
@Override
protected void onStart() {
super.onStart();
mSubscriptions.add(createObservable().subscribe(...));
}
@Override
protected void onStop() {
super.onStop();
// use clear(), not unsubscribe()
mSubscriptions.clear();
}
}
If you’re not keeping the reference to the subscription, you might leak memory. There is a trick to make this a little bit easier. There is this object called CompositeSubscription()
and it’s basically a backup subscription that you can put all your subscriptions into.
You can add your subscription and when you’re done, you call clear()
, which will unsubscribe all the subscriptions and basically cancel all the operations. There’s also an unsubscribe()
method on this thing, which you definitely do not want to use; if you call unsubscribe, then the next time this activity comes back, it will try to add a subscription, but it will immediately unsubscribe it again.
Error handling (31:38)
// Oops, no error handling!
createObservable().subscribe(value -> { … })
// errors are handled here
createObservable().subscribe(value -> { … }, error -> { … })
You should always make sure that you subscribe and supply an error handler. If you don’t, especially where you’re applying a scheduler, you will get a stack trace that shows basically nothing. It shows you that something went wrong deep down in some RxJava clause, and you have no clue where it happened. You should always be using error callbacks and if you are, then you should always log the error coming in because it might be something unexpected.
RxLint can help by flagging missing error handlers!
public void fetchValueFromTheNetwork() {
createObservable().subscribeOn(Schedulers.io()).
observeOn(AndroidSchedulers.mainThread()).
subscribe(value -> {
EventBus.getDefault().post(new ValueEvent(value));
}, error -> {
EventBus.getDefault().post(new ValueErrorEvent(error));
});
}
public Observable<String> fetchValueFromTheNetwork() {
return createObservable().subscribeOn(Schedulers.io()).
observeOn(AndroidSchedulers.mainThread());
}
mSubscriptions.add(fetchValueFromTheNetwork().subscribe(value -> { … },
error -> { … }));
These are people that are longing for an AsyncTask. They would create this observable, they subscribe to it, and apply these operators. Then something weird happens in the subscribe, so they go out to an EventBus and post the event. If you’re doing this, then just use AsyncTask!
What you should probably be doing here is creating this observable and subscribing to it, because that’s how you apply RxJava.
Breaking the stream (34:08)
createObservable().map(value -> {
// this is not the place to subscribe or call
// other functions.
createAnotherObservable().subscribe( … );
return value;
});
Another thing that’s common, which I see once in awhile, is people “breaking the stream”. For example, we have this map operator and then we realize we want to do something with that value coming into the map. Why not create another observable here and subscribe to it? That has all kinds of problems.
Instead, use flatMap
because that allows you to return a new observable based on a value that you’re getting. You would then return what you create in flatMap
.
Tips (35:33)
// this gets boring and cumbersome
retrieveArticle().observeOn(Schedulers.io()).subscribeOn(AndroidSchedulers.mainThread()).subscribe(article -> {});
Try to decompose the observables that you’re creating due to the chaining style. If you’re not careful, you’ll end up with a method which is chained like crazy and has a lot of operators.
/**
* Retrieve a single article. Will return the result on the main thread
* @return the observable
*/
public Observable<Article> retrieveArticle() {
return articleHttpCall().
subscribeOn(Schedulers.io()).
observeOn(AndroidSchedulers.mainThread());
}
// does the right thing
retrieveArticle().subscribe(article -> { … }, … );
Another tip is to apply the schedulers where it makes sense.
If you have this API in your app where you’re retrieving articles and you’re seeing yourself doing this all the time, it might make sense to set up these schedulers in your API. You probably want to do something like this, where you already make sure that it’s returning on the correct thread and then somewhere else in your app. You don’t have to do the whole subscribeOn
, observeOn
dance every time.
It makes your code a little bit more readable and makes it easier to change the schedulers.
Conclusion (37:36)
I think RxJava is a powerful but different way of doing things. It makes sense for a lot of projects to use it, but when you do, I would recommend to take small steps with it. Don’t go full-out reactive until you really get what you’re solving.
There are many places where callbacks and even AsyncTask are still okay to use. Don’t go in thinking you can remove AsyncTask completely and make everything RxJava.” Take small steps to learn it, look at the documentation, and try to figure out which combinations of operators work.
Q&A (39:05)
Q: Do you remember some statistics comparing the same project with and without RxJava in terms of complexity, number of lines of code, or other metrics?
Hugo: I don’t think I have hard figures but it’s a little bit of a double-edged sword. I noticed myself that when you’re writing these RxJava things, it’s concise and powerful, yet it comes with some cost that is hard to understand for some people. I think especially if you’re doing data processing and things like that, it’s probably way more powerful. Once you have this designed in the way that it should work, it probably reduces the amount of code.
Especially if you’re working on a team, you have to work with your team members to understand this all at the same level. You won’t be happy in a team where there’s “this RxJava guy”. There’s a little too much RxJava in places where it shouldn’t be probably.
Q: Do you use comments to ease the readability or are there any other methods?
Hugo: Yeah, so I have to keep this stuff readable. Try to compose these observables like I mentioned. I personally like to keep the the callbacks close to the observable itself. I like to keep these inner classes close to the observable.
My best tip is to try to keep the change short and then compose the various observables. That’s probably the best thing, because then you can reason about what the separate observables do. If you’re flatMapping to some observable, you might make a new function to create that particular observable, which also allows you to test that separate step better.
Q: At the beginning when you explained what the observables were about, you mentioned that once an observable receives an error, you can handle the error then complete the observable. I’m beginning with Rx, and I found myself that I would like to handle that error, maybe give some feedback to the user, but keep the Observer subscribed. What is the best way to do that on RxJava?
Hugo: When you get an error from your observable, you want to keep producing these events. There are a couple of options. There is onErrorResumeNext
. One pattern might be that you use the side effect, do an error, and then chain it with onErrorResumeNext
or retry
. Then you handle the error in your side effect call, but the error will never be reported to the subscriber in that sense, and you can tailor that to only retry on certain errors and let others fall through to your subscriber.
Q: If I want to take one action on results matching a filter and then another set of actions on all the other items, then in this use case, what would be the scenario using RxJava here?
Hugo: I would approach it like AutoConnect. You can have this observable and then filter it for the one case and leave it as-is for the other case. In that sense, you can either combine it or have two subscribers doing the same thing. It depends on how you want to handle that. Once you filter the stream, it’s being filtered; it’s not possible to filter it and then unfilter it at some point. You basically have to broadcast the same stream in two streams.
Receive news and updates from Realm straight to your inbox