0
votes

I built a Spring JMS solution that was built against JBoss 7.1.1 and an external HornetQ implementation 2.2.1.4. This connected and worked successfully.

However I am now using EAP6, and am attempting to connect to the internal HornetQ that is packaged within EAP6.

I have a few classes that manage the connection and create queues. But none of this fits that seems to be required to connect to the packaged HornetQ - works fine for connection to an external HornetQ.

I have raised this with Redhat, and they are not sure how to resolve as this also require Spring coding.

The problem I have is, I believe that i need to create a QueueConnection, as in QueueConnection qcon = queueConnectionFactory.createQueueConnection("user","password");

But the way we have implemented it in Spring, is we use Spring JmsTemplate, and the concept of adding a queue connection to this is not available, so it doesnt work.

Below is the jms-services.xml file containing the required Spring beans:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xmlns:task="http://www.springframework.org/schema/task" xmlns:tx="http://www.springframework.org/schema/tx"
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
                        http://www.springframework.org/schema/beans/spring-beans.xsd
                        http://www.springframework.org/schema/context 
                        http://www.springframework.org/schema/context/spring-context.xsd
                        http://www.springframework.org/schema/task 
                        http://www.springframework.org/schema/task/spring-task.xsd
                        http://www.springframework.org/schema/tx
                        http://www.springframework.org/schema/tx/spring-tx-3.0.xsd">

    <context:annotation-config />

    <context:component-scan base-package="com.myproject.test" />

    <task:annotation-driven />

    <bean id="testTransactionManager"
        class="org.springframework.transaction.jta.JtaTransactionManager">
        <property name="transactionManagerName" value="java:/TransactionManager"></property>
        <property name="autodetectUserTransaction" value="false"></property>
    </bean>

    <tx:annotation-driven transaction-manager="testTransactionManager" />

    <bean id="queueConnectionFactory" class="com.myproject.test.impl.QueueConnectionFactoryImpl">
        <constructor-arg type="String" name="host" value="231.7.7.7" />
        <constructor-arg type="int" name="port" value="9876" />
        <constructor-arg type="boolean" name="useJta" value="true" />
        <constructor-arg type="boolean" name="useCluster" value="true" />
    </bean>

    <bean id="testQueueManager" class="com.myproject.test.impl.QueueManagerImpl">
        <constructor-arg ref="queueConnectionFactory" />
        <constructor-arg name="queue" value="TestQueue" />
    </bean>

</beans>

This is my QueueConnectionFactoryImpl class:

package com.myproject.test.impl;

import java.util.HashMap;
import java.util.Map;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;

import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.api.jms.JMSFactoryType;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.remoting.impl.netty.TransportConstants;
import org.jboss.logging.Logger;

import com.myproject.test.QueueConnectionFactory;

public class QueueConnectionFactoryImpl implements QueueConnectionFactory {

    private String host;
    private int port;
    private ConnectionFactory connectionFactory;
    private Logger logger;
    private boolean useJta = false;

    public QueueConnectionFactoryImpl(String host, int port, boolean useJta)
    {
        this.useJta = useJta;
        createConnection(host, port);
    }

    public QueueConnectionFactoryImpl(String host, int port) {

        createConnection(host, port);
    }

    public QueueConnectionFactoryImpl(String host, int port, boolean useJta, boolean useCluster)
    {
        this.useJta = useJta;
        if(useCluster)
            createClusterConnection(host, port);
        else
            createConnection(host, port);
    }

    private void createConnection(String host, int port) {

        logger = Logger.getLogger(this.getClass());

        this.host = host;
        this.port = port;

        Map<String, Object> connectionParams = new HashMap<String, Object>();
        connectionParams.put(TransportConstants.PORT_PROP_NAME, port);
        connectionParams.put(TransportConstants.HOST_PROP_NAME, host);  

        TransportConfiguration transportConfiguration = new TransportConfiguration(NettyConnectorFactory.class.getName(),  connectionParams);

        JMSFactoryType jmsFType = JMSFactoryType.CF; 

        if(useJta)
            jmsFType = JMSFactoryType.XA_CF;

        connectionFactory = (ConnectionFactory) HornetQJMSClient.createConnectionFactoryWithoutHA(jmsFType, transportConfiguration);

    }

    private void createClusterConnection(String host, int port)
    {
        logger = Logger.getLogger(this.getClass());

        this.host = host;
        this.port = port;

        JMSFactoryType jmsFType = JMSFactoryType.CF; 

        if(useJta)
        jmsFType = JMSFactoryType.XA_CF;

        connectionFactory = (ConnectionFactory) HornetQJMSClient.createConnectionFactoryWithHA(new DiscoveryGroupConfiguration(host, port), jmsFType);

    }

    public QueueConnectionFactoryImpl(Object connectionFactory)
    {
        logger = Logger.getLogger(this.getClass());
        logger.debug("Object is: "+connectionFactory);
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public ConnectionFactory getConnectionFactory() {
        return connectionFactory;
    }

    public void setConnectionFactory(ConnectionFactory connectionFactory) {
        this.connectionFactory = connectionFactory;
    }

    public boolean isUseJta() {
        return useJta;
    }

    public void setUseJta(boolean useJta) {
        this.useJta = useJta;
    }

}

This is my QueueManagerImpl code

package com.myproject.test.impl;

import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Queue;

import org.apache.log4j.Logger;
import org.hornetq.api.jms.HornetQJMSClient;
import org.springframework.jms.core.JmsTemplate;

import com.myproject.test.QueueManager;

public class QueueManagerImpl implements QueueManager {

    private String queue;
    private ConnectionFactory connectionFactory;
    private JmsTemplate template;
    private Queue jmsQueue;
    private boolean useJta = false;
    private static final Logger log = Logger.getLogger(QueueManagerImpl.class);

    public QueueManagerImpl(QueueConnectionFactory queueConnectionFactory) {

        template = new JmsTemplate();
        connectionFactory = queueConnectionFactory.getConnectionFactory();
        try
        {
            this.setUseJta(queueConnectionFactory.isUseJta());
            template.setConnectionFactory(connectionFactory);
            template.setExplicitQosEnabled(true);
            template.setDeliveryMode(DeliveryMode.PERSISTENT);
            if(queueConnectionFactory.isUseJta())
                template.setSessionTransacted(true);
        }
        catch(Exception ex)
        {
            logError(ex.toString());
        }
    }

    public QueueManagerImpl(QueueConnectionFactory queueConnectionFactory, String queue) {

        this(queueConnectionFactory);
        setQueue(queue);
    }

    public String getQueue() {

        return queue;
    }

    public void setQueue(String queue) {

        try
        {
            jmsQueue = HornetQJMSClient.createQueue(queue);
            template.setDefaultDestination(jmsQueue);
            this.queue = queue;
        }
        catch(Exception ex)
        {
            logError(ex.toString());
        }
    }

    public JmsTemplate getTemplate() {
        return template;
    }

    public void logError(String error)
    {
        String details = String.format("Unable to connect to queue, details: %s ", error);
        String errorMessage = String.format("error...", details);
        log.error(errorMessage);
    }

    @Override
    public boolean isUseJta() {
        return useJta;
    }

    @Override
    public void setUseJta(boolean useJta) {
        this.useJta = useJta;
    }
}

The main thing is, the code above requires a ConenctionFactory object to be passed to the JmsTemplate in the QueueManagerImpl - template.setConnectionFactory(connectionFactory);.

I have tried a couple of ways to get this to work:

1) Adding the following to the jsm-service.xml file:

<bean id="myConnectionFactory" class="org.springframework.jms.connection.UserCredentialsConnectionFactoryAdapter">
    <property name="targetConnectionFactory" ref="queueConnectionFactory"/>
    <property name="username" value="myuser"/>
    <property name="password" value="myuser123"/>
</bean>     

This creates the following exception:

java.lang.IllegalStateException: Cannot convert value of type [com.myproject.test.impl.QueueConnectionFactoryImpl] to required type [javax.jms.ConnectionFactory] for property 'targetConnectionFactory': no matching editors or conversion strategy found

2) Changing the connection in the QueueConnectionFactoryImpl to :

org.hornetq.jms.client.HornetQConnectionFactory HQConnectionFactory = (HornetQConnectionFactory) HornetQJMSClient.createConnectionFactoryWithoutHA(jmsFType, transportConfiguration);

try {
    connectionFactory = (ConnectionFactory) HQConnectionFactory.createConnection("myuser","myuser123");
} catch (JMSException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
}

This doesn't work either. I get a exception of:

java.lang.ClassCastException: org.hornetq.jms.client.HornetQConnection cannot be cast to javax.jms.ConnectionFactory


In short, can anyone please help with a method of getting my above code to connect to a HornetQ by providing a username and password in some way so that I can still use a JmsTemplate.

1

1 Answers

1
votes

Just use your own connection factory. It worked for me:

The spring bean (5445 is the hornetq acceptor port):

<bean name="jmsConnectionFactory" class="messaging.jms.CustomHornetQJMSConnectionFactory">
    <constructor-arg name="ha" value="false" /> <!-- set true if you want support failover -->
    <constructor-arg name="commaSepratedServerUrls" value="127.0.0.1:5445" />
    <property name="username" value="admin" />
    <property name="password" value="admin" />
</bean>

Connection factory implementation (used HornetQJMSConnectionFactory from hornetq-jms-client and TransportConfiguration from hornetq-core-client):

public class CustomHornetQJMSConnectionFactory extends org.hornetq.jms.client.HornetQJMSConnectionFactory
{
    private static final long serialVersionUID = 1L;

    private String username;
    private String password;

    public CustomHornetQJMSConnectionFactory(boolean ha, String commaSepratedServerUrls)
    {
        super(ha, converToTransportConfigurations(commaSepratedServerUrls));
    }

    public static TransportConfiguration[] converToTransportConfigurations(String commaSepratedServerUrls)
    {   
        String [] serverUrls = commaSepratedServerUrls.split(",");
        TransportConfiguration[] transportconfigurations = new TransportConfiguration[serverUrls.length];
        for(int i = 0; i < serverUrls.length; i++)
        {
            String[] urlParts = serverUrls[i].split(":");
            HashMap<String, Object> map = new HashMap<String,Object>();
            map.put(TransportConstants.HOST_PROP_NAME, urlParts[0]);
            map.put(TransportConstants.PORT_PROP_NAME, urlParts[1]);
            transportconfigurations[i] = new TransportConfiguration(NettyConnectorFactory.class.getName(), map);
        }
        return transportconfigurations;
    }

    @Override
    public Connection createConnection() throws JMSException 
    {
        return super.createConnection(username, password);
    }
    public String getUsername() {
        return username;
    }
    public void setUsername(String username) {
        this.username = username;
    }
    public String getPassword() {
        return password;
    }
    public void setPassword(String password) {
        this.password = password;
    }
}

Now, if you give this connection factory to jmsTemplate, you can use user/pass for sending/consuming messages