0
votes

I receive an observable containing an array of objects. I want to partition the array into two based on a property of each object. I'd like to do this in such a way that the resulting arrays are in observables that can use the async pipe in the template, rather than having to manually manage subscriptions. I'm using RxJS 6 with Angular 9.

The trouble I'm having is in trying to partition based on individual elements while returning arrays to the destructuring assignment. I have tried variations of

public groupA: Observable<IItem[]>;
public groupB: Observable<IItem[]>;

...

// assigned values are not arrays
[this.groupA, this.groupB] = partition(
    this.service.item$.pipe(
        // split array into stream of its elements
        concatAll(),
    ),
    (item: IItem) => belongsInGroupA(item.myGroup)
);

and

[this.groupA, this.groupB] = partition(
    this.service.item$,
    (items: IItem[]) => {
        return ???;
        // How do I partition each item without first splitting into individual elements?
    }
);

StackBlitz demo

I know I could build the two arrays using something like map (or, more appropriately, tap), but I don't know how to auto-(un)subscribe using async with that, if it's even possible.

partition seems intended to divide streams of one item at a time, but my need feels like something well within the domain of Angular+RxJS and I just lack the understanding. Can you help?

5
what is the data here and what is expected output?Aakash Garg
Input and output are all observables of arrays of the same type of object. Output is the input split on one of the input objects' properties. Check the StackBlitz demo.nshew13
can you give data sample?Aakash Garg
When the component is destroyed the async pipe is automatically unsubscribed, so I believe you could use the tap strategy safely, should you choose to.robbieAreBest
My approach to tap is that I loop through the source array once and build the two output arrays as I go. I can't stream each individual element to *ngFor, and waiting until the arrays are built to use of() or from() seems foolish. In other words, I don't know how async applies. Am I missing something?nshew13

5 Answers

0
votes
private version1() {
    [this.groupCabbages, this.groupKings] = partition(
      new Observable((observer) => {
        source.subscribe((res: IItem[]) => {
          for(let i = 0; i< res.length; i++) {
            observer.next(res[i]);
          }
          observer.complete();
        })
      }),
      (item: IItem) => {
        return item.myGroup === 'cabbages';
      }
    );
    this.groupCabbages = this.groupCabbages.pipe(toArray());
    this.groupKings = this.groupKings.pipe(toArray());
    // elements are correctly sorted, but streamed one at a time
    this.groupCabbages.subscribe(res => {
      console.log('v1 cabbages', res);
    });
    this.groupKings.subscribe(res => {
      console.log('v1 kings', res);
    });
  }

Working Stackblitz :- https://stackblitz.com/edit/rxjs-cazbuc?devtoolsheight=33&file=index.ts

0
votes

You can transform your array elements to observable emits by mergeMap, then groupBy the property, and emit an arrays for each group.

Something like this perhaps:

private versionZ() {
   source
     .pipe(
       mergeMap((items: IItem[]) => from(items)),
       groupBy((item: IItem) => item.myGroup),
       mergeMap((group: Observable<any>) => group.pipe(toArray()))
     )
     .subscribe(t => console.log(t));
}

Result:

enter image description here

0
votes

You just have to apply toArray() operator

 this.groupCabbagesForView = this.groupCabbages.pipe(toArray())
 // in view 
 groupCabbagesForView | async

then it is consumable by the view async pipe. View will get the list of items instead of item by item.

0
votes

RxJS partition is used to split observable streams. You on the other hand need to create two different streams based on a condition applied to elements of each notification of the stream. If the number of elements in each emission is fairly limited and if you need to create only a limited number of observables, I'd say it'd be quicker to create multiple observables individually using the map operator.

Controller

ngOnInit() {
  this.cabbages$ = this.source$.pipe(
    map((items: IItem[]) =>
      items.filter((item: IItem) => item.myGroup === 'cabbages')
    )
  );

  this.kings$ = this.source$.pipe(
    map((items: IItem[]) =>
      items.filter((item: IItem) => item.myGroup === 'kings')
    )
  );
}

Template

<ng-container *ngIf="(cabbages$ | async) as cabbages">
  Cabbages:
  <div *ngFor="let cabbage of cabbages">
    {{ cabbage | json }}
  </div>
</ng-container>
<br>

<ng-container *ngIf="(kings$ | async) as kings">
  Kings:
  <div *ngFor="let king of kings">
    {{ king | json }}
  </div>
</ng-container>

Working example: Stackblitz

Note: Each async pipe would trigger a separate subscription to the source$ observable stream. If too many async pipes are needed it might lead to performance issues.

0
votes

If I understand your issue correctly, then the essence is that you want to obtain 2 observables with values derived from a single observable. The general way to do this would be to multicast the original, then use that to create the derived observables.

temp$ = obs1$.pipe(share());
obs2$ = temp$.pipe(map(value => ...));
obs3$ = temp$.pipe(map(value => ...));

So to apply this to your example:

temp$ = this.service.item$.pipe(
   map(arr => splitArr(arr, belongsInGroupA));
   // splitArr returns [arrA, arrB]
);


this.groupA = temp$.pipe(map(value => value[0]));
this.groupB = temp$.pipe(map(value => value[1]));

The remaining question is how to efficiently split the array, but this is normal array manipulation. If a single-pass is important then use reduce.