1
votes

I have successfully implemented some integration flows in DSL, everything is running fine in PROD. For the almost all my flows (+-10), I just have a message source and a handler plus some extra features that are always the same one for each flows:

  • create an error flow that send an email
  • add circuit breaker
  • add an httpinbound to force a run
  • ...

I want to know if there is some elegant way to factorize this like an abstract configuration class or a template (may be with flowIntegrationAdatper ? )

Given this abstraction level, in each flow configuraion class I just want to provide/override two methods :

  • a Messource
  • a handler

@Configuration
@ConfigurationProperties("app.flows.sample")
public class SampleFlowConfiguration {

    public static final String FLOW_NAME = "SampleFlow";
    public static final String POLLER = "poller";

    private final Service service;
    private final TaskExecutorFactory taskExecutorFactory;
    private final ErrorFlowFactory errorFlowFactory;

    public SampleFlowConfiguration(Service service,
                                   TaskExecutorFactory taskExecutorFactory,
                                   ErrorFlowFactory errorFlowFactory) {
        this.service = service;
        this.taskExecutorFactory = taskExecutorFactory;
        this.errorFlowFactory = errorFlowFactory;
    }

    @Bean
    public IntegrationFlow sampleFlow() {
        return IntegrationFlows
                .from(objectToTreatsSource(), sampleProducer())
                .enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, sampleErrorChannel()))
                .channel(MessageChannels.executor(sampleConsumerTaskExecutor()))
                .handle(handler())
                .get();
    }

    @Bean
    public MessageSource objectToTreatsSource() {
        return service.getObjectToTreat();
    }

    @Bean
    public Consumer<SourcePollingChannelAdapterSpec> sampleProducer() {
        return c -> c.poller(Pollers.cron("* * * * * *")
                .maxMessagesPerPoll(10)
                .errorChannel(sampleErrorChannel())
                .taskExecutor(samplePollerTaskExecutor()))
                .autoStartup(false)
                .id(POLLER);
    }

    @Bean
    public MessageHandler objectHandler() {
        return new AbstractReplyProducingMessageHandler() {
            @Override
            protected Object handleRequestMessage(Message<?> message) {
                service.handle(message.getPayload());
                return message;
            }
        };
    }

    @Bean
    public Executor samplePollerTaskExecutor() {
        return taskExecutorFactory.getTaskExecutor(10, "sampleProducerExec");
    }

    @Bean
    public Executor sampleConsumerTaskExecutor() {
        return taskExecutorFactory.getTaskExecutor(10, "sampleConsumerExec");
    }

    @Bean
    public DirectChannel sampleErrorChannel() {
        return MessageChannels.direct().get();
    }

    @Bean
    public IntegrationFlow samplesExpirationErrorFlow() {
        return errorFlowFactory.getDefaultErrorFlow(
                sampleErrorChannel(),
                m -> POLLER,
                mailTransformer());
    }

    @Bean
    public MessagingExceptionToMailTransformer mailTransformer() {
        FlowErrorMessageBuilder flowErrorMessageBuilder = messageException ->
                "Error while processing sample";
        return new MessagingExceptionToMailTransformer(FLOW_NAME, flowErrorMessageBuilder, true);
    }

}

Thanks

1

1 Answers

2
votes

The IntegrationFlows is a factory and it is indeed can be used from some generic method:

@Bean
public IntegrationFlow sampleFlow() {
    return myFlowBuilder(objectToTreatsSource(), handler());
}

private IntegrationFlow myFlowBuilder(MessageSource<?> messageSource, MessageHandler  handler) {
 return IntegrationFlows
            .from(messageSource, sampleProducer())
            .enrichHeaders(h -> h.header(MessageHeaders.ERROR_CHANNEL, sampleErrorChannel()))
            .channel(MessageChannels.executor(sampleConsumerTaskExecutor()))
            .handle(handler)
            .get();

}

On the other hand you can extract the common piece of those flows into an independent flow and use a gateway() to perform requests into that flow and wait for replies.

Remember: the MessageChannel abstraction is one of the first class citizens in the framework, so you always can send to some channel from somewhere.