1
votes

New to RxJS, but I'm trying to map a single element stream to another that produces an array after all internal/subsequent streams are finished/loaded. However, my inner observables don't seem to be executing. They just get returned cold.

High level, I need to execute http post to upload a list of files (in two different arrays to two different endpoints). Since they are large I emulate with a delay of 5 seconds. The requests need to be executed in parallel, but limited to concurrently executing X at a time (here 2). This all needs to be inside a pipe and the pipe should only allow the stream to continue after all posts are complete.

https://stackblitz.com/edit/rxjs-pnwa1b

import { map, mapTo, mergeMap, mergeAll, delay, tap, catchError, toArray } from 'rxjs/operators';
import { interval, merge, forkJoin, of, from, range, Observable } from 'rxjs';

const single = "name";

const first = ["abc", "def"];

const second = of("ghi", "jkl", "mno");

of(single)
.pipe(tap(val => console.log(`emit:${val}`)))
.pipe(
  map(claim => 
    merge(
      from(first).pipe(map(photo => of(photo).pipe(delay(5000)))),
      from(second).pipe(map(video => of(video).pipe(delay(5000))))
    )
    .pipe(
      mergeAll(2)
    )
    .pipe(tap(val => console.log(`emit:${val}`)))
    .pipe(toArray())
    .pipe(tap(val => console.log(`emit:${val}`)))
  )
)
.pipe(
catchError(error => {
  console.log("error");
  return Observable.throw(error);
})
)
.subscribe(val => console.log(`final:${val}`));

An inner subscribe would not wait until they are complete. Using forkJoin would not allow me to limit the concurrent uploads. How can I accomplish this?

Update:

Answer by @dmcgrandle was very helpful and led me to make the changes below that seem to be working:

import { map, mapTo, mergeMap, mergeAll, delay, tap, catchError, toArray } from 'rxjs/operators';
import { interval, merge, forkJoin, of, from, range, Observable, throwError } from 'rxjs';

const single = "name";

const first = ["abc", "def"];

const second = of("ghi", "jkl", "mno");

of(single)
.pipe(tap(val => console.log(`emit:${val}`)))
.pipe(
  mergeMap(claim =>
    merge(
      from(first).pipe(map(photo => of(photo).pipe(delay(5000)).pipe(tap(val => console.log(`emit:${val}`))))),
      from(second).pipe(map(video => of(video).pipe(delay(5000)).pipe(tap(val => console.log(`emit:${val}`)))))
    )
  ),
  mergeAll(2),
  toArray()
)
.pipe(
catchError(error => {
  console.log("error");
  return throwError(error);
})
)
.subscribe(val => console.log(`final:${val}`));
1

1 Answers

2
votes

If I am understanding you correctly, then I think this is a solution. Your issue was with the first map, which won't perform an inner subscribe, but rather just transform the stream into Observables of Observables, which didn't seem to be what you wanted. Instead I used mergeMap there.

Inside the from's I used concatMap to force each emission from first and second to happen in order and wait for one to complete before another started. I also set up postToEndpoint functions that return Observables to be closer to what your actual code will probably look like.

StackBlitz Demo

code:

import { mergeMap, concatMap, delay, tap, catchError, toArray } from 'rxjs/operators';
import { merge, of, from, concat, throwError } from 'rxjs';

const single = "name";

const first = ["abc", "def"];

const second = of("ghi", "jkl", "mno");

const postToEndpoint1$ = photo => of(photo).pipe(
  tap(data => console.log('start of postTo1 for photo:', photo)),
  delay(5000),
  tap(data => console.log('end of postTo1 for photo:', photo))
);

const postToEndpoint2$ = video => of(video).pipe(
  tap(data => console.log('start of postTo2 for video:', video)),
  delay(5000),
  tap(data => console.log('end of postTo2 for video:', video))
);

of(single).pipe(
  tap(val => console.log(`initial emit:${val}`)),
  mergeMap(claim => 
    merge(
      from(first).pipe(concatMap(postToEndpoint1$)),
      from(second).pipe(concatMap(postToEndpoint2$))
      )
  ),
  toArray(),
  catchError(error => {
    console.log("error");
    return throwError(error);
  })
).subscribe(val => console.log(`final:`, val));

I hope this helps.