6
votes

The abstract problem

Is there any way to consume the result of a mergeMap in the original order of the outer observable, while still allowing the inner observables to run in parallel?


More detailed explanation

Let's look at two merge-mapping operators:

  • mergeMap

    ...which takes a mapping callback, and a number of how many inner observables may run concurrently:

      of(1, 2, 3, 4, 5, 6).pipe(
          mergeMap(number => api.get('/double', { number }), 3)
      );
    

    See it here in action: https://codepen.io/JosephSilber/pen/YzwVYNb?editors=1010

    This will fire off 3 parallel requests for 1, 2 and 3, respectively. As soon as one of those requests completes, it'll fire off another request for 4. And so on and so forth, always maintaining 3 concurrent requests, until all of the values have been processed.

    However, since previous requests may complete before subsequent requests, the values produced may be out of order. So instead of:

      [2, 4, 6, 8, 10, 12]
    

    ...we may actually get:

      [4, 2, 8, 10, 6, 12] // or any other permutation
    
  • concatMap

    ...enter concatMap. This operator ensures that the observables are all concatenated in the original order, so that:

      of(1, 2, 3, 4, 5, 6).pipe(
          concatMap(number => api.get('/double', { number }))
      );
    

    ...will always produce:

      [2, 4, 6, 8, 10, 12]
    

    See it here in action: https://codepen.io/JosephSilber/pen/OJMmzpy?editors=1010

    This is what we want, but now the requests won't run in parallel. As the documentation says:

    concatMap is equivalent to mergeMap with concurrency parameter set to 1.

So back to the question: is it possible to get the benefits of mergeMap, whereby a given amount of requests can be run in parallel, while still having the mapped values be emitted in the original order?


My concrete problem

The above described the problem in abstract. It is sometimes easier to reason about a problem when you know the actual problem at hand, so here goes:

  1. I have a list of orders that have to be shipped:

     const orderNumbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
    
  2. I have a shipOrder method that actually ships the orders. It returns a Promise:

     const shipOrder = orderNumber => api.shipOrder(orderNumber);
    
  3. The API can only process up to 5 order shipments simultaneously, so I'm using mergeMap to handle that:

     from(orderNumbers).pipe(
         mergeMap(orderNumber => shipOrder(orderNumber), 5)
     );
    
  4. After an order is shipped, we need to print its shipping label. I have a printShippingLabel function that, given the order number of a shipped order, will print its shipping label. So I subscribe to our observable, and print the shipping labels as the values come in:

     from(orderNumbers)
         .pipe(mergeMap(orderNumber => shipOrder(orderNumber), 5))
         .pipe(orderNumber => printShippingLabel(orderNumber));
    
  5. This works, but now the shipping labels are printed out of order, since mergeMap emits values based on when shipOrder completes its request. What I want is for the labels to print in the same order as the original list.

Is that possible?


Visualization

See here for a visualization of the problem: https://codepen.io/JosephSilber/pen/YzwVYZb?editors=1010

You can see that earlier orders are being printed before later orders are even shipped.

3

3 Answers

4
votes

I did manage to partially solve it, so I'm posting it here as an answer to my own question.

I still very much want to know the canonical way to handle this situation.


A convoluted solution

  1. Create a custom operator that takes values that have an index key ({ index: number } in Typescript parlance), and keeps a buffer of the values, only emitting them according to their index's order.

  2. Map the original list into a list of objects with their index embedded.

  3. Pass that onto our custom sortByIndex operator.

  4. Map the values back into their original values.

Here's what that sortByIndex would look like:

function sortByIndex() {
    return observable => {
        return Observable.create(subscriber => {
            const buffer = new Map();
            let current = 0;
            return observable.subscribe({
                next: value => {
                    if (current != value.index) {
                        buffer.set(value.index, value);
                    } else {
                        subscriber.next(value);
                    
                        while (buffer.has(++current)) {
                            subscriber.next(buffer.get(current));
                            buffer.delete(current);
                        }
                    }
                },
                complete: value => subscriber.complete(),
            });
        });
    };
}

With the sortByIndex operator in place, we can now complete our whole pipeline:

of(1, 2, 3, 4, 5, 6).pipe(
    map((number, index) => ({ number, index })),
    mergeMap(async ({ number, index }) => {
        const doubled = await api.get('/double', { number });
        return { index, number: doubled };
    }, 3),
    sortByIndex(),
    map(({ number }) => number)
);

See it here in action: https://codepen.io/JosephSilber/pen/zYrwpNj?editors=1010

Creating a concurrentConcat operator

In fact, with this sortByIndex operator in place, we can now create a general concurrentConcat operator, which will do the transformations to and from the { index: number, value: T } type internally:

function concurrentConcat(mapper, parallel) {
    return observable => {
        return observable.pipe(
            mergeMap(
                mapper,
                (_, value, index) => ({ value, index }),
                parallel
            ),
            sortByIndex(),
            map(({ value }) => value)
        );
    };
}

We can then use this concurrentConcat operator instead of mergeMap, and it will now emit the values in their original order:

of(1, 2, 3, 4, 5, 6).pipe(
    concurrentConcat(number => api.get('/double', { number }), 3),
);

See it here in action: https://codepen.io/JosephSilber/pen/pogPpRP?editors=1010

So to solve my original problem with the order shipments:

from(orderNumbers)
    .pipe(concurrentConcat(orderNumber => shipOrder(orderNumber), maxConcurrent))
    .subscribe(orderNumber => printShippingLabel(orderNumber));

See it here in action: https://codepen.io/JosephSilber/pen/rNxmpWp?editors=1010

You can see that even though later orders might end up being shipped before earlier ones, the labels are always printed in their original order.


Conclusion

This solution is not even complete (since it doesn't handle inner observables that emit more than one value) yet it requires a bunch of custom code. This is such a common problem, that I feel there has to be an easier (built-in) way to solve this :|

1
votes

You could use this operator: sortedMergeMap, example.

const DONE = Symbol("DONE");
const DONE$ = of(DONE);
const sortedMergeMap = <I, O>(
  mapper: (i: I) => ObservableInput<O>,
  concurrent = 1
) => (source$: Observable<I>) =>
  source$.pipe(
    mergeMap(
      (value, idx) =>
        concat(mapper(value), DONE$).pipe(map(x => [x, idx] as const)),
      concurrent
    ),
    scan(
      (acc, [value, idx]) => {
        if (idx === acc.currentIdx) {
          if (value === DONE) {
            let currentIdx = idx;
            const valuesToEmit = [];
            do {
              currentIdx++;
              const nextValues = acc.buffer.get(currentIdx);
              if (!nextValues) {
                break;
              }
              valuesToEmit.push(...nextValues);
              acc.buffer.delete(currentIdx);
            } while (valuesToEmit[valuesToEmit.length - 1] === DONE);
            return {
              ...acc,
              currentIdx,
              valuesToEmit: valuesToEmit.filter(x => x !== DONE) as O[]
            };
          } else {
            return {
              ...acc,
              valuesToEmit: [value]
            };
          }
        } else {
          if (!acc.buffer.has(idx)) {
            acc.buffer.set(idx, []);
          }
          acc.buffer.get(idx)!.push(value);
          if (acc.valuesToEmit.length > 0) {
            acc.valuesToEmit = [];
          }
          return acc;
        }
      },
      {
        currentIdx: 0,
        valuesToEmit: [] as O[],
        buffer: new Map<number, (O | typeof DONE)[]>([[0, []]])
      }
    ),
    mergeMap(scannedValues => scannedValues.valuesToEmit)
  );
0
votes

What you want is this:

from(orderNumbers)
  .pipe(map(shipOrder), concatAll())
  .subscribe(printShippingLabel)

Explanation:

The first operator in the pipe is map. It calls shipOrder for each value immediately (so subsequent values may start parallel requests).

The second operator, concatAll, puts the resolved values in proper sequence.

(I simplified the code; concatAll() is equivalent to concatMap(identity).)