0
votes

I am still struggling with some of the basics of RxJava and would greatly appreciate some help.

I have an infinite, hot observable which emits regularly tagged events (captures in a simple class having a name (tag) and some properties). The tags are finite (in this case about 10 distinct tags) but the event specifics are every time different (a time stamp, say).

What I am trying to do now is to create a HashMap with the tags as key and the events as the entry, such that the HashMap itself becomes an infinite observable which emits the HashMap with every change.

So far, I used a Subject to subscribe to the original observable and to emit the HashMap, but I also saw the ".toMap" method. However, I cannot figure out how use that method with an infinite observable source and emit with every change. From the documentation it is not even clear to me whether this would be possible at all.

If it's not possible, is there another way, aside from using Subjects, to achieve the same? I want to keep this lean, and Subject seems rather heavy.

1
You want a Map or an Observable? If you want Observable, you can use groupBy. If you want Map, you can collect the GroupedObservable to a Map. - Dean Xu

1 Answers

1
votes

Here is my code convert it to a Map or Observable<Map>. But I don't know why you need to do this.

fun <T, K> Observable<T>.toInfiniteMap(keySelector: (T) -> K): Map<K, Observable<T>> {
    val map = ConcurrentHashMap<K, Observable<T>>()
    this.subscribeOn(Schedulers.newThread())
            .doOnNext { println(it) }
            .groupBy(keySelector)
            .doOnNext { map.put(it.getKey(), it) }
            .subscribe()
    return map
}

fun <T, K> Observable<T>.toInfiniteMapObservable(keySelector: (T) -> K):
        Observable<Map<K, Observable<T>>> {
    val map = ConcurrentHashMap<K, Observable<T>>()
    return this.subscribeOn(Schedulers.newThread())
            .doOnNext { println(it) }
            .groupBy(keySelector)
            .doOnNext { map.put(it.getKey(), it) }
            .map { map }
}