2
votes

I want to delete a specific queue when consumer's down time in activemq. I don't want to send queue message when consumer available time.Give me some suggestions.Thanks in advance.

This is my publisher class

public class MessageHandler implements MessageListener {
    private static String url = "tcp://localhost:61616";
    private Session session;

    private MessageProducer producer;
    private MessageConsumer consumer;
    private Connection connection;
    private Map<String, String> messageStatus = new HashMap<String, String>();
    public void setup(String systemCode, String funCode, boolean synchronous) {
        try {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
            if(synchronous) {
                connection = connectionFactory.createConnection();
                connection.start();
                session = connection.createSession(false, Session.SESSION_TRANSACTED);
            } else {
                connection = connectionFactory.createConnection();
                connection.start();
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            }
            Destination requestQueue = session.createQueue(systemCode + "-" + funCode + "-request");
            producer = session.createProducer(requestQueue);
            Destination responseQueue = session.createQueue(systemCode + "-" + funCode + "-response");
            consumer = session.createConsumer(responseQueue);
            consumer.setMessageListener(this);
        } catch(JMSException e) {
            throw new RuntimeException("Failed to initialize MessageHandler", e);
        }
    }

    public String sendMessage(String parameter) {
        String response = null;
        try {
            TextMessage message = session.createTextMessage(parameter);
            String messageId = UUID.randomUUID().toString();
            message.setJMSCorrelationID(messageId);
            producer.send(message);
            boolean carryon = true; 
            long start = System.currentTimeMillis();
            long end = start + 10 * 1000;
            while (System.currentTimeMillis() < end && carryon) {
                if(checkStatus(messageId)) {
                    carryon = false;
                }
            }
            response = getMessage(messageId);
            stop();
        } catch(JMSException e) {
            try {
                stop();
            } catch (JMSException e1) {
                throw new RuntimeException("Failed to send Message", e);
            }
            throw new RuntimeException("Failed to send Message", e);
        } 
        return response;
    }

    private String getMessage(String correlationId) {
        synchronized (this) {
            if (messageStatus.containsKey(correlationId)) {
                String status = messageStatus.get(correlationId);
                messageStatus.remove(correlationId);
                return status;
            } else {
                return null;
            }
        }
    }

    private boolean checkStatus(String messageId) {
        return messageStatus.containsKey(messageId);
    }

    public void onMessage(Message message) {
        synchronized (this) {
            try {
                if (message instanceof TextMessage) {
                    String originalMessageId = message.getJMSCorrelationID();
                    String responseText = ((TextMessage) message).getText();
                    messageStatus.put(originalMessageId, responseText);
                }
            } catch (JMSException e) {
                throw new RuntimeException("Failed to receipt Message", e);
            }
        }
    }

    public void stop() throws JMSException {
        session.close();
        connection.close();
    }

    public static void main(String[] args) throws Exception {
        System.out.println("Caller Client.....");
        MessageHandler handler = new MessageHandler();
        handler.setup("P001", "FUC0001", true);
        String response = handler.sendMessage("xxxxxx");
        System.out.println(response);
    }   
}

When i use Session.SESSION_TRANSACTED, i can't subscribe from my listener class and there is no message in queue.My goal is when there is no consumer,i want to delete queue and if there any consumer,they can subscribe.

2
Please explain in more detail what you are trying to accomplish and what you've tried so far, it's not clear what you want to do.Tim Bish
@TimBish I'm updated my question.ThanksKyaw Bo

2 Answers

0
votes

My Requirement

For Synchronous Process

Client send the message to the server, but MessageLestener is not active/down, I want to remove this specific message from the queue.

How to delete specific message from queue by using messageid?

I also have like your problem, I provide the resuable function. You just need to pass MessageId and Queue name. It is ok for me.

private void deleteMessage(String messageId, String queueName) {
    try {
         JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi");
         JMXConnector jmxc = JMXConnectorFactory.connect(url);
         MBeanServerConnection conn = jmxc.getMBeanServerConnection();
         ObjectName name = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost");
         BrokerViewMBean proxy = (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(conn, name, BrokerViewMBean.class, true);
         for (ObjectName queue : proxy.getQueues()) {  
            QueueViewMBean queueBean = (QueueViewMBean) MBeanServerInvocationHandler.newProxyInstance(conn, queue, QueueViewMBean.class, true);
            if(queueBean.getName().equals(queueName)) {
                System.out.println("Deleted : " + messageId);
                queueBean.removeMessage(messageId);
                return;
            }
         }
    } catch(Exception e) {
        e.printStackTrace();
    }
}

I use activemq-all-5.8.0.jar.

0
votes

First of the session creation for transacted sessions is wrong, the session transacted flag needs to be set to true, yours is false.

It sounds like you want to delete inactive destinations which you can do via a destination policy in ActiveMQ called gcInactiveDestinations. With that option the destination that sits empty for some time with no consumers registered will be deleted. You can also use advisory messages from the broker to learn about consumers coming and going.

It's still not really clear what you are trying to accomplish so this is the best info I can give you until you clarify the problem you are trying to solve.