1
votes

im breaking my mind around how to do this in RX.

The actual usecase is mapping of LowerLevelEvent(val userId: String) to HigherLevelEvent(val user: User), where the User is provided by observable, so it can emit n times, so example output

LowerLevelEvent1(abc) -> HigherLevelEvent1(userAbc(nameVariation1)
LowerLevelEvent2(abc) -> HigherLevelEvent2(userAbc(nameVariation1)
LowerLevelEvent3(abc) -> HigherLevelEvent3(userAbc(nameVariation1)
LowerLevelEvent4(abc) -> HigherLevelEvent4(userAbc(nameVariation1)
                         HigherLevelEvent4(userAbc(nameVariation2)
                         HigherLevelEvent4(userAbc(nameVariation3)

So my naive solution was to use combineLatest. So while userId is not changed user observable is subscribed, i.e. not resubscribed when new lowerLevelEmits & its userId is not changed

val _lowerLevelEventObservable: Observable<LowerLevelEvent> = lowerLevelEventObservable
                        .replayingShare()

val _higherLevelEventObservable: Observable<HigherLevelEvent> = Observables
                        .combineLatest(
                _lowerLevelEventObservable, 
                _lowerLevelEventObservable
                            .map { it.userId }
                            .distinctUntilChanged()
                            .switchMap { userRepository.findByIdObservable(it)
            ) { lowerLevelEvent, user -> createHigherLevelInstance... }

However this has glitch issues, since both sources in combineLatest originate from same observable.

Then I thought about

lowerLevelObservable.
   .switchMap { lowerLevelEvent ->
      userRepository.findByIdObservable(lowerLevelEvent.userId)
          .map { user -> createHigherLevelInstance... }
   }

This however can break if lowerLevelObservable emits fast, and since user observable can take some time, given lowerLevelX event can be skipped, which I cannot have. Also it resubscribes user observable each emit, which is wasteful since it wont change most likely

So, maybe concatMap? That has issue of that the user observable doesnt complete, so concatMap wouldnt work.

Anyone have a clue?

Thanks a lot

// Clarification: basically its mapping of A variants (A1, A2..) to A' variants (A1', A2'..) while attaching a queried object to it, where the query is observable so it might reemit after the mapping was made, so AX' needs to be reemited with new query result. But the query is cold and doesnt complete

So example A1(1) -> A1'(user1), A2(1) -> A2'(user1), A3(1) -> A3'(user1) -- now somebody changes user1 somewhere else in the app, so next emit is A3'(user1')

1
It's very confusing for me what you want to do. Maybe you could use concatMap and inside that userRepository.findByIdObservable(...).take(1)?martin
@martin id like to map lower to upper level event, where to map to upper i need to query the user, via userId from the lower level event. However, during this whole process database might change, so I need to reemit the higher level event with new User valueurSus
I especially don't understand this bit, "... given lowerLevelX event can be skipped, which I cannot have." Why would you want events that have been superseded?Daniel T.
@DanielT. because lower level events are algebraic data type and I need to see all types that are commingurSus
basically its mapping of A variants (A1, A2..) to A' variants (A1', A2'..) while attaching a queried object to it, where the query is observable so it might reemit after the mapping was made, so AX' needs to be reemited with new query resulturSus

1 Answers

1
votes

Based on the comments you have made, the below would work in RxSwift. I have no idea how to translate it to RxJava. Honestly though, I think there is a fundamental misuse of Rx here. Good luck.

How it works: If it's allowed to subscribe it will, otherwise it will add the event to a buffer for later use. It is allowed to subscribe if it currently isn't subscribed to an inner event, or if the inner Observable it's currently subscribed to has emitted an element.

WARNING: It doesn't handle completions properly as it stands. I'll leave that to you as an exercise.

func example(lowerLevelEventObservable: Observable<LowerLevelEvent>, userRepository: UserRepository) {
    let higherLevelEventObservable = lowerLevelEventObservable
        .flatMapAtLeastOnce { event in // RxSwift's switchLatest I think.
            Observable.combineLatest(
                Observable.just(event),
                userRepository.findByIdObservable(event.userId),
                resultSelector: { (lowLevelEvent: $0, user: $1) }
            )
        }
        .map { createHigherLevelInstance($0.lowLevelEvent, $0.user) }

    // use higherLevelEventObservable
}

extension ObservableType {
    func flatMapAtLeastOnce<U>(from fn: @escaping (E) -> Observable<U>) -> Observable<U> {
        return Observable.create { observer in
            let disposables = CompositeDisposable()
            var nexts: [E] = []
            var disposeKey: CompositeDisposable.DisposeKey?
            var isAllowedToSubscribe = true
            let lock = NSRecursiveLock()
            func nextSubscription() {
                isAllowedToSubscribe = true
                if !nexts.isEmpty {
                    let e = nexts[0]
                    nexts.remove(at: 0)
                    subscribeToInner(e)
                }
            }

            func subscribeToInner(_ element: E) {
                isAllowedToSubscribe = false
                if let key = disposeKey {
                    disposables.remove(for: key)
                }
                let disposable = fn(element).subscribe { innerEvent in
                    lock.lock(); defer { lock.unlock() }
                    switch innerEvent {
                    case .next:
                        observer.on(innerEvent)
                        nextSubscription()
                    case .error:
                        observer.on(innerEvent)
                    case .completed:
                        nextSubscription()
                    }
                }
                disposeKey = disposables.insert(disposable)
            }

            let disposable = self.subscribe { event in
                lock.lock(); defer { lock.unlock() }
                switch event {
                case let .next(element):
                    if isAllowedToSubscribe == true {
                        subscribeToInner(element)
                    }
                    else {
                        nexts.append(element)
                    }
                case let .error(error):
                    observer.onError(error)
                case .completed:
                    observer.onCompleted()
                }
            }
            _ = disposables.insert(disposable)
            return disposables
        }
    }
}