0
votes

I'm new to rxswift and here's my problem:

Suppose I have observable of actions: Observable.of("do1", "do2", "do3")

Now this observable mapped to function that returns observable:

let actions = Observable.of("do1", "do2", "do3")

func do(action: String) -> Observable<Result> {
       // do something
       // returns observable<Result>
}

let something = actions.map { action in return do(action) } ??? 

How can I wait for do1 to complete first, then execute do2, then do3?

Edit: Basically i want to achieve sequential execution of actions. do3 waits for do2 result, do2 waits for do1 result.

Edit2: I've tried using flatmap and subscribe, but all actions runs in parallel.

4
Add some more details. What are you exactly trying to do? - staticVoidMan

4 Answers

7
votes

How can I wait for do1 to complete first, then execute do2, then do3?

I think concatMap solves the problem.

Lets say we have some service and some actions we need to perform on it, for instance a backend against with we'd like to authenticate and store some data. Actions are login and store. We can't store any data if we aren't logged in, so we need to wait login to be completed before processing any store action.

While flatMap, flatMapLatest and flatMapFirst execute observables in parallel, concatMap waits for your observables to complete before moving on.

import Foundation
import RxSwift
import PlaygroundSupport

PlaygroundPage.current.needsIndefiniteExecution = true

let service: [String:Observable<String>] = [
    "login": Observable.create({
        observer in
        observer.onNext("login begins")
        DispatchQueue.main.asyncAfter(deadline: .now() + 1.0, execute: {
            observer.onNext("login completed")
            observer.onCompleted()
        })
        return Disposables.create()
    }),
    "store": Observable.create({
        observer in
        observer.onNext("store begins")
        DispatchQueue.main.asyncAfter(deadline: .now() + 0.2, execute: {
            observer.onNext("store completed")
            observer.onCompleted()
        })
        return Disposables.create()
    }),
]

// flatMap example
let observeWithFlatMap = Observable.of("login", "store")
    .flatMap {
        action in
        service[action] ?? .empty()
    }

// flatMapFirst example
let observeWithFlatMapFirst = Observable.of("login", "store")
    .flatMapFirst {
        action in
        service[action] ?? .empty()
    }

// flatMapLatest example
let observeWithFlatMapLatest = Observable.of("login", "store")
    .flatMapLatest {
        action in
        service[action] ?? .empty()
    }

// concatMap example
let observeWithConcatMap = Observable.of("login", "store")
    .concatMap {
        action in
        service[action] ?? .empty()
    }

// change assignment to try different solutions
//
// flatMap: login begins / store begins / store completed / login completed
// flatMapFirst: login begins / login completed
// flatMapLatest: login begins / store begins / store completed
// concatMap: login begins / login completed / store begins / store completed

let observable = observeWithConcatMap

observable.subscribe(onNext: {
    print($0)
})
0
votes

I just face the same problem, and finally found the solution. I expect my devices will do disconnect followed by one another, so I did as follow:

I just create the func like

func disconnect(position: WearingPosition) -> Completable{
    print("test run")
    return Completable.create { observer in
          print("test run 2")    
          // Async process{
          //       observer(.complete)
          // }
                 
          return Disposables.create() 
    }
}

And use like:

self.disconnect(position: .left_wrist).andThen(Completable.deferred({
        
        return self.disconnect(position: .right_wrist)
        
    })).subscribe(onCompleted: {
        // do some things
        
    }) { (error) in
        print(error)
    }.disposed(by: self.disposeBag)

The key is the usage of " Completable.deferred "

I have tested with the "test run" printed

-1
votes

Use flatMap or flatMapLatest. You can find more about them at reactivex.io.

-1
votes

You can flatMap and subscribe to the Result observable sequence by calling subscribe(on:) on the final output.

actions
    .flatMap { (action) -> Observable<Result> in
        return self.doAction(for: action)
    }
    .subscribe(onNext: { (result) in
        print(result)
    })

func doAction(for action: String) -> Observable<Result> {
    //...
}

Read: