3
votes

I am trying to make a flowable with a backpressure. My idea is that new item of the flowable won't be emitted until one of the current items finishes its processing. I am using a ResourceSubscriber and subscribeWith() method to achieve that.

Each element of the flowable is being processed asynchronously on a separate thread pool. (Which I achieve by using flatMap/subscribeOn)

I expect that each element after second will be emitted AFTER onNext method of the subscriber called. However when I am trying to run this code the Flowable emits elements uncontrollably. The backpressure dosn't work.

There is the code to reproduce the issue:

import io.reactivex.Flowable;
import io.reactivex.schedulers.Schedulers;
import io.reactivex.subscribers.ResourceSubscriber;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicInteger;

public class RxTest2 {

    private static final Logger log = LoggerFactory.getLogger(RxTest.class);

    static AtomicInteger integer = new AtomicInteger();

    public static void main(String[] args) {
        Flowable.generate(emitter -> {
            final int i1 = integer.incrementAndGet();
            if (i1 >= 20) {
                Thread.sleep(10000);
                System.exit(0);
            }
            emitter.onNext(i1);
        })
                .doOnNext(i -> log.info("Published: " + i))
                .flatMap(i -> Flowable.defer(() -> {
                    log.info("Starting consuming {}", i);
                    Thread.sleep(100);
                    log.info("Finished consuming {}", i);
                    return Flowable.just(i);
                }).subscribeOn(Schedulers.computation()))
                .doOnNext(i -> log.info("Consuming finished, result: " + i))
                .subscribeWith(new BackpressureSubscriber(2));
    }
}

class BackpressureSubscriber extends ResourceSubscriber<Object> {

    private static final Logger log = LoggerFactory.getLogger(BackpressureSubscriber.class);

    private final long initialRequest;

    public BackpressureSubscriber(final long initialRequest) {
        this.initialRequest = initialRequest;
    }

    @Override
    protected void onStart() {
        super.onStart();
        log.info("Starting execution with {} initial requests", initialRequest);
        request(initialRequest);
    }

    @Override
    public void onNext(final Object message) {
        log.info("On next for {}", message);
        request(1);
    }

    @Override
    public void onError(final Throwable throwable) {
        log.error("Unhandled error: ", throwable);
    }

    @Override
    public void onComplete() {
        log.info("On Complete");
    }
}

Expected output something like:

[main] INFO RxTest - Published: 1
[main] INFO RxTest - Published: 2
[RxComputationThreadPool-1] INFO RxTest - Starting consuming 1
[RxComputationThreadPool-1] INFO RxTest - Finished consuming 1
[RxComputationThreadPool-2] INFO RxTest - Starting consuming 2
[RxComputationThreadPool-1] INFO RxTest -  On next for 1
[main] INFO RxTest - Published: 3
[RxComputationThreadPool-1] INFO RxTest - Finished consuming 2

Actual Output:

11:30:32.166 [main] INFO BackpressureSubscriber - Starting execution with 2 initial requests
11:30:32.170 [main] INFO RxTest - Published: 1
11:30:32.189 [main] INFO RxTest - Published: 2
11:30:32.189 [RxComputationThreadPool-1] INFO RxTest - Starting consuming 1
11:30:32.189 [RxComputationThreadPool-2] INFO RxTest - Starting consuming 2
11:30:32.189 [main] INFO RxTest - Published: 3
11:30:32.190 [main] INFO RxTest - Published: 4
11:30:32.190 [RxComputationThreadPool-3] INFO RxTest - Starting consuming 3
11:30:32.190 [main] INFO RxTest - Published: 5
11:30:32.190 [RxComputationThreadPool-4] INFO RxTest - Starting consuming 4
11:30:32.190 [main] INFO RxTest - Published: 6
11:30:32.190 [RxComputationThreadPool-5] INFO RxTest - Starting consuming 5
11:30:32.190 [main] INFO RxTest - Published: 7
11:30:32.191 [RxComputationThreadPool-6] INFO RxTest - Starting consuming 6
11:30:32.191 [main] INFO RxTest - Published: 8
11:30:32.191 [RxComputationThreadPool-7] INFO RxTest - Starting consuming 7
11:30:32.191 [main] INFO RxTest - Published: 9
11:30:32.191 [RxComputationThreadPool-8] INFO RxTest - Starting consuming 8
11:30:32.191 [main] INFO RxTest - Published: 10
11:30:32.191 [RxComputationThreadPool-9] INFO RxTest - Starting consuming 9
11:30:32.191 [main] INFO RxTest - Published: 11
11:30:32.191 [RxComputationThreadPool-10] INFO RxTest - Starting consuming 10
11:30:32.192 [main] INFO RxTest - Published: 12
11:30:32.192 [RxComputationThreadPool-11] INFO RxTest - Starting consuming 11
11:30:32.192 [main] INFO RxTest - Published: 13
11:30:32.192 [main] INFO RxTest - Published: 14
11:30:32.192 [RxComputationThreadPool-12] INFO RxTest - Starting consuming 12
11:30:32.192 [main] INFO RxTest - Published: 15
11:30:32.192 [main] INFO RxTest - Published: 16
11:30:32.192 [main] INFO RxTest - Published: 17
11:30:32.192 [main] INFO RxTest - Published: 18
11:30:32.192 [main] INFO RxTest - Published: 19
11:30:32.294 [RxComputationThreadPool-2] INFO RxTest - Finished consuming 2
11:30:32.294 [RxComputationThreadPool-1] INFO RxTest - Finished consuming 1
11:30:32.294 [RxComputationThreadPool-1] INFO RxTest - Consuming finished, result: 1
11:30:32.294 [RxComputationThreadPool-1] INFO BackpressureSubscriber - On next for 1

Tested on libraries versions:

2.2.19 2.1.2

As far as I understand ReactiveX documentation I think it is RX Bug. However I might be wrong and would be grateful if you point out

1

1 Answers

3
votes

flatMap actually requests from upstream in batches and will buffer items until downstream requests them. That fact is sufficient to describe the behaviour you are seeing. If you had set bufferSize to 1 you might see the behaviour you expected. There is an overload that lets you set bufferSize.

In addition flatMap has a maxConcurrent parameter which is easier to understand if you realize that flatMap is effectively a map, then a merge is applied to the stream of streams given by the map. The merge can only realistically subscribe to a limited number of sources at a time and that is maxConcurrent. Default for bufferSize and maxConcurrent is 128.

Bear in mind here that when the merge step receives a request from downstream it has no idea how many streams (remember we are dealing with a stream of streams here) it will need to subscribe to to fulfill the request! The first 10 streams could return no values at all. If the first stream returns nothing and doesn't complete for 1 hour and we have maxConcurrent=1 then we will receive no events at all for that first hour even though stream 2 and stream 3 were ready to send us stuff. For reasons like these we have to choose all-purpose defaults for bufferSize and maxConcurrent and the values are normally chosen that optimize performance in certain benchmark cases and minimize problems for many edge-cases.