1
votes

I am using reactor in a project, and one of the features calls a blocking service, which connects to a device and gets an infinite stream of events.

I am trying to do a load test to see how many calls can I make to the blocking service.

I am generating around 1000 requests to the blocking service


Flux.just("ip1", "ip2", "ip3", "ip4")
                .repeat(250)

The problem is that reactor is only processing the first 256 requests, after that it isn't making any more requests.

When I added the .log("preConnect") I can see that it is logging only one request(256) from the downstream subscriber.

I don't understand what I am doing wrong.

I am attaching simplified example which can reproduce the issue.

package test.reactor;

import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import java.util.stream.Stream;

public class ReactorTest {

    @Test
    void testLoad() throws InterruptedException {
        AtomicInteger id = new AtomicInteger(0);
        Flux.just("ip1", "ip2", "ip3", "ip4")
                .repeat(250) // will create a total of 1004 messages
                .map(str -> str + " id: " + id.incrementAndGet())
                .log("preConnect")
                .flatMap(this::blocking)
                .log()
                .subscribeOn(Schedulers.parallel())
                .subscribe();

        new CountDownLatch(1).await();
    }

    private Flux<String> blocking(String ip) {
        Mono<String> connectMono = Mono.fromCallable(this::connect)
                .subscribeOn(Schedulers.boundedElastic())
                .map(msg -> "Connected: "+ip + msg);

        Flux<String> streamFlux = Mono.fromCallable(this::infiniteNetworkStream)
                .subscribeOn(Schedulers.boundedElastic())
                .flatMapMany(Flux::fromStream)
                .map(msg -> ip + msg);

        return connectMono.concatWith(streamFlux);
    }

    private Stream<String> infiniteNetworkStream() {
        return Stream.generate(new Supplier<String>() {
            @Override
            public String get() {
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                return "Hello";
            }
        });
    }

    private String connect() throws Exception{
        Thread.sleep(100);
        return "success";
    }
    
}
1

1 Answers

4
votes

Figured out the issue, it has to do with flatmap, the default concurrency for flatmap is 256. It will not request more items from the upstream publisher until the current subscriptions go below 256.

In my case since my flux is infinite, it wasn't processing any after 256.

The solution I found was to increase the concurrency

   Flux.just("ip1", "ip2", "ip3", "ip4")
                .repeat(250) // will create a total of 1004 messages
                .map(str -> str + " id: " + id.incrementAndGet())
                .log("preConnect")
                .flatMap(this::blocking, 1000) // added 1000 here to increase concurrency
                .log()
                .subscribeOn(Schedulers.parallel())
                .subscribe();