I have created a simple Kafka consumer that returns a Flux of objects (the received messages) and I'm trying to test it using the StepVerifier.
In my test, I do something like that:
Flux<Pojo> flux = consumer.start();
StepVerifier.create(flux)
.expectNextMatches(p -> p.getList().size() == 3)
.verifyComplete();
The assertion works ok (if I change the value from 3 to something else, the test fails). But, if the assertion passes, than the test never exit.
I have also tried to use the verify method like so:
StepVerifier.create(flux)
.expectNextMatches(f -> f.getEntitlements().size() == 3)
.expectComplete()
.verify(Duration.ofSeconds(3));
In this case, I get this error:
java.lang.AssertionError: VerifySubscriber timed out on false
Any idea what I'm doing wrong?