1
votes

I have a fairly complicated reactive behavior which I would like to achieve using RxJS but I didn't find a proper solution yet.

In my angular app which can be found in Plnkr I have a series of sync, async and parallel async execution.

  1. First the user clicks on a button which calls next on my RxJS Subject userClick$ = new Subject<void>();.

    <button (click)="model.on = !model.on; userClick$.next()"> {{ model.on ? 'Stop' : 'Start' }} </button>

  2. then I provide a random number using getRandomNumber
  3. then I calculate the exponential of that random number using getExpNumber
  4. Finally I need to run getFloor and getCeil all in parallel.

What I need?

  • Be able to use forkJoin on the last two parallel executions #4.
  • Be able to express dependency of #3 on #2 and #4 on #3:
    • getExpNumber depends on getRandomNumber and run after it
    • getFloor runs in parallel with getCeil and both depend on getExpNumber.
  • Be able to use the return value of the stream in each of my flatMap, currently I get only the last value (I need to display the random value (first flatMap) to the user using ngFor and async pipe.

Depencency: I can acheive that by checking the type of my current observable value in each flatMap but I am thinking there must be a better way.

Full code:

import { Component, NgModule, VERSION } from '@angular/core';
import { BrowserModule } from '@angular/platform-browser';
import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';
import 'rxjs/add/operator/filter';
import 'rxjs/add/operator/do';
import 'rxjs/add/operator/switchMap';

@Component({
  selector: 'my-app',
  template: `
    <button (click)="model.on = !model.on; userClick$.next()">{{ model.on ? 'Stop' : 'Start' }}</button>
    <h1>Numbers stream</h1>
    <div *ngFor="let numberValue of (numbers$ | async)">
      <h2>{{ numberValue }}</h2>
    </div>
    <label>{{ model.log }}</label>
  `,
})
export class App {
  model = {
    on: false,
    log: ''
  }
  userClick$ = new Subject<void>();
  numbers$: Observable<number[]> = this.userClick$
    .filter(() => !!this.model.on)
    .do(() => this.model.log = '')
    .switchMap(() => this.getRandomNumber())
    .switchMap((num) => this.getExpNumber(num))
    .switchMap((num) => this.getFloor(num))
    .switchMap((num) => this.getCeil(num));
  constructor() {
  }
  /**
   * Runs after user click
   */
  getRandomNumber(): Observable<number[]> {
    return new Observable<number[]>(observer => {
      this.model.log += ' getRandomNumber';
      observer.next([Math.floor(Math.random() * 10) + 1]);

      return () => {}
    }
  }
  /**
   * Depends on getRandomNumber and run after it
   */
  getExpNumber(n: number): Observable<number[]> {
    return new Observable<number[]>(observer => {
      this.model.log += ' getExpNumber';
      observer.next([Math.floor(Math.exp(n)]);

      return () => {}
    }
  }
  /**
   * Runs in parallel with getCeil
   */
  getFloor(n: number): Observable<number[]> {
    return new Observable<number[]>(observer => {
      this.model.log += ' getFloor';
      observer.next([Math.floor(n)]);

      return () => {}
    }
  }
  /**
   * Runs in parallel with getFloor
   */
  getCeil(n: number): Observable<number[]> {
    return new Observable<number[]>(observer => {
      this.model.log += ' getCeil';
      observer.next([Math.ceil(n)]);

      return () => {}
    }
  }
}
@NgModule({
  imports: [ BrowserModule ],
  declarations: [ App ],
  bootstrap: [ App ]
})
export class AppModule {}
2

2 Answers

1
votes

it's a strange for me the way you create the observables (I'll use "of") and I'll received number, not array, so the code is like

@Component({
  selector: 'my-app',
  template: `
    <button (click)="model.on = !model.on; userClick$.next()">{{ model.on ? 'Stop' : 'Start' }}</button>
    <h1>Numbers stream</h1>
    <h2 *ngFor="let numberValue of (numbers$ |async)">{{numberValue}}</h2>
    <label>{{ model.log }}</label>
  `,
})
export class HomeComponent {
  model = {
    on: false,
    log: ''
  }
  userClick$ = new Subject<void>();
  numbers$: Observable<number[]> = this.userClick$
    .switchMap(() => {
      return this.getRandomNumber().switchMap((num: number) => {
        this.model.log += num;
        return this.getExpNumber(num).switchMap((num2: number) => {
          this.model.log += num2;
          return forkJoin(this.getFloor(num2), this.getCeil(num2))
        })
      })
    })


  getRandomNumber(): Observable<number> {
    this.model.log += ' getRandomNumber';
    return of((Math.random() * 10) + 1);
  }
  getExpNumber(n: number): Observable<number> {
    this.model.log += ' getExpNumber';
    return of(Math.exp(n));
  }
  getFloor(n: number): Observable<number> {
    this.model.log += ' getFloor';
    return of((Math.floor(n)));
  }

  getCeil(n: number): Observable<number> {
    this.model.log += ' getCeil';
    return of(Math.ceil(n));
  }
}
  1. Not use "do", do is for check a response
  2. If I make the observable like you, the forkJoin don't work me (call the function but, I can't subscribe them)
1
votes

If you only want to getRandomNumber you can subscrib only to a getRandomNumber.

this.getRandomNumber().subscribe((value:number)=>{console.log(value)}) 

NOTE: when we write

<div *ngFor="let value of ($numbers |async)>{{value}}</div>

Is "like"

//in .html
<div *ngFor="let value of numberList>{{value}}</div>
//in .ts
numberList:number[]
subscription:Subscription;

ngOnInit()
{
  this.subscription=$numbers.subscribe(values:number[]=>{
      this.numberList=values;
      this.subscription.unsubscribe()
  }
}