I have implemented a RecipientListRouter shown below -
@Bean
public RecipientListRouter routerFlow() {
RecipientListRouter router = new RecipientListRouter();
router.setIgnoreSendFailures(true);
router.setApplySequence(true);
router.addRecipient("channelChkn", "headers.get('eventSubType').contains('CHKN')");
router.addRecipient("channelBkd", "headers.get('eventSubType').contains('BKD')");
router.addRecipient("channelBrd", "headers.get('eventSubType').contains('BRD')");
router.addRecipient("channelAciRecCncl", "headers.get('eventSubType').contains('ACI-REC-CNCL')");
router.addRecipient("channelSeatAsgn", "headers.get('eventSubType').contains('SEATNBR-')");
router.addRecipient("channelDeboard", "headers.get('isDeBoarded') == true");
router.setDefaultOutputChannelName(IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME);
LOGGER.info("********************* RecipientListRouter *********************" + router.getRecipients());
return router;
}
My event flow is
@Bean
public IntegrationFlow baseEventFlow() {
return IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(kafkaMessageListenerContainer))
.filter(filterMessage, "rejectPastData")
.transform(aciMessageTransformer, "parserXMLMessage")
.route(routerFlow())
// executor used to parallelise the multiple subscribe execution
.publishSubscribeChannel(Executors.newCachedThreadPool(),
pubsub -> pubsub.subscribe(flow -> flow.channel("pubCountEvntChannel"))
.subscribe(flow -> flow.channel("pubTravlerEventChannel")))
.get();
}
I am getting the Bean Creation exception - The 'currentComponent' (routerFlow) is a one-way 'MessageHandler' and it isn't appropriate to configure 'outputChannel'
It seems that after route we can't invoke any handler method in the parent flow. But in my scenario the handler in the route processes the data and sets a value in the java object that needs to be used in the main flow to publish to the down-line channels.
Please suggest how to achieve this
##Edited##
If I add the below recipient in the router list then I believe the the channel will always be among the list of recipients and I can do the processing after the event is processed by the handlers. Please confirm if the below recipient will be added at the end of the router list.
router.addRecipient("pubSubChannel");
Also, if the message header doesn't match with any of the values then we are sending it to the default NULL channel, discarding the message.
If we add the above "pubSubChannel" recipient then what will be the impact of the un-matched messages. Do I have to handle the unmatched messages in the pubSubEventChannel?
Please suggest
###EDITED
@Bean
public IntegrationFlow baseEventFlow() {
return IntegrationFlows.from(Kafka.messageDrivenChannelAdapter(kafkaMessageListenerContainer))
.filter(filterMessage, "rejectPastData")
.transform(aciMessageTransformer, "parserXMLMessage")
.gateway(eventFlow())
.handle (test())
.get()
}
@Bean
public IntegrationFlow eventFlow() {
return f -> f.route(routerFlow());
}
@Bean
public RecipientListRouter routerFlow() {
RecipientListRouter router = new RecipientListRouter();
router.setIgnoreSendFailures(true);
router.setApplySequence(true);
router.addRecipient("channelChkn", "headers.get('eventSubType').contains('CHKN')");
router.addRecipient("channelBkd", "headers.get('eventSubType').contains('BKD')");
router.addRecipient("channelBrd", "headers.get('eventSubType').contains('BRD')");
router.addRecipient("channelAciRecCncl", "headers.get('eventSubType').contains('ACI-REC-CNCL')");
router.addRecipient("channelSeatAsgn", "headers.get('eventSubType').contains('SEATNBR-')");
// router.addRecipient("channelSeatAsgn","headers.get('eventSubType').contains('SEATNBR-ASSIGN')");
// router.addRecipient("channelSeatAsgn","headers.get('eventSubType').contains('SEATNBR-CHG')");
router.addRecipient("channelDeboard", "headers.get('isDeBoarded') == true");
//router.setDefaultOutputChannelName(IntegrationContextUtils.NULL_CHANNEL_BEAN_NAME);
LOGGER.info("********************* RecipientListRouter *********************" + router.getRecipients());
return router;
}