How can you move away from traditional synchronous state management with variables to asynchronous streams of data? Try functional reactive programming! In this talk from Droidcon NYC, Juan Gomez explains why you should use FRP, and covers three main topics: reactive extensions, observers & descriptions, and intermediate topics around RxJava. These powerful, yet easy to use abstractions will let you write asynchronous code in a straightforward, declarative fashion—making the task of writing great Android apps much easier.
Wait, you mean Reactive Programming? (00:29)
My name is Juan Gomez. I am here today to discuss functional reactive programming (controversial term! some times referred to as reactive programming or reactive extensions, and even used in the past in a different manner - check out André Staltz’s post to learn more about the name controversies). I will introduce three concepts: “reactive extensions” (and convince you why do you want to use them), Observables and the observable pattern on reactive extensions, observers and subscriptions; and lastly I will cover some intermediate topics around RxJava.
What is functional reactive programming? (02:22)
Erik Meijer popularized the term functional reactive programming: reactive programming, with the concept of functional programming associated with it. He did this when he created the reactive extension (Rx) library for .NET while at Microsoft. I insist on calling it functional reactive programming (FRP) because reactive programming is a term that has been around for a long time, but reactive extensions are not the only way to do reactive programming. You can do reactive programming to actors (Scala): they have functional programming but they are not natively a functional concept. Functional reactive programming and reactive extensions allow you to use functional concepts and go further than what actors let you do.
What are reactive extensions? (03:53)
Rx, initiated by Erik Meijer (C# and .Net, then adapted to many different languages), is a library that allows you to apply FRP to any language. Using “schedule operations”, Rx also allows you to parameterize the concurrency in asynchronous data streams (e.g. when events are going to be listened to, or emitted in different threads). In turn, that helps abstract away concepts of low-level threading, synchronization, thread safety, and concurrent data structures.
Rx Family (05:57)
Rx has been ported to many languages: C# (original), JavaScript, RxJava (a JVM library, hence any language that runs on the JVM can take advantage of RxJava), Ruby, RxPY for Python (by Microsoft) and many more.
Why functional reactive programming? (06:45)
Writing concurrent programs is difficult. FRP helps you think about asynchronous programs (high-level abstractions), makes the flow of your application easier, and improves standard error handling (data structure = less code, less bugs). That is the reactive part. The functional part is the reactive extensions. Rx allow you to manipulate and combine streams of events. Together, that is really the power of functional reactive programming: the ability to combine functions, operate, and transform the stream of events.
Multithreading is hard! (08:21)
When you think you really know multithreading, think of Dianne Hackborn’s quote:
“Once you think you now understand it and are an expert, you are heading soon to another painful lesson that multithreading is hard.”
Push vs Pull (08:43)
The essence of functional reactive programming is the duality of our usual imperative model: we pull and process things synchronously, using iterators (e.g. getting tweets from a timeline - we are pulling iterators, processing them synchronously and then emitting a result). At the core of functional reactive programming, you take that model, and then create a push model. We have a stream of events (collection of events), we take the pull model, and we push the same things that we used to have in a collection. We push them asynchronously into an observer.
for (Iterator<Tweet> iterator = timeline.iterator(); iterator.hasNext();)
{
Tweet tweet = iterator.next();
Log.i(TAG, tweet.getTitle());
}
Observables (10:06)
An Observable depicts a concept of duality (check Erik Meijer’s online videos for details). You can derive the Observable type (push-based method) from an Iterable (pull-model) using duality. Here, the Observable is a mirror image and the Iterable gives you the next element. On an Observable you have onNext
: a new event is introduced. That one is pull: you pull the next element; the other one is push, you get the event and then you process it.
The Observable has three methods: onNext
, onError
, and onComplete
. When the Iterable fails, you throw an exception. If we retrieve a collection, there are not many chances of failure. If you are creating the collection and it goes wrong, you throw an exception and the iterable fails. On an observable this is the onError
. When something happens on the Observable, we send the error event and the observer or the processor has to stop and handle the error. On the iterable, hasNext
will return folds. When the collection is over and the Observable finishes emitting events, you can process the onComplete
event.
The “Subscribe” method (12:57)
You must hook onNext
, onError
, and onComplete
together through a subscription. On the subscription you give the observable an observer that processes events as they come.
Observable<Tweet> tweetObservable =
Observable.create(new Observable.OnSubscribe<Tweet>() {
@Override
public void call(Subscriber<? super Tweet> subscriber) {
if (!subscriber.isUnsubscribed()){
try {
Tweet favorite = TwitterService.getNewFavorites();
subscriber.onNext(favorite);
Tweet mention = TwitterService.getNewMentions();
subscriber.onNext(mention);
subscriber.onCompleted();
} catch (JsonParseException e) {
subscriber.onError(e);
}
}
}
});
In these lines of code—the same iteration I was doing on a collection that I had already retrieved—I am processing it here as a stream of events. I have an Observable and I am giving it an observer to a subscription . Then the observer is processing each of the tweets as they come. It is then the observer’s job to process the events as they get emitted. This is a very simple example using an iterable and an observer to process some tweets. As you can see, we have a catch at the bottom. When there is an exception JSON, we call onError
to let the observer know that we are not going to be emitting events.
No Magic! (15:03)
Uncle Bob Martin posted “Make the Magic go away”. He claims Rx is a concept that has been around for a long time—nothing new, and without value. Although I disagree with this point, I agree with the end: “any library that you use, it should not be magic to you. You need to understand how the library works to avoid issues. I am including a link from the reactive IO website if you want to learn how to build a very basic Rx framework, which will give you: A, a better understanding of reactive extensions, and B, it is going to make the magic go away.
RxJava and Retrolambda (18:11)
RxJava is a JVM implementation of Rx.
aObservable.filter(new Func1<Integer, Boolean>() {
public Boolean call(Integer n) {
return n % 2 == 0;
}
})
.map(new Func1<Integer, Integer>() {
public Integer call(Integer n) {
return n * n;
}
})
.subscribe(new Action1<Integer>() {
public void call(Integer n) {
System.out.println(n);
}
});
This an operation on an observable (Java 7): very verbose. You can simplify the code using using Retrolambda(Gradle plugin; Java 8).
aObservable.filter(n -> n % 2 == 0)
.map(n -> n * n)
.subscribe(System.out::println);
Operators in RxJava (20:32)
The power of Rx is that everything is a stream of events - you will need to decide how to combine operators for efficiency.
aObservable.map(x -> x*x) //Square
.reduce((a, b) -> a+b) //Sum
.subscribe(x -> println(x)); //Show
The operators on this piece of code are map, reduce and subscribe. Here we are just squaring a number, making a sum of all the squares, and printing the result. I will use “marble diagrams”.
Marble diagrams (20:59)
In marble diagram you are processing some marbles. You have time, and time flows from left to right. Where you see the line on the arrow is where time stops: the stream of events gets completed. The operator (map) is in the middle. It is transforming those marbles into diamonds. In this case, this is a meta stream. There is the first stream (straight) and a diagonal stream (diamond). In this case, an observable of observables has the transform events (again, check Andre Staltz’s website to go through more interactive marble diagrams).
Creating an Observable (22:36)
List<String> aList = …;
ob = Observable.create(subscriber -> {
try {
for (String s : aList) {
if (subscriber.isUnsubscribed())
return;
subscriber.onNext(s);
}
subscriber.onCompleted();
} catch (Exception e) {
subscriber.onError(e);
}
});
The observer has to implement at least a combination of the three basic methods. In this case we are not implementing onError
, we are just calling onNext
with two marbles and then calling onComplete
(call, when the time stops). We also have “unsubscribing”. You can check for the subscription and see if you have an observer that is subscribed, and then you emit events.
List<String> aList = …;
Observable<String> ob = Observable.from(aList);
If you have a collection of things, you can just call Observable.from
, which will take each element on the collection, create an event out of that element and start emitting them one by one. We can also use Observable.just
, which will take any object and then emit it as an event:
Observable<List<String>> ob = Observable.just(aList); Observable<String> ob2 = Observable.just(“Some String”);
I am doing Observable.just
and then giving it a collection. Be aware: Observable.from
, the observable is going to emit the elements of the collection one by one; Observable.just
will take the whole collection. But there are many other creation operators that you can use.
Transforming Observables (25:50)
Map is probably one of the best known higher-order functions on functional programming, and it allows you to transform an input, emit, transform output, and, in this case, apply it to every element.
This is the typical use case on the marble diagram. You have the marbles on top, flowing through time, and map transforms them. All the marbles get transformed into diamonds on the end of the marble diagram.
Observable.range(0, 5)
.map(x -> toBinaryString(x*x))
.subscribe(s -> println(s),
err -> err.printStackTrace(),
() -> println(“done”));
Here we are emitting numbers and then transforming them from decimal representation to a binary representation, and printing them on the console. It is an asynchronous operation: every time I emit a number, I get it, transform it, and then emit the transformation on the result observable.
Observable.range(1, 3)
.flatMap(x -> Observable.just(x).repeat(x)) .subscribe(System.out::println);
There is also flatMap
: it takes an observable of observable, and then flattens it into one observable with the elements of all the observables that it had inside. Same thing: we have numbers from one to three. Observable.Range
creates an observable that emits the numbers. We are just putting them on an observable again by using .just
and .repeat
, creating that meta observable of observables, and using flatMap
to flatten that out and then get back just the numbers as events.
Filtering Observables (29:01)
When filtering Observables, you take a stream of events and choose only the ones that you need. Here we only care about even numbers—we just pass it all numbers from zero to 10 and then it filters each even and then puts it on the resulting observable:
Observable.range(0, 10)
.filter(x -> (x % 2) == 0)
.subscribe(System.out::println);
There are many other transforming observables, which you can check following this link.
Aggregate Operators (30:15)
You can sum or aggregate everything that you have on a stream of events, and then emit one single output (an observable of just one element, or the element itself). Here we are multiplying all the numbers from one to 10 and then reducing them back into just the result.
Observable.range(1, 10)
.reduce((a, b) -> a*b)
.subscribe(System.out::println);
Combining Observables (30:42)
You can use combining observables to combine two different events from different observables into one observable that has the combination of the two. For that we use zip
. For instance, you can be combining things that you are reading from disk with things that you have from RAM to create one single observable with elements. That is how it looks in code:
Observable<String> lower =
Observable.from(new String[]{“a”, “b”, “c”});
Observable<String> upper =
Observable.from(new String[]{“A”, “B”, “C”});
Observable.zip(lower, upper, Pair::create)
.map(pair -> pair.first +”_” +pair.second)
.subscribe(System.out::println);
You take two observables and match them together. This creates a problem where one observable is emitting events really fast versus another really slow. You do a zip
to combine them - you have to wait to get an event from the first and then this one is emitting a bunch of events, and you have to wait for the second event.
Schedulers (32:05)
Using schedulers you can decide, on different threads, where the emitting of the events, the subscription, or the processing of the events will take place. You use observeOn
and subscribeOn
, and then you give it a scheduler that is the one that encapsulates the thread where you want the processing or the emitting to happen. These events happen asynchronously—you do not have to do the threading manually. Just concentrate on combining those operators and processing your stream of events.
Backpressure (33:24)
When you have two observables emitting events of different speeds, you are going to end up with backpressure. RxJava allows you to handle that to a specific buffer, which handles a few events. If the buffer fills up, you get a backpressure error.
Hot and Cold Observables (34:10)
Cold observables only emit events when you subscribe to them, whereas Hot observables emit events regardless of subscription.
RxAndroid (35:06)
Android specific bindings for RxJava, RxAndroid, allow you to schedule things on a handler thread. There are two new libraries, and you can just pull the one you need. This is just sample code from RxAndroid. I am just taking a collection of Strings, and emitting them. The difference is that I am subscribing on a new thread, the normal RxJava scheduler, but I am observing on the main thread. The processing of the events needs to happen on the main thread, which you can specify it using the Android maintenance scheduler.
Summary (37:12)
Embracing the functional part is essential. Go review the functional programming concepts, higher-order functions, and play around with a functional programming language (e.g. Scala). Then think about the power of manipulating streams of data, to combine those streams and how that makes your code better.
Useful links (37:54)
- Android apps: Grokking RxJava, by Dan Lew
- Introduction to Reactive Programming by André Staltz
- Guide to Rx Operators
- RxJava docs
Receive news and updates from Realm straight to your inbox