5
votes

I have a class that acquires an ActiveMQ connection using ActiveMQConnectionFactory.createConnection(). It then creates and owns the session, the destination and the consumer (queue) listening on that destination.

I call receive() or receive(millis) on the consumer to get my messages off the queue. In certain scenarios I have to kill (interrupt) the thread in which the receive method is being called. I try to close the session and the connection right after that. Unfortunately I am constantly getting an exception when calling close and the associated "ActiveMQ Transport" threads (and related connections to the broker) are staying alive. The exception I get is

org.myorg.awsiface.communication.MessagingException: Failed to close JMS connection
at
org.myorg.aws.communication.transport.JMSMessageTransport.cleanUp(JMSMessageTransport.java:253)
at
org.myorg.aws.communication.protocol.ContextFinishedProtocol.cleanUp(ContextFinishedProtocol.java:94)
at org.myorg.myservice.job.Job.run(Job.java:206)
Caused by: javax.jms.JMSException: java.io.InterruptedIOExceptio)
    at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:62)
at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1227)
at org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1219)
at org.apache.activemq.ActiveMQSession.asyncSendPacket(ActiveMQSession.java:1799)
at org.apache.activemq.ActiveMQMessageConsumer.doClose(ActiveMQMessageConsumer.java:636)
at org.apache.activemq.ActiveMQMessageConsumer.close(ActiveMQMessageConsumer.java:627)
at org.myorg.aws.communication.transport.JMSMessageTransport.cleanUp(JMSMessageTransport.java:232)
... 2 more
Caused by: java.io.InterruptedIOException
at org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:102)
at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40)
at org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
at org.apache.activemq.ActiveMQConnection.doAsyncSendPacket(ActiveMQConnection.java:1225)
... 7 more

I've tried the following connection URLs failover://tcp://broker.ip.address:61616 tcp://broker.ip.address:61616 failover://tcp://broker.ip.address:61616?trace=true&closeAsync=false

Also I've tried using the MessageConsumer::receiveNoWait() call and just executing my own Thread.sleep, but I always end up with the exception above whenever I call the following to close the connection

try {
    // I added the following two lines to see if the taskRunner was handling the threads - still no luck
    TaskRunnerFactory taskRunner = ((ActiveMQConnection)connection).getSessionTaskRunner();
    taskRunner.shutdown();

    if (producer != null) {
        producer.close();
    }
    if (consumer != null) {
        consumer.close();
    }
    session.close();
    if (connection instanceof ActiveMQConnection) {
        ActiveMQConnection amqConnection = (ActiveMQConnection) connection;
        amqConnection.stop();
    }           
    connection.stop();
    connection.close();
}
catch (ConnectionClosedException e) {
    // NOOP - this is ok
}
catch (Exception e) {
    throw new MessagingException("Failed to close JMS connection", e, log);
}
2

2 Answers

2
votes

I'm not sure why the close is not working for you. But you can accomplish what you want by using an asynchronous message listener.

Connection connection = connectionFactory.createConnection();
connection.start();

Session session = 
    connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Destination queue = session.createQueue("queueName");

Consumer consumer = session.createConsumer( queue );

consumer.setMessageListener( new MessageListener()
{
    public void onMessage( Message message )
    { 
         // whatever was after the receive() method call
    }
} );

Then there's no need to be interrupted. When you are done, close things out:

consumer.close();
session.close();
queue.close();
connection.close();
1
votes

One solution could be to set daemon flag on TCP ActiveMQ transport thread, e.g. tcp://URI:PORT?daemon=true

Check out ActiveMQ documentation: http://activemq.apache.org/tcp-transport-reference.html