8
votes

I had much difficulties implementing a Websphere MQ (WMQ) connector with Apache CAMEL that could handle MQ Confirmation of Delivery (CoD) reports without exceptions, neither side effects in the form of unwanted response datagrams. In the end I got it working the way I wanted, a much standard and common way if you are accustomed to writing native MQ clients. I document the method in a post to this same topic, but I find the solution bloated with intricacies and would very much appreciate any advice or example to make the implementation cleaner and more elegant.

I understood that the problem takes its roots into the way MQ designed request-reply Message Exchange Pattern (MEP), versus the way JMS specifications did, versus CAMEL implementation of the Request-Reply MEP in its JMS Component. Three different philosophies!

  1. WMQ features a MessageType header (see MQMD fields and constants) that bears value 1 for request, 2 for reply, and 8 for datagram (one way MEP). In addition, the value 4 is used to mark Report messages in the form of CoD (Conf. of Delivery), PAN (Positive AckNowledge) and NAN (Negative AckNowledge), which - in term of the message flow - also make an additional Reply message. The CoD, PAN, and NAN acks can be requested for either Request messages, Replies or Datagrams using another header field named 'Report' in which flags for all report variants can be combined. Additional header fields 'ReplyToQ' and 'ReplyToQMgr' specify the Queue and Queue Manager on which Reports and Replies are expected by the original sender, and a fixed 24 bytes 'CorrelId' field - optional - can help correlate Reports and Replies to the original Datagram or Request mesage. To make it more complex, one can indeed send back Replies and Reports with the same original Message ID and no CorrelID, or provide the original Message ID in the CorrelId, or return the CorrelId value when already specified in the original Request or Datagram. IBM provides a JMS API over WMQ, allowing either to tunnel plain JMS exchanges over WMQ as transport (with help from an extra message header name MQRFH2), or to map native MQ messages onto JMS messages and vice-versa.
  2. On the other hand, JMS specs provide an optional 'JMSReplyTo' header field and a 'JMSCorrelationID', but do leave the exact MEP semantics to client applications; namely stating in specs: "A response may be optional; it is up to the client to decide."
  3. CAMEL features 'routes' in XML or Java DSL and a inner Exchange Object model with the purpose to support EIP Patterns amongst which the Request-Reply pattern. CAMEL then assumes in its JMS Component that if the JMSReplyTo field is set, this is necessarily a Request expecting a Reply, causing the Out part (or modified In part if Out is empty) of an Exchange to be returned to the queue defined in JMSReplyTo.

I was willing to support native MQ Message exchanges with Confirmation of Delivery (CoD) reports through a remote Websphere Queue Manager, so that, in addition to transaction and persistence (i.e. no loss, no duplicates) one can also track when a message is consumed and raise alerts in case of delays.

Inbound issue:

By default, the Websphere Queue Manager generates CoD Reports when message consumption from the queue completes. Therefore, without any specific settings, a remote MQ client sending a datagram with the CoD flag (and the then compulsory ReplyToQ) will get a first reply as MQ Report back from the Queue Manager when the CAMEL endpoint consumes the message, followed by a second (unexpected) reply message explicitly returned by CAMEL and containing whatever is left in the Exchange object at the end of the CAMEL route, because CAMEL assumes a Request-Reply EIP given that the JMSReplyTo field is present (mapped from MQ ReplyToQ and ReplyToQMgr, both being required to support the CoD return flow).

Outbound issues:

Without specific settings, CAMEL assumes too by default a Request-reply EIP / MEP on the outbound connection. The CAMEL JMS/MQ endpoint will then wait for 1 response back. When the OUTbound message is JMS over MQ (hence with MQRFH2 header), this works fine. When forcing plain vanilla MQ, i.e. removing MQRFH2 header as below, I was unable to get the endpoint listener match the correlated incoming MQ Report, although traced values seem all correct (enforcing 24 char correlation IDs so that truncation of longer CorrelId values or null padding by MQ cannot geopardize the correlation filter). Has anyone been able to solve that issue?

Details: Although IBM JMS API accepts passing specific JMS property values WMQ_MESSAGE_BODY={1|0} / WMQ_TARGET_CLIENT={1|0} to control the presence of the JMS header MQRFH2 in the generated messages, these options become inoperative through CAMEL. One must use the CamelJmsDestinationName header (as explained in CAMEL JMS doc) to supply an IBM queue URL for the destination featuring the option "targetClient=1" in order to get rid of the MQRFH2 header. But without this header, the CAMEL correlation on the CoD Report or MQ reply does fail.

The solution to the above issue is indeed about building a specific CAMEL route to handle the CoD Reports returned by the remote party (as well as correlated MQ Replies). So CAMEL outbound messages must be enforced as "InOnly" ExchangePattern and thus not wait for any reply. But this will cause CAMEL to suppress too all ReplyTo fields. Then if a MQ CoD is requested on an outbound MQ datagram, a CAMEL exception ensues whose cause is MQException JMSCMQ0001: WebSphere MQ call failed with compcode '2' ('MQCC_FAILED') reason '2027' ('MQRC_MISSING_REPLY_TO_Q').

CAMEL documents an URI option 'disableReplyTo=true' to disable the reply-listening on exchange patterns yet keep the ReplyTo fields - apparently on both inbound and outbound exchanges. But this option does not work (as observed, I may be wrong) on outbound JMS exchanges and one must use the much less intuitive 'preserveMessageQos' option instead.

Elegant solutions to these issues are welcome.

1

1 Answers

5
votes

The best I have been able to get is documented below, illustrated as a Spring XML application context that itself hosts the CAMEL context and routes. It works with the IBM native MQ JCA-compliant resource adapter v7.5, CAMEL 2.15, Spring core 4.2. I can deploy it to both Glassfish and Weblogic servers.

Of course a java DSL is used in a real implementation given the numerous variables. This example based on the CAMEL XML DSL is self-contained and easy to test.

We start with Spring & Camel declarations:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:util="http://www.springframework.org/schema/util" 
    xmlns:context="http://www.springframework.org/schema/context" 
    xmlns:camel="http://camel.apache.org/schema/spring"
    xmlns:jee="http://www.springframework.org/schema/jee"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd   
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
        http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
        http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd">

The CAMEL context follows with 2 routes: MQ to JMS and JMS to MQ, here chained to form a bridge to ease testing.

<camel:camelContext id="mqBridgeCtxt">

  <camel:route id="mq2jms" autoStartup="true">

Weird: when using the native MQ Resource adapter the only way to get (e.g.) 3 listeners is to enforce 3 connections (with 3 Camel:from statements in sequence) with max 1 session each, otherwise an MQ error ensues: MQJCA1018: Only one session per connection is allowed. However, if you use MQ client jars instead, then the concurentConsumers option in CAMEL JMS does work fine.

    <camel:from uri="wmq:queue:TEST.Q1?concurrentConsumers=1&amp;disableReplyTo=true&amp;
            acknowledgementModeName=SESSION_TRANSACTED"/> 
    <camel:from uri="wmq:queue:TEST.Q1?concurrentConsumers=1&amp;disableReplyTo=true&amp;
            acknowledgementModeName=SESSION_TRANSACTED"/> 
    <camel:from uri="wmq:queue:TEST.Q1?concurrentConsumers=1&amp;disableReplyTo=true&amp;
            acknowledgementModeName=SESSION_TRANSACTED"/> 

The disable disableReplyTo option above ensures that CAMEL will not produce a reply before we can test the MQ message type to be 1=Request(-reply) or 8=datagram (one way!). That test and reply construction is not illustrated here.

Then we enforce the EIP to InOnly on the next posting to plain JMS to be consistent with the Inbound MQ mode.

    <camel:setExchangePattern pattern="InOnly"/>
    <!-- camel:process ref="reference to your MQ message processing bean fits here" / -->
    <camel:to uri="ref:innerQueue" />
  </camel:route>

Next comes the jms-to-MQ route:

  <camel:route id="jms2mq"  autoStartup="true">
    <camel:from uri="ref:innerQueue" />
    <!-- remove inner message headers and properties to test without inbound side effects! -->
    <camel:removeHeaders pattern="*"/> 
    <camel:removeProperties pattern="*" />
    <!-- camel:process ref="reference to your MQ message preparation bean fits here" / -->

Now comes the request flag for the MQ CoD report to be returned by remote destination. We also enforce the MQ message to be of Datagram type (value 8).

    <camel:setHeader headerName="JMS_IBM_Report_COD"><camel:simple resultType="java.lang.Integer">2048</camel:simple></camel:setHeader>
    <camel:setHeader headerName="JMS_IBM_Report_Pass_Correl_ID"><camel:simple resultType="java.lang.Integer">64</camel:simple></camel:setHeader>
    <camel:setHeader headerName="JMS_IBM_MsgType"><camel:simple resultType="java.lang.Integer">8</camel:simple></camel:setHeader>

The ReplyTo queue can be specified either via the ReplyTo uri option, else as a header as below.

Next we do use CamelJmsDestinationName header to enforce suppressing of the JMS MQ message header MQRFH2 (using targetClient MQ URL option value 1). In other words, we want to send a plain vanilla MQ binary message (i.e. Only the MQMD message descriptor followed by the payload).

    <camel:setHeader headerName="JMSReplyTo"><camel:constant>TEST.REPLYTOQ</camel:constant></camel:setHeader>
    <camel:setHeader headerName="CamelJmsDestinationName"><camel:constant>queue://MYQMGR/TEST.Q2?targetClient=1</camel:constant></camel:setHeader>

More MQMD fields may be controlled through reserved JMS properties as illustrated below. See restrictions in IBM doc.

    <camel:setHeader headerName="JMS_IBM_Format"><camel:constant>MQSTR   </camel:constant></camel:setHeader>
    <camel:setHeader headerName="JMSCorrelationID"><camel:constant>_PLACEHOLDER_24_CHARS_ID_</camel:constant></camel:setHeader>

The destination queue in the URI is overwritten by the CamelJmsDestinationName above, hence the queue name in the URI becomes a placeholder.

The URI option preserveMessageQos is the one that - as observed - allows sending a message with the ReplyTo data being set (to get the MQ CoD Report), yet prevent CAMEL to instantiate a Reply message listener by enforcing the InOnly MEP.

    <camel:to uri="wmq:queue:PLACEHOLDER.Q.NAME?concurrentConsumers=1&amp;
                exchangePattern=InOnly&amp;preserveMessageQos=true&amp;
                includeSentJMSMessageID=true" />
  </camel:route>
</camel:camelContext>

The following must be adjusted to your context. It provides the queue factories for both a native JMS provider and Websphere MQ (via the native IBM WMQ JCA Resource Adapter). We use here JNDI lookups on administrative objects.

<camel:endpoint id="innerQueue" uri="jmsloc:queue:transitQueue">
</camel:endpoint>

<jee:jndi-lookup id="mqQCFBean" jndi-name="jms/MYQMGR_QCF"/>
<jee:jndi-lookup id="jmsraQCFBean" jndi-name="jms/jmsra_QCF"/>

<bean id="jmsloc" class="org.apache.camel.component.jms.JmsComponent">
  <property name="connectionFactory" ref="jmsraQCFBean" />
</bean>

<bean id="wmq" class="org.apache.camel.component.jms.JmsComponent">
  <property name="connectionFactory" ref="mqQCFBean" />
</bean>

</beans>

Alternatively, if you are using the MQ client jars instead of the Resource Adapter, you will declare the connection factory beans like (in place of JNDI lookups as above):

<bean id="mqCFBean" class="com.ibm.mq.jms.MQXAConnectionFactory">
    <property name="hostName" value="${mqHost}"/>
    <property name="port" value="${mqPort}"/>
    <property name="queueManager" value="${mqQueueManager}"/>
    <property name="channel" value="${mqChannel}"/>
    <property name="transportType" value="1"/> <!-- This parameter is fixed and compulsory to work with pure MQI java libraries -->
    <property name="appName" value="${connectionName}"/>
</bean>

<bean id="wmq" class="org.apache.camel.component.jms.JmsComponent">
    <property name="connectionFactory" ref="mqCFBean"/>
    <property name="transacted" value="true"/>
    <property name="acknowledgementModeName" value="AUTO_ACKNOWLEDGE"/>
</bean>

Comments and improvements welcome.