1
votes

I receive a stream of data as response from a service(grpc) like 5 streams in within 2 secs. Each stream is an observable object that I'm subscribing. The processing logic once I receive each stream is more, and each stream is having some heavy json object that contains like base64 encoded byte string, involves json to complex types conversion and more conditional logic. So by the time the processing logic for one stream is done i couldn't verify other streams as it getting processed very fast (close like concurrent) and getting missed. So I need to aggregate all these Y(e.g 5) streams after listening for X secs into an observable array. Below is code snippet.

MyTestService.ts:

   @Injectable()
     export class MyTestService {    
            client: GrpcMyTestServiceClient;
            public readonly coreData$: Observable<TestReply.AsObject>;

  constructor(private readonly http: HttpClient) {
        this.client = new GrpcMyTestServiceClient(environment.apiProxyUri);
        this.coreData$ = this.listCoreStream();
    }

listCoreStream(): Observable<TestReply.AsObject> {    
    return new Observable(obs => {       
     const req = new SomeRequest(); 
     //Get stream data from service(grpc)
     const stream = this.client.getCoreUpdates(req);
 
     stream.on('data', (message: any) => {        
        obs.next(message.toObject() as TestReply.AsObject);
      });
    });
}

MyComponent.ts

public testReply?: TestReply.AsObject;   
private _subscription: Subscription;
  constructor(private readonly _MyTestService: MyTestService) {     
  this._subscription = new Subscription();
}

  ngOnInit(): void {   
     this._subscription = this._MyTestService.coreData$.subscribe((data) => {
          if (data) {
                let obj = JSON.parse(data);
                //processing logic: condition checks, filtering based on child types,dynamic binding of styles, etc..
            }
        });
    }

As the data is too fast in short span, not all the records or processed. Its looks like synchronization issue which is not possible in web world, where the last stream that satisfies some condition overwrites the previous stream. To avoid this and to process all the stream one at a time, I need to combine/merge all the streams into an array so that I can iterate each of them inside the component that subscribes the observable. Stream data ordering doesn't matter.

I was trying to use rxjs operators like timer, mergemap, concatmap, scan, merge. But still new to these couldn't figure out the right way to do. Below is one such try using scan but couldn't get the desired results and the array is having empty values and not sure how to use that array from console.log. Any pointers would be much helpful, please suggest.

Solution trying:

let temp: TestReply.AsObject[] = [];
let test = this._MyTestService.coreData$
  .pipe(
    mergeMap(_ => timer(5000)),
    scan<any>((allResponses, currentResponse) =>
      [...allResponses, currentResponse], this.temp),
  ).subscribe(console.log);
1

1 Answers

0
votes

This is my solution, push all the arrays together using a next block in the subscribe and then finally perform the actions in the complete block.

MyTestService.ts:

   @Injectable()
     export class MyTestService {    
            client: GrpcMyTestServiceClient;
            public readonly coreData$: Observable<TestReply.AsObject>;

  constructor(private readonly http: HttpClient) {
        this.client = new GrpcMyTestServiceClient(environment.apiProxyUri);
        this.coreData$ = this.listCoreStream();
    }

listCoreStream(): Observable<TestReply.AsObject> {    
    return new Observable(obs => {       
     const req = new SomeRequest(); 
     //Get stream data from service(grpc)
     const stream = this.client.getCoreUpdates(req);
 
     stream.on('data', (message: any) => {        
        obs.next(message.toObject() as TestReply.AsObject);
      });
    });

    stream.on('end', (message: any) => {        
        obs.complete();
      });
    });
}

MyComponent.ts

public testReply?: TestReply.AsObject;   
public dataArray = [];
private _subscription: Subscription;
  constructor(private readonly _MyTestService: MyTestService) {     
  this._subscription = new Subscription();
}

  ngOnInit(): void {   
     this._subscription = this._MyTestService.coreData$.subscribe({
             next: (data) => {
                 if (data) {
                     this.dataArray.push(JSON.parse(data));
                 } 
             },
             complete: () => {
                // this.dataArray
                // above will be your final data
                // processing logic: condition checks, filtering based on child types,dynamic binding of styles, etc..
             })
  }