2
votes

How can we gracefully shutdown the daemon thread [ActiveMQ Session: ID:PC-63704-1472105244157-1:1:1]?

We have a spring-boot based web app which runs on tomcat 8. And using Spring JMS 4.2.4.RELEASE to send/receive message to/from ActiveMQ 5.13.0.

The activeMQ related libs are as below(All version are 5.13.0)

<properties>
    <activemq.version>5.13.0</activemq.version>
</properties>

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-client</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
</dependency>
<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-jms-pool</artifactId>
</dependency>

The spring-boot activeMQ settings are simply as follows

spring.activemq.mqtt.broker-url=tcp://127.0.0.1:1883
spring.activemq.broker-url=tcp://127.0.0.1:61616
spring.activemq.in-memory=false
spring.activemq.pooled=true

For sending message, we just simply autowired the JmsTemplate and send message out:

@Autowired
private JmsTemplate jmsTemplate;

MessageCreator messageCreator = new MessageCreator() {
    @Override
    public Message createMessage(Session session) throws JMSException {
        // return Message
    }
};

jmsTemplate.send(new ActiveMQTopic(destinationName), messageCreator);

For receive(listening) message, we using Spring DefaultMessageListenerContainer(DMLC)

@Bean(name = { "afterCreateQueue" })
public Queue afterCreateQueue() {
    return new ActiveMQQueue(properties.getId() + "_After_Create_Queue");
}

@Bean
public DefaultMessageListenerContainer afterCreateJmsListenerContainer(ConnectionFactory connectionFactory)
        throws KeyManagementException, NoSuchAlgorithmException, KeyStoreException {
    final DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
    defaultMessageListenerContainer.setConnectionFactory(connectionFactory);
    defaultMessageListenerContainer.setDestination(afterCreateQueue());

    final MessageListenerAdapter listener = new MessageListenerAdapter(afterCreateMessageDelegate());
    listener.setDefaultListenerMethod("handleMessage");

    defaultMessageListenerContainer.setMessageListener(listener);
    return defaultMessageListenerContainer;
}

When we shutdown tomcat, there will have warning log as below

22-Aug-2016 17:23:32.870 WARNING [localhost-startStop-2] org.apache.catalina.loader.WebappClassLoaderBase.clearReferencesThreads The web application [demo] appears to have started a thread named [ActiveMQ Session: ID:PC-63704-1472105244157-1:1:1] but has failed to stop it. This is very likely to create a memory leak. Stack trace of thread:
java.lang.Object.wait(Native Method)
java.lang.Object.wait(Object.java:502)
org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:119)
org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:42)
22-Aug-2016 17:23:32.872 WARNING [localhost-startStop-2] org.apache.catalina.loader.WebappClassLoaderBase.clearReferencesThreads The web application [demo] appears to have started a thread named [ActiveMQ InactivityMonitor Worker] but has failed to stop it. This is very likely to create a memory leak. Stack trace of thread:
sun.misc.Unsafe.park(Native Method)
java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
java.util.concurrent.SynchronousQueue$TransferStack.awaitFulfill(SynchronousQueue.java:460)
java.util.concurrent.SynchronousQueue$TransferStack.transfer(SynchronousQueue.java:362)
java.util.concurrent.SynchronousQueue.poll(SynchronousQueue.java:941)
java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1066)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1127)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)

If I close the application, the tomcat thread stack will have a daemon thread [ActiveMQ Session: ID:PC-63704-1472105244157-1:1:1] hanging there

org.apache.catalina.startup.Bootstrap at localhost:63675    
Thread [main] (Running) 
Daemon Thread [NioBlockingSelector.BlockPoller-1] (Running) 
Daemon Thread [NioBlockingSelector.BlockPoller-2] (Running) 
Daemon Thread [NioBlockingSelector.BlockPoller-3] (Running) 
Daemon Thread [AsyncLogger-1] (Running) 
Thread [ActiveMQ Session: ID:PC-63704-1472105244157-1:1:1] (Running)    
Daemon Thread [ContainerBackgroundProcessor[StandardEngine[Catalina]]] (Running)    
Daemon Thread [http-nio-1010-ClientPoller-0] (Running)  
Daemon Thread [http-nio-1010-ClientPoller-1] (Running)  
Daemon Thread [http-nio-1010-Acceptor-0] (Running)  
Daemon Thread [http-nio-1443-ClientPoller-0] (Running)  
Daemon Thread [http-nio-1443-ClientPoller-1] (Running)  
Daemon Thread [http-nio-1443-Acceptor-0] (Running)  
Daemon Thread [ajp-nio-8019-ClientPoller-0] (Running)   
Daemon Thread [ajp-nio-8019-ClientPoller-1] (Running)   
Daemon Thread [ajp-nio-8019-Acceptor-0] (Running)   
Daemon Thread [http-nio-1010-exec-1] (Running)  

The stack of this [ActiveMQ Session: ID:PC-63704-1472105244157-1:1:1] thread as below

Thread [ActiveMQ Session: ID:MoboLink-63704-1472105244157-1:1:1] (Suspended)    
waiting for: Object  (id=96)    
Object.wait(long) line: not available [native method]   
Object.wait() line: 502 
DedicatedTaskRunner.runTask() line: 119 
DedicatedTaskRunner$1.run() line: 42

We have already handled the shutdown of DefaultMessageListenerContainer and also PooledConnectionFactory in ContextClosedEvent as below, but it still has the [ActiveMQ Session: ID:PC-63704-1472105244157-1:1:1] thread hanging there.

@Autowired
private DefaultMessageListenerContainer afterCreateJmsListenerContainer;
@Autowired
private PooledConnectionFactory pooledJmsConnectionFactory;

@EventListener
private void onContextClosed(ContextClosedEvent event)
{
    try
    {
        if (afterCreateJmsListenerContainer != null) {
            afterCreateJmsListenerContainer.shutdown();
        }

        if (pooledJmsConnectionFactory != null) {
            pooledJmsConnectionFactory.clear();
        }
    } catch (Exception e)
    {
        logger.error("Exception", e);
    }
}

Are there any other settings we have miss?

1

1 Answers

1
votes

See receiveTimeout setting in the DefaultMessageListenerContainer , if it is -1 the listener thread is blocked till it gets a message, otherwise every timeout period it will check if consumer is stopped.The default value is 1000ms. Source code

The setting will also stop that particular listener if no messages received in the period after specified retries, other settings could be used to control number of active listeners. See here and here.