2
votes

I'm actually looking for an advisory or any other alternative support from ActiveMQ, so that i could be notified when the MessageListener associated with a Consumer finishes processing the message.

The MessageDelivered advisory seems to notify as soon as the broker gets the message. Also, the MessageConsumed advisory claims to notify when a consumer receives the message.

------------------------UPDATE------------------------------

Please find the code snippet below:

public class SampleListener implements MessageListener {

    private Session session;

    public SampleListener(Session session) {
        this.session = session; 
    }

    public void onMessage(Message message) {
        try {
             // do something
             session.commit();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

public class SampleConsumer {

    private boolean stopConnection = false;

    public static void main(String[] args) {
        new SampleConsumer().start();
    }

    public void start() {
            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            Connection connection;
            try {
                connection = connectionFactory.createConnection();
                connection.start();
                Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
                Destination destination = session.createTopic("test");
                MessageConsumer messageConsumer = session.createConsumer(destination);
                messageConsumer.setMessageListener(new SampleListener(session));

                try {
                    synchronized (this) {
                        while (!stopConnection) {
                            wait();
                        }
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    session.close();
                    connection.close();
                }

            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    public void stop() {
        synchronized (this) {
            stopConnection = true;
            notify();
        }
    }
}


public class SampleProducer implements MessageListener {

    private boolean messageDelivered;

    @Test
    public void shouldTestSomething() throws JMSException, InterruptedException {
        producerConnection = new ActiveMQConnectionFactory("tcp://localhost:61616").createConnection();
        producerConnection.start();
        Session session = producerConnection.createSession(true, SESSION_TRANSACTED);

        Destination destination = session.createTopic("test");
        MessageConsumer advisoryConsumer = session.createConsumer(AdvisorySupport.getMessageConsumedAdvisoryTopic(destination));
        advisoryConsumer.setMessageListener(this);

        Message message = session.createTextMessage("Hi");
        Destination destination = session.createTopic("test");
        MessageProducer producer = session.createProducer(destination);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        producer.send(message);
        session.commit();

        synchronized (this) {
            while (!messageDelivered) {
                wait();
            }
        }

        session.close();

        // some assertions
    }

    public void onMessage(Message message) {
        // do something

        synchronized (this) {
            messageDelivered = true;
            notify();
        }
    }
}
1
what happens if you set ack-mode as transacted, MessageConsumed advisory still happens early? According to closing reason of issues.apache.org/jira/browse/AMQ-3361 it shouldnt.Aksel Willgert
Aksel, the ack-mode is already transacted. While MessageDelivered advisory happened early, i couldn't get any notification with MessageConsumed advisory. I had the PolicyEntry parameter to switch on MessageConsumed, but still there wasn't any message enqueueing for the MessageConsumed advisory.Ramanathan B
im not following. is the problem that you think message consumed will happen to early or that you dont manage to enable the advisory for it?Aksel Willgert
the latter. i do see MessageConsumed advisory enlisted in the Topics section of the admin console, but nothing happens to this Topic i.e the listener configured to listen to this advisory never gets invoked. Whereas, when listening to MessageDelivered advisory, notification happens before consumer finishes consuming the message.Ramanathan B
You are invoking the session.commit() in onMessage() ? Otherwise im out of guesses and would prolly need to see some code to answer betterAksel Willgert

1 Answers

1
votes

Some of the advisories are not enabled by default. Se link:

http://activemq.apache.org/advisory-message.html

Disabled Advisories can be enabled by adding a policyEntry to the activemq.xml http://activemq.apache.org/xml-configuration.html

Add the following to activemq.xml:

     <destinationPolicy>
        <policyMap>
          <policyEntries>
            <policyEntry topic=">" advisoryForConsumed="true" />
            <policyEntry  queue=">" advisoryForConsumed="true" />
            ..            
          </policyEntries>
        </policyMap>
    </destinationPolicy>

After enabling the advisory and invoking session.commit() in the consumer the advisory would be delivered.

If you are using embedded broker you can just place activemq.xml on the classpath and start the broker using:

BrokerService broker = BrokerFactory.createBroker("xbean:activemq.xml",true);

(I didnt find any way to enable disabled advisories without using activemq.xml).