RxKotlin Multiple Observations


If you’ve been doing android development with RxJava or RxKotlin you know that it’s a steep learning curve. RxJava is a great skill that every android developer should have in their toolbox. It makes writing android apps much easier. I’m constantly learning new things with RxJava, and applying what I learn to my android applications. For example this last week I was looking to do some work on the UI thread at the start of a subscription. I thought I had lucked into an easy solution with doOnSubscribe, which from the docs is supposed to do just that:

Modifies the source Observable so that it invokes the given action when it is subscribed from its subscribers. Each subscription will result in an invocation of the given action except when the source Observable is reference counted, in which case the source Observable will invoke the given action for the first subscription.

My code looked something like the following:

Observable
       .just(1)
       .doOnSubscribe {
           toast("Subscription, Started")
       }
       .subscribeOn(Schedulers.io())
       .subscribe {
       }

Unfortunately when I went to use the operator I got a crash with the following exception

Caused by: java.lang.RuntimeException: Can't create handler inside thread that has not called Looper.prepare()

However a little more experimenting and I found this actually worked:

Observable
       .just(1)
       .subscribeOn(Schedulers.io())
       .doOnSubscribe {
           toast("Subscription, Started")
       }
       .subscribe {
       }

Observation 1

We can have doOnSubscribe work on a different thread. Which is super useful as we can now display a ui message from doOnSubscribe and then do work in the background.

At first the working code didn’t make much sense, how was the doOnsubscribe method being called on the main thread? My hunch was that the it was simply running on the thread that the observable was created, e.g. the immediate thread. So I decided to wrap it and test that assumption:

Observable.just(1).observeOn(Schedulers.io()).subscribe {
   Observable
           .just(1)
           .subscribeOn(Schedulers.io())
           .doOnSubscribe {
               toast("Subscription, Started")
           }

           .subscribe {

           }
}

Sure enough this failed as the toast was no longer implicitly running on the Main Thread. In android studio I was seeing the following error again:

Caused by: java.lang.RuntimeException: Can't create handler inside thread that has not called Looper.prepare()

Observation 2

We should be explicit about where we want things to run because implicit code isn’t as obvious and could lead to bugs later. Specifically this function behaves differently depending on which thread doWork is called from:

fun doWork(){
Observable
       .just(1)
       .subscribeOn(Schedulers.io())
       .doOnSubscribe {
           toast("Subscription, Started")
       }
       .subscribe {

       }
}

However, this function behaves the same regardless of which thread doWork() is called from:

fun doWork(){
Observable
       .just(1)
       .subscribeOn(Schedulers.io())
       .doOnSubscribe {
           toast("Subscription, Started")
       }
       .subscribeOn(AndroidSchedulers.mainThread())
       .subscribe {

       }
}

I didn’t believe this was correct so I redid the test in a slightly different format to reverify:

Observable
       .just(1)
       .map {
           Log.i("Log", "2) showing progress on:" + Thread.currentThread().name)
           it
       }
       .subscribeOn(Schedulers.io())
       .doOnSubscribe {
           Log.i("Log", "1) doOnSubscribe with thread:" + Thread.currentThread().name)
       }
       .subscribeOn(AndroidSchedulers.mainThread())
       .subscribe {

       }

This yielded the following output:

I/Log: 1) doOnSubscribe with thread:main
I/Log: 2) showing progress on:RxCachedThreadScheduler-1

Observation 3

We can have multiple doOnSubscribes, and have them run on different schedulers. Note it is worth pointing out that subscribeOn can’t be used be used multiple times for all operations. For example:

Observable
       .just(1)
       .map {
           Log.i("Log", "2) showing progress on:" + Thread.currentThread().name)
           it
       }
       .subscribeOn(Schedulers.io())
       .map{
           Log.i("Log", "1) doOnSubscribe with thread:" + Thread.currentThread().name)
       }
       .subscribeOn(AndroidSchedulers.mainThread())
       .subscribe {

       }

Doesn’t cause work to be done on different threads. It simply uses the first doOnSubscribe. It generates the following output:

I/Log: 2) showing progress on:RxCachedThreadScheduler-1
I/Log: 1) doOnSubscribe with thread:RxCachedThreadScheduler-1

Often times we have to do some processing in the background and it would be nice to occasionally update the UI say between operations in our chain:

Observable
       .just(1)
       .map {
           Log.i("Log","1) doing work on:" + Thread.currentThread().name)
       }
       .subscribeOn(Schedulers.io())
       .observeOn(AndroidSchedulers.mainThread())
       .map {
           Log.i("Log","2) showing progress on:" + Thread.currentThread().name)
       }
       .observeOn(Schedulers.io())
       .map {
           Log.i("Log","3) more work on:" + Thread.currentThread().name)
       }
       .observeOn(AndroidSchedulers.mainThread())
       .subscribe {

       }

Which yielded the following in the log:

Log: 1) doing work on:RxCachedThreadScheduler-2
Log: 2) showing progress on:main
Log: 3) more work on:RxCachedThreadScheduler-1

Observation 5

We can use ObserveOn multiple times to interleave UI updates.

Revisiting the documentation:

Observable will use below where that operator appears. For this reason, you may call ObserveOn multiple times at various points during the chain of Observable operators in order to change on which threads certain of those operators operate.

These are some useful observations I noticed while trying to tackle this problem, proving again that Rx is very concise.

Leave a Reply

Your email address will not be published. Required fields are marked *