4
votes

I have created a simple Kafka consumer that returns a Flux of objects (the received messages) and I'm trying to test it using the StepVerifier.

In my test, I do something like that:

Flux<Pojo> flux = consumer.start();
StepVerifier.create(flux)
  .expectNextMatches(p -> p.getList().size() == 3)
  .verifyComplete();

The assertion works ok (if I change the value from 3 to something else, the test fails). But, if the assertion passes, than the test never exit.

I have also tried to use the verify method like so:

StepVerifier.create(flux)
  .expectNextMatches(f -> f.getEntitlements().size() == 3)
  .expectComplete()
  .verify(Duration.ofSeconds(3));

In this case, I get this error:

java.lang.AssertionError: VerifySubscriber timed out on false

Any idea what I'm doing wrong?

1

1 Answers

9
votes

The Kafka Flux is probably infinite, so it never emits the onComplete signal, which the test waits for. You can call .thenCancel().verify() if you're only interested in testing that first value.