2
votes

How do you integrate a Spring Boot HealthIndicator with an IntegrationFlow polling email with imap?

I can get exceptions from the IntegrationFlow via the errorChannel but how do I "clear" the exception once the IntegrationFlow starts working again, e.g., after a network outage.

@SpringBootApplication
public class MyApplication {

    @Bean
    IntegrationFlow mailFlow() {
        return IntegrationFlows
                .from(Mail.imapInboundAdapter(receiver()).get(),
                        e -> e.autoStartup(true)
                                .poller(Pollers.fixedRate(5000)))
                .channel(mailChannel()).get();
    }
    
    @Bean
    public ImapMailReceiver receiver() {
        String mailServerPath = format("imaps://%s:%s@%s/INBOX", mailUser,
                encode(mailPassword), mailServer);
        ImapMailReceiver result = new ImapMailReceiver(mailServerPath);
        return result;
    }

    @Bean
    DirectChannel mailChannel() {
        return new DirectChannel();
    }

    @Autowired
    @Qualifier("errorChannel")
    private PublishSubscribeChannel errorChannel;

    @Bean
    public IntegrationFlow errorHandlingFlow() {
        return IntegrationFlows.from(errorChannel).handle(message -> {
            MessagingException ex = (MessagingException) message.getPayload();
            log.error("", ex);
        }).get();
    }

    @Bean
    HealthIndicator mailReceiverHealthIndicator() {
        return () -> {
            /*
             * How to error check the imap polling ???
             */
            return Health.up().build();
        };
    }

}
1

1 Answers

1
votes

I would go with an AtomicReference<Exception> bean and set its value in that errorHandlingFlow. The HealthIndicator impl would consult that AtomicReference to down() when it has a value.

The PollerSpec for the Mail.imapInboundAdapter() could be configured with a ReceiveMessageAdvice:

/**
 * Specify AOP {@link Advice}s for the {@code pollingTask}.
 * @param advice the {@link Advice}s to use.
 * @return the spec.
 */
public PollerSpec advice(Advice... advice) {

Its afterReceive() impl could just clean that AtomicReference up, so your HealthIndicator would return up().

The point is that this afterReceive() is called only when invocation.proceed() doesn't fail with an exception. and it is called independently if there are new messages to process or not.