1
votes

reduce operator emits value at the end of the observable (when completed).

I'm looking for a way to use reduce inside a switchmap. I want the sum of the infinite internal observable values when outer observable emits values or complete.

@Test
public void emit_value_when_switchmap() throws InterruptedException {

    Observable.interval(0, 2000, TimeUnit.MILLISECONDS)
            .switchMapMaybe(
                    l -> Observable.interval(0, 500, TimeUnit.MILLISECONDS)
                            .reduce(Long::sum)
                            .map(a -> a + ": Final")
            )
            .subscribe(e -> System.out.println(e));


    Thread.sleep(10000);
}

This diagram illustrates the wanted behavior :

//events: --------x-----1----2---1---x-----3--0--------x-1---1----|  
//result: ---------------------------4-----------------3----------2  
1

1 Answers

1
votes

This is probably not the best way to do it, but it does the job for now, until someone comes up with a fancier method to resolve your use-case.

Please have a look at my test, I think it resolves your question:

Environment: (gradle -- groovy)

implementation "io.reactivex.rxjava2:rxjava:2.2.8"
testImplementation "org.junit.jupiter:junit-jupiter-api:5.7.0"
testImplementation "org.junit.jupiter:junit-jupiter-params:5.7.0"
testRuntimeOnly "org.junit.jupiter:junit-jupiter-engine:5.7.0"

Test 3 emits are being made from the source observable. Each time a new value is emitted, the inner observable is subscribed to. When a new value is emitted, the inner observable completes and pushes the value downstream. Then the newly emitted value will be processed by subscribing to a new inner-stream.

  @Test
  public void takeWhileReduce() {
    TestScheduler scheduler = new TestScheduler();
    PublishSubject<Integer> source = PublishSubject.create();

    Observable<Long> publish = source.publish(
        multicast -> {
          return multicast.flatMap(
              o -> {
                return Observable.interval(0, 500, TimeUnit.MILLISECONDS, scheduler) //
                    .takeUntil(multicast)
                    .reduce(Long::sum)
                    .toObservable();
              },
              1);
        });

    TestObserver<Long> test = publish.test();

    source.onNext(42);

    scheduler.advanceTimeBy(1500, TimeUnit.MILLISECONDS);

    // action - push next value - flatMapped value will complete and push value
    source.onNext(42);
    // assert - values emitted: 0,1,2,3
    test.assertValuesOnly(6L);

    // next value is flatMapped
    scheduler.advanceTimeBy(1000, TimeUnit.MILLISECONDS);
    // action - push next value - flatMapped value will complete and push value
    source.onNext(42);

    // assert - values emitted: 0,1,2
    test.assertValuesOnly(6L, 3L);

    scheduler.advanceTimeBy(500, TimeUnit.MILLISECONDS);

    // action - push next value - flatMapped value will complete and push value
    source.onNext(42);

    // assert - values emitted: 0,1
    test.assertValuesOnly(6L, 3L, 1L);
  }