Iam building an application using activeMQ where i have a producer and a consumer. In the consumer iam using a MessageListener to asynchronously listen to messages from producer which is done using a method called onMessage(Message message). But before consuming the messages i want to perform a condition check and then consume the messages. I do not want to use synchronous consumption of message because it will be against my design.
void initialize() throws JMSException {
this.connection = this.connectionFactory.createConnection();
this.connection.start();
final Session session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Destination destination = session.createQueue("testQ");
this.consumer = session.createConsumer(destination);
this.consumer.setMessageListener(this);
}
Check Condition here like detection of internet connection etc
public void onMessage(final Message message) {
Preconditions.checkNotNull(message);
if (!(message instanceof TextMessage)) {
_LOG.error("The message is not of type TextMessage but of type {} so we could not process", message.getClass().getSimpleName());
throw new IllegalArgumentException("This type '" + message.getClass().getSimpleName() + "' of message could not been handled");
}
try {
final String messageType = message.getStringProperty("messageType");
Preconditions.checkNotNull(messageType);
_LOG.info("The MessageType is {}", messageType);
final String msg = ((TextMessage) message).getText();
Preconditions.checkNotNull(msg);
_LOG.debug(msg);
process(messageType, msg);
} catch (final JMSException e) {
_LOG.error("We could not read the message", e);
}
}
Any code example would be great.