2
votes

We have a need to "clock" one hot observable based on another one, e.g.:

 main:            --(1)-----(2)-----(3)-----(4)-------(5)----(6)------------|
 clock:           -------(X)------------(X)------(X)--------------(X)-------|
______________________________________________________________________________

 expected result: -------(1)(2)-----(3)-(3)-(4)--(4)--(5)----(6)--(6)-------|

More specifically, whenever the "clock" emits an item, the last item from the other observable is emitted again. Item emission is delayed until both observables have emitted at least one item. Right now, we achieve this in the following fashion:

  <T, U> Observable<T> clock(Observable<T> main, Observable<U> clock) {
    return Observable.combineLatest(main, clock, (mainItem, clockItem) -> mainItem);
  }

This seems somewhat silly, as we are applying a transformation and then just discard one of the inputs. Also, we are having occasional back pressure troubles with this sort of solution. It seems there would be an existing Rx operator that performs this operation, but so far I have not found the right API method that does exactly this. Is there a better, more idiomatic way of solving this problem with RxJava?

1

1 Answers

2
votes

Looks fine to me. It's not that the clock input hasn't been used because though the value has not been used the timing has.

For handling backpressure on a hot source you might want to apply one of the onBackpressureXXX operators. For example if you don't want to miss outputs then use .onBackpressureBuffer(). Another strategy is to use .throttle or .sample.