1
votes

I am trying to create an RxJava BlockingObservable that will emit the value of a variable every X milliseconds until (condition == true) or a timeout occurs.

The code below seems close to what I want, but it always emits ONCE and then exits. What's odd is that I have a condition in takeUntil() which will NEVER be true -- I'd expect this observable to emit continuously and eventually time out, but it doesn't.

What am I missing/doing wrong here?

Observable.fromCallable(() -> getSendWindow())
        .sample(10, TimeUnit.MILLISECONDS)
        .timeout(30, TimeUnit.SECONDS)
        .takeUntil(sendWindow -> 1==2)
        .doOnError(throwable -> log.warn("Timed out waiting for send window to clear. Giving up."))
        .doOnCompleted(() -> {
            log.info("Send window cleared");
        })
        .toBlocking().forEach(sendWindow -> log.info("sendWindow={}, getSendWindow());
1
.sample does not do what you think it does. Sample rate limits the above Observable to (at most) once every 10 seconds. - Aron
According to the documentation: "The Sample operator periodically looks at an Observable and emits whichever item it has most recently emitted since the previous sampling." introtorx.com/Content/v1.0.10621.0/… Does that not imply that it's going to "sample" at a given interval and emit what it sees at that time? If not, what's the correct approach? - bitstream
That is correct. However, during the sampling window, no item has been emitted. Therefore there is no "most recently emitted" item. - Aron

1 Answers

2
votes

.sample does not do what you think it does. Sample rate limits the above Observable to (at most) once every 10 seconds.

Observable.fromCallable() only emits an event once, then completes.

.sample() waits 10 seconds and emits the last event (if there is one), every 10 seconds. Therefore it only emits one event, when you attach it to an Observable that only has one event. Then it completes.

What you probably actually want (I'm a .net programmer, so excuse my casing etc) is this.

Edit: Thanks for @akanokd for telling me that java uses interval for repeated events.

Observable.interval(10, timeUnit.MILLISECONDS)
    .map(x -> getSendWindow())
    .takeUntil(sendWindow -> 1==2)
    .doOnError(throwable -> log.warn("Timed out waiting for send window to clear. Giving up."))
     .doOnCompleted(() -> {
            log.info("Send window cleared");
        })
    .toBlocking().forEach(sendWindow -> log.info("sendWindow={}, getSendWindow());

Feel free to edit this answer with the API calls to the JAVA specific version...