3
votes

How to change the redelivery policy for the embedded ActiveMQ when using with Spring Boot? I tried specifying FixedBackOff on the DefaultJmsListenerContainerFactory but it didn't help. Below is code I am using to initialize the jms factory bean. I have a message consumer processing incoming messages on a queue. During processing because of unavailable resource, I throw a checked exception. I am hoping to have the message redelivered for processing after a fixed interval.

Spring Boot : 1.5.7.Release

Java : 1.7

@Bean
public JmsListenerContainerFactory<?> publishFactory(ConnectionFactory connectionFactory,
                                                DefaultJmsListenerContainerFactoryConfigurer configurer) {
    DefaultJmsListenerContainerFactory factory = 
        new DefaultJmsListenerContainerFactory();

    factory.setBackOff(new FixedBackOff(5000, 5));

    // This provides all boot's default to this factory, including the message converter
    configurer.configure(factory, connectionFactory);

    // You could still override some of Boot's default if necessary.
    factory.setErrorHandler(new ErrorHandler() {

        @Override
        public void handleError(Throwable t) {
            LOG.error("Error occured in JMS transaction.", t);
        }

    }); 
    return factory;
}

Consumer Code:

@JmsListener(destination = "PublishQueue", containerFactory = "publishFactory")
@Transactional
public void receiveMessage(PublishData publishData) {
    LOG.info("Processing incoming message on publish queue with transaction id: " + publishData.getTransactionId());

    PublishUser user = new PublishUser();
    user.setPriority(1);
    user.setUserId(publishData.getUserId());

    LOG.trace("Trying to enroll in the publish lock queue for user: " + user);
    PublishLockQueue lockQueue = publishLockQueueService.createLock(user);
    if (lockQueue == null)
        throw new RuntimeException("Unable to create lock for publish");
    LOG.trace("Publish lock queue obtained with lock queue id: " + lockQueue.getId());

    try {
        publishLockQueueService.acquireLock(lockQueue.getId());
        LOG.trace("Acquired publish lock.");
    }
    catch (PublishLockQueueServiceException pex) {
        throw new RuntimeException(pex);
    }

    try {
        publishService.publish(publishData, lockQueue.getId());
        LOG.trace("Completed publish of changes.");

        sendPublishSuccessNotification(publishData);
    }
    finally {
        LOG.trace("Trying to release lock to publish.");
        publishLockQueueService.releaseLock(lockQueue.getId());
    }

    LOG.info("Publish has been completed for transaction id: " + publishData.getTransactionId());
}
1
Its the consumer, you need to use transacted acknowledge mode to let the consumer rollback on exception and let ActiveMQ be able to re-deliver the message to the same consumer or another consumer if you have multiple consumers running. You can however configure redelivery options on the ActiveMQ such as backoff etc. The error handler above is just a noop listener which cannot do very much other than logging.Claus Ibsen
can you share your code ?Makoton
@Makoton I have included the consumer code as well.user320587
@ClausIbsen The issue I am having is not related to redelivery of message but the interval. It's currently redelivering the message instantly once the exception occurs. What I would like is to have the message redelivered after a certain intervaluser320587

1 Answers

0
votes

@claus answerd: i tested it to work:

Its the consumer, you need to use transacted acknowledge mode to let the consumer rollback on exception and let ActiveMQ be able to re-deliver the message to the same consumer or another consumer if you have multiple consumers running. You can however configure redelivery options on the ActiveMQ such as backoff etc. The error handler above is just a noop listener which cannot do very much other than logging