Rx for the Ultimate Beginner – Part 1 (Kotlin)

Back to Blog Home

Rx for the Ultimate Beginner – Part 1 (Kotlin)

Rx (aka ReactiveX) is a library which provides all the utilities for you to write code using functional reactive programming (FRP) paradigm. If you have no idea what FRP means, don’t worry. This post is written for audiences like yourself. We will discuss what FRP is, why it is useful and what you achieve when you utilize it correctly. Since Rx in its entirety is a huge library, it is not possible to cover all the tools it provides in a single post. Therefore this guide will expand into multiple parts, each building on top of the one comes before.

We are going to use Rx in Kotlin, on the Android platform. If you are looking for the Swift version which runs on iOS, please refer to this mirror post. All the consecutive posts will follow the same format.

  1. Part 1 – Introduction
  2. Part 2 – Observable Contract

As always, you can find a working project at the bottom of this page.

What is FRP?

If this is not the first time you are hearing about it, chances are you are shown a description formed by a vast number of weird terminology. Here, we are approaching the topic on a pragmatic level instead of an academic one.

FRP is a paradigm to write highly concurrent code in a composable way. When used correctly, it solves the problem named callback hell when you are working on asynchronous logic.

In FRP, you describe your program as a series of transformations. At a pseudo code level, instead of writing code like,

DoTheFirstThing()
DoTheSecondThing()
loop for something
DoTheLastThing()

what you will do will look like,

initialData = FirstThingToDo()
transformedData = FirstTransformation(initialData)
loopedData = FirstLoop(transformedData)
extractedData = FirstExtraction(loopedData)

which is more functional in its core. The catch here is, the second pseudo code above will be maximally aynchronous, in other words it will not block the context you are working while it is doing its job.

To be able to achieve that, instead of directly transforming data types named A, BC etc to each other, you work with envelope objects which encapsulate your A, BC  like Observable<A>, Observable<B>, Observable<C>.

Observables are recipes of type transformations. They make no assumption about their encapsulated types, amount of data they will process or the time when the data arrives for processing. So in a sense, they are array like objects where each element is both seperated by time and space. Regular arrays only arrange their elements spatially, but they require all of them to be ready at the time they are defined. Thus you can’t literally have an array of infinite size, where each element is seperated with say 1 second long intervals. On the otherhand, when nested in an observable, your object collection becomes an abstract entity, where each element is transformed according to your description, as they arrive. The source of the data stream can be disk, network or memory. They will behave exactly the same.

If you are already an experienced Swift user, chances are you know how to use array operators like array.filter() and array.map(). These operators recieve anonymous functions (lambdas) to decide how to filter or convert each element. Observables work the same way. Actually, filter() and map() already exists in Rx. In order to achieve its asynchronity, Rx demands you to provide a final command to start executing its potentially infinite transformation pipeline, in which you are notified with the final result for each element via the provided callback.

Installation

Add the below lines into your gradle file. If you are looking for an earlier version, please refer to RxJava, RxAndroid, RxKotlin repositories.

implementation 'io.reactivex.rxjava2:rxjava:2.2.2'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.0'
implementation 'io.reactivex.rxjava2:rxkotlin:2.2.0'

Hello Observable

In order to visualize how to convert a native array processing code into its asynchronous Rx counterpart, we will start with the most basic example.

fun arrayExample() {
    val myArray = listOf(1, 2, 3)
    val transformation = myArray.map { e ->
        //Log.d("Rx","Map: " + e)
        e * 2
    }
    val filtered = transformation.filter { e ->
        //Log.d("Rx","Filter: " + e)
        e > 2
    }

    filtered.forEach { e ->
        Log.d("Rx","Result: " + e)
    }
    Log.d("Rx", "Done!")
}

// Output is
// Result: 4
// Result: 6
// Done!

Here is how the above code transforms into Rx.

fun observableExample() {
    val myObservable = Observable.fromArray(1, 2, 3)
    val transformation = myObservable.map { e ->
        //Log.d("Rx","Map: " + e)
        e * 2
    }
    val filtered = transformation.filter { e ->
        //Log.d("Rx","Filter: " + e)
        e > 2
    }

    filtered.subscribe(
        { e ->
            // onNext
            Log.d("Rx","Result: " + e)
        },
        { ex ->
            // onError
            Log.e("Rx", "Error", ex)
        },
        {
            // onComplete
            Log.d("Rx","Done!")
        },
        { disposable ->
            //onSubscribe
            Log.d("Rx","Subscribed!")
        }
    )
}

// Output is
// Result: 4
// Result: 6
// Done!
// Garbage collected!

At first glance, it looks like we increased the amount of boilerplate for such a simple task. Before explaining the reasons, let’s take a look at what we did.

We created an observable with 3 elements, all immediately defined. We told our observable to multiply all the values it emits by 2. We then discarded the ones equal or less then 2. Then to initiate the transformations, we subscribed to the observable and told it to print all the results one by one, or the error if any occurs. Optionally we told it to notify when the transformations complete and all temporary data is deleted.

These elements are called emissions because they are not limited to be defined in your code as hardcoded literals. They can arrive from an API call or user input at different times. We will show you how to create observables which emit values at differe times in the future.

To be able to understand what’s happening behind the scenes, please uncomment the Log.d(e) lines in the map() and filter() blocks on both examples. First example should print something like,

Map: 1
Map: 2
Map: 3
Filter: 2
Filter: 4
Filter: 6
Result: 4
Result: 6
Done!

As you can see, it didn’t start filtering before multiplying all the values with 2. Once 2, 4 and 6 is ready, it discarded the 2 and showed us the results. Let’s see what happens when we execute the observable example.

Map: 1
Filter: 2
Map: 2
Filter: 4
Result: 4
Map: 3
Filter: 6
Result: 6
Done!

It may look complicated but actually it isn’t. First thing you should notice is, at least the order of results is still preserved. Remember that we are both printing the transformation and the filtering. Since our first element is 1, it is multiplied by 2 just as usual. Then, 2 is printed again which tells us that our observable decided to filter it instead of multiplying the next element by 2. If we used a bigger list of numbers, the order of the mapping and filtering would also change on our each run. This happens because no element waits for its neighbours’ operations before continueing with its own transformations, thus giving us the opportunity to processes multiple data independently as long as they share the same type.

What is the benefit?

In most cases, when you have time/space ordered data, what you want to achieve is doing something with incoming data as it arrives and act accordingly without worrying about when it arrives. The input can be a UI input such as touch up events, results of an API call or rows of a database table. Similarly, the result can be an UI action, another api call or a database write.

Normally, you cannot store a user touch or a response of an API call before they happen, so the first example cannot be converted to asynchronous code without destroying its outlined logic. On the other hand, observables can pack the incoming values as if they arrived to apply the recipes describing the transformations with a guaranteed result ordering. In other words, the pseudo code below can translate directly to Rx just like an array transformation.

apiCallResultOfEachButtonClick = myView.clicks.map {
    return apiCall()
}

We will cover this kind of more advanced topics in the next posts.

Working Code

You can find the example repository here.

References