1
votes

I'm a bit of an RxJava newbie. I'm getting the hang of it, but there's one scenario that I'm not quite satisfied with. Say I have a pair of Observables of different types that are designed to work together. For example:

val list: Observable<List<MyClass>>
val indexIntoList: Observable<Int>

So, I need to observe these as a pair, so that the observer gets emitted to when either list or indexIntoList gets an update. Right now my solution is something like this:

var lastListUsed: List<MyClass> = ArrayList()
var lastIndexUsed = -1
val syncObject = Object()

init {
    list.subscribe(object: Observer<List<MyClass>>{
        .
        .
        .
        override fun onNext(t: List<MyClass>)
        {
             FunctionOnMyClass(t, lastIndexUsed)
        }
    })

    indexIntoList.subscribe(object: Observer<Int>{
        .
        .
        .
        override fun onNext(t: Int)
        {
            FunctionOnMyClass(lastListUsed, t)
        }
    })
}

fun FunctionOnMyClass(list: List<MyClass>, index: Int)
{
    synchronized(syncObject)
    {
        lastListUsed = list
        lastIndexUsed = index
        .
        .
        .
    }
}

I had the thought to do something like this:

var lastMyClass = DefaultMyClass()

list.doOnNext{listVersion ->
    indexIntoMap.map{ index ->
          if(index in listVersion.indices)
              listVersion[index]
          else
              lastMyClass
     }.subscribe(object: Observer<MyClass>{
        .
        .
        .
        override fun onNext(t: MyClass)
        {
            lastMyClass = t
            .
            .
            .
        }
     })
}

But if I'm understanding correctly, to use this method, I would have to dispose the Disposable on the inner Observable every time the outer Observable updates. I'd rather not do that. Is there a preferred way to handle this scenario? Should I be open to disposing the Disposable on every update of the outer Observable?

2
Hey I’m not that familiar with Kotlin and I am also sort of a newbie in RxJava but I did use it before to link multiple observables, not sure if that’s what you were looking for but you can link the outputs of observables is that what you want?Mr.O
If you wanted to pair them you can use the zip operator like thisMr.O

2 Answers

3
votes

I think you are looking for combineLastest operator:

when an item is emitted by either of two Observables, combine the latest item emitted by each Observable via a specified function and emit items based on the results of this function

enter image description here

Here an example of implementation:

val myListObservable = ...
val myIndexObservable = ...

disposable = Observable.combineLatest(myListObservable, myIndexObservable, BiFunction { list: List<YourType>, index: Int ->
    list to index
})

Then you can work directly with the last list and the las index emitted by each observable as a Pair<List<YourType>,Int>


In the case you need to wait to each observable to emit an element to pair with each other, you should use zip

Combine the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function

enter image description here

As you can observe in the diagram, the second observable emits C and D at last. But the first observable take a time to emit 3 and then 4. The resulting pair will wait until the other observable emits an element to pair with.

The implementation is the same as above, replacing combineLastest with zip.

1
votes

if you want to listen to the most recent events from both observables then take a look at combineLatest operator: http://rxmarbles.com/#combineLatest

It emits every time one of the observables emit a new value BUT it will start as soon as each of the sources has emitted at least one event.

val result = Observable.combineLatest<String, Int, String>(sourceObservable1, sourceObservable2) { s, integer -> s + ": " + integer }