0
votes

I have a list that is being updated from a external source. What I want is an observable which emit individual items with a fixed delay as more items added to the list. I want to do this with RXJava. This is sample sudo code I've came up with:

class ItemDispatcher {

  private List<Item> items = new ArrayList();
  private Observable<Item> observable = <some observable>

  private void addItems(List<Item> newItems) {
      items.addAll(newItems);
      // dispatch new items to observable and remove items as they are dispatched 
      //(Items needs to be emitted with 500 milliseconds delay)
  }

  public Observable<Item> getObservable() {
      return observable;
  }
}


//caller
ItemDispatcher dispatcher = new ItemDispatcher();

// when someone calls dispatcher.addItems(...) those items needs to be emitted to subscriber with 500 milliseconds delay

dispatcher.getObservable().subscribe() item -> {
  System.out.println("I'm getting items one by one as they are added to list with 500 millisecond delay")
}
1
Why do you want that list if you keep removing from it. Why not dispatch directly? - akarnokd
Yes @akarnokd agree with you. list is redundant - Nilu

1 Answers

1
votes

Try this

class ItemDispatcher {

    private List<Item> items = Collections.synchronizedList(new ArrayList());
    private PublishSubject<List<Item>> itemsSubject = PublishSubject.create();

    private void addItems(List<Item> newItems) {
        items.addAll(newItems);
        itemsSubject.onNext(newItems);
    }

    public Observable<Item> getObservable() {
        return itemsSubject.delay(500, TimeUnit.MILLISECONDS)
                .doOnNext(newItems -> items.removeAll(newItems))
                .flatMapIterable(newItems -> newItems);
    }
}

Also, if you don't need items elsewhere, you can remove items field