This is probably not the best way to do it, but it does the job for now, until someone comes up with a fancier method to resolve your use-case.
Please have a look at my test, I think it resolves your question:
Environment: (gradle -- groovy)
implementation "io.reactivex.rxjava2:rxjava:2.2.8"
testImplementation "org.junit.jupiter:junit-jupiter-api:5.7.0"
testImplementation "org.junit.jupiter:junit-jupiter-params:5.7.0"
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:5.7.0"
Test
3 emits are being made from the source observable. Each time a new value is emitted, the inner observable is subscribed to. When a new value is emitted, the inner observable completes and pushes the value downstream. Then the newly emitted value will be processed by subscribing to a new inner-stream.
@Test
public void takeWhileReduce() {
TestScheduler scheduler = new TestScheduler();
PublishSubject<Integer> source = PublishSubject.create();
Observable<Long> publish = source.publish(
multicast -> {
return multicast.flatMap(
o -> {
return Observable.interval(0, 500, TimeUnit.MILLISECONDS, scheduler) //
.takeUntil(multicast)
.reduce(Long::sum)
.toObservable();
},
1);
});
TestObserver<Long> test = publish.test();
source.onNext(42);
scheduler.advanceTimeBy(1500, TimeUnit.MILLISECONDS);
// action - push next value - flatMapped value will complete and push value
source.onNext(42);
// assert - values emitted: 0,1,2,3
test.assertValuesOnly(6L);
// next value is flatMapped
scheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
// action - push next value - flatMapped value will complete and push value
source.onNext(42);
// assert - values emitted: 0,1,2
test.assertValuesOnly(6L, 3L);
scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS);
// action - push next value - flatMapped value will complete and push value
source.onNext(42);
// assert - values emitted: 0,1
test.assertValuesOnly(6L, 3L, 1L);
}