0
votes

I'm creating offline first app as my side project using rxKotlin, MVVM + Clean Architecture and yesterday I decided to get ride off boilerplate subscribeOn and observeOn by using transformers. I quickly realized that apply function of transformers are ignored.

Here is code of my base completable use case (interactor):

abstract class CompletableUseCase(private val transformer: CompletableTransformer) {

    abstract fun createCompletable(data: Map<String, Any>? = null) : Completable

    fun completable(data: Map<String, Any>? = null) : Completable {
        return createCompletable(data).compose(transformer)
    }
}

And here is implementation of specific interactor:

class SaveRouteInteractor(
    transformer: CompletableTransformer,
    private val routeRepository: RouteRepository
) : CompletableUseCase(transformer) {

    companion object {
        private const val PARAM_ROUTE = "param_route"
    }

    fun saveRoute(route: Route) : Completable {
        val data = HashMap<String, Route>()
        data[PARAM_ROUTE] = route
        return completable(data)
    }

    override fun createCompletable(data: Map<String, Any>?): Completable {
        val routeEntity = data?.get(PARAM_ROUTE)

        routeEntity?.let {
            return routeRepository.saveRoute(routeEntity as Route)
        } ?: return Completable.error(IllegalArgumentException("Argument @route must be provided."))
    }
}

My custom transformer that is passed to the constructor of SaveRouteInteractor:

class IOCompletableTransformer(private val mainThreadScheduler: Scheduler) : CompletableTransformer {

    override fun apply(upstream: Completable): CompletableSource {
        return upstream.subscribeOn(Schedulers.io()).observeOn(mainThreadScheduler)
    }
}

And implementation of RouteRepository method:

override fun saveRoute(route: Route): Completable {
        return localRouteSource.saveRoute(route)
            .flatMap { localID ->
                route.routeId = localID
                remoteRouteSource.saveRoute(route)
            }
            .flatMapCompletable { localRouteSource.updateRouteID(route.routeId, it) }
    }

I'm using Room as my local source so after calling save interactor in my ViewModel I'm getting IlligalStateException telling me that I'm not allowed to access database on the main thread.

Maybe I'm missing something but it seems that transform function is ignored. I debugged this method and it is applying subscribeOn and observeOn to the upstream.

Thanks for help in advance, Pace!

1

1 Answers

1
votes

It's hard to tell you where the issue is because the code is partial.

For example here:

    return localRouteSource.saveRoute(route)
        .flatMap { localID ->
            route.routeId = localID
            remoteRouteSource.saveRoute(route)
        }
        .flatMapCompletable { localRouteSource.updateRouteID(route.routeId, it) }

I suppose the localRouteSource.saveRoute() is using the interactor you show us but it is not clear how remoteRouteSource.saveRoute() or localRouteSource.updateRouteID() are implemented.

they also need to be subscribed on the IO thread.

As a rule of thumb you should switch thread when you KNOW that you need it.

In other words, you should use subscribeOn() in places where you know you are doing IO as close as possible to the actual job. ObserveOn instead is to be used when you know you need to obtain those results in the UI thread and that you might get in some other thread.

in your example there's absolutely no need to keep using observeOn(MAIN_THREAD), the only time you do need it (I suppose) is when you want to show the result.

A couple of other things:

This code

override fun createCompletable(data: Map<String, Any>?): Completable {
    val routeEntity = data?.get(PARAM_ROUTE)

    routeEntity?.let {
        return routeRepository.saveRoute(routeEntity as Route)
    } ?: return Completable.error(IllegalArgumentException("Argument @route must be provided."))
}

it is evaluated at the time when the method is called rather then when the completable is subscribed.

In other words it break the Rx contract and compute data?.get(PARAM_ROUTE) when you call the method. If it is immutable there's no much difference, but if it can change value during execution it should be wrapped in a Completable.defer { }

Finally, here

        .flatMap { localID ->
            route.routeId = localID
            remoteRouteSource.saveRoute(route)
        }

you are modyfing something outside the chain (route.routeId = localID), this is called a side effect.

be careful with those kind of stuff, Rx is build in a way that is safer to be used with immutable objects.

I personally wouldn't mind too much as long as you understand what's going on and when it could create issues.