0
votes

I have an observable (that is hot) that is doing things via system process, and I'd like to have an interval also running, until the process observable hits onComplete.

I see the interval operator: http://reactivex.io/documentation/operators/interval.html

How can I combine these two to get he behavior I want (in particular, cancel the interval when the other hits onComplete)?

1
Show your code please - shmakova

1 Answers

1
votes

You can use takeUntil() operator in order to cancel (unsubscribe) the interval operator, takeUntil() takes Observable as input, and cancel when input Observable emits an item.

What it's left is to transform the hot Observable to an Observable that emit an item with its onComplete(), so we can use it as input to takeUntil(), this is possible using materialize() operator, that emit Notification object for each Observable event (onNext(), onError(), onCompleted()), in conjunction with filter() to take only onCompleted() events.

Observable<Notification<Object>> hotOnCompleteObservable = 
            hot.materialize()
               .filter(notification -> notification.isOnCompleted());

Observable interval = ...
interval.takeUntil(hotOnCompleteObservable);