1
votes

I am trying to implement a Camel Route that Reads a request message from a Remote systems queue (System.A.out) The route looks at the message body and dynamically routes it to another systems in queue (System.B.in) This Route is then complete, and waits for the next message on its from queue (Currently it blocks and waits for a response on a temp queue)

System.B Reads its in queue (System.B.in, not always a camel route) processes the message and drops a response on its out queue (System.B.out) System.B uses the JMSMessageID from the Request message as the JMSCorrelationID on its response, that is all it keeps from the request.

A Camel Route (Similar to the System.A.out, but listening on System.B.out) picks up the response message and using the JMSCorrelationID (The request would not have had a JMSCorrelationID and thus would be routed by message body) finds the request's JMSReplyTo Queue (System.A.in) and drops the response on System.A's in queue for System.A to process.

I am using SpringBoot and Camel 2.18.3, the message queue is IMB MQ version 8

My route looks like this:

@Override
public void configure() throws Exception {

    //@formatter:off
    Predicate validRoute = header("route-valid").isEqualTo(true);
    Predicate inValidRoute = header("route-valid").isEqualTo(false);
    Predicate splitRoute = header("route-split").isEqualTo(true);
    Predicate singleRoute = header("route-split").isEqualTo(false);
    Predicate validSplitRoute = PredicateBuilder.and(validRoute, splitRoute);
    Predicate validSingelRoute = PredicateBuilder.and(validRoute, singleRoute);

    from(endpoint(incomingURI)).routeId(routeId)
        .process(exchange -> {
                exchange.getIn().setHeader("route-source", format("%s-%s", incomingURI, routeId));
            })
            .to(endpoint(format("bean:evaluateIncomingMessageService?method=routeMessage(*, %s)", replyToURI)))
            .choice()
                .when(validSingelRoute)
                    .log(DEBUG, "Creating a Single route")
                    .to(endpoint("bean:messageCoalitionService?method=saveInstruction(*)"))
                    .setExchangePattern(ExchangePattern.InOut)
                    .toD("${header.route-recipients}")
                .when(inValidRoute)
                    .log(DEBUG, "a.b.test", format("Incoming message [%s] failed evaluation: %s", incomingURI, body()))
                    .to(endpoint(deadLetterURI))
                    .routeId(format("%s-%s", incomingURI, routeId))
                .when(validSplitRoute)
                    .log(DEBUG, "Creating a Split route")
                    .to(endpoint("bean:messageCoalitionService?method=saveInstructions(*)"))
                    .setExchangePattern(ExchangePattern.InOut)
                    .multicast()
                    .toD("${header.route-recipients}").endChoice()
                .otherwise()
                    .log(DEBUG, "a.b.test", format("Incoming message [%s] failed evaluation: %s", incomingURI, body()))
                    .to(endpoint(deadLetterURI))
                    .routeId(format("%s-%s", incomingURI, routeId));

The Spring Bean evaluateIncomingMessageService decides if the message is a Request (No Correlation ID) or a Response and sets routing headers for the Request. I hoped Camel would automatically route responses to the Request.JMSReplyTo Queue, if not how can one do this?

replyToURI is configured in the Camel Route builder, if the route Listens on System.A.out its replyToURI will always be System.A.in.

evaluateIncomingMessageService.routeMessage looks like this:

 public void routeMessage(final Exchange exchange, final String replyToURI) {
    String correlationId = exchange.getIn().getHeader("JMSCorrelationID", String.class);

    if (correlationId != null) {
        log.debug("Processing Message Response with JMSCorrelationID [{}]", correlationId);
        exchange.getIn().setHeader("JMSReplyTo", replyToURI);
    } else {
        // Request Messages have nave NO correlationId
        log.debug("Processing Message Request with MessageID [{}] and JMSMessageID: [{}]",
                exchange.getIn().getMessageId(),
                exchange.getIn().getHeader("JMSMessageID") != null ? exchange.getIn().getHeader("JMSMessageID").toString() : exchange.getIn().getMessageId());
        String message = exchange.getIn().getBody(String.class);
        Set<ContentBasedRoute> validRoutes = contentBasedRouting
                .stream().filter(
                        routeEntity -> Pattern.compile(
                                routeEntity.getRegularExpression(), DOTALL).matcher(message).matches()).collect(Collectors.toSet());

        if (validRoutes.isEmpty()) {
            log.warn("No valid routes found for message: [{}] ", message);
            exchange.getIn().setHeader("route-valid", false);

        } else {
            HashMap<String, ContentBasedRoute> uniqueRoutes = new HashMap<>();
            validRoutes.stream().forEach(route -> uniqueRoutes.putIfAbsent(route.getDestination(), route));

            exchange.getIn().setHeader("route-valid", true);
            exchange.getIn().setHeader("route-count", uniqueRoutes.size());
            exchange.getIn().setHeader("JMSReplyTo", replyToURI);
            //if (exchange.getIn().getHeader("JMSMessageID") == null) {
             //   exchange.getIn().setHeader("JMSMessageID", exchange.getIn().getMessageId());
            //}
            if (uniqueRoutes.size() > 1) {
                log.debug("Building a split route");
                StringBuilder routes = new StringBuilder();
                StringBuilder routeIds = new StringBuilder();
                StringBuilder routeRegex = new StringBuilder();
                uniqueRoutes.keySet().stream().forEach(i -> routes.append(i).append(","));
                uniqueRoutes.values().stream().forEach(j -> routeIds.append(j.getRouteId()).append(","));
                uniqueRoutes.values().stream().forEach(k -> routeRegex.append(k.getRegularExpression()).append(","));
                routes.deleteCharAt(routes.length() - 1);
                routeIds.deleteCharAt(routeIds.length() - 1);
                routeRegex.deleteCharAt(routeRegex.length() - 1);

                exchange.getIn().setHeader("route-split", true);
                exchange.getIn().setHeader("route-uuid", routeIds.toString());
                exchange.getIn().setHeader("route-regex", routeRegex.toString());
                exchange.getIn().setHeader("route-recipients", routes.toString());
            } else {
                exchange.getIn().setHeader("route-split", false);
                exchange.getIn().setHeader("route-uuid", uniqueRoutes.values().iterator().next().getRouteId());
                exchange.getIn().setHeader("route-regex", uniqueRoutes.values().iterator().next().getRegularExpression());
                exchange.getIn().setHeader("route-recipients", uniqueRoutes.values().iterator().next().getDestination());
            }
        }
    }
}

The Bean messageCoalitionService simply saves the message body and headers so the messages can be reproduced and for auditing of the system.

I am not sure if I have gone about this incorrectly, Should I be using the Camel Async API or do I need pipes to implement this? This pattern looks close to what I need http://camel.apache.org/async.html (Asynchronous Request Reply) Any Help would be great thanks.

1

1 Answers

1
votes

In the end I implemented the above using Spring Integration. I was not able to find a way to retrieve the Message ID of the sent Message once the Camel Route had sent the message on which meant I had no way of tracking the Correlation ID when a response was sent back. Using the Camel InOut caused Camel to block and wait for a response which is also not what I wanted.

Thanks to lutalex for this solution: http://forum.spring.io/forum/other-spring-related/remoting/30397-jmsmessageid-after-message-is-sent?p=745127#post745127