1
votes

So I have an Observable Data store with a private BehaviorSubject emitting items every time the store changes.

I also have a public method that returns an observable created from the BehaviorSubject that filters items and skip items based on the state.

I want to know how can I return an observable that completes itself after a certain condition is met without giving the responsibility to the consumer?

EDIT: The solution in this case is to have the consumers to use the .take(1) operator so that it completes after it emits the first time.

Here's some code relating to my question:

class Item {

  public id: string;
  public state: number;
};


@Injectable()
export class SomeDataService(){

  private let items   = [];                          //The store of items
  private let stream$ = new BehaviorSubject<Item[]>; //The endless stream of items.

  constructor( someService: SomeOtherService ){

    this.someService.items$.subscribe( items => {

      this.items = items;
      this.stream$.next( items );
    });
  };


  //
  public getObservable( filterID: string ): Observable<Item> {

    return this.$stream.asObservable().map( items => {

      //Find the item in the list and return it
      return items.find( item => {
        return item.id === filterID;
      });
    }).flatMap( item => {

      if( item && item.state === 3 ) { //arbitrary number
        throw Observable.throw( item );
      }

      //Transform the item and such...

      return Observable.of( item );
    }).skipWhile( item => {

      return item && item.state !== 1;
    });
  };
};

//Some other file to consume the service

this.someDataService.getObservable( 'uniqueID' ).finally( () => {
  console.log("Now this gets printed when it emits the first time and on errors.");
}).subscribe();

/*
  //Previous attempt
  this.someDataService.getObservable( 'uniqueID' ).finally( () => {

    console.log("I will never print this as the observable never completes.");
  }).subscribe();
*/
2
you can try piping the takeUntil operator. - Chau Tran
Something to note, your getObservable() is spelled getObervable() -> while that may just be a typo from copying/paste you can clean it up to improve your question. - ShellNinja

2 Answers

1
votes

No clue why you are using a BehaviorSubject when you have your items stored in private const items = []; And I personally don't like when a service 'do the logic'. Anyway the problem is in the skipWhile.

If you have for example this items assigned to your item property :

const itemsO: Item[] = [
    {
        id: '0',
        state: 1
    },
    {
        id: '1',
        state: 1
    },
    {
        id: '2',
        state: 1
    },
    {
        id: '0',
        state: 1
    },
];

and call the getObservable function with '0' as param, the code will works fine, because the skipWhile will find the predicate as true, so it will do the work.

I've done this to your code and tried out, and it works fine :

Service :

import { Injectable } from '@angular/core';
import { Observable } from 'rxjs/Observable';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
import 'rxjs/add/observable/of';
import 'rxjs/add/operator/mergeMap';
import 'rxjs/add/operator/map';
import 'rxjs/add/operator/skipWhile';
class Item {

    public id: string;
    public state: number;
}

const itemsO: Item[] = [
    {
        id: '0',
        state: 1
    },
    {
        id: '1',
        state: 1
    },
    {
        id: '2',
        state: 1
    },
    {
        id: '0',
        state: 1
    },
];

@Injectable()
export class SomeDataService {

    private items = [];
    private stream$: BehaviorSubject<Item[]> = new BehaviorSubject<Item[]>(null);

    constructor() {
        this.items = itemsO;
        this.stream$.next(itemsO);
    }

    public getObservable(filterID: string): Observable<Item> {
        return this.stream$.asObservable().map(items => {

            // Find the item in the list and return it
            return items.find(item => {
                return item.id === filterID;
            });
        }).flatMap(item => {

            if (item && item.state === 3) { // arbitrary number
                throw Observable.throw(item);
            }

            // Transform the item and such...

            return Observable.of(item);
        }).skipWhile(item => {
            return item.state !== 1;
        });
    }
}

On main component :

constructor(a: SomeDataService) {
        a.getObservable('0')
            .subscribe(evt => {
                console.log(evt);
                // You will see something in console
            });
    }

I don't know what you are trying to achieve, but the problem is only on that skipWhile. If you want to discard data or emit an empty array if the status !== 1 just use another operator

0
votes

So the solution here is to have the consumers use the .take(1) operator in order to complete after the first item has been emitted.

this.someDataService.getObservable( 'uniqueID' ).finally( () => {
  console.log("Now this gets printed when it emits the first time and on errors.");
}).subscribe();

/*
  //Previous attempt
  this.someDataService.getObservable( 'uniqueID' ).finally( () => {

    console.log("I will never print this as the observable never completes.");
  }).subscribe();
*/