1
votes

I'm trying to understand how to apply backpressure in Spring WebFlux. I understand the theory of backpressure, but I can't reproduce it, so I don't fully understand it.

Let's take the following example:

public void test() throws InterruptedException {
    EmitterProcessor<String> processor = EmitterProcessor.create();

    new Thread(() -> {
        int i = 0;
        while(runThread) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException ignored) {
            }
            processor.onNext("Value: " + i);
            i++;
        }
        processor.onComplete();
    }).start();

    processor
            .subscribe(makeSubscriber("FIRST - "), Throwable::printStackTrace);
}

private Consumer<String> makeSubscriber(String label) {
    return v -> {
        System.out.println(label + v);
        try {
            Thread.sleep(1000);
        } catch (InterruptedException ignored) {
        }
    };
}

I have created a Hot Flux in the form of an EmitterProcessor and in a separate thread I start producing data for it. A bit lower, I subscribe to it. The subscriber is slower than the rate at which elements are being produced, so the issues should start to occur, right? But the subscriber logic is run on the producer thread. When I call processor.onNext(), it synchronously calls all the subscribers, so if the subscribers are slow, the publisher is slowed down as well. So, then backpressure doesn't even seem useful.

I have also tried making two Spring Boot WebFlux applications, one with a Flux endpoint and one that consumes the endpoint, so I can be certain the consumer runs on a separate thread. But then, any attempt I make at backpressure in the consumer does nothing. There is no buffer being filled, there is nothing being dropped or anything!

Can anyone give me a concrete example of backpressure? Preferably in Spring WebFlux but I'll take any reactive Java library.

1
@dfritsi Yes, I've seen that question and its answer, but I still don't understand it to be honest. I couldn't replicate the extended subscriber example. The GET method already does some backpressure, but as the publisher, isn't that the wrong place? Shouldn't the subscriber say how much it wants?Fenrir

1 Answers

0
votes

the documentation to the variant of subscribe method you have chosen reads:

The subscription will request an unbounded demand (Long.MAX_VALUE).

that is, you switched off backpressure yourself.

To use backpressure , subscribe with Flux.subscribe(Subscriber)