5
votes

recently I realized that I don't understand how RxJava2 backpressure works.

I made small test and I expect that it should fail with MissingBackpressureException exception:

@Test
public void testBackpressureWillFail() {
    Observable.<Integer>create(e -> {
        for (int i = 0; i < 10000; i++) {
            System.out.println("Emit: " + i);
            e.onNext(i);
        }
        e.onComplete();
    })
    .subscribeOn(Schedulers.newThread())
    .observeOn(Schedulers.computation())
    .doOnNext(i -> {
        Thread.sleep(100);
        System.out.println("Processed:" + i);
    })
    .blockingSubscribe();
}

System out shows next:

Emit: 0
Emit: 1
Emit: 2
...
Emit: 10000

Processed:0
Processed:1
Processed:2
...
Processed:10000

Why it doesn't produce MissingBackpressureException.

I expect that e.onNext(i); will put item into buffer of ObservableObserveOn and after it's size is greater than static final int BUFFER_SIZE = Math.max(16,Integer.getInteger("rx2.buffer-size",128).intValue());

It should throw MissingBackpressureException which doesn't happen. Does the buffer automatically grow? If not where are items stored?

1
Observables in RxJava2 don't support backpressure, only Flowables do - Tassos Bassoukos
I know that, they doesn't support backpreassure, but I thought that doesn't support means that MissingBackpressureException would be thrown, not auto grow of buffer. - Rostyslav Roshak
@RostyslavRoshak "Observable doesn't support backpressure" means that when there is a source emitting items faster than the consumer can handle them, these items are buffered unboundedly and it will be the case until an OutOfMemory exception is thrown because of the resources' shortage. However, for a Flowable, as long as the buffer (which is bounded) is full, a MissingBackpressureException is thrown. - HiddenDroid

1 Answers

3
votes

That's because backpressure moved out to Flowableonly with RxJava2, see here.
If you will switch to Flowable with BackpressureStrategy.MISSING you will get the exception.
That also means that in your case you indeed have buffer that automatically grows, from observerOn docs:

Modifies an ObservableSource to perform its emissions and notifications on a specified Scheduler, asynchronously with an unbounded buffer...