1
votes

I want to do this:

  1. ActiveMQ sends a message to the client.

  2. The client sends acknowledgement to ActiveMQ after processing the message.

  3. If the client is down, or did not acknowledge the message, this message will remain in the queue. All other messages will not be delivered.

  4. The server sends the messages one at a time and the client processes the messages one at a time. Unless the first message has been acknowledged, the next message won't be delivered.

Is there a way to do this?

3
Actually this is how activeMQ works (in general), have you test it? - Evan P
In our case, it is not working this way. We got all the messages at once, not one by one. I suspect the server configuration may not be correct. Will test more and let you know. - ericyoung

3 Answers

0
votes

Regarding drawbacks in failure, this is how activeMQ works.

If the "failure/bad acknowledge" happens on the business logic layer then you should use transactions

Warning Not a valid code (pradigm)

// at your Listener
public void onMessage(Message message)
{
    if (isValid(message))
    {
        connection.ack();
        commit();
        return;
    }
    rollback();
}

Regarding the message throttling and persistence the following configuration of destination policy will assist you.

    <destinationPolicy>
        <policyMap>
          <policyEntries>
            <policyEntry topic=">" producerFlowControl="true" memoryLimit="10mb">
              <pendingSubscriberPolicy>
                <vmCursor />
              </pendingSubscriberPolicy>
            </policyEntry>
            <policyEntry queue=">" producerFlowControl="true" memoryLimit="10mb">
              <pendingQueuePolicy>
                <vmQueueCursor/>
              </pendingQueuePolicy>
            </policyEntry>
          </policyEntries>
        </policyMap>
    </destinationPolicy>

IMHO I would also use MySQL for this kind of persistence

    <persistenceAdapter>
        <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql-ds"/>
    </persistenceAdapter>

Also I advise you to use Camel to configure your throttler as you wish.

0
votes

Sounds like what you are looking for is a combination of a low or zero prefetch limit combined with an Ack mode of Client Ack or Individual Ack. A prefetch of zero would make you client essentially a pull based consumer, and setting the ack to client would then cause the broker to redeliver the message if your client failed to ack the message it had been dispatched should it disconnect.

0
votes

try the following...

  • set queue prefetch=0
  • use TRANSACTED acknowledgement
  • single thread the consumer (concurrentConsumers=1/maxConcurrentConsumers=1, etc)
  • set DispatchAsync = false