1
votes

I have an app where users can receive notifications based on topics they subscribe to. The workflow is the following:

  1. User logs in
  2. The app registers to the notification server
  3. The users chooses to subscribe/unsubscribe from various topics

I want all the network requests to be serialized. If I knew at initialization what exact topics are going to be subscribed/unsubscribed, I could write a stream like below:

    loginObservable.subscribeOn(Schedulers.io())
            .flatMap(user -> registerApp(appId))
            .flatMap(o -> subscribeToTopic("topic1"))
            .flatMap(o -> unsubscribeFromTopic("topic2"))
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe()

The thing is that users can subscribe/unsubscribe at any point in the app lifecycle, possible even before registration is successful. I could maintain a list of observable and serialize all the requests by hand in onComplete() but that doesn't sound very Rx-ish. Any chance I could do it in a more concise way ? Something like:

    observable = loginObservable.subscribeOn(Schedulers.io())
            .flatMap(user -> registerApp(appId))
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe()

    //later
    observable.flatMap(o -> subscribeToTopic("topic1"))
            .subscribe()

    //even later
    observable.flatMap(o -> unsubscribeFromTopic("topic2"))
            .subscribe()
2

2 Answers

0
votes

What about use doOnSubscribe? Where you can register your app before your topics are publish.

Look this example.

boolean onSubscribe = false;

@Test
public void observableDoOnSubscribe() {
    String val = "test";
    Observable.just(val)
              .doOnSubscribe(() -> onSubscribe = true)
              .filter(s -> onSubscribe)
              .subscribe(s -> System.out.printf(s));
}

You can see more examples here https://github.com/politrons/reactive

0
votes

You can introduce new topics into the observable via PublishSubject. I haven't understood your use of switchMap in your example but this approach might be useful to you:

PublishSubject<Topic> newTopics = PublishSubject.create();
Observable<Topic> topics = ...;
newTopics
    .mergeWith(topics)
    .flatMap(topic -> subscribeToTopic(topic))
    ...
    .subscribe(subscriber);

UI interaction:

newTopics.onNext(topic);