Reactive programming and RxJava are hot topics and often cause for intense debate, with many questions and uncertainty. This talk will cover Airbnb’s experience adopting these new paradigms and technologies, including motivations, implementation difficulties, and lessons learned along the way. We’ll walk through some production code examples, comparing imperative versus reactive approaches, and discuss their respective advantages and limitations.
Introduction (0:00)
In this post, I’ll discuss RxJava and how we adopted it at Airbnb, where I’ve been working for a little over a year. To give you a sense of scale, we have around 15 people on the team right now. The company is growing pretty fast, and it’s always a challenge to integrate new technologies into a team.
Why RxJava? (0:47)
We all know that mobile engineering is hard. Mobile users expect an instant response to everything, and there’s also the need to switch back and forth between different threads. There’s the main thread, you need to make network calls, and you need to make different things that happen in the background. On top of that, you cannot block the UI thread.
RxJava is a good candidate for helping us out because it’s easy to switch back and forth between threads. It’s built right into the framework. Async can be very cumbersome and error-prone, and RxJava is one reason we you don’t need to do that anymore, and why you can compose different tasks together.
The real reason we need RxJava, though, is because we make shitty software. Why do we have so many bugs? Why do we need crash reporting tools that track how many hundreds of thousands of crashes we had, or how many users are mad at us? Something must be wrong.
We need a change; I think imperative programming is not the way we should be doing this. Of course, object-oriented programming has been around for many years. That’s built into our core as modern programmers. Everyone does this with eyes closed, but it isn’t necessarily the way we should be doing software.
Functional programming is the concept behind RxJava, and I think it’s one way you can achieve more solid code that doesn’t keep state around forever. The code is more reliable and you know that works.
Bottom line: our problem is that we write bad code and mobile engineering is hard, and this is one way of tackling that problem.
Streams (3:48)
RxJava is part of ReactiveX, a group of open source libraries. They have libraries in many different languages, including JavaScript, Groovy, Ruby, Java, C#, and the list goes on. However, they all share the same concept, which is functional programming.
Streams are the main part of this concept. Everything in your code is a “stream”, and you have to reconceptualize the way you think about it. Today, we think about code procedurally, since that’s the way you’ve been doing it. You do one instruction, then another instruction, and then you have a loop, and then you call up a method, then they return. You add in the fact that you do this in different threads, so you have parallelism. You always have to consider what happens if a thread comes back, but yuo’re somewhere else. It gets really hard, especially for mobile programming.
Learning about streams was really different for me. I started learning them about six months ago, when we first adopted RxJava at Airbnb, and it was so complicated. The first time I looked at it, I would get to a page, and I couldn’t grasp all of the different concepts. Observables, observers, it was too much. However, after a few times working with it, you start to internalize some of the concepts, and it starts to make more sense. The core idea, though, is that everything is a stream.
So Many Concepts (5:54)
We can all agree that reactive programming is hard, but it is definitely growing. I can see a trend with React Native, React, and all those different libraries coming up. There are also new tools, for example Cycle, Elm, and other languages that are focusing on this.
However, there are too many concepts to grasp! From the top of my head, there are two main concepts: observable and observer, the main two classes that you have to know about. However, there’s also subscriber, subscription, producer, hot/cold observables, backpressure, scheduler, subject, and more. This isn’t even 10%. It can get overwhelming!
If you feel confused, it’s fine, and I can sympathize with you. Just keep going, keep trying, and eventually things will make sense. (I’m not trying to scare you, but it’s the reality.)
Adoption Challenges (7:15)
I want to focus on how this went for us at Airbnb: our process, things we learned, things that were good, and things that were not so great.
Team Size - We have a team of 15 people, which is a pretty big team, and definitely the biggest I’ve worked on. Everyone is working and pushing code to the same repo (our Android app). It’s one code base, and everyone reviewing everyone else’s code. It’s really important that everyone knows what’s going on and everyone understands what code you are writing.
We use Phabricator for reviewing code, which has a similar process to a GitHub pull request. You write comments, make suggestions, give feedback, and other stuff. Before you decide to adopt RxJava, it’s really important to get everyone on the same page. If you just start using it, people on the team are aren’t going to understand what’s going on, and it will be hard to see what’s happening behind the scenes. Maybe if you’re just two people, it’s fine, but when you have more on the team, it’s going to be a challenge to get everyone up to speed.
Learning Curve - There are so many things to understand that you often make silly mistakes. You’re going to write code that makes no sense, and you’re going to crash in production, but eventually it will be fine. In my experience, it’s taken around two months for everyone to undersand it. I recommend talking about it as a group, and trying to explain these concepts to the rest of the team if you’re planning to adopt this technology. Try to get a good grasp of it yourself, and once you’re feeling confident, get everyone in the same room and talk about it. Go hands-on: open Android Studio and try to demo some code.
Debugging - It’s a really big problem. Everyone in the community knows that, and they know that is something that needs to be fixed. I recently got a stack trace from our bug tracking system that had so many exceptions. It’s just a massive amount of stuff with a lot of noise. I don’t know if anyone is actively working to fix this, so if there any startups looking for good technology to focus on, here’s something you could try to fix!
Common Pitfalls (11:34)
I want to point to some blockers for us. These caused a lot of trouble while adopting RxJava.
observeOn() (11:49)
If you’re planning to work with RxJava, you need to know this important core concept.
return observableFactory.<T>toObservable(this)
.compose(this.<T>transform(observableRequest))
.observeOn(Schedulers.io())
.map(new ResponseMetadataOperator<>(this))
.flatMap(this::mapResponse)
.observeOn(AndroidSchedulers.mainThread())
.<AirResponse<T>>compose(group.transform(tag))
.doOnError(new ErrorLoggingAction(request))
.doOnError(NetworkUtil::checkForExpiredToken)
.subscribe(request.observer());
This is a piece of code we have in our app in which we are building an observable stream of RxJava. We call observeOn
twice here, which may make no sense. Effectively, every time you call observeOn
, everything after that point will run on that scheduler, and when you call it again later on, it switches again.
When using RxJava, you’re building a stream. One common misconception about RxJava is that it’s asynchronous, but everything is synchronous by default, actually. When you build a stream, you are just constructing it to the point where we actually subscribe to it. When you subscribe, you build it all together and then actually execute it. Until you call subscribe, you’re only constructing a stream. It’s like a declaration process. When you say observeOn
, you are switching to a different thread. If you don’t call observeOn
, everything’s going to run the same thread that is subscribed to the observable. Here, we have subscribe
, so if this is called from the main thread, everything will happen from the main thread regardless of what you are doing. So, observeOn
is effectively a way of offloading that work to a different thread, and that’s going to make it asynchronous.
The first time we call observeOn
, we’re passing a scheduler. It has a few of these schedulers built in, and one of them is the I/O scheduler, which is, of course, going to work on the I/O thread, which is a thread pool bound to your I/O. The map
and flatMap
operators will execute that thread, and when that’s finished, we send it back to the main thread. So, you’re doing work on the main thread, assuming this is called from the main thread, offload it to the background, and then move it back to the main thread.
If you didn’t use RxJava, this would be a pretty complicated thing to do. Instead, we have this simple and short declarative way of saying what you want to do, and that’s one of the reasons why it’s RxJava seems so complicated: it’s a very short amount of code, but it takes a long time to actually understand what’s going on in there.
subscribeOn() (16:14)
Another concept tied closely to observeOn
is subscribeOn
. It’s going to change the thread the observable is subscribed to, which sounds complicated if you aren’t familiar with the terms.
return observableFactory.<T>toObservable(this)
.compose(this.<T>transform(observableRequest))
.observeOn(Schedulers.io())
.map(new ResponseMetadataOperator<>(this))
.flatMap(this::mapResponse)
.observeOn(AndroidSchedulers.mainThread())
.<AirResponse<T>>compose(group.transform(tag))
.doOnError(new ErrorLoggingAction(request))
.doOnError(NetworkUtil::checkForExpiredToken)
.subscribeOn(Schedulers.io())
.subscribe(request.observer());
The first call here, observableFactory.<T>toObservable
is where the observable object is created, and that’s exactly the piece of code that’s affected by subscribeOn
. There’s the code that runs upon subscription, and you have code that runs when you subscribe to it, and then you have all the other mutations on that stream. This changes where the subscription code runs into, not the rest. It doesn’t matter when you call it, it only changes the thread where the subscription is executed.
Error handling (18:05)
return observableFactory.<T>toObservable(this)
.compose(this.<T>transform(observableRequest))
.observeOn(Schedulers.io())
.map(new ResponseMetadataOperator<>(this))
.flatMap(this::mapResponse)
.observeOn(AndroidSchedulers.mainThread())
.<AirResponse<T>>compose(group.transform(tag))
.doOnError(new ErrorLoggingAction(request))
.doOnError(NetworkUtil::checkForExpiredToken)
.subscribeOn(Schedulers.io())
.subscribe(request.observer());
We use doOnError
as one way of logging our errors. Let’s say your network failed, and you want to log that to your analytics service, and you want to know how many times that happened. doOnError
is actually is an action that’s executed whenever you get an error in the stream, and you can have multiple of those, so you can do multiple error handlings for one stream. When it sees an error event, it’s going to call its action, but it’s just a side effect.
return observableRequest
.rawRequest()
.<Observable<Response<T>>>newCall()
.observeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.flatMap(responseMapper(airRequest))
.onErrorResumeNext(errorMapper(airRequest));
Another construct that can be used is onErrorResumeNext
, which works kind of like a catch block, which is something that doesn’t make sense at all in the reactive world. It’s like you’re saying, “Hey, whenever I see an error, I want to run this action here which is going to st catch that error and keep going, then wrap that exception, log it, and may return an empty data set or something.” If you think imperatively, it works like a catch block.
Unit testing (20:04)
It sounds complicated to do unit testing in a synchronous stream of data, so RxJava comes with this nice class called TestSubscriber
.
@Test public void testErrorResponseNonJSON() {
server.enqueue(new MockResponse()
.setBody("something bad happened")
.setResponseCode(500));
TestRequest request = new TestRequest.Builder<String>().build();
TestSubscriber<AirResponse<String>> subscriber = new TestSubscriber<>();
observableFactory.<String>toObservable(request).subscribe(subscriber);
subscriber.awaitTerminalEvent(3L, TimeUnit.SECONDS);
NetworkException exception = (NetworkException)
subscriber.getOnErrorEvents().get(0);
assertThat(exception.errorResponse(), equalTo(null));
assertThat(exception.bodyString(), equalTo("something bad happened"));
}
You can use TestSubscriber
to subscribe to your stream, and then you can block into it and wait until you get an event. It has handy methods, for examle .awaitTerminalEvent
, which is going to block the thread until it gets a terminal event (i.e. onCompleted
or onError
). For every event in your stream, you get several onNext
from zero to n times, and then when it finishes, you get onCompleted
or if it fails, you get onError
, and afterwards you cannot get any event anymore, so that ends the stream.
@Test public void testUnicodeHeader() {
server.enqueue(new MockResponse().setBody("\"Hello World\""));
TestRequest request = new TestRequest.Builder<String>()
.header("Bogus", "中華電信")
.build();
observableFactory.toObservable(request)
.toBlocking()
.first();
RecordedRequest recordedRequest = server.takeRequest();
assertThat(recordedRequest.getHeader("Bogus"), equalTo("????"));
}
Another thing it can use is toBlocking
. That’s effectively going to block the thread, which is very handy on unit testing. It’s not as useful on production code, of course. If you’re using RxJava, you’re likely not the one to block your thread, but it comes really handy when testing. It’s a little shorter than using test subscriber. If you know it’s not going to fail, you can just block and get the first event.
In this case, we’re using OkHttp where we can mock the response and then send the fake response back. We’re adding a header there and testing if this was a bug on OkHttp, for special characters on headers. So, we’re just testing if we cleaned up the header correctly.
Memory leaks (22:41)
If you do mobile, you know all about memory exceptions.
When you subscribe to a stream, you get a subscription back. Once you get a subscription, you can unsubscribe to it, so you’ve literally freed resources. You don’t have a reference to that stream anymore. We all know that’s important when making requests, for example, from an Android activity or a fragment. You don’t want to just fire and forget, you want to actually clean the resources that are allocated when the activity is destroyed, which is a pretty common pattern.
private final CompositeSubscription pendingSubscriptions =
new CompositeSubscription();
@Override public void onCreate() {
pendingSubscriptions.add(
observable.subscribe(observer));
}
@Override public void onDestroy() {
pendingSubscriptions.clear();
}
You can use CompositeSubscription
, which is a class that can group multiple subscriptions together, and then you just add the subscription to it. Then, whenever you destroy the activity, you just clear it out.
Additional Resources (23:55)
Q&A (23:35)
Q: What’s your strategy for retrying when you get an error? For eample, for a network request to your server to go get data.
Felipe: In our case specifically, we don’t really retry requests; we just fail and that’s it. But if you want to do that, one way of doing that would be with onErrorResumeNext
. You could resume to another request, and do that multiple times. If you get one error, resume two different requests and then maybe you can do it twice, to retry it two times if you want. That’s one way of doing it. I haven’t really implemented that with RxJava, though, so I don’t have a specific suggestion.
Q: In Java 8, lambdas don’t necessarily carry a reference to the context in which they were created, but in Android, I believe they do? Android hasn’t really implemented Java 8. I’m curious about your experience with memory leaks regarding holding references back.
Felipe: We didn’t really have any issues with lambdas, as far as I could tell, regarding memory management. It was pretty surprising. We use Retrolambda which backports Java 8 lambdas into Java 7 byte code, because Android doesn’t support Java 8 natively. It’s like a hack that changes byte code into something compatible with Java 7. I haven’t seen any issues, our code seems to be fine, regarding lambdas.
Q: I don’t know if you follow the RxJava repo, but Ben Christensen left back in October to go work for Facebook, and since then, the project has kind of taken a steep drop off. It’s being maintained by one guy right now. Do you have any concerns about kind of like the long term viability of basing your development around this framework?
Felipe: That’s definitely true. As far as I heard, Netflix is committed to allocating people to work on this, but that doesn’t seem to be happening yet. David Karnok, the guy who has been maintaining this product alone, is really, really smart. The thing is, RxJava 1.0 is really stable. I don’t think I am worried about future bug fixes or security or anything like that. It should be good for years to come. Regarding future development, like 2.0, that’s kind of an unknown. We don’t really know what’s going to happen. I’m quite comfortable with 1.0, though, since it’s like pretty stable.
Q: Could you elaborate on techniques for splitting streams? There’s a lot of material on combining streams, but the only technique I’ve really found for splitting would be the publish-connect model, and I feel like that’s often probably better suited for adding multiple subscriptions, rather than splitting the observable into mulitple observable streams.
Felipe: Right, so I’ve used share
before, which is one way of having one observable being subscribed by multiple people, like you said. I can’t remember exactly the difference between share and publish-connect, which are basically the same thing. We’ve used that for allowing multiple people to subscribe to the same observable, but I don’t really know other ways we could do that.
Q: You mentioned that one of the difficulties was hot vs. cold observables. Do you have documentation or naming conventions in your code to try and make clear which is which, so like, “Oh, surprise! You subscribe and there are side effects!”
Felipe: It’s just one more concept that you have to grasp. We didn’t really have that problem because most of our usage of RxJava was around Retrofit, which is pretty straight forward. You subscribe to it and make a network request. I haven’t really had much problem with that, since I haven’t touched it that much. But the RxJava repo has a really good Wiki, they have like great documentation, and also the ReactiveX website has a lot of stuff.
Q: When you decided to switch to RxJava, you decided to migrate a whole bunch of things to your repository. Can you talk about some of those challenges at Airbnb? Was it all just people problems, or was it something else?
Felipe: I would say it was mostly a people problem, including myself, because I was also learning. It’s a problem when you are trying to teach a tool that you are also learning yourself. It’s hard to explain some of these concepts because they’re so complicated. That was the biggest challenge in my opinion, getting everyone up to speed. We didn’t have many production issues. We had a few crashes here and there due to naive usage, I can’t remember exactly. We had a legacy setup using Volley for our network, and then we switched to Retrofit. We had a lot of Legacy code around that, with request classes, and it doesn’t really fit the way Retrofit works, so we actually had to fork Retrofit to make it work with our current setup. However, most of the challenges were around getting everyone to understand it. It just takes time, and you cannot skip that.
Q: You mentioned RxJava is hard to take on. Now, looking back and seeing that you’ve migrated over, do you think it was worth the trade off for the pros you’ve mentioned?
Felipe: I would think it’s worth the investment. It’s a big investment, of course. We’re going to spend a lot of time learning, and I think it’s definitely worth it. You get more benefit once you start to use it more. Once you start to combine streams and mutate them, you’ll look and think, “Hey, I am accomplishing a lot of stuff with a tiny it of code.” That’s really amazing once you reach that point. So I definitely think it was worth the price.
Q: Do you use RxJava for everything, or do you draw a line where you want to use RxJava only in certain complicated places in terms of asynchronous tasks?
Flipe: We are only using it around our network layer. We’re not even exposing it to the UI, because that’s a much bigger bet. Getting everyone on board and the knowledge thing comes into play, because if you expose that to your activity layer or your views, then it means that everyone literally has to understand it to use it. In the network layer, fewer people who are touching it, so you don’t have to have everyone understand what’s going there. We just expose an API, and it works. It’s like a bridge where we use RxJava on one end, and on the other side it’s like normal land. It’s kind of limited use, even though it’s a core part of our app.
Receive news and updates from Realm straight to your inbox