0
votes

I'm in a case where I want to subscribe as soon as possible to two different obsevable, but wait for the first one to emit before the second one emits.

For instance I'm developing a chat application where I want to retrieve all previous messages stored on a DB, but also subscribe to a stream which emits new ones.

I want something like this:

readonly messages$ = this.service.getStoredMessages().pipe(
    switchMapTo(this.service.getNewMessages())
)

The problem with this is that I may miss some messages because of a late subscription to getNewMessages(). What I need to do is to subscribe as soon as possible to both observables, but the second one (getNewMessages()) should emits only after getStoredMessages() emits.

4
If you want to both subscribe immediately, but not emit until the first one emits, what exactly is it that you want to do with emissions from the second observable before the first one emitted? It seems like switchMap(To) does what you want. Can you provide example observables and expected output?Ingo Bürk
@IngoBürk I think that what the OP is saying is that getStoredMessages is a pull request that sends a snapshot of the "historical" messages. However, it's possible that other messages get created while the request is being served. I've been in that situation, and what I did was buffer the messages that were received before receiving the result of the historical data, and then "merge" the historical with the buffered messages (ensuring that there are no duplicates), then emit those and afterwords emit the new messages. I could be wrong, but I think what's the OP is looking for.Josep
That sounds reasonable, but should still come from the OP. In that case I'd go for combineLatest (forkJoin won't work as I assume getNewMessages doesn't complete) with operators to combine the results.Ingo Bürk
Yes, sorry for the late replay. What @Josep says it's correct. Would you mind sharing a solution with combineLatest? I don't quite get it how it could actually solve my problem. Again, sorry for the late replayMarco Ripamonti
@MarcoRipamonti I disagree with @IngoBürk... I don't think that combineLatest is going to help you for what you are trying to do here, but now that you've confirmed that this is what you are looking for I will share a way to solve this. Although, I'm genuinely curious to see if I'm missing something, and combineLatest can help here.Josep

4 Answers

1
votes

Assuming that:

  • storedMessges$ type is Observable<Message[]>
  • that getMessages$ type is Observable<Message>
  • and that messages$ type is supposed to be Observable<Message>

I think that you are what you are want to do is something more or less like this:

readonly messages$ = defer(() => {
  const storedMessages$ = this.service.getStoredMessages().pipe(share())
  const newMessages$ = this.service.getNewMessages().pipe(share())

  const missedMessages$ = newMessages$.pipe(
    takeUntil(storedMessages$),
    toArray(),
  )

  const initialMessages$ = merge(
    storedMessages$,
    missedMessages$
  ).pipe(mergeAll()) // instead of `mergeAll` maybe you want to `scan` and make sure that there are no duplicate values...

  return merge(
    newMessages$.pipe(ignoreElements()), // it ensures that we don't unbscribe from newMessages$ after storedmessages$ emits/completes.
    concat(initialMessages$, newMessages$),
  )
})
0
votes

You can use this kind of structure:

readonly messages$ = this.service.getStoredMessages().pipe(
    withLatestFrom(this.service.getNewMessages())
)
0
votes

you can try this :

readonly messages$ = this.service.getStoredMessages().subscribe(
              res => {
                      this.service.getNewMessages()
                     })
0
votes

From your question what i understand is that there is an array of messages which you want to use in your view.

Let's call it viewMessages. Now if we want to push messages into viewMessages, I recommend subscribing both the streams i.e. stream of previous messages and stream of chat message and when any of them emits, prepend or append messages/messages into viewMessages.

viewMessages:Message[] = [];
.
.
.
this.getStoredMessages().subscribe(messages=>{
    const chatMessages = this.viewMessages;
    this.viewMessages = [].concat(messages, chatMessages);
});
this.getNewMessage().subscribe(message=>{
    const existingMessages = this.viewMessages;
    this.viewMessages = [].concat(existingMessages, message);
});

Please not the sequence is maintained in both the subscriptions. Previous messages will appear first and new messages will appear afterwards.