0
votes

I have two observable streams, each emitting a series of items over an infinite period of time (similar to how a DOM-based click Observable would behave). I know an item from Observable A (a$) will match an item from Observable B (b$), but need to do some custom logic to determine which items match. I tried to make this work, but I could only get the first pair to match, and then subsequent items never emit again...

This is an extract from the code:

a$.pipe(
  mergeMap(a => {
    return b$.pipe(
      filter(b => b.key.includes(a.subKey)), // custom matching logic goes here
      take(1),
      map(b => ({ a, b }))
    );
  })
)
.subscribe(({ a, b }) => {
  console.log("do something with a and b", a, b);
});

Note that both Observables never complete, so if some item a from a$ emitted, its "pair" might not have been emitted from b yet. That's why I used filter and not find above. When I did find a matching item, I can complete the inner observable, since that pair has been matched & handled.

Please advise, what am I missing?

3

3 Answers

0
votes

Have you looked at combine latest? It emits the latest value for both streams once both have emitted once.

combineLatest(a$, b$).pipe(filter(([a, b]) => b.key.includes(a.subKey)))
  .subscribe(([a, b]) => {
    // Do stuff with an and b here
  });
0
votes

I think one way to solve this is to first accumulate the values of each observable in a map, then to use the combineLatest operator so you can check for pairs on every emission.

const aMap$ = a$.pipe(scan((acc, crt) => (acc[crt.id] = crt, acc), Object.create(null)));
const bMap$ = b$.pipe(scan((acc, crt) => (acc[crt.id] = crt, acc), Object.create(null)));

combineLatest(aMap$, bMap$)
    .pipe(
        map(([aMap, bMap]) => {
            let pair = null;

            for (const bKey in bMap) {
                const bVal = bMap[bKey];

                const aPairKey = bVal.keys.find(k => !!aMap[k]);

                if (aPairKey) {
                    pair = { a: aMap[aPairKey], b: bVal };

                    delete aMap[aPairKey];
                    delete bMap[bKey];

                    break;
                }
            }

            return pair;
        }),
        filter(v => !!v)
    )
0
votes

I would accumulate the values from A and B to see which values happened yet. After you can create an intersection from those arrays

const keysOccuredInA$ = a$.pipe(
    map(a => a.subKey),
    scan((acc, curr) => ([...acc, curr]), []),
);

const keysOccuredInB$ = b$.pipe(
    map(b => b.key), // b.key is an array, right?
    scan((acc, curr) => ([...acc, ...curr]), []),
);

keysOccuredInBoth$ = combineLatest(keysOccuredInA$, keysOccuredInB$).pipe(
    map(([keysOccuredInA, keysOccuredInB]) => _intersection(keysOccuredInA, keysOccuredInB)), // lodash intersection
)