RxJava’s growing popularity has developers looking for ways to incorporate it’s capabilities into their apps. However, the combination of a steep learning curve, and the lack of practical examples, can make it challenging to leverage advanced functions in the library. This talk from the San Francisco Android User Group uses a complex, practical example to exemplify how RxJava can simplify the process of uploading files and persisting their state within Android. It also explores some of RxJava’s more advanced constructs such as subjects, handling and retrying on errors, and publishing events to the UI.
Intro (0:00)
Hello, my name Jag Saund, and I’ve been an Android engineer since Cupcake (version 1.5).
The focus of this talk is RxJava. RxJava makes our lives easier with concurrency, and brings us into the world of functional reactive programming. It is a powerful framework with different useful operators. But because there are certain nuances, it’s easy to get into trouble quickly.
I hope to bring back and realign those concepts of RxJava to something that is more relevant to what you’re working on, so you can see how it fits directly into your apps.
The Goal (2:10)
We want to build a file uploader, but it’s not just uploading files. In particular, we want to be able to manage the life cycle of all those upload operations. We want be able to control what states they go through: queued, sending, completed, and failed. We want to be able to persist that state back to disk, so that if the application were to restart, the application would be able to restore that state and show it back to the user.
Secondly, we want to be able to communicate that progress back to the UI, and finally, be able to handle failures. Essentially, we want to build out some sort of a library that is able to upload photos up to 500 pixels and be able to get the progress percentage and communicate it back to the UI.
Architecting the Uploader with RxJava (3:05)
Starting with the end goal in mind, we want to be able to upload this file to a remote endpoint. In order to do that, we need an uploader component to connect to the backend so that we can read content out of that file. As we’re getting that content out, we want to be able to get the progress updates as well, convert them into status objects, publish it to a status stream, and once that operation completes, publish the completed status to our status stream. If it fails, we want to publish a failed status.
We’re basically taking status objects that are events in our system and pushing them onto this status stream. Now we can take and consume either back to the UI, or have our job repository consume that and be able to persist and update that state. We have a job repository that subscribes to that stream, reads that status, and updates the job status as it comes in. The other thing we need to do is expose some way of queuing up new jobs into this system.
A job object gets published to the job stream. We have our job repository that’s going to subscribe to that, save it to disk, and then from there, we’re going to publish that event back to the status stream.
At this point, we’ve figured out how to queue up new jobs into that stream and we figured out how to publish progress updates to that status stream, but there’s really no way to kick off this operation yet. In order to do that, we need our uploader to consume these status events from the status stream, but just filter out the ones that are queued. That is basically what we’re going to try to build out, mostly towards the uploader side and seeing how it connects back to the UI.
There’s two things that I learned by doing this. One is that I highly encourage you to go through the exercise of architecting something like this before jumping into the code. Doing so saves a lot of trouble down the road. Two, I went through several iterations of going through this before arriving at this. Previous ones were a lot more complicated, and although I think this is fairly simple, there’s still some stuff that can be simplified. a
In dealing with the functional reactive world, the goal is to think of things as event streams. In this case, we have status and jobs, and how they interact with those different components. Those events are going to dictate how those different components are going to react to those events.
The Uploader (6:05)
Let’s start with the core component. It’s going to connect to some remote endpoint, read stuff out of that file, and as it’s doing that, we’re going ot emit these progress updates out.
For networking, I’ll use OkHttp. OkHttp provides us with RequestBody
, which provides for progress updates. The abstract method that we’re most interested in here is the writeTo
method.
public abstract class RequestBody {
/**Returns the Content-type header for this body. */
public abstract MediaType contentType();
/**
* Returns the number of bytes that will be written to {@code out} in a
* call to {@link #writetTo}, or -1 if that count is unknown.
*/
public long contentLength() throws IOException {
return -1;
}
/** Writes the content of this request to {@code out}. */
public abstract void writeTo(BufferedSink sink) throws IOException;
}
Assume there’s a call back, and it exposes three methods: onProgress
, onComplete
, and onError
. From there, we’re going to figure out how to take this imperative approach back into the reactive world.
final RequestBody requestBody = new RequestBody() {
...
@Override
public void writeTo(@NonNull BufferedSink sink) throws IOException {
try (final Source source = Okio.source(file)) {
final long total = contentLength();
long remaining = total;
long size = Math.min(BUFFER_SIZE, remaining);
while (remaining > 0) {
sink.write(source, size);
final int progress = (int) (100 - ((float) remaining / total) * 100);
// TODO emit progress
remaining -= size;
size = Math.min(BUFFE_SIZE, remaining);
}
//TODFO emit complete
} catch (@NonNuyll IOException e) {
//TODO emit error
throw e;
}
}
}
Essentially, at this point, our HttpClient has established this connection to some remote endpoint. We have a socket, and HttpClient provides us this BufferedSink to write into that socket. That’s where the writeTo
comes into play.
We are going to want to write our file to this, so we construct the source by wrapping that file. From there, we’re going to read some buffer-sized amounts each time, and write it back into the sink. From there we can figure out how much progress we’ve completed. That’s our first opportunity to, if we were using a callback, to call onProgress
there. Once we’ve done that, we can use our callback and use onComplete
or onError
if there was an exception.
Starting with a conceptual overview, what we’re trying to do is create this component that is able to publish progress events onto this stream. To accomplish this, we’ll need an observable and a observer.
If we were to turn this RequestBody
into a reactive one, using the subject, first we declare it, and construct it in our constructor. From there, it’s really easy; we just need to call onNext
, publish that progress, and once it’s done reading all the content out of that file, we can then call onComplete
to terminate that stream, or if there was an exception, we’ll call onError
.
public class ProgressRequestBody extends RequestBody {
...
@NonNull private final BehaviorSUnject<Integer> uploadProgressSubject;
...
@NonNUll
public Observable<Integer> getProgress() {
return uploadProgressSubject.asObservable():
}
...
@Override
public void writeTo(@NonNull BufferedSink sink) throws IOException {
try (final Source source = Okio.source(file)) {
...
while (remaining > 0) {
...
uploadProgressSubject.onNext(progress);
...
}
uploadProgressSubject.onCompleted();
} catch (@NonNull IOException e) {
uploadProgressSubject.onError(e);
throw e;
}
}
}
Something to note, we don’t actually want to risk exposing the subject. It’s very easy to publish something onto that stream. We just had to call onNext, and if we expose this subject to another layer, it makes it very easy for that client to also publish events onto that stream.
To solve this issue, what we need is an observable:
Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> observer) {
try {
if (! observer.isUnsubscribed()) {
for (int i = 1; i < 5; i ++) {
observer.onNext(i);
}
observer.onCompleted();
}
} catch (Exception e) {
observer.onError(e);
}
}
});
The first thing is that I have to be very careful that I don’t emit events when I don’t have an observer subscribed to this. To make sure I can place this, I need to place this guard against the component where I write, or publish events to that stream. I need to explicitly handle exceptions. If I don’t, this entire thing kind of collapses. If I do get an exception, I should probably propagate that downstream.
The other thing is that, although it’s not explicitly shown here, if we did have callbacks that we had registered within this observable, we need to make sure that we un-register them. Let’s say that we have a view, register for onClick events. If I no longer have any observers subscribed to this observable, I need to make sure that I’ve un-registered it, and if I don’t, I risk the chance of leaking memory.
Now that we know about these three different things that we need to worry about, although they’re kind of a nuisance, we can handle it. The big problem, which is the deal-breaker, is the fact that we need to handle backpressure. For that main reason, we want to avoid this way.
The Best Solution (12:08)
That brings us to the third way, which is probably the best way to do this. We still use the create method, but we’re provided a submitter. This submitter basically wraps that nasty stuff that we would’ve had to deal with in the onSubscribe version.
Observable.create(Action<Emitter<T> emitter,
Emitter.BackpreassureMode backpressure)
This bridges this reactive world with the callback. Let’s assume that we have some API, which is asynchronous. When we invoke it, in order to get those events back, we have a callback that we pass along to it. When those events fire, we could just simply call emitter.onNext
, or once it’s, to terminate that stream, we’ll call onCompleted
or onError
.
Observable.<Event>create(emitter -> {
Callback listener = new Callback() {
@Override
public void onEvent(Event e) {
emitter.onNext(e);
if (e.isLast()) {
emitter.onCompleted();
}
}
@Override
public void onFailure(Exception e) {
emitter.onError(e);
}
};
AutoCloseable c = api.someMethod(listener);
emitter.setCancellation(c::close);
}, BackpreassureMode.BUFFER);
The emitter provides with the method, setCancellation
, which is an opportunity for us to define how we want to clean up those resources when there is no longer any subscribers subscribed to this observable. We simply just define what type of backpressure strategy we want to use.
public class RxRequestBody extends RequestBody {
...
@NonNull private final Emitter<Status> progressEmitter;
public RxRequestBody(@NonNull Emitter<Status> progressEmitter, ...) {
...
this.progressEmitter = progressEmitter;
}
...
@Override
public void writeTo(@NonNull BufferedSink sink) throws IOException {
try (final Source source = Okio.source(inputStream)) {
...
progressEmitter.onNext(Status.createSending(jobId, 0));
while (remaining > 0) {
sink.write(source, size);
remaining = Math.max(0, remaining - size);
size = Math.min(BUFFER_SIZE, remaining);
final int progress = (int) (100 - ((float) remaining / total) * 100);
progressEmitter.onNext(Status.createSending(jobId, progress));
}
}
}
}
The first thing that is different is that before that subject version, we were constructing that subject inside the RequestBody. In this case, we’re actually going to inject this emitter, and the reason why is that this RequestBody is actually part of a higher level component, which is constructed by the uploader. The uploader constructs the observable, and the observable has that emitter. For that reason, we would construct the RequestBody, inject the emitter in, and then from there, it’s really just straightforward. We just need to call onNext
on that progressEmitter.
Note that we’re publishing these status objects onto that stream. We’re no longer explicitly handling those exceptions, and we’re no longer calling onComplete
on this stream, and the reason why we don’t handle those exceptions explicitly is because we want to propagate them higher up the chain.
The reason why we don’t call onComplete
on this progressEmitter is that we had to look at this from the request-response life cycle. I kick off that request to the remote endpoint, I start writing content onto that socket, and if I just simply mark it as complete once I’m done writing the entire contents of that file, it’s not necessarily considered complete. This is because I need to wait for that response to come back from the remote endpoint, and say that this succeeded with a 200, or maybe this failed with a 400 or a 500.
Tying it All Together (15:30)
Let’s see how this all now ties in together. Inside our uploader is where we construct our observable. We have our emitter. We create a RequestBody. Passing in that emitter, and that RequestBody is part of an actual request itself, which is a multi-part request. We pass that into there, and now, here’s where it gets kind of interesting.
return Observable.create(emitter -> {
...
fileBody = RxRequestBody.create(emitter, jobId, file, job.mimeType());
...
final MultipartBody.Part body = MultipartBody.Part.createFormData(..., fileBody);
final Subscription subscription = uploadService.upload(job.metadata(), body)
.subscribe(new Subscriber<Object>() {
@Override
public void onCompleted() {
emitter.onCompleted();
}
@Override
public void onError(@NonNull Throwable e) {
final ErrorType errorType = errorAdapater.fromThrowable(e);
emitter.onNext(Status.createFailed(jobId, errorType));
emitter.onCompleted();
}
@Override
public void onNext(@NonNull Object response) {
emitter.onNext(Status.createCompleted(jobId, response));
}
});
emitter.setSubscription(subscription);
}, Emitter.BackpressureMode.LATEST);
Let’s say that we have this upload service, it exposes this upload method. That upload method is then going to return an observable of some response body object. Because it’s an observable, in order to activate it, we need to subscribe to it, so we get back this subscription.
Let’s take a look at the subscribe implementation, starting with the last part first, which is the onNext.
At this point, we’ve received that response back from the web server, saying that everything worked out okay, possibly with some response body. We create this new completed status object that we then publish to the emitter. Then, we can mark it as complete.
The onError
is kind of interesting, though. Typically, you would call emitter.onError
and propagate that error downstream, but in this case, we don’t actually want to propagate that error further downstream. We want to transform it into a failed status type so we can then communicate it back to the UI, and also persist it back to disk. In this case, we still call onNext
inside our onError
, subscribe of limitation, but fall by onComplete
.
The subscription is one of the things that’s really good about this emitter version is that the emitter’s going to manage the life cycle of that subscription for us. If there’s no longer anybody that is subscribed to this observable, the emitter’s going to unsubscribe from that subscription.
Backpressure (17:29)
What exactly is backpressure, and how do we deal with it?
In the ideal case, we have some observable that is generating these events, and we have some observer that’s consuming them, and things are just flying along nicely. Each observable, by default, in the Android world, has this bounded, 16-element buffer associated with it. In our case, because we’re sending out these progress updates, and especially for smaller files, they’re going to be generated really fast.
If that buffer fills up, in other words, that observer is not able to drain that buffer as fast as we’re filling it up, we’re going to hit this missing backpressure exception.
How do you deal with it?
The first option is that you have the none/error version, which doesn’t make sense for us because it’s going result in the same thing. The next option for us is to use the buffer mode. What the buffer mode does is rather than having this 16-element buffer, we switch it up to 128, but that buffer is no longer bounded. If our observer’s not draining that buffer, and we have an infinite stream of events that are being generated, it’s going to until we hit this out of memory exception.
There is another version where we have a fixed size of one. In this case, the first item that is emitted from onto that stream goes directly into that buffer, and everything else from there on gets dropped, until the observer consumes that buffer. In our case, an example would be if we had progress updates five, ten, fifteen, five would go into the buffer; ten and fifteen will get dropped. The observer consumes that, and 20 comes in and goes into the buffer.
Essentially, what’s happening here is that we’re getting stale progress updates. We actually want to be able to get the latest progress update. We don’t really care about what happened a couple seconds ago. We want to know what happened now.
That’s exactly what the last option is. The latest mode still has a buffer size of one, but whatever is in the buffer at the moment will get replaced with whatever is new, and most recently emitted onto that stream. For that reason, we end up using the backpressure mode of latest.
Status Updates in UI (20:50)
Now, how do we kick off those upload jobs and essentially get this entire system flowing?
First, we’re going to look at is how to queue up new jobs into this system. We have two streams, status and jobs. We have this job object, we push it, and we publish it onto the job stream. Our job repository’s is going to read from that. From there, it’s going to admit this new status object, which is going to be of type queued. It will get published to the status stream.
In order to understand how we build this out, we need to first look at what are hot and cold observables. Once we go through this process, it’ll become a little bit more clear why this is important. Starting with an analogy, a hot observable is really like watching a live presentation. If somebody joins this room right now, they will not know what happened in the past, but they’ll begin to receive all the same information, starting from that moment onward. A cold observable is much like watching a recording of the same presentation, in the sense that these observables can then come in and start receiving content at any time, but they will always start from the beginning, and be able to transform that information in whichever way they want.
To relate it back to what we’re working in, the hot observable is much like UI touch events, motion sensor events, or in our case, progress updates. Whereas cold observables are a lot like reading from a file or database query results or web requests.
The thing with hot observables is that we can’t really control the rate of flow. In the case of UI touch events, if the user is just hammering away on that screen, and generating those touch events, the hot observable can’t really tell the user to slow down. But in that case, we had to employ things like backpressure strategies to figure out how to drop or buffer up those events that are being emitted.
A cold observable, on the other hand, when you subscribe to it, always subscribes to it by starting from the beginning, and you will always receive the same information. You can also control the rate of flow as well by using this mechanism of push-pull. The observer will basically give the observable a hint and say, “I can take up to 10 events,” and then, the observable will use that as a hint where it won’t exceed that threshold.
The other thing about hot observables is that they’re always running. They don’t care if anybody’s subscribed to it. They’re just going to fire those events, whereas cold observables, when you do subscribe, that’s when those notifications start to get published. That means that when you do subscribe, they will always start from the beginning, and they’ll always emit the same information. Whereas hot observables, as soon as you subscribe, you get information from that moment, moving forward.
The reason why this is important is that this is how it dictates what or how to build out the status or job stream. The status and job stream are essentially hot observables, and they’re a continuously flowing set of information, and the best way to build that out is by using subjects.
Subjects Revisited (24:47)
We revisit this topic of subjects, where we remember that a subject is both an observable and observer, but the great thing about those subjects is the fact that they allow for having multiple sources of notifications to be generated and written onto that stream. On the other side, you can have multiple consumers or multiple observers reading from that stream.
Let’s take a look at the four different versions of subjects and see how they relate to what we’re trying to build out.
The first thing is the AsyncSubject. It will admit the last item that it receives before receiving a terminal event. For example, it could drop the first three, and it’s going to emit the fourth one. That’s not exactly what we’re looking for though, because we need all those events to get published through.
The next one is the PublishSubject, which essentially is much like what I talked about regarding the hot observables. In this case, we have the subscriber attached to this stream, it will receive some events. When a new one comes along, and subscribes to this stream, it won’t receive any of the previous events, but it’ll start receiving everything from this moment onward. This is essentially what we’re looking for, although the other two can apply to what we’re trying to build. It has some really interesting properties as well.
The BehaviorSubject builds on top of the PublishSubject, but it has this one extra nice feature where it keeps track of the last item that got admitted. For example, if we have an observer that’s subscribed to it, and the last thing that was emitted was a green object. When a new one comes along, it will receive the last one immediately, and then whatever else is on that stream.
This is kind of nice because, if we had the UI subscribed to receiving these status updates, and the user backgrounds the application gets unsubscribed from that stream. When the user foregrounds the application again, the user, or the UI, would want to be able to display the last thing that was emitted immediately. This is kind of a nice option but we’ll see how we get around that by using the PublishSubject.
The last version is the ReplaySubject which is an extension of the BehaviorSubject, but in this case, rather than having a buffer size of one, it keeps track of everything that was emitted. When we have a new subscriber that attaches to this stream, it’ll start to receive everything that was emitted before, but it also provides another nice little property where you can also put in certain conditions on that buffer.
Let’s say that I want to keep it maxed out at 10, or maybe I want to place some sort of a time limit on those items, so that if anything that has been sitting around for more than 10 seconds, it’s considered stale, so start to drop those items.
We’re going to use a PublishSubject to define the status and job streams. Let’s see how this translates over to writing in code. It’s not super difficult but there is quite a bit of stuff that’s happening there, but to write it and implement it, that’s not a lot of code.
public class UploadManager {
@NonNull private final PublishSubject<Job> jobSubject;
@NonNull private final PublishSubject<Job> statusSubject;
@NonNull private final Observable<Status> statusObservable;
...
public UploadManager() {
jobSubject = PublishSubject.create();
statusSubject = PublishSubject.create();
...
final Observable<Status> jobQueue = jobSubject.asObservable()
.filter(job -> job.status().statusType() == StatusType.QUEUED)
.filterMap(job -> uploadJobRepository.save(job))
.map(Job::status);
...
subscriptions.add(jobQueue.subscribe(statusSubject::onNext));
}
}
Here, the first thing defined is PublishSubject, and from there, we take our jobSubject and we get the observable counterpart. As a precaution, we’re going to apply a filter onto that as well, so that we make sure that we’re only getting the queued jobs out of this stream. The next thing that we’re going to do is save that to disk. Our uploadJobRepository calls the save method, which comes back with an observable of type job.
From there, we want to get back the status out of it, so we map it, and we extract the status. To activate this entire flow, we need to subscribe to that jobQueue observable. Anything that is emitted, is published directly onto statusSubject. We call jobQueue.subscribe, we subscribe to it, and push the new status directly onto the statusSubject.
public void enqueue(@NonNull Job job) {
jobSubject.onNext(job);
}
The last thing we need to do is expose some way of queuing up new jobs, so we expose this method. We need to call jobSubject.onNext, and publish that job into that stream.
Now this is guaranteed to be already saved to disk. The result of that is going to be what we share with the clients and the UI. On the other side of it, out of that job repository we want to apply a filter to get just the stuff that’s queued, and pass that down to the uploader. From here, we kick off that upload operation, which then starts to emit those sending, completed, or failed status objects.
It seems like a lot that’s going on here, but what’s kind of nice is that you can really see what’s happening here. It’s really nicely flowing throughout the system.
public class UploadManager {
@NonNull private final PublishSubject<Job> jobSubject;
@NonNull private final PublishSubject<Job> statusSubject;
@NonNull private final Observable<Status> statusObservable;
...
public UploadManager() {
final Observable<Status> status = statusSubject.asObservable().share();
final Observable<Status> sending = status
.filter(status -> status.statusType() == StatusType.SENDING)
.onBackpressureLatest();
final Observable<Status> statusUpdates = status
.filter(this::canUpdateStatus)
.flatMap(uploadJobRepository::update)
.map(Job::status)
.share();
final Observable<Status> uploadJobs = statusUpdates
.filter(status -> status.statusType() == StatusType.QUEUED)
.flatMap(status -> uploader.upload(status.id()));
statusObservable = statusUpdates.mergeWith(sending).share();
}
...
}
First thing was that we had that status stream, we get the observable, and we share that stream. From there, we create this sending observable stream by filtering out just the ones that are in status type sending, and because we can’t afford to lose some of these, we’ll apply this backpressure mode using backpressure latest. On the other side of it, we want to persist that stuff to those updates back to disk. We’ll filter out just the ones that are enqueued, completed, or failed, and then pass it to the uploadJobRepository to update those status, which returns back a job observable. We’ll then map it out and then share that stream. One part of it goes to our status stream, and the other part, we’re going to use to upload stuff to the uploader.
On one of it, we apply a filter to queue, to just get the queued items out, and then, pass that to the uploader, which calls on the upload method. It is then going to give us back any observable of type status, but at this point, it hasn’t started the upload.
On the other side, we use this mergeWith
operator to take the sending stream, and combine it with our status updates. The interesting property about the mergeWith is that it doesn’t respect ordering. It’ll interleave those objects as they come in together. Then, we finally use the share operator on that. The reason why is that the client is going to use that status observable to subscribe to it, and unsubscribe to it multiple times as it goes in and out of foreground and background.
The last part is to kick off those uploads, and we subscribe to that upload job stream. Whatever is coming out of there gets published back into our statusSubject by calling onNext. We basically built out that component that is able to upload stuff and also share the status observable out with the clients.
Share vs. Publish (35:23)
The reason why you would want to use share and publish is that you want to be able to multicast this stream. You want to be able to share the results that are currently on one side with all those multiple streams. The reason why is that maybe you’ve done some heavy lifting just before that.
Let’s say that maybe there was a network call where each time you resubscribe to it. You don’t want to perform that operation again. In this case, the share or the publish operators come in handy. The thing with the share operator is that let’s say that we have a stream that has a set of integers already published to it from one through 10. As soon as I subscribe to that stream, what I would expect on the one observer is to see 10, 20, 30, 40 emitted. While on another, we would see 11, 12, 13, 14 and so on.
However, as soon as I subscribe, I’d expect to get one on the second observer but I actually lose that second emission. The reason why is that with the shared operator, as soon as you subscribe to it, it starts to emit those events. If you have a situation where you need to guarantee that you received the same information across the different streams that do subscribe to it, that’s where the publish operator comes into play.
When we call subscribe on the first and second one, nothing gets emitted until connect
is called.
final ConnectableObservable<Integer> o = Observable.range(1, 10)
.publish();
o.map(i -> i * 10).subscribe(v -> Log.v(TAG, "Value [A]: " + v), logError);
o.map(i -> i + 10).subscribe(v -> Log.v(TAG, "Value [B]: " + v), logError);
o.connect();
In our case, it doesn’t really apply because we’re basically building out these streams inside our constructor and nothing is active on those streams at that moment. It’s only until we call queue, retry, or retryAll do things start to get published onto those streams.
public class UploadManager {
@NonNull private final PublishSubject<Job> jobSubject;
@NonNull private final PublishSubject<Job> statusSubject;
@NonNull private final Observable<Status> statusObservable;
...
public UploadManager() {
final Observable<Status> status = statusSubject.asObservable().share();
final Observable<Status> sending = ...
final Observable<Status> statusUpdates = ...
statusObservable = statusUpdates.mergeWith(sending).share();
subscriptions.add(uploadJobs.subscribe(statusSubject::onNext));
}
...
@NonNull
public Observable<Status> status() {
final Observable<Status> persistedStatus = uploadJobeRepository
.getAll()
.map(Job::status);
return persistedStatus.concatWith(statusObservable);
}
}
Fixing Some Problems (37:50)
At this point, we’ve actually built out that system. It will work but it has some problems.
public class UploadManager {
@NonNull private final PublishSubject<Job> jobSubject;
@NonNull private final PublishSubject<Job> statusSubject;
@NonNull private final Observable<Status> statusObservable;
...
public UploadManager() {
jobSubject = PublishSubject.create();
statusSubject = PublishSubject.create();
...
}
...
public void enqueue(@NonNull Job job) {
jobSubject.onNext(job);
}
...
@NonNull
public Observable<Status> status() {
...
return persistedStatus.concatWith(statusObservable);
}
}
The first thing is that there’s a little problem with the enqueue method. I should be able to call that from multiple threads, and things should just work out perfectly fine. What you would expect is that things would kind of just get serialized into the same stream, but the thing with subjects is that they’re not thread-safe. This means that when I do call these things from separate threads, they actually get pushed onto that stream in parallel.
That ends up breaking two parts of the observable contract. One is that you can’t do things and you can’t be publishing these objects in parallel and have them processed by those operators. The second is that we need to ensure that it happens before a relationship. There is an easy fix to it, though.
The first thing is that we no longer can define it as a PublishSubject
, we need to actually declare it as a generic subject, but we still create a PublishSubject; it still has the same semantics except for one distinction. We call toSerialized
, and that would guarantee that we now have this serialized subject that is thread-safe.
public class UploadManager {
@NonNull private final Subject<Job, job> jobSubject;
@NonNull private final Subject<Status, status> statusSubject;
@NonNull private final Observable<Status> statusObservable;
...
public UploadManager() {
jobSubject = PublishSubject.<Job>create().toSerialized();
statusSubject = PublishSubject.<Status>create().toSerialized();
final Observable<Status> jobQueue = jobSubject.asObservable()
.filter(job -> job.status().statusType() == StatusType.QUEUED)
.flatMap(job -> uploadJobRepository.save(job))
.map(Job::status);
...
subscriptions.add(jobQueue.subscribe(statusSubject::onNext,
this::handleJobError));
}
...
public void enqueue(@NonNull Job job) {
jobSubject.onNext(job);
}
...
@NonNull
public Observable<Status> status() {
...
return persistedStatus.concatWith(statusObservable);
}
}
If you do have a situation where you’re not too sure that the stuff that you are publishing onto the stream is coming in from separate threads, you probably do need to call toSerialized
here to protect yourself from that. That fixes the problem with the subjects, with thread-safe.
There are two subtle problems with the error-handling, though. Let’s assume that the uploadJobRepository, when we call the save method, emits an error notification. Since I haven’t implemented the error-handling inside that subscribe method there, this thing kind of collapses here. The solution to that is pretty easy. I need to implement that error action, and then from there, maybe you can log it or perform some other operations.
There’s still another problem here. We fixed the crash, except that, you have to remember that this jobQueue was getting information off of the jobSubject, which is a live stream of events that are constantly being published to it. As soon as the uploadJobRepository, emits an error, this entire stream breaks. If that stream breaks, we can no longer process any new jobs. This will essentially become unsubscribed, and the reason why is that errors are terminal events. As soon as the error notification is emitted, we essentially just stop receiving any more events, and we can no longer process any more jobs.
How do we fix this? RxJava provides an operator called onErrorResumeNext
, and if it does receive an error notification, that’s a chance for us to transform that back into a regular observable. From there onwards, the rest of the stream is not going to see it as an error.
final Observable<Status> jobQueue = jobSubject.asObservable()
.filter(job -> job.status().statusType() == StatusType.QUEUED)
.flatMap(job -> uploadJobRepository.save(job))
.onErrorResumeNext(error -> {
final String jobId = job.id();
final ErrorType errorType = errorAdapter.fromThrowable(error);
final Status status = Status.createFailed(jobId, errorType);
return Observable.just(job.withStatus(status));
})
.map(Job::status);
...
subscriptions.add(jobQueue.subscribe(statusSubject::onNext,
this::handleJobError));
In this case, we get the error notification, convert it back into this new status, which is of a failed type, we publish that and it gets captured by our map operator. We get the status out, and then we end up publishing it back to the statusSubject, which is great, except that this works fine for the downstream. We fixed the downstream, but the upstream is broken.
We’re no longer going to receive any more job objects to process, and the reason why is that each of these operators returns back an observable, and RxJava is all about immutability, so that’s a brand new stream that you’re getting back. That means that when I call filtering, I call flatMap and I call onErrorResumeNext, they are all giving me brand new observables, and each observable is subscribing back to the one before it. We have this chain, and as soon as that flatMap emits that error, that stream above it has already seen that error notification so, it all terminates. Downstream however, onErrorResumeNext captures that and turns it back into a regular observable, so it repairs the downstream.
What’s the fix? All we need to do is nest it inside the flatMap.
final Observable<Status> jobQueue = jobSubject.asObservable()
.filter(job -> job.status().statusType() == StatusType.QUEUED)
.flatMap(job -> uploadJobRepository
.save(job)
.onErrorResumeNext(error -> {
final String jobId = job.id();
final ErrorType errorType = errorAdapter.fromThrowable(error);
final Status status = Status.createFailed(jobId, errorType);
return Observable.just(job.withStatus(status));
}))
.map(Job::status);
...
subscriptions.add(jobQueue.subscribe(statusSubject::onNext,
this::handleJobError));
What’s happened here is that the save operator generates the error notification, but there’s no observable that will be seen outside of that. This is because everything is just inside that flatMap operator. The end result of this flatMap operation is going to be a brand new observable that is repaired but just has a failed status type. It is really subtle but something to be aware of if you have observables that are constantly reading from an infinite stream.
Questions (45:37)
Why would you use subjects or when would you use them?
First of all, subjects tend to get this bad rap where a lot of people will say, try to avoid them, and I do agree with that, because there’s a lot of subtle issues with them that can start to wreak a lot of havoc in your system. In this case, the reason why the subject fit nicely into this scenario here was that there’s multiple sources that are coming in to write into these streams. Maybe it is possible to do it exclusively with observables, but the way that I saw it was that, if I did want to come up with a new stream by combining two other streams, that new stream will be a brand new stream. It’s immutable, and that means that I had to save that somewhere and then if I add in a new job, which should then result in a brand new stream, it becomes a lot more difficult to manage. In a way, I think, if I had multiple sources that are generating those events, it makes sense to keep those being written into a subject.
If you can avoid subjects, I highly encourage you to do so. It’ll save you a lot more time and pain trying to debug things as well. I think another way to look at it is as a replacement for event buses. The only downside with event buses was that I think we tend to throw everything into the event bus.
What are your thoughts on RxJava 2?
I think they’ve already released a stable version. The great thing about RxJava 2 is that this whole notion of backpressure is fully baked into the framework, and backpressure in RxJava 1 was an afterthought.
There’s a lot of other great features as well. The fact that they conform now to an actual reactive stream standard is a lot better. There’s a lot of concepts that do follow over into RxJava 2 such as how observables can be actually constructed out of just regular observables.
The from emitter version that we used there, the observable.create using the emitter, that is a hot observable, in the end. The reason why is that there is really no mechanism in place there to control the rate of flow at which those progress updates are being emitted, unless if we decide to throttle how fast we’re going to upload that file.
Suppose we wanted to convert accelerometer events, into an observable, you’d use the emitter variant. You have a callback that’s registered but you can’t tell the sensor to slow down. There’s a couple different modes that can give it a hint, but from that kind of standpoint, it’s still considered hot.
Receive news and updates from Realm straight to your inbox