Development of the next major version of RxJava is underway. While the operators remain largely unchanged, observable creation, subscription management, and backpressure have been completely overhauled in this new version. Jake’s talk from GOTO Copenhagen 2016 will explore the what and why of RxJava 2’s changes. You’ll learn how both libraries and applications can migrate to supporting RxJava 2, and how to interop between the two versions.
Why Reactive? (0:53)
Why is Reactive suddenly becoming something that you hear a lot of people talking about? Unless you can model the entire system of your app in a synchronous fashion, having a single asynchronous source will ultimately break the traditional imperative style of programming we’re used to. Not “break” in the sense that it stops working, but in the sense that it pushes the complexity onto you, and you start losing the things that imperative programming is really good for.
Here’s an example to understand why I think this is such a big problem.
interface UserManager {
User getUser();
void setName(String name);
void setAge(int age);
}
UserManager um = new UserManager();
System.out.println(um.getUser());
um.setName("Jane Doe");
System.out.println(um.getUser());
Start with a simple class that can get us a user object with some mutaters on it. If we live in synchronous world, a single threaded world, we can rely on this to be exactly what we expect: create an instance, print out the user, modify some of its properties, print out the user.
The problem comes when we have to start modeling things in an asynchronous fashion. Let’s say we need to reflect property changes on the server side; the bottom two methods need to become asynchronous. How would we change our code in order to reflect this?
One thing you could do is nothing: you can assume that the asynchronous call to update the server will succeed, and you can make the change locally, so that it’s reflected instantly when we print out the user object. Obviously, this is not a good idea. Networks are flaky, the server might return an error, and now you have to reconcile dealing with your state locally.
We could do simple things like providing a runnable that gets called whenever this async call succeeds: now we are being reactive, where we are only updating the data that is displayed when we are ensured that it was asked successfully changed.
interface UserManager {
User getUser();
void setName(String name, Runnable callback);A
void setAge(int age, Runnable callback);B
}
UserManager um = new UserManager();
System.out.println(um.getUser());
um.setName("Jane Doe", new Runnable() {
@Override public void run() {
System.out.println(um.getUser());
}
});
However, we aren’t modeling any problems that could occur, like if the network ends up failing. Maybe we create our own listener, so when an error actually does happen, we can do something with that.
UserManager um = new UserManager();
System.out.println(um.getUser());
um.setName("Jane Doe", new UserManager.Listener() {
@Override public void success() {
System.out.println(um.getUser());
}
@Override public void failure(IOException e) {
// TODO show the error...
}
});
We could report it to the user. We can automatically retry. Stuff like this works, and it’s the avenue people go down when they need to mix in asynchronous code with code that’s running on a single thread, which is traditionally an Android main thread.
Where the problems start arising is when you have to do more. You have to support multiple calls happening: the user is changing multiple properties in your app while filling out a form, or maybe there are flows of these asynchronous calls where the success of the first one has to then trigger another asynchronous call, which can also succeed or fail.
public final class UserActivity extends Activity {
private final UserManager um = new UserManager();
@Override protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.user);
TextView tv = (TextView) findViewById(R.id.user_name);
tv.setText(um.getUser().toString());
um.setName("Jane Doe", new UserManager.Listener() {
@Override public void success() {
tv.setText(um.getUser().toString());
}
@Override public void failure(IOException e) {
// TODO show the error...
}
});
}
}
We also have to remember that this is in the context of Android, so there are a lot of additional considerations we have to take into account. For example, in our success
callback, maybe we are propagating information directly into the UI, but the problem here is that activities on Android are ephemeral. They might disappear at any time. If this asynchronous call returns after the UI has been torn down, you’re going to run into problems.
There are imperative ways to work around this. We can check some state before we end up modifying the view. We can also create an anonymous type, which will ultimately lead to a short-term memory leak, because it’s going to retain a reference to our activity; if that activity disappears, the asynchronous call still happens in the background.
The last thing is that we didn’t define what threads these callbacks run on. Maybe they come back on the background thread, so it becomes our responsibility to imperatively write code that does the thread hopping back onto the main thread.
public final class UserActivity extends Activity {
private final UserManager um = new UserManager();
@Override protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.user);
TextView tv = (TextView) findViewById(R.id.user_name);
tv.setText(um.getUser().toString());
um.setName("Jane Doe", new UserManager.Listener() {
@Override public void success() {
runOnUiThread(new Runnable() {
@Override public void run() {
if (isDestroyed()) {
tv.setText(um.getUser().toString());
}
}
});
}
@Override public void failure(IOException e) {
// TODO show the error...
}
});
}
}
We’ve cluttered this activity with a bunch of things that are really unrelated to the intent of the code, which is simply to start this asynchronous work and handle the asynchronous result, and so this is only eagerly calling this asynchronous request. We are not dealing with disabling user input, handling button clicks, or multiple fields. When you have code like this that deals with only the simple case, and suddenly you start to turn it into a real app, all these problems compound. You are left with managing a bunch of states and a bunch of checks in your activities.
Reactive Thinking (6:16)
Everything in an app is asynchronous in some way. We have our network, which we are sending requests to, and whose responses are returning after a long time. We can’t block the main thread for that, so it has to be done in a background thread. We have things like the file system, whether it’s databases writing to storage, or even shared preferences where we can’t block the main thread.
The user is also an asynchronous source of data. We push data to them in the UI, and then they react using button clicks or changing inputs in text fields. Those things happen asynchronously. We are not synchronously pulling the user for data, but waiting until they do something.
A lot of people think that you can write single threaded apps, which by default are on the main thread, but really the UI is a piece of asynchronous source. You have to react to their input in ways that make sense to them.
With flowing to the app at different times, the app has to be receptive and reactive so that you don’t wind up in a state where the main thread is blocked, or some piece of data comes in asynchronously when you weren’t expecting it and the app either doesn’t reflect it or just crashes. Ultimately, your code is what’s responsible for managing everything.
This is where the complexity lies: you have to maintain all this state in your fragments to reconcile the fact that these multiple asynchronous sources are producing and consuming data at potentially different rates. That doesn’t even take into account Android itselfm, which is a fundamentally asynchronous platform. We have things like push notifications, broadcasts throughout the system, and even things like a configuration change. The user may rotate the device at any time, and if your code is not expecting that, you will run into crashes.
Network Requests (9:19)
Unless you can model your entire system synchronously, a single asynchronous source breaks imperative programming.
It’s hard to find an app without network requests that’s fundamentally asynchronous. You have the disk and the database, a fundamentally asynchronous source, and then the UI itself should ultimately be thought of as an asynchronous source, so by default, everything in Android is already asynchronous. By clinging to more traditional imperative programming and state management techniques you are ultimately harming yourself.
We should consider a model where our code sits in the middle as the arbiter of state, and instead of trying to coordinate all these asynchronous things, we can remove our responsibility by hooking them up directly. We can change the UI to subscribe directly to the database and just react when the data changes. We can change the database and the network call to react when the button is clicked, instead of us having to receive that click and then dispatch it.
Similarly, when the network response comes back, it would be nice if that updated the data. We know that when the data updates, the UI automatically updates, and so we removed our responsibility for doing that. If Android does something asynchronously, like rotating, it would be nice if that was automatically reflected in the UI, or automatically started some background job.
Ultimately, this removes a lot of the code that we had to write in order to maintain those states ourselves. We are still writing code, but all we are doing is now connecting the bits together in a way that makes sense instead of trying to manage the state of all of them, emitting and consuming events.
RxJava (11:14)
This is where we get to RxJava. This has become the de facto reactive library for Android, mostly due to the fact that it was one of the first available for Java that was really comprehensive. RxJava 2 retains support for all the versions of Java that we need on Android.
It comes down to providing three major things:
- A set of classes for representing data sources.
- A set of classes for listening to data sourses.
- A set of methods for modifying and composing the data.
A source of data will start or stop doing some work when you start listening to it. You can think of this like a network request that’s not going to fire until you start listening for a response, and if you ultimately end up unsubscribing from that source of data before it completes, it could potentially do work by canceling the network request.
These can be both synchronous and asynchronous, and so you can model something like a network request, which is blocking, but running on a background thread. Alternatively, it can be asynchronous, which is something like calling out to Android and expecting an activity result, or even clicks in the UI can be thought of as asynchronous.
A network response would be like a single item for a single request, but a stream of button clicks would be potentially infinite, as long as your UI is there, even though you’re only subscribing to a single button. These also can be empty, and so there is the idea of a source that really only succeeds or fails and doesn’t actually have any items. You could think of this like writing to a database or writing to a file. It doesn’t really have a response or an item that it can return to you; it either completes or fails. That completion and failure aspect is modeled by sources in RxJava with these so-called “terminal events.” This is similar to a method that can return normally or throw an exception.
It also may never terminate. To go back to the button click example, if we were modeling button clicks as a source of data, that will never end as long as your UI is there, and when your UI disappears, you are probably unsubscribing from that source of button clicks. It’s not actually completing.
This all amounts to an implementation of the traditional Observer pattern. We have some thing that can produce data, and then we have a contract of what the data looks like, and all we are doing is just wanting to observe that. We want to add a listener to it and get notified when things happen.
Flowable vs. Observable (14:50)
The two main types that are going to represent this in RxJava 2 are Flowable and Observable. These both end up modeling the same types of data, which can be zero to n items. It can be empty, it can have a single item, or potentially many, and then they terminate either successfully or with an error. Why do we need two types to represent the same structure of data?
This comes down to something called “backpressure”. I don’t really want to dive into too much what backpressure is, but it is there to allow you to slow down a source of data. We are living under systems which have finite resources, and backpressure is a way to tell all of the people sending you data to slow down since you can’t process it as fast as they are sending.
In RxJava 1, every type in the system had backpressure, but in RxJava 2, we have two types. While all types expose the idea of backpressure, not all sources actually implemented it, so you would ultimately get a crash. This is because backpressure, like inheritance, must be designed and accounted for upfront. In RxJava2, you can dictate in the type system whether backpressure is supported or not.
For example, if we have a source of data which is touch events, this is something we can’t really slow down: we can’t tell the user, “draw half of your curve, and then stop and wait while I catch up drawing, and then you can continue the rest.” We can potentially do this in other ways, such as disabling buttons or displaying other UI to try to slow them down, but the source of data itself is not one that can be slowed down.
You can compare that to something like a database, where we have a large result set, and maybe we only want to pull out certain rows at a time. A database can model this really well: it has the concept of cursors. But a stream of touch events can’t model this at all, because there’s no way to push back and slow down the user’s finger. In RxJava 1, you would see both of these types implemented as Observables, so at runtime you might try and apply backpressure and you ultimately get an exception, and your app crashes.
Observable<MotionEvent> events
= RxView.touches(paintView);
Observable<Row> rows
= db.createQuery("SELECT * …");
MissingBackpressureException
In RxJava 2 we model them as different types, because one fundamentally can support backpressure and one fundamentally cannot.
Observable<MotionEvent> events
= RxView.touches(paintView);
Flowable<Row> rows
= db.createQuery("SELECT * …");
Because these are two different types, they have to expose this backpressure in some way, but because they also model the same type of data, they also have to appear the same way in terms of how data gets pushed into your callbacks. The two interfaces for listening to events from these sources look fairly similar. The first method is called onNext
, and this is the method where items are going to be delivered. As long as there is one or more items being produced by the Observable or Flowable, this method will be called for each one, allowing you to do whatever processing you want to do with the items.
Observable<MotionEvent> Flowable<Row>
interface Subscriber<T> {
void onNext(T t);
void onComplete();
void onError(Throwable t);
void onSubscribe(Subscription s);
}
interface Disposable {
void dispose();
}
interface Observer<T> {
void onNext(T t);
void onComplete();
void onError(Throwable t);
void onSubscribe(Disposable d);
}
interface Subscription {
void cancel();
void request(long r);
}
This could be infinite. If you are listening to a button click, this onNext
method will be called every time the user clicks a button. For non-infinite sources, we have those two terminal events. They either can complete
, where complete indicates success, or they can error
, where error indicates that either processing the onNext
callback resulted in an exception being thrown. Both onComplete
, and onError
are called terminal events, which means you will never get another callback after you get either one of those. Where they differ is in this last method called onSubscribe
.
If you know RxJava 1, this is something that’s fairly new: the idea that when you subscribe to either an Observable or a Flowable, you are really creating a resource, and resources often need to be cleaned up when you’re done with them. This onSubscribe
callback will be immediately called as soon as you start listening to an Observable or Flowable, and it’s going to hand you one of these two types.
For Observable, this type allows you to call the dispose
method, which essentially says, “I am done with this resource, I don’t want any more callbacks.” What if we have a network request? This would potentially cancel the network request. If you are listening to that infinite stream of button clicks, this would say that you no longer want to receive those, and it would onSet
the listener on the view. The same is true for the type onFlowable
. Even though it has a different name, the use is the same; it has this cancel
method which is essentially the same as disposable’s dispose
. The difference here is that it has a second method called request, and this is where backpressure shows itself in the API.
This request method is how you tell the Flowable that you want more items. I’m going to build up a little chart here of how these things relate to each other. We can represent any type of emissions. It could be zero, it could be one, it could be many, and it potentially could complete or potentially error. The only difference between the two is that one has backpressure and one does not.
Reactive Streams (22:10)
I want to touch on why the disposable and subscription types are named so differently, and why their methods, one is dispose, and one is cancel, instead of perhaps being one extending the other and just adding the request method. The reason is because there is this thing called the reactive stream specification. It’s an initiative where a bunch of companies got together, and said, let’s make a standard set of interfaces for reactive libraries in Java, and they wound up with these four interfaces.
You will see the subscriber type and the subscription type in the middle there. These are actually part of the specification, and that’s why they have a name that is so different than the disposable type and the observer type.
interface Publisher<T> {
void subscribe(Subscriber<? super T> s);
}
interface Subscriber<T> {
void onNext(T t);
void onComplete();
void onError(Throwable t);
void onSubscribe(Subscription s);
}
interface Subscription {
void request(long n);
void cancel();
}
interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
}
They are different because they are part of a standard and we can’t really change that. The advantage though is that because it is a standard, although this doesn’t happen frequently on Android, but if you had to use two different libraries for streams, if they both implement the standard, you can convert between them seamlessly. I’m going to change my left column here to be types that implement the reactive streams specification, which implies that backpressure is supported, and the types on the right are those which do not have backpressure.
interface UserManager {
User getUser();
void setName(String name);
void setAge(int age);
}
interface UserManager {
Observable<User> getUser();
void setName(String name);
void setAge(int age);
}
If we go back to our user manager, before we were pulling users out of this class and then displaying them when we thought it was most appropriate. Now what we could do is model this as, instead, an observable of user. It’s a source of user objects that whenever the user changes will be notified of that change and can react to the change by displaying it, instead of trying to guess when the most appropriate time is based on other events happening in the system.
There are a couple specialized sources in RxJava, and it’s a subset of the observable, so there are three of them. The first one is called single. This either has a single item, or an error, so it’s less of a stream and more like a potentially asynchronous source of a single item. It also does not have backpressure. The way to think about this is like a scaler. You call a method, you get a turn type, you get a return value, or that method throws an exception. Single essentially models the same concept. You subscribe to a single, you either get back your item or you get an error.
The difference being that it’s reactive. Completable is similar to a method that declares a void return type. It completes with no data or it throws an exception, has an error. Think of this as a reactive runnable. It’s a set of code that you can run, and it will either complete successfully or fail.
A new type in RxJava 2, compared to RxJava 1, is called a Maybe
. This either has an item, errors, or potentially has no items. The way to think of this is it’s like an optional. A method that returns an optional value will always return something, if it doesn’t throw an exception, but that optional may or may not have a value. We will see where these are going to be used in a second, but this is similar to the optional concept, except just reactive. There are no types that actually model that that are reactive streams compliant in RxJava 2, they are only modeled in the observable and backpressure-free side.
interface UserManager {
Observable<User> getUser();
void setName(String name);
void setAge(int age);
}
interface UserManager {
Observable<User> getUser();
Completable setName(String name);
Completable setAge(int age);
}
If our setName
and setAge
calls are asynchronous, they either complete or fail, they don’t really return data, and so what we want to do is model those as completable.
Creating Sources (26:23)
This example is to show how the sources are created, and how you can wrap the things that you are already using in reactive sources:
Flowable.just("Hello");
Flowable.just("Hello", "World");
Observable.just("Hello");
Observable.just("Hello", "World");
Maybe.just("Hello");
Single.just("Hello");
All of the types have static methods that allow you to create them with like scaler values. You can also create them from things like arrays or anything that’s iterable, but there are two that are really useful that I think are going to be the most used for adapting the methods, and the actions that you are already doing, in either a synchronous or asynchronous way.
OkHttpClient client = // …
Request request = // …
Observable.fromCallable(new Callable<String>() {
@Override public String call() throws Exception {Y
return client.newCall(request).execute();Z
}
});
The first one is called fromCallable
. fromCallable
is essentially modeling some synchronous behavior that returns a single value. In this case, I’m delegating to some hypothetical method called getName
. The nice thing about fromCallable
is that you are allowed to throw an exception from a callable, which is a standard Java interface, and this means that we can model things that would potentially fail using a checked exception. If we have an HTTP request that we want to make which could throw an I/O exception we can now put that inside a fromCallable
. The returned Observable, when subscribed to, will execute that request, and if that request throws an exception we will get an on error. If that request completes, we will get the response, an onNext
.
Flowable.fromCallable(() -> "Hello");
Observable.fromCallable(() -> "Hello");
Maybe.fromCallable(() -> "Hello");
Maybe.fromAction(() -> System.out.println("Hello"));
Maybe.fromRunnable(() -> System.out.println("Hello"))
Single.fromCallable(() -> "Hello");
Completable.fromCallable(() -> "Ignored!");
Completable.fromAction(() -> System.out.println("Hello"));
Completable.fromRunnable(() -> System.out.println("Hello"));
fromCallable
is available on all five types. These are for modeling synchronous sources of a single piece of data. It’s what you would use a method for in an imperative world, a method that returns a return value. In the reactive world, fromCallable
is what you use to model that.
There are also two additional methods on both, maybe
and completable
, and those allow you to model things that don’t return a value. Like I said, just a runnable, except that runnable is now reactive.
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("Hello");
e.onComplete();
}
});
The most powerful method for creating observables is aptly named create
. We create it with a callback which is called whenever there’s a new subscriber. We are given this thing called an “emitter”, and the emitter is the person that is listening. We can take data, and we can send it to the emitter. In this example what I’m doing is I’m sending a piece of data synchronously and then I’m completing the stream because it completed successfully.
I’m going to convert this to a lambda, to clean up some of the boilerplate here. I can send more than one piece of data.
Observable.create(e -> {
e.onNext("Hello");
e.onNext("World");
e.onComplete();
});
Unlike fromCallable
, I have the ability to call onNext
multiple times. The other advantage of this is that we can now model asynchronous pieces of data. If I take HTTP requests, and instead of executing it synchronously, executed it asynchronously, I can call that onNext
method in the emitter from the HTTP callback.
Another nice advantage of this create method is it allows you to do things when the person unsubscribes.
OkHttpClient client = // …
Request request = // …
Observable.create(e -> {
Call call = client.newCall(request);
e.setCancelation(() -> call.cancel());
call.enqueue(new Callback() {
@Override public void onResponse(Response r) throws IOException {
e.onNext(r.body().string());
e.onComplete();
}A
@Override public void onFailure(IOException e) {
e.onError(e);
}
});
});
If someone stops listening to the HTTP request, there’s no reason for it to continue executing. We can now add a cancellation action, which cancels the HTTP request, and cleans up the resources.
This is also super useful for Android because it’s how we model interactions with the UI. When you subscribe to an observable, we want to say start listening to button clicks, and then when you unsubscribe, we want to remove that listener, so we don’t end up leaking that reference to the view.
Flowable.create(e -> { … });
Observable.create(e -> { … });
Maybe.create(e -> { … });
Single.create(e -> { … });
Completable.create(e -> { … });
Creating Observables with this emitter works on all five types.
Observing Sources (30:44)
Flowable<String>
interface Subscriber<T> {
void onNext(T t);
void onComplete();
void onError(Throwable t);
void onSubscribe(Subscription s);
}
interface Subscription {
void cancel();
void request(long r);
}
Observable<String>
interface Observer<T> {
void onNext(T t);
void onComplete();
void onError(Throwable t);
void onSubscribe(Disposable d);
}
interface Disposable {
void dispose();
}
You don’t use these interfaces directly when you subscribe to an Observable, subscribe
being the method that starts listening.
Because of that fourth method, you are put in a weird place. What do I do with this object, and how do I unsubscribe?
Observable<String> o = Observable.just("Hello");
o.subscribe(new DisposableObserver<String>() {
@Override public void onNext(String s) { … }
@Override public void onComplete() { … }
@Override public void onError(Throwable t) { … }
});
Instead, we have a type called DisposableObserver
, and this will automatically handle that fourth method for you and allow you to only be concerned about the notifications from the Observable itself. But, how do we dispose here?
Observable<String> o = Observable.just("Hello");
o.subscribe(new DisposableObserver<String>() {
@Override public void onNext(String s) { … }
@Override public void onComplete() { … }
@Override public void onError(Throwable t) { … }
});
One thing you could do is hold onto that observer. It implements disposable
, so you can call dispose
method, and it will take care of forwarding it to you up the chain. But, there’s a new method in RxJava 2 called subscribeWith
.
This allows you to use it in a similar way that you would do if you are using RxJava 1, which is that it now returns you the thing that you can then call dispose
on. In RxJava 1, this is called a subscription. In the observable world, it’s a disposable.
For those that know of composite subscription, there’s a composite disposable and this allows you to subscribe to multiple streams, take those returned disposables, and add them to what is essentially a list of disposables, and you can unsubscribe from multiple streams at once. You will see this a lot on Android, where you have a single composite disposable for an activity or a fragment, and you are unsubscribing in the onDestroy
or whatever lifecycle callback is most appropriate.
Observable<String> o = Observable.just("Hello");
o.subscribeWith(new DisposableObserver<String>() { … });Z
Maybe<String> m = Maybe.just("Hello");
m.subscribeWith(new DisposableMaybeObserver<String>() { … });Z
Single<String> s = Single.just("Hello");
s.subscribeWith(new DisposableSingleObserver<String>() { … });Z
Completable c = Completable.completed();
c.subscribeWith(new DisposableCompletableObserver<String>() { … });Z
You have these on all four non-backpressure types, the one that use observer, and there actually is one on Flowable, even though Flowable uses the subscription callback, not the disposable one. The type we provide in RxJava 2 allows you to model it in the same way.
Flowable<String> f = Flowable.just("Hello");
Disposable d1 = f.subscribeWith(new DisposableSubscriber<String>() { … });
Observable<String> o = Observable.just("Hello");
Disposable d2 = o.subscribeWith(new DisposableObserver<String>() { … });
Maybe<String> m = Maybe.just("Hello");
Disposable d3 = m.subscribeWith(new DisposableMaybeObserver<String>() { … });
Single<String> s = Single.just("Hello");
Disposable d4 = s.subscribeWith(new DisposableSingleObserver<String>() { … });
Completable c = Completable.completed();
Disposable d5 = c.subscribeWith(new DisposableCompletableObserver<String>() { … });
You get a disposable back from all five types, even though Flowable is a little bit different. The way to think of this is like you would think of anything that is a resource, a file, a cursor on a database. You wouldn’t open a file without having some way to close it. You wouldn’t open a cursor on a database without eventually closing it. Never subscribe to an observable without managing the disposable, and ultimately, unsubscribing from it.
Operators (33:52)
Operators do three things:
- Manipulate or combine data in some way.
- Maniuplate threading in some way.
- Maniuplate emissions in some way.
Just like we took something that was imperative, like a synchronous method call, and turned it reactive, operators do the same thing. Here, we are applying an operation to a string, and getting back a new string.
Observable<String> greeting = Observable.just("Hello");
Observable<String> yelling = greeting.map(s -> s.toUppercase());Y
In the reactive world, we would have an Observable and we would apply that operation via an operator. In this case, map
is the operator, which is allowing us to take data being emitted and apply some operation to it to create a new type of that data.
If we look at our user object, we defined that originally the callbacks were coming back on a background thread and we had to explicitly move to the main thread. There’s actually a built-in operator that allows you to do that and allows you to do it at a much more declarative way.
Observable<User> user = um.getUser();
Observable<User> mainThreadUser =
user.observeOn(AndroidSchedulers.mainThread());
We can say, I want to observe emissions from this observable, on a different thread, and so the things coming from user would be on the background thread, but the things coming out of main thread user will now be on the main thread. observeOn
being the operator here.
Because we’re changing threads, the order that you apply these operators really matters. Similarly to observeOn
, we can change where the work of the Observable happens.
OkHttpClient client = // …
Request request = // …
Observable<Response> response = Observable.fromCallable(() -> {
return client.newCall(request).execute();
});
Observable<Response> backgroundResponse =
response.subscribeOn(Schedulers.io());
If we are doing a network request, that network request is still going to be done synchronously, but we don’t want it to happen on the main thread. We can apply an operator that changes where we subscribe to the Observable, which is where the work ultimately happens. When we subscribe to our background response, it will change to the background thread. I/O is just a thread pool of threads you can use, and it will do work on that thread pool, and then send out the notification to whoever is listening. subscribeOn
here being the operator to change where the work happens.
It’s nice that these all return a new Observable, because we can compose them and chain them together. What you normally see is that we don’t have intermediate variables for these. We are just applying the operators in a certain order. We want to request to execute on the background thread. We want to observe the result of that request on the main thread. We want to change the response into a string. We want to read the string. Order here matters.
OkHttpClient client = // …
Request request = // …
Observable<Response> response = Observable.fromCallable(() -> {
return client.newCall(request).execute();
})
.subscribeOn(Schedulers.io())
.map(response -> response.body().string()) // Ok!
.observeOn(AndroidSchedulers.mainThread());Y//
Because I applied the map operator after observeOn
, that’s going to run on Android’s main thread. We don’t want to be reading from an HTTP response on the main thread, we want that to be happening before we change to the main thread. The request comes in, and it emits the response down the Observable chain. We map that into the result string, and then we change threads to the main thread, where we can ultimately subscribe and show it in the UI.
Operator Specialization (37:28)
There are other operators which take Observable and return it into a different type. An operator like first()
is going to take the first element that’s emitted from a string and return it to you. In RxJava 1, we got back an Observable that only emitted one item. This is kind of weird, because if you have a list of items, and you call get
on it to get the first item you don’t get back a list with only one item. What you get back is the scaler. In RxJava 2, when you call this first operator, which is guaranteed to only return one element, you get back a single.
If the observable is empty this will result in an error because we know a single either has an item or errors, and so there are other operators like firstElement()
which is going to return you now a maybe. When the observable is empty, maybe can model that by completing without an error. There’s ones that also return completable, so if you are just ignoring the elements, all you care about is whether it completes or fails, that now returns completable, and that’s exactly what completable models.
All these exist on flowable as well. They all have the same operators. They all return the same specialized types. This is a chart that shows some of the operators. The upper right-hand corner of this is where the types narrow, so when you call something, like you want to count the number of items in a stream, account is always a single value, so you get a narrower type, like single. Then we also have operators which do the opposite, which is take a type and turn it into something more broad. You can take a single and turn it into an Observable.
Being Reactive (39:33)
If we want to be reactive with our original example, we can subscribe to our user and say, “I want notifications on the main thread, and then I want to shove that into the UI and display that user.” Anytime the user changes, this code is automatically going to run. You will automatically see updates and we no longer have to worry about managing this ourselves.
// onCreate
disposables.add(um.getUser()
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableObserver<User>() {
@Override public void onNext(User user) {
tv.setText(user.toString());
}
@Override public void onComplete() { /* ignored */ }
@Override public void onError(Throwable t) { /* crash or show */ }
}));
// onDestroy
disposables.dispose();
However, we do have to remember to manage the disposable that gets returned because we are in the Android world and when our activity disappears we want this code to stop running. In onDestroy
we would dispose of this. Dispose of the disposables.
Similarly, when we ultimately make an asynchronous request to change data, we want that to happen on a background thread. We want to observe the result on the main thread, whether it succeeds or fails, and in the success callback we could essentially reenable the text box.
disposables.add(um.setName("Jane Doe")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(new DisposableCompletableObserver() {
@Override public void onComplete() {
// success! re-enable editing
}
@Override public void onError(Throwable t) {
// retry or show
}
}));
Again, because you wouldn’t open a file without closing it, you wouldn’t subscribe without managing the disposable, so we add this to our disposable list.
I’m going to skip this, just because I’m a little low on time, but a nice thing about RxJava 2, compared to RxJava 1 is that there’s a fundamental architecture shift. What that affords, on Android specifically, is that there are less intermediate objects being created. When you create these streams, every operator you call has to return a new observable that implements that behavior. When you call map, you get a new observable that takes the old one, runs a function, and emits the new data.
That requires a bunch of intermediate objects to be allocated in order to model that stream. RxJava 2 changed how this works in a way that we get less intermediate objects created. You get less allocation to create the stream, which is call the operators. Each one results in one less object being created and there’s also less overhead when subscribing to a stream. There’s less method dispatch that has to happen, and so ultimately what we get is the faster, causing less GC version of the library, without any compromise in the API.
Conclusion (42:02)
RxJava 2 is this idea that we want to take the things which are fundamentally asynchronous in Android, whether it be the network, Android itself, a database, or even the UI, and write code that reacts to changes in these sources instead of trying to cope with changes and manage state ourselves. Right now, it’s in a developer preview release, so we are finalizing the API. In about a month, it will have its final release so that things like libraries can start using it and exposing these types automatically.
class RxJavaInterop {
static <T> Flowable<T> toV2Flowable(rx.Observable<T> o) { … }
static <T> Observable<T> toV2Observable(rx.Observable<T> o) { … }
static <T> Maybe<T> toV2Maybe(rx.Single<T> s) { … }
static <T> Maybe<T> toV2Maybe(rx.Completable c) { … }
static <T> Single<T> toV2Single(rx.Single<T> s) { … }
static Completable toV2Completable(rx.Completable c) { … }
static <T> rx.Observable<T> toV1Observable(Publisher<T> p) { … }
static <T> rx.Observable<T> toV1Observable(Observable<T> o, …) { … }
static <T> rx.Single<T> toV1Single(Single<T> o) { … }
static <T> rx.Single<T> toV1Single(Maybe<T> m) { … }
static rx.Completable toV1Completable(Completable c) { … }
static rx.Completable toV1Completable(Maybe<T> m) { … }
}
If you do use RxJava 1, there’s an interop project which allows you to convert between the types, and this will allow you to incrementally update your app.
github.com/akarnokd/RxJava2Interop
Your dependencies for that will ultimately look like this:
dependencies {
compile 'io.reactivex.rxjava2:rxjava:2.0.0-RC3'
compile 'io.reactivex.rxjava2:rxandroid:2.0.0-RC1'
// Optionally...
compile 'com.github.akarnokd:rxjava2-interop:0.3.0'
}
RxJava 2 is not something new. Reactive programming is not new by any stretch, but Android itself is a highly reactive world that we’ve been taught to model in a very imperative, stateful fashion.
Reactive programming allow us to model it in the proper way: asynchronously. Embrace the asynchronicity of the sources, and instead of trying to manage all the state ourselves, compose them together such that our apps become truly reactive.
Receive news and updates from Realm straight to your inbox