I have an application based on Spring Integration and a part of that application has the following config.
It worked fine for the version 4.3.10.RELEASE. But now I am trying to upgrade the version to 5.1.5.RELEASE and the application fails to start with the exception mention below.
I was able to resolve the issue with changing the config but I am wondering why this has been changed
Old config working in 4.3.10.RELEASE:
@Bean
public IntegrationFlow mainFlow(@Qualifier("initChannel") PublishSubscribeChannel inboundChannel, TaskExecutor taskExecutor,
StatusUpdater statusUpdater, MyTransformer myTransformer, MyFilter myFilter,
AbstractMessageRouter messageRouter, FinalHandler finalHandler, InlineHandler inlineHandler) {
return flow -> flow.channel(inboundChannel)
.channel(new ExecutorSubscribableChannel(Executors.newFixedThreadPool(2)))
.publishSubscribeChannel(taskExecutor, spec -> spec.subscribe(flow1 ->
flow1.handle(statusUpdater, "handle"))
.subscribe(flow2 -> flow2.transform(myTransformer)
.filter(myFilter, "filter")
.route(messageRouter)
.handle(inlineHandler, "handle")
).errorHandler(new MessagePublishingErrorHandler())
);
}
Exception in the logs with above config for version 5.1.5.RELEASE:
Caused by: org.springframework.beans.factory.BeanCreationException: The 'currentComponent' (messageRouter) is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'. This is the end of the integration flow. at org.springframework.integration.dsl.IntegrationFlowDefinition.registerOutputChannelIfCan(IntegrationFlowDefinition.java:3088) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at org.springframework.integration.dsl.IntegrationFlowDefinition.register(IntegrationFlowDefinition.java:3033) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:1185) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:1001) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at org.springframework.integration.dsl.IntegrationFlowDefinition.handle(IntegrationFlowDefinition.java:979) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at com.config.IntegrationConfig.lambda$null$1(IntegrationConfig.java:42) ~[classes/:na] at org.springframework.integration.dsl.PublishSubscribeSpec.subscribe(PublishSubscribeSpec.java:58) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at com.config.IntegrationConfig.lambda$null$2(IntegrationConfig.java:38) ~[classes/:na] at org.springframework.integration.dsl.IntegrationFlowDefinition.publishSubscribeChannel(IntegrationFlowDefinition.java:255) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at com.config.IntegrationConfig.lambda$mainFlow$3(IntegrationConfig.java:36) ~[classes/:na] at org.springframework.integration.dsl.context.IntegrationFlowBeanPostProcessor.processIntegrationFlowImpl(IntegrationFlowBeanPostProcessor.java:309) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at org.springframework.integration.dsl.context.IntegrationFlowBeanPostProcessor.postProcessBeforeInitialization(IntegrationFlowBeanPostProcessor.java:117) ~[spring-integration-core-5.1.5.RELEASE.jar:5.1.5.RELEASE] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:414) ~[spring-beans-5.1.7.RELEASE.jar:5.1.7.RELEASE] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1770) ~[spring-beans-5.1.7.RELEASE.jar:5.1.7.RELEASE] at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:593) ~[spring-beans-5.1.7.RELEASE.jar:5.1.7.RELEASE]
Updated config working for 5.1.5.RELEASE:
The processing happening in inlineHandler bean is now happening in a channel/flow of its own instead of being inline. I had to change the messageRouter bean route the message to default flow which was happening inline earlier.
@Bean
public IntegrationFlow mainFlow(@Qualifier("initChannel") PublishSubscribeChannel inboundChannel, TaskExecutor taskExecutor,
StatusUpdater statusUpdater, MyTransformer myTransformer, MyFilter myFilter,
AbstractMessageRouter messageRouter, FinalHandler finalHandler, InlineHandler inlineHandler) {
return flow -> flow.channel(inboundChannel)
.channel(new ExecutorSubscribableChannel(Executors.newFixedThreadPool(2)))
.publishSubscribeChannel(taskExecutor, spec -> spec.subscribe(flow1 ->
flow1.handle(statusUpdater, "handle"))
.subscribe(flow2 -> flow2.transform(myTransformer)
.filter(myFilter, "filter")
.route(messageRouter)
//.handle(inlineHandler, "handle") // removed this
).errorHandler(new MessagePublishingErrorHandler())
);
}
@Bean
public AbstractMessageRouter messageRouter(@Qualifier("swift") MessageChannel messageChannel, MessageChannel defaultFlow) {
return new AbstractMessageRouter() {
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
System.out.println(">>>>>> Determining channel");
if(//some cond) {
return Collections.singleton(defaultFlow);
}
return Collections.singleton(messageChannel);
}
};
}