1
votes

I am trying to implement routing slip (EI Pattern) using Spring integration. The configuration I have done is

 @Bean
    @Transformer(inputChannel = "routingServiceChannel")
    public HeaderEnricher headerEnricher() {
        return new HeaderEnricher(Collections.singletonMap(IntegrationMessageHeaderAccessor.ROUTING_SLIP,
                new RoutingSlipHeaderValueMessageProcessor("routingChannel2",                   
                    "routingChannel1")));
    }

So when it reaches routingServiceChannel, it goes to routingChannel2 and then routingChannel1. But in my case it always throws an exception that it does not have a reply channel. when I set output channel, like

@Transformer(inputChannel = "routingServiceChannel", outputChannel=""xyzChannel)

Then then routing happens to xyzChannel instead of going to routingChannel2, and routingChannel1.

When I debugged the code in spring integration core, I stumbled with this code, In

class AbstractReplyProducingMessageHandler

protected final void handleMessageInternal(Message<?> message) {
        Object result;
        if (this.advisedRequestHandler == null) {
            result = handleRequestMessage(message);
        } else {
            result = doInvokeAdvisedRequestHandler(message);
        }
        if (result != null) {
            sendOutputs(result, message);
        } else if (this.requiresReply) {
            throw new ReplyRequiredException(message, "No reply produced by handler '" + getComponentName()
                    + "', and its 'requiresReply' property is set to true.");
        } else if (logger.isDebugEnabled()) {
            logger.debug("handler '" + this + "' produced no reply for request Message: " + message);
        }
    }

Here in handle message method they obtain the routingSlip map and assigning it to the result. And

protected void produceOutput(Object reply, Message<?> requestMessage) {
        MessageHeaders requestHeaders = requestMessage.getHeaders();

        Object replyChannel = null;
        if (getOutputChannel() == null) {
            Map<?, ?> routingSlipHeader = requestHeaders.get(IntegrationMessageHeaderAccessor.ROUTING_SLIP, Map.class);
            if (routingSlipHeader != null) {
                Assert.isTrue(routingSlipHeader.size() == 1, "The RoutingSlip header value must be a SingletonMap");
                Object key = routingSlipHeader.keySet().iterator().next();
                Object value = routingSlipHeader.values().iterator().next();
                Assert.isInstanceOf(List.class, key, "The RoutingSlip key must be List");
                Assert.isInstanceOf(Integer.class, value, "The RoutingSlip value must be Integer");
                List<?> routingSlip = (List<?>) key;
                AtomicInteger routingSlipIndex = new AtomicInteger((Integer) value);
                replyChannel = getOutputChannelFromRoutingSlip(reply, requestMessage, routingSlip, routingSlipIndex);
                if (replyChannel != null) {
                    // TODO Migrate to the SF MessageBuilder
                    AbstractIntegrationMessageBuilder<?> builder = null;
                    if (reply instanceof Message) {
                        builder = this.getMessageBuilderFactory().fromMessage((Message<?>) reply);
                    } else if (reply instanceof AbstractIntegrationMessageBuilder) {
                        builder = (AbstractIntegrationMessageBuilder<?>) reply;
                    } else {
                        builder = this.getMessageBuilderFactory().withPayload(reply);
                    }
                    builder.setHeader(IntegrationMessageHeaderAccessor.ROUTING_SLIP,
                            Collections.singletonMap(routingSlip, routingSlipIndex.get()));
                    reply = builder;
                }
            }

            if (replyChannel == null) {
                replyChannel = requestHeaders.getReplyChannel();
            }
        }

        Message<?> replyMessage = createOutputMessage(reply, requestHeaders);
        sendOutput(replyMessage, replyChannel);
    }

Instead of getting the routingSlip configuration from the reply, they are trying to get from the requestMessage. Am I missing something here? Are there any additional configuration that I need to set?

1

1 Answers

1
votes

Thank you for the attention to the subject, BTW! :)

Everything looks good, but you have missed the point of the Routing Slip.

First of all you should configure it for the message. And since Routing Slip is a header you should use HeaderEnricher to add it to the headers of the message.

The routing is really caused in the downstream from and exactly for the requestMessage, not the reply. The Routing Slip is out of HeaderEnricher.

Although it may happen for the HeaderEnricher's requestMessage, of course.

If you would like to consult Routing Slip exactly after the HeaderEnricher, you should configure something like:

@BridgeTo
@Bean
public MessageChannel xyzChannel() {
    return new DirectChannel();
}

Note: no one new header is available during HeaderEnricher logic. Only in the downstream. And Routing Slip is one of them.