4
votes

I have an observable that emits a series of messages, say obs1. Then a second observable, obs2, that needs some data from the last message emitted by obs1 and emits another series of messages. I would like to "chain" these 2 observables to produce an observable obs3 that serially emits ALL messages from obs1 and obs2.

The solution I came up with so far is:

obs3 = concat(
  obs1,
  obs1.pipe(
    last(),
    concatMap(lastMessage => obs2(lastMessage)),
);

But this has the flaw that obs1 is executed (subscribed to) 2 times.

Is there a more direct way to achieve this? Something like a concatMapWithSelf() operator that would work like this:

obs3 = obs1.pipe(
  concatMapWithSelf(lastMessage => obs2(lastMessage)),
);

Thank you!

1

1 Answers

3
votes

Sounds like you could use ConnectableObservable. In RxJS 7 I believe it would be even easier and better readable with multicast() but that's going to be deprecated in RxJS 8 so the only option is probably wrapping the source Observable with connectable() and then manually calling connect().

const obs1 = connectable(
  defer(() => {
    console.log('new subscription');
    return of('v1', 'v2', 'v3', 'v4');
  })
);

const obs2 = msg => of(msg);

const obs3 = merge(
  obs1,
  obs1.pipe(
    last(),
    concatMap(lastMessage => obs2(lastMessage))
  )
);

obs3.subscribe(console.log);

obs1.connect();

Live demo: https://stackblitz.com/edit/rxjs-2uheg4?devtoolsheight=60

If obs1 is always asynchronous then probably you could use share() but that would behave differently with synchronous sources so using ConnectableObservable should be more safe.