3
votes

I'm trying to chain a completable into my Rx chain and when I do so the chain never finishes in the onError or onComplete.

When I step through the code, my completables code is executed. I can even add logging and see it log in it's own doOnComplete()

The below will log "I Completed" but will not go into the the error or complete callback.

 profileRepo.getLocalProfileIfAvailableElseRemote()
                .flatMapCompletable { profile ->
                    userRoutingRepo.disableRule(profile.account_uid, userRoutingRule.id)
                            .doOnComplete {
                                Log.i("I COMPLETED", "I COMPLETED")
                            }
                }
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeBy(
                        onError = { error ->
                            //do error
                        },
                        onComplete = {
                           //do success
                        }
                ).addTo(disposable)

if I instead use flatMap and use the andThen to return a boolean observable, it will work

 profileRepo.getLocalProfileIfAvailableElseRemote()
                .flatMap { profile ->
                    userRoutingRepo.disableRule(profile.account_uid, userRoutingRule.id)
                            .doOnComplete {
                                Log.i("I COMPLETED", "I COMPLETED")
                            }.andThen(Observable.just(true))
                }
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeBy(
                        onError = { error ->
                         //do error
                        },
                        onNext = {
                           //do next
                        }
                ).addTo(disposable)

I've tried adding a "andThen" to the flatMapCompletable version and calling Completable.complete() but that doesn't work either?

I can't figure out why my completable is completing, but refuses to work with flatMapCompletable?

EDIT: This is an update of my complete attempt that does not work

Note userRoutingService.disableRule(accountUid, ruleId) is the retrofit interface

 profileRepo.getLocalProfileIfAvailableElseRemote()
                .flatMapCompletable { profile ->
                    userRoutingRepo.disableRule(profile.account_uid, userRoutingRule.id)
                            .andThen(Completable.complete())
                }
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeBy(
                        onError = { error ->
                          Log.i("TAG", "ERROR")
                        },
                        onComplete = {
                            Log.i("TAG", "COMPLETE")
                        }
                ).addTo(disposable)

 override fun disableRule(accountUid: String, ruleId: String): Completable {
        return activeStateToggler(userRoutingSourceApi.disableRule(accountUid, ruleId),
                ruleId,
                false)
    }



override fun disableRule(accountUid: String, ruleId: String): Completable {
        return userRoutingService.disableRule(accountUid, ruleId)
                .doOnError { error ->
                    authenticationValidator.handleAuthenticationExceptions(error)
                }
    }

    private fun activeStateToggler(completable: Completable,
                                   ruleId: String,
                                   stateOnSuccess: Boolean
    ): Completable {
        return completable
                .doOnSubscribe {
                    stateTogglingInProgress.add(ruleId)
                }
                .doOnComplete {
                    stateTogglingInProgress.remove(ruleId)
                    getLocalUserRule(ruleId)?.active = stateOnSuccess
                    stateTogglingInProgressPublishSubject.onNext(UserRoutingStateToggleSubjectType.Success)
                }
                .doOnError {
                    stateTogglingInProgress.remove(ruleId)
                    stateTogglingInProgressPublishSubject.onNext(UserRoutingStateToggleSubjectType.Error(
                            it))
                }
    }
2

2 Answers

1
votes

This is what flatMapCompletable does:

Maps each element of the upstream Observable into CompletableSources, subscribes to them and waits until the upstream and all CompletableSources complete.

When using flatMapCompletable, the Completable that you return will wait for the upstream's Observable terminal event (onComplete).

When using flatMapCompletable, use it only if you are sure that everything up in the chain completes.

In your case it doesn't work because your source Observable is hot and never completes.

0
votes

When using flatMapCompletable, you need to return Completable.complete() yourself.

edit:

 profileRepo.getLocalProfileIfAvailableElseRemote()
     .flatMap { profile ->
         userRoutingRepo.disableRule(profile.account_uid, userRoutingRule.id)
             .doOnComplete { Log.i("I COMPLETED", "I COMPLETED") } }
     .flatMapCompletable { () -> { Completable.complete() } }
     .subscribeOn(Schedulers.io())
     .observeOn(AndroidSchedulers.mainThread())
     .subscribeBy(
         onError = { error ->
             //do error
         },
         onNext = {
             //do next
         }
    ).addTo(disposable)

edit 2: since disposableRule is a Completable

 profileRepo.getLocalProfileIfAvailableElseRemote()
     .flatMapCompletable { profile ->
         userRoutingRepo.disableRule(profile.account_uid, userRoutingRule.id)
             .doOnComplete { Log.i("I COMPLETED", "I COMPLETED") }
             .andThen(Completable.complete().doOnCompleted { Log.i("comp2", "comp2")) }
     .subscribeOn(Schedulers.io())
     .observeOn(AndroidSchedulers.mainThread())
     .subscribeBy(
         onError = { error ->
             //do error
         },
         onNext = {
             //do next
         }
    ).addTo(disposable)

edit 3: working sample

Observable.just(1)
    .flatMapCompletable { profile ->
        Completable.complete()
            .doOnComplete { Log.i("I COMPLETED", "I COMPLETED") }
            .andThen(Completable.complete().doOnComplete { Log.i("I COMPLETED", "I COMPLETED 2") })}
    .subscribeBy(
        onError = { error ->
        },
        onComplete = {
            Log.d("I COMPLETED", "I COMPLETED 3")
        })