0
votes

So, problem is with router. When the router try send message to channel I'l got the error: Dispatcher has no subscribers for channel 'newTypingNotificationHandler.input'. But I have integrationFlow defenition for this channel name.

@Bean
public IntegrationFlow routeEcmIncomeRq(AbstractMessageRouter typeNotificationRouter) {
    return IntegrationFlows.from(FROM_CHANNEL)
            .routeToRecipients(r -> r
                    .recipientFlow(p -> p instanceof TypingNotificationDto,
                            f -> f.route(typeNotificationRouter)
                    )
                    .defaultOutputChannel(DEFAULT_SERVICE_CHANNEL)
            ).get();
}

@Bean
public AbstractMessageRouter typeNotificationRouter(IncomeRepository incomeRepository) {
    return new AbstractMessageRouter() {
        @Override
        protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
            TypingNotificationDto messagePayload = (TypingNotificationDto) message.getPayload();

            if (!incomeRepository.existsById(StringUtil.ecdFileIdToUuid(messagePayload.getEcdDocumentImage().getDocumentSource()))) {
                return Collections.singleton(MessageChannels.direct("newTypingNotificationHandler.input").get());
            } else {
                return Collections.singleton(MessageChannels.direct("existsTypingNotificationHandler.input").get());
            }
        }
    };
}

@Bean
public IntegrationFlow newTypingNotificationHandler() {
    return f -> f.log("need's create new Income");
}

@Bean
public IntegrationFlow existsTypingNotificationHandler() {
    return f -> f.log("exist income process");
}

stackTrace cause:

org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139) ~[spring-integration-core-5.2.6.RELEASE.jar:5.2.6.RELEASE] at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.2.6.RELEASE.jar:5.2.6.RELEASE] at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.2.6.RELEASE.jar:5.2.6.RELEASE] at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:461) ~[spring-integration-core-5.2.6.RELEASE.jar:5.2.6.RELEASE] at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403) ~[spring-integration-core-5.2.6.RELEASE.jar:5.2.6.RELEASE] at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.2.6.RELEASE.jar:5.2.6.RELEASE] at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.2.6.RELEASE.jar:5.2.6.RELEASE] at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.2.6.RELEASE.jar:5.2.6.RELEASE] at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.2.6.RELEASE.jar:5.2.6.RELEASE] at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:206) ~[spring-integration-core-5.2.6.RELEASE.jar:5.2.6.RELEASE]

1

1 Answers

1
votes

You are returning a new channel each time, not the one managed by Spring

MessageChannels.direct("newTypingNotificationHandler.input").get();

Use

return Collections.singleton(getChannelResolver().resolveDestination("newTypingNotificationHandler.input"));

instead. But, it would be better to cache after resolving rather than returning a new collection for each message.