I have two observable streams, each emitting a series of items over an infinite period of time (similar to how a DOM-based click Observable would behave). I know an item from Observable A (a$
) will match an item from Observable B (b$
), but need to do some custom logic to determine which items match.
I tried to make this work, but I could only get the first pair to match, and then subsequent items never emit again...
This is an extract from the code:
a$.pipe(
mergeMap(a => {
return b$.pipe(
filter(b => b.key.includes(a.subKey)), // custom matching logic goes here
take(1),
map(b => ({ a, b }))
);
})
)
.subscribe(({ a, b }) => {
console.log("do something with a and b", a, b);
});
Note that both Observables never complete, so if some item a
from a$
emitted, its "pair" might not have been emitted from b
yet. That's why I used filter
and not find
above. When I did find a matching item, I can complete the inner observable, since that pair has been matched & handled.
Please advise, what am I missing?