I want to delete a specific queue when consumer's down time in activemq. I don't want to send queue message when consumer available time.Give me some suggestions.Thanks in advance.
This is my publisher class
public class MessageHandler implements MessageListener {
private static String url = "tcp://localhost:61616";
private Session session;
private MessageProducer producer;
private MessageConsumer consumer;
private Connection connection;
private Map<String, String> messageStatus = new HashMap<String, String>();
public void setup(String systemCode, String funCode, boolean synchronous) {
try {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
if(synchronous) {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.SESSION_TRANSACTED);
} else {
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
Destination requestQueue = session.createQueue(systemCode + "-" + funCode + "-request");
producer = session.createProducer(requestQueue);
Destination responseQueue = session.createQueue(systemCode + "-" + funCode + "-response");
consumer = session.createConsumer(responseQueue);
consumer.setMessageListener(this);
} catch(JMSException e) {
throw new RuntimeException("Failed to initialize MessageHandler", e);
}
}
public String sendMessage(String parameter) {
String response = null;
try {
TextMessage message = session.createTextMessage(parameter);
String messageId = UUID.randomUUID().toString();
message.setJMSCorrelationID(messageId);
producer.send(message);
boolean carryon = true;
long start = System.currentTimeMillis();
long end = start + 10 * 1000;
while (System.currentTimeMillis() < end && carryon) {
if(checkStatus(messageId)) {
carryon = false;
}
}
response = getMessage(messageId);
stop();
} catch(JMSException e) {
try {
stop();
} catch (JMSException e1) {
throw new RuntimeException("Failed to send Message", e);
}
throw new RuntimeException("Failed to send Message", e);
}
return response;
}
private String getMessage(String correlationId) {
synchronized (this) {
if (messageStatus.containsKey(correlationId)) {
String status = messageStatus.get(correlationId);
messageStatus.remove(correlationId);
return status;
} else {
return null;
}
}
}
private boolean checkStatus(String messageId) {
return messageStatus.containsKey(messageId);
}
public void onMessage(Message message) {
synchronized (this) {
try {
if (message instanceof TextMessage) {
String originalMessageId = message.getJMSCorrelationID();
String responseText = ((TextMessage) message).getText();
messageStatus.put(originalMessageId, responseText);
}
} catch (JMSException e) {
throw new RuntimeException("Failed to receipt Message", e);
}
}
}
public void stop() throws JMSException {
session.close();
connection.close();
}
public static void main(String[] args) throws Exception {
System.out.println("Caller Client.....");
MessageHandler handler = new MessageHandler();
handler.setup("P001", "FUC0001", true);
String response = handler.sendMessage("xxxxxx");
System.out.println(response);
}
}
When i use Session.SESSION_TRANSACTED
, i can't subscribe from my listener class and there is no message in queue.My goal is when there is no consumer,i want to delete queue and if there any consumer,they can subscribe.