40
votes

I have two source observables from where I need to calc some data as soon as one source observable emits. I'm trying to use the combineAll() operator but it only emits a value when each of the source observables emits for the first time.

Is there any operator similar to combineAll() that emits as soon as any of the source observables emits for the first time? If not, what's the clearest way of doing it?

What I've tried:

const source1$ = service.getSomeData();
const source2$ = service.getOtherData();

combineLatest(
  source1$,
  source2$
).pipe(
  map([source1Data, source2Data] => {
    // this code only gets executed when both observables emits for the first time
    return source1Data + source2Data;
  })
)
2
Your example indicates you need both values to perform the operation. What are you going to do with only 1 value (the first to be emitted)?JJWesterkamp
Yes, I need both values to perform the operation. But, until I have both values, the value combineLatest emits is null. What I want is emit the starting value of the first observable (empty array for example) until the second one arrives. Then, I can perform the operation and return another list based on the second observable.Elias Garcia
I'm not sure I understand what you're trying to achieve. combineLatest does not emit anything before all input observables have emitted at least once. If you want to receive a compound value immediately regardless of input state - e.g. [null, 'source2Data'] - there is no other way than to explicitly push that initial null value through each input observable.JJWesterkamp
My guess is that you need withLatestFrom instead of combineLatest. This way you can have one observable that determines when something should happen and withLatestFrom will just give you the last emitted value of some other observables. You can pass multiple observables into withLatestFrom, and it will give you an array of values -> one value for each observable. You might still need startWith though.Davy
"Be aware that combineLatest will not emit an initial value until each observable emits at least one value. This is the same behavior as withLatestFrom and can be a gotcha as there will be no output and no error but one (or more) of your inner observables is likely not functioning as intended, or a subscription is late." - learnrxjs.io/learn-rxjs/operators/combination/combinelatestPranaya

2 Answers

38
votes

If I understand correctly you want a pattern like the following diagram:

stream1$ => ------ 1 ------ 12 -----------------------
stream2$ => ------------------------- 30 -------------

result$  => ------ 1 ------ 12 ------ 42 --------------

If one value is available, emit that. If both are available, emit the combination of both, a simple sum in this case (12 + 30 = 42);

First the input streams, I've made them subjects for the sake of this example, so we can push data in manually:

const stream1$ = new Subject();
const stream2$ = new Subject();

Next we'll combine the inputs, first piped through the startWith operator. This makes sure that combineLatest produces an observable that emits immediately - [null, null] to be precise.

const combined$ = combineLatest(
  stream1$.pipe(startWith(null)),
  stream2$.pipe(startWith(null)),
);

Now you have an observable that always emits arrays of length 2, containing any combination of your data (numbers in this example) and null, like the following diagram:

stream1$ | startWith(NULL) => NULL ----------- 1 ----------- 12 ----------------------------
stream2$ | startWith(NULL) => NULL ---------------------------------------- 30 -------------

combined$                     [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------

Finally you can inspect and map this output to your desired format: the sum of 2 numbers if both are available, or the first value to be available:

const processedCombinations$ = combined$.pipe(
  map(([data1, data2]) => {
    if (data1 === null) return data2;
    if (data2 === null) return data1;

    return data1 + data2;
  }),
);

Result:

combined$                  => [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------
processedCombinations$     => NULL ----------- 1 ----------- 12 ----------- 42 -------------

One problem remains: the first value emitted from combined$ is [null, null], causing processedCombinations$ to emit null initially. One way to fix this is to chain another pipe using skipWhile onto processedCombinations$:

const final$ = processedCombinations$.pipe(skipWhile((input) => input === null));

Result:

combined$                  => [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------
processedCombinations$     => NULL ----------- 1 ----------- 12 ----------- 42 -------------
final$                     => ---------------- 1 ----------- 12 ----------- 42 -------------

Another - imo better - way is to filter the combined$ stream before processedCombinations$ (now actually final$) is created from it:

const combinedFiltered$ = combined$.pipe(
    filter(([first, second])=> first !== null || second !== null),
);

const final$ = combinedFiltered$.pipe(
    map(([data1, data2]) => {
        if (data1 === null) return data2;
        if (data2 === null) return data1;

        return data1 + data2;
    }),
);

A corresponding diagram shows nicely how irrelevant values are eliminated as early in the stream hierarchy as possible:

combined$                  => [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------
combinedFiltered$          => ---------------- [1, NULL] --- [12, NULL] --- [12, 30] -------
final$                     => ---------------- 1 ----------- 12 ----------- 42 -------------

The above diagrams can be produced with this code:

final$.subscribe(console.log);

stream1$.next(1);
// logs: 1

stream1$.next(12);
// logs: 12

stream2$.next(30);
// logs: 42

Imports used:

import { combineLatest, Subject } from 'rxjs';
import { filter, map, skipWhile, startWith } from 'rxjs/operators';
36
votes

One way is prefixing all sources with startWith:

combineLatest([
  source1$.pipe(startWith(?)),
  source2$.pipe(startWith(?)),
])

that emits as soon as any of the source observables emits for the first time?

This looks like you might be looking for race(source1$, source2$) Observable creation method or maybe just merge(source1$, source2$).pipe(take(1)). But it really depends on what you want to do.