0
votes

I'm trying to unit test an advice on the poller which blocks execution of the mongo channel adapter until a certain condition is met (=all messages from this batch are processed).

The flow looks as follow:

IntegrationFlows.from(MongoDb.reactiveInboundChannelAdapter(mongoDbFactory,
            new Query().with(Sort.by(Sort.Direction.DESC, "modifiedDate")).limit(1))
                    .collectionName("metadata")
                    .entityClass(Metadata.class)
                    .expectSingleResult(true),
            e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(pollingIntervalSeconds))
                    .advice(this.advices.waitUntilCompletedAdvice())))
            .handle((p, h) -> {
                this.advices.waitUntilCompletedAdvice().setWait(true);
                return p;
            })
            .handle(doSomething())
            .channel(Channels.DOCUMENT_HEADER.name())
            .get();

And the following advice bean:

@Bean
public WaitUntilCompletedAdvice waitUntilCompletedAdvice() {
    DynamicPeriodicTrigger trigger = new DynamicPeriodicTrigger(Duration.ofSeconds(1));
    return new WaitUntilCompletedAdvice(trigger);
}

And the advice itself:

public class WaitUntilCompletedAdvice extends SimpleActiveIdleMessageSourceAdvice {

    AtomicBoolean wait = new AtomicBoolean(false);

    public WaitUntilCompletedAdvice(DynamicPeriodicTrigger trigger) {
        super(trigger);
    }

    @Override
    public boolean beforeReceive(MessageSource<?> source) {
        if (getWait())
            return false;
        return true;
    }

    public boolean getWait() {
        return wait.get();
    }

    public void setWait(boolean newWait) {
        if (getWait() == newWait)
            return;

        while (true) {
            if (wait.compareAndSet(!newWait, newWait)) {
                return;
            }
        }
    }
}

I'm using the following test for testing the flow:

    @Test
    public void testClaimPoollingAdapterFlow() throws Exception {
        // given
        ArgumentCaptor<Message<?>> captor = messageArgumentCaptor();
        CountDownLatch receiveLatch = new CountDownLatch(1);
        MessageHandler mockMessageHandler = mockMessageHandler(captor).handleNext(m -> receiveLatch.countDown());
        this.mockIntegrationContext.substituteMessageHandlerFor("retrieveDocumentHeader", mockMessageHandler);
        LocalDateTime modifiedDate = LocalDateTime.now();
        ProcessingMetadata data = Metadata.builder()
                .modifiedDate(modifiedDate)
                .build();
        assert !this.advices.waitUntilCompletedAdvice().getWait();

        // when
        itf.getInputChannel().send(new GenericMessage<>(Mono.just(data)));

        // then
        assertThat(receiveLatch.await(10, TimeUnit.SECONDS)).isTrue();
        verify(mockMessageHandler).handleMessage(any());
        assertThat(captor.getValue().getPayload()).isEqualTo(modifiedDate);
        assert this.advices.waitUntilCompletedAdvice().getWait();
    }

Which works fine but when I send another message to the input channel, it still processes the message without respecting the advice.

Is it intended behaviour? If so, how can I verify using unit test that the poller is really waiting for this advice?

2

2 Answers

2
votes

itf.getInputChannel().send(new GenericMessage<>(Mono.just(data)));

That bypasses the poller and sends the message directly.

You can unit test the advice has been configured by calling beforeReceive() from your test

Or you can create a dummy test flow with the same advice

IntegationFlows.from(() -> "foo", e -> e.poller(...))
       ...

And verify that just one message is sent.

0
votes

Example implementation:

@Test
public void testWaitingActivate() {
    // given
    this.advices.waitUntilCompletedAdvice().setWait(true);

    // when
    Message<ProcessingMetadata> receive = (Message<ProcessingMetadata>) testChannel.receive(3000);

    // then
    assertThat(receive).isNull();
}

@Test
public void testWaitingInactive() {
    // given
    this.advices.waitUntilCompletedAdvice().setWait(false);

    // when
    Message<ProcessingMetadata> receive = (Message<ProcessingMetadata>) testChannel.receive(3000);

    // then
    assertThat(receive).isNotNull();
}

@Configuration
@EnableIntegration
public static class Config {

    @Autowired
    private Advices advices;

    @Bean
    public PollableChannel testChannel() {
        return new QueueChannel();
    }

    @Bean
    public IntegrationFlow fakeFlow() {
        this.advices.waitUntilCompletedAdvice().setWait(true);
        return IntegrationFlows.from(() -> "foo", e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(1))
                .advice(this.advices.waitUntilCompletedAdvice()))).channel("testChannel").get();
    }
}