5
votes

I am trying to create a Flowable which emits events respecting backpressure to avoid memory issues, while running each stage of transformation in parallel for efficiency. I have created a simple test program to reason about the behavior of the different steps of my program and when events are being emitted vs. waiting on different stages.

My program is as follows:

public static void main(String[] args) throws ExecutionException, InterruptedException {
  Stream<Integer> ints = IntStream.range(0, 1000).boxed().collect(Collectors.toList())
      .stream().map(i -> {
        System.out.println("emitting:" + i);
        return i;
      });

  Flowable<Integer> flowable = Flowable.fromIterable(() -> ints.iterator());
  System.out.println(String.format("Buffer size: %d", flowable.bufferSize()));

  Long count = flowable.onBackpressureBuffer(10)
      .buffer(10)
      .flatMap(buf -> {
        System.out.println("Sleeping 500 for batch");
        Thread.sleep(500);
        System.out.println("Got batch of events");
        return Flowable.fromIterable(buf);
      }, 1)
      .map(x -> x + 1)
      .doOnNext(i -> {
        System.out.println(String.format("Sleeping : %d", i));
        Thread.sleep(100);
        System.out.println(i);
      })
      .count()
      .blockingGet();

  System.out.println("count: " + count);
}

When I run this, I get output that respects backpressure as expected, where a batch of events is emmited up to the size in buffer, then they are flatmapped, and finally some action is taken where they are printed one-by-one:

Buffer size: 128
emitting:0
emitting:1
emitting:2
emitting:3
emitting:4
emitting:5
emitting:6
emitting:7
emitting:8
emitting:9
Sleeping 500 for batch
Got batch of events
Sleeping : 1
1
Sleeping : 2
2
Sleeping : 3
3
Sleeping : 4
4
Sleeping : 5
5
Sleeping : 6
6
Sleeping : 7
7
Sleeping : 8
8
Sleeping : 9
9
Sleeping : 10
10
emitting:10
emitting:11
emitting:12
emitting:13
emitting:14
emitting:15
emitting:16
emitting:17
emitting:18
emitting:19
Sleeping 500 for batch
Got batch of events
Sleeping : 11
11
Sleeping : 12
12
Sleeping : 13

However if I attempt to parallelize the different stages of operation here by adding some calls to .observeOn(Schedulers.computation()) then it seems like my program no longer respects backpressure. My code now looks like:

public static void main(String[] args) throws ExecutionException, InterruptedException {
  Stream<Integer> ints = IntStream.range(0, 1000).boxed().collect(Collectors.toList())
      .stream().map(i -> {
        System.out.println("emitting:" + i);
        return i;
      });

  Flowable<Integer> flowable = Flowable.fromIterable(() -> ints.iterator());
  System.out.println(String.format("Buffer size: %d", flowable.bufferSize()));

  Long count = flowable.onBackpressureBuffer(10)
      .buffer(10)
      .observeOn(Schedulers.computation())
      .flatMap(buf -> {
        System.out.println("Sleeping 500 for batch");
        Thread.sleep(500);
        System.out.println("Got batch of events");
        return Flowable.fromIterable(buf);
      }, 1)
      .map(x -> x + 1)
      .observeOn(Schedulers.computation())
      .doOnNext(i -> {
        System.out.println(String.format("Sleeping : %d", i));
        Thread.sleep(100);
        System.out.println(i);
      })
      .observeOn(Schedulers.computation())
      .count()
      .blockingGet();

  System.out.println("count: " + count);
}

And my output is the following, where all of my events are emitted upfront instead of respecting the backpressure and buffers specified by the various stages of execution:

Buffer size: 128
emitting:0
emitting:1
emitting:2
emitting:3
emitting:4
emitting:5
emitting:6
emitting:7
emitting:8
emitting:9
emitting:10
Sleeping 500 for batch
emitting:11
emitting:12
... everything else is emitted here ...
emitting:998
emitting:999
Got batch of events
Sleeping 500 for batch
Sleeping : 1
1
Sleeping : 2
2
Sleeping : 3
3
Sleeping : 4
4
Sleeping : 5
Got batch of events
Sleeping 500 for batch
5
Sleeping : 6
6
Sleeping : 7
7
Sleeping : 8
8
Sleeping : 9
9
Sleeping : 10
Got batch of events
Sleeping 500 for batch
10
Sleeping : 11
11
Sleeping : 12
12
Sleeping : 13
13
Sleeping : 14
14
Sleeping : 15
Got batch of events
Sleeping 500 for batch
15
Sleeping : 16
16
Sleeping : 17
17
Sleeping : 18
18
Sleeping : 19
19
Sleeping : 20
Got batch of events
Sleeping 500 for batch
20
Sleeping : 21
21
Sleeping : 22
22
Sleeping : 23
23
Sleeping : 24
24
Sleeping : 25
Got batch of events
Sleeping 500 for batch
25

Pretend my stages of batching are calling out to external services, but that I want them to run in parallel because of latency. I also want to have control of the number of items in memory at a given time because the number of items emitted initially could be highly variable, and the stages operating on batches run much slower than the initial emission of events.

How can I have my Flowable respect backpressure across a Scheduler? Why does it seem to only disrespect backpressure when I sprinkle in calls to observeOn?

1

1 Answers

4
votes

How can I have my Flowable respect backpressure across a Scheduler

Actually, applying onBackpressureBuffer makes the source above it disconnect from any backpressure applied by downstream as it is an unbounded-in operator. You don't need it because Flowable.fromIterable (and by the way, RxJava has a range operator) supports and honors backpressure.

Why does it seem to only disrespect backpressure when I sprinkle in calls to observeOn?

In the first example, there is a natural backpressure happening called call-stack blocking. RxJava is synchronous by default and most operators don't introduce asynchrony, just like none do in the first example.

observeOn introduces an asynchronous boundary thus in theory, stages can run in parallel with each other. It has a default 128 element prefetch buffer which can be adjusted via one of its overloads. In your case, however, buffer(10) will actually amplify the prefetch amount to 1280 which may still lead to the complete consumption of your 1000 element long source in one go.