1
votes

A customer uses this pattern:

  • Apache Camel and CXF JMS receivers
  • These internally use Spring MDPs (message-driven POJOs) to implement their message receivers
  • They are deployed on IBM WebSphere Application Server 7
  • The queue manager is IBM Websphere MQ 6
  • The Spring MDPs are bound to the queue manager using JNDI Queue Connection Factories -- with support for connection pooling and session pooling

Here is an example of such message receiver, this one is using Camel:

<bean id="ibmmq" class="org.apache.camel.component.jms.JmsComponent">
    <property name="configuration" ref="jmsConfig"/>
</bean>

<!-- JNDI reference to the queue manager -->
<jee:jndi-lookup id="myTargetConnectionFactory" jndi-name="${mq.queueconnectionfactory}"/>

<bean id="jmsDestResolver" class="org.springframework.jms.support.destination.JndiDestinationResolver"/>

<bean id="myConnectionFactory" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
    <property name="targetConnectionFactory" ref="myTargetConnectionFactory"/>
    <property name="username" value="SOME_USER"/>
    <property name="password" value=""/>
</bean>

<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">

    <property name="connectionFactory" ref="${mq.connectionfactorybean}" />        
    <property name="destinationResolver" ref="jmsDestResolver" />        
    <property name="concurrentConsumers" value="1" />
    <property name="maxConcurrentConsumers" value="1" />

    <!--
        NOTE: If we try to use a cache without a transactionManager we get "Connection closed" errors
    -->
    <property name="cacheLevelName" value="CACHE_NONE" />
</bean>

Problem: the WebSphere MQ administrators are reporting lager number of MGET() requests against the queue manager. The hypothesis at the moment is that those receiver are constantly polling the channel for new messages.

They do not seem to have this problem with MDBs (message-driven beans). Is the MDP async implementation really a polling mechanism? If so, is there a way to limit the trips to the queue manager? Perhaps increasing the polling interval? Any insights would be appreciated.

2
Your JMS config seems fine. Nothing in there will prevent the listener from working as a listener. Can you post your camel routes/config as well?Petter Nordlander
What in the routes would look for regarding this problem? re: "Nothing in there will prevent the listener from working as a listener." -- would know where I can find documentation about the internals of an MDP "listener". I am concerned because of a (possibly misguided) answer in the thread: stackoverflow.com/questions/7390286/…. This suggests that behavior of an MDP listener over a QCF is polling -- and perhaps not a real async listener when compared to an MDB. I'd expect a polling app to constantly request MGETs.Frederic Fortier
some research later, read my answer. Not 100% sure, but it's somewhere to start looking at least.Petter Nordlander
Thanks! I appreciate your taking the time of researching. I will dig into it further based on what you provided.Frederic Fortier

2 Answers

2
votes

I am not sure about CXF, but for Camel listeners:

It seems like a default JMS consumer in the JmsConfiguration is of type "Default". The means, it will implement a DefaultMessageListenerContainer from Spring.

From Javadoc

Message listener container variant that uses plain JMS client API, specifically a loop of MessageConsumer.receive() calls

Receive calls would map to MQ GET calls.

There is also the option to specify a Simple type of consumer, which I guess is what you want.

Message listener container that uses the plain JMS client API's MessageConsumer.setMessageListener()

I am not sure here, but the Spring docs indicate that the simple message listener container does not support XA transactions. That might be something to consider, since you are running inside an application server.

0
votes

We had a similar problem with our Mainframe Q. The IBM mainframe q:

Note that the application process ID is used as the default user identity to be passed to the queue manager. If the application is running in client transport mode then this process ID must exist with the relevant authorizations on the server machine. If a different identity is required, then the application should use the createConnection(username, password) method.

Or in other words, IBM uses the JVM process ID to log into the mq unless we send in the appropriate credentials. We were using Spring so our each time our DefaultMessageListenerContainer polled the Q it had to send the credentials with it. I hooked up one of these babies and Bam, worked like a charm:

public class CustomConnectionFactory extends CachingConnectionFactory {

    private String username;
    private String password;

    ...

    /**
     * This is the secret sauce. Each time when we make a connection, we send
     * the username/password.
     */
    protected Connection doCreateConnection() throws JMSException {
        return getTargetConnectionFactory().createConnection(this.username, this.password);
    }

Our Mainframers were happier. And we later switched to a distributed MQ and all was much better!

Here is our final setup:

<!-- This hooks us up to the jndi -->
<jee:jndi-lookup id="telematicsJNDIConnectionFactory" jndi-name="${mq.jndi}" cache="true" lookup-on-startup="true" />
<!-- The outer wrapper must be TransactionAware, the inner custom one will cache the connection -->
<bean id="telematicsConnectionFactory" class="org.springframework.jms.connection.TransactionAwareConnectionFactoryProxy">
    <property name="targetConnectionFactory">
        <bean class="cat.dds.tmatic.utils.CustomConnectionFactory">
            <property name="targetConnectionFactory">
                <ref bean="telematicsJNDIConnectionFactory" />
            </property>
            <property name="username">
                <value>${mq.user}</value>
            </property>
            <property name="password">
                <value>${mq.pass}</value>
            </property>
            <property name="sessionCacheSize">
                <value>10</value>
            </property>
        </bean>
    </property>
    <property name="synchedLocalTransactionAllowed" value="true" />
</bean>