0
votes

I think this will be done similarly with most reactive implementations so I do not specify any specific library&language here.

What is the proper way to do reactive side effects on streams (not sure what is the proper terminology here)?

So lets say we have a stream of Players. We want to transform that to a stream of only ids. But in addition to that, we want to do some additional reactive processing based on the ids.

Here is how one can do in not so elegant way, but what is the idiomatic way to achieve this:

Observable<Player> playerStream = getPlayerStream();
return playerStream
      .map(p -> p.id)
      .flatmap(id -> {
           Observable<Result> weDontCare = process(id);
           // Now the hacky part
           return weDontCare.map(__ -> id);
      })

Is that ok? Seems not so elegant.

2
Btw, there exist an extension operator for RxJava that allows you to map asynchronously, which is more concise than the otherwise reasonable flatMap approach: github.com/akarnokd/… - akarnokd

2 Answers

0
votes

I don't know RxJava, but you also tagget project reactor, and there are two ways to do this with reactor depending on how you want the side-effects to affect your stream. If you want to wait for the side effects to happen so you can handle errors etc (my preferred way) use delayUntil:

return getPlayerStream()
  .map(p -> p.id)
  .delayUntil(id -> process(id));

If you want the id to pass straight through without waiting, and instead do a fire-and-forget style side-effect, you could use doOnNext:

return getPlayerStream()
  .map(p -> p.id)
  .doOnNext(id -> process(id).subscribe());
-1
votes
Observable<Player> playerStream = getPlayerStream();
    return playerStream
          .map(p -> p.id)
          .doOnEach(id -> { //side effect with id
            Observable<Result> weDontCare = process(id);
            weDontCare.map(__ -> id);
          })