I'm trying to chain a completable into my Rx chain and when I do so the chain never finishes in the onError or onComplete.
When I step through the code, my completables code is executed. I can even add logging and see it log in it's own doOnComplete()
The below will log "I Completed" but will not go into the the error or complete callback.
profileRepo.getLocalProfileIfAvailableElseRemote()
.flatMapCompletable { profile ->
userRoutingRepo.disableRule(profile.account_uid, userRoutingRule.id)
.doOnComplete {
Log.i("I COMPLETED", "I COMPLETED")
}
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeBy(
onError = { error ->
//do error
},
onComplete = {
//do success
}
).addTo(disposable)
if I instead use flatMap and use the andThen to return a boolean observable, it will work
profileRepo.getLocalProfileIfAvailableElseRemote()
.flatMap { profile ->
userRoutingRepo.disableRule(profile.account_uid, userRoutingRule.id)
.doOnComplete {
Log.i("I COMPLETED", "I COMPLETED")
}.andThen(Observable.just(true))
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeBy(
onError = { error ->
//do error
},
onNext = {
//do next
}
).addTo(disposable)
I've tried adding a "andThen" to the flatMapCompletable version and calling Completable.complete() but that doesn't work either?
I can't figure out why my completable is completing, but refuses to work with flatMapCompletable?
EDIT: This is an update of my complete attempt that does not work
Note userRoutingService.disableRule(accountUid, ruleId)
is the retrofit interface
profileRepo.getLocalProfileIfAvailableElseRemote()
.flatMapCompletable { profile ->
userRoutingRepo.disableRule(profile.account_uid, userRoutingRule.id)
.andThen(Completable.complete())
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeBy(
onError = { error ->
Log.i("TAG", "ERROR")
},
onComplete = {
Log.i("TAG", "COMPLETE")
}
).addTo(disposable)
override fun disableRule(accountUid: String, ruleId: String): Completable {
return activeStateToggler(userRoutingSourceApi.disableRule(accountUid, ruleId),
ruleId,
false)
}
override fun disableRule(accountUid: String, ruleId: String): Completable {
return userRoutingService.disableRule(accountUid, ruleId)
.doOnError { error ->
authenticationValidator.handleAuthenticationExceptions(error)
}
}
private fun activeStateToggler(completable: Completable,
ruleId: String,
stateOnSuccess: Boolean
): Completable {
return completable
.doOnSubscribe {
stateTogglingInProgress.add(ruleId)
}
.doOnComplete {
stateTogglingInProgress.remove(ruleId)
getLocalUserRule(ruleId)?.active = stateOnSuccess
stateTogglingInProgressPublishSubject.onNext(UserRoutingStateToggleSubjectType.Success)
}
.doOnError {
stateTogglingInProgress.remove(ruleId)
stateTogglingInProgressPublishSubject.onNext(UserRoutingStateToggleSubjectType.Error(
it))
}
}