0
votes

I am trying to do a subscribe of observable which formed through multiple times with map operator to build that final result.

So let say If I have outer observable functionA() where we had subscribed to it,but there is functionB() : observable where data is coming through two observable and second observable is dependent to first one result, then how can we get the return the final mapped observable . please check code what I need to describe:-

functionB() : observable<any>{
return this.datastream1().map((objectA)=>{
this.datastream2(objectA).map((objectB)=>{
return [...objectA,...objectB]
})
})
}


functionA(){

this.functionB().subscribe((data)=>{
console.log(data);
})
}

So in this example while I am subscribing it then I am getting the observable in console instead of getting the spreaded data coming from both observables.

how can I make wait to resolve nested observable dependencies first while subscribing.In this scenario both observable should be resolved and then data should be returned back to subscribing function that is textfunctionA()

2
dataStream functions are async ?Manish Gowardipe
if yes then try to resolve the callback in a promise object instead of returning itManish Gowardipe
@Manish yes they are asynchronous returning observable stream.Aditya Vashishtha
@manish even if I subscribe the nested datastream2 inside then it won't work because subscribe will be call lately till the functionA() subscribe would have been done its work ,that's why I am getting observable in console.Aditya Vashishtha
there are few ways in RX module which we want to make the outer function wait for one callback to finish .. map a function returning observable for the inside one..and use it like a chain of functions... see [this answer (stackoverflow.com/questions/30519645/…)Manish Gowardipe

2 Answers

2
votes

A flatMap (or one of its siblings) will return a new stream, with one common .subscribe() at the end of the chain getting the business going. There probably is a neater method of including objectA in the resulting stream, but this should do the job:

functionB() : observable<any> {
    return this.datastream1().pipe(
        flatMap(objectA => combineLatest(of(objectA), this.datastream2(objectA))
    );
}
0
votes

I think you can use the concatMap operator, which internally does not subscribe to the next observable until the previous is completed.


const datastream1 = () =>
  of([{ value: 1 }, { value: 2 }, { value: 3 }]).pipe(delay(3000));


const datastream2 = dependentData => {
  const result = [
    { value: dependentData[0].value + 100 },
    { value: dependentData[1].value + 100 },
    { value: dependentData[2].value + 100 }
  ];
  return of(result);
};

const example = datastream1().pipe(
  concatMap(val1 => datastream2(val1).pipe(map(val2 => [...val1, ...val2])))
);

example.subscribe(val => {
  val.forEach(v => console.log("Value: " + v.value));
});

In the example above we delay the emits of datastream1 by 3 seconds. datastream2 though is waiting for the completion of datastream1 before it starts emitting. You can try it here.