1
votes

I'm trying to combine two Observables and emit an event based on the result of one or both.

Oversimplified code:

export class MyDirective {
  // Filter properties
  filter = new ReplaySubject<any>();
}

export class MyComponent implements AfterContentInit {
  // First observable (my own creation)
  @ContentChildren(MyDirective) columns: MyDirective[];
  filterChange = Observable.merge(...this.columns.map(c => c.filter));

  // Second observable (EventEmitter from a component library)
  @ViewChild(MdSort) sort: MdSort;

  ngAfterContentInit() {
    // Now merge them and call a function with the results from both
    Observable.xxx([ this.sort.sortChange, this.filterChange ]).subscribe(res => {
      this.readData(res.sort, res.filter);
    });
  }

  readData(sort?: any, filter?: any) {
    // This method should receive the results from the observables
  }
}

Solution attempts:

I've tried different things to merge these:

Observable.merge

Observable.merge(this.sort.sortChange, this.filterChange).subscribe(res => {
  // Gives me no clue as to which of the observables has been fired
  // and thus I cannot process properties 
});

Observable.zip

Observable.zip(this.sort.sortChange, this.filterChange, (sort, filter) => {
  // I never reach this spot. Why?
  this.readData(sort, filter);
});

Subscribing to both

This works, but I'm not happy with this. I would like this to be done using Observable merging somehow...

// On any changes, read data
let sort; let filter;
this.sort.sortChange.subscribe(s => sort = s);
this.filterChange.subscribe(f => filter = f);
this.Observable.merge(this.sortChange, this.filterChange).subscribe(result => {
  // Fired whenever any of the two are changed
  this.readData(sort, filter);
});
1
Are you looking for combineLatest? I’d recommend looking at e.g. rxmarbles.comjonrsharpe
Or maybe worth looking at could be .mergeMap operatorfubbe
combineLatest requires both observables to be fired before it collects a result? If so, I cannot use that. But thanks for the suggestion. mergeMap is a transformer, right? So I need to merge the two observables first before I can run the transformer on it. What should I use to combine the observables in the first place?Øystein Amundsen

1 Answers

1
votes

You can use distinctUntilchanged() with map() or mapTo() to give you information of which stream has emitted value

this.Observable.merge(this.sortChange.distinUntilChanged().mapTo('sort'), 
this.filterChange.distinUntilChanged().mapTo('filter').subscribe(result => {
  // Fired whenever any of the two are changed
 return this.readData(sort, filter);
});

Depends on whether sortchange or filterChange emit a single value you might have to apply a custom comparer logic for distinctUntilchanged to work. E.g

var source = Rx.Observable.of({value: 42}, {value: 42}, {value: 24}, {value: 24})
 .distinctUntilChanged(function (x) { return x.value; }, function (a,b) { 
return a !== b; });

ref:https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/distinctuntilchanged.md