0
votes

I'm trying to find a solution to a problem to guarantee that a message is only over processed completely by a single consumer.

Lots of messages on a queue and a number of consumers read messages and process them writing out to a database. My messages are transacted so that if a consumer dies then the message goes back onto the queue for another consumer to process.

We have to have an active/passive configuration for activemq and this is causing the issue. If I stop the active activemq then the consumer reconnects to the other activemq as I am using the failover transport. This is fine but during the reconnect, the message is put back on the queueand the consumer is not made aware of this reconnection and continues to process. This leads to the situation where 2 consumers process the same message.

I would have liked to use a distributed transaction manager and this may happen in the future but for now I need a different solution.

If I don't use failover transport then I can hook into a JMSException listener and abort the consumer. Unfortunately this does not work when using failover transport.

I would like either to use failover transport for the initial connect (discover which of the activemqs are running) and then force failover not to reconnect... or use a different transport that allows a list of server to try but doesn't reconnect... or find away to listen to the reconnect.

Note that this happens sometimes with just one server using failover (reconnect).

I could do my over initial connect logic (hunting for the active server) but was going to check if there is another option

1
It just occurred to me that I could use discovery to get a valid connection on startupPaul

1 Answers

0
votes

You can listen to transport events on the ActiveMQConnection by using a listener:

    connection = (ActiveMQConnection)factory.createConnection();
    connection.addTransportListener(new TransportListener() {
    public void onCommand(Object command) {
        // Do something 
    }

    public void onException(IOException error) {
        // Do something 
    }

    public void transportInterupted() {
        // Do something 
    }

    public void transportResumed() {
        // Do something 
    }
});
connection.start();

Note that in this example the listener is set on the Connection directly; however, you can set an instance on the ActiveMQConnectionFactory which will be assigned to each Connection instance that it creates.