1
votes

I am learning to use RXJS. In this scenario, I am chaining a few async requests using rxjs. At the last mergeMap, I'd like to have access to the first mergeMap's params. I have explored the option using Global or withLatest, but neither options seem to be the right fit here.

const arraySrc$ = from(gauges).pipe(
    mergeMap(gauge => {
      return readCSVFile(gauge.id);
    }),
    mergeMap((csvStr: any) => readStringToArray(csvStr.data)),
    map((array: string[][]) => transposeArray(array)),
    mergeMap((array: number[][]) => forkJoin(uploadToDB(array, gauge.id))),
    catchError(error => of(`Bad Promise: ${error}`))
  );

readCSVFile is an async request which returns an observable to read CSV from a remote server.

readStringToArray is another async request which returns an observable to convert string to Arrays

transposeArray just does the transpose

uploadToDB is async DB request, which needs gague.id from the first mergeMap.

How do I get that? It would be great to take some advice on why the way I am doing it is bad.

For now, I am just passing the ID layer by layer, but it doesn't feel to be correct.

const arraySrc$ = from(gauges).pipe(
    mergeMap(gauge => readCSVFile(gauge.id)),
    mergeMap(({ data, gaugeId }: any) => readStringToArray(data, gaugeId)),
    map(({ data, gaugeId }) => transposeArray(data, gaugeId)),
    mergeMap(({ data, gaugeId }) => uploadToDB(data, gaugeId)),
    catchError(error => of(`Bad Promise: ${error}`))
  );
1
@picci thanks, but i need to get the ID first from a list. I guess I could wrap it into another pipe, but it seems a bit tedious. - leogoesger

1 Answers

0
votes

Why don't you do simply this?

const arraySrc$ = from(gauges).pipe(
    mergeMap(gauge => readCSVFile(gauge.id).pipe(
         mergeMap((csvStr: any) => readStringToArray(csvStr.data)),
         map((array: string[][]) => transposeArray(array)),
         mergeMap((array: number[][]) => forkJoin(uploadToDB(array, gauge.id)))
    )),
    catchError(error => of(`Bad Promise: ${error}`))
);

You can also wrap the inner observable in a function:

uploadCSVFilesFromGaugeID(gaugeID): Observable<void> {
     return readCSVFile(gaugeID).pipe(
         mergeMap((csvStr: any) => readStringToArray(csvStr.data)),
         map((array: string[][]) => transposeArray(array)),
         mergeMap((array: number[][]) => forkJoin(uploadToDB(array, gaugeID))
     );
}

In order to do this at the end:

const arraySrc$ = from(gauges).pipe(
    mergeMap(gauge => uploadCSVFileFromGaugeID(gauge.id)),
    catchError(error => of(`Bad Promise: ${error}`))
);