1
votes

I have the following setup: Tomcat with embedded ActiveMQ

I use Spring integrations's JmsMessageDrivenChannelAdapter to create adapter which consumes messages from ActiveMQ queue as follows:

 Jms.messageDrivenChannelAdapter(jmsConnectionFactory, TransactedMessageListenerContainer.class)
      .destination(destination)
      .errorChannel(errorChannel)
     .get();

where TransactedMessageListenerContainer is

public class TransactedMessageListenerContainer extends DefaultMessageListenerContainer {

     public TransactedMessageListenerContainer() {
         this.setSessionTransacted(true);
     } 
}

If exception occurs, ActiveMQ broker doesn't redeliver message accordingly.

When I use org.apache.activemq.broker.BrokerService for simple integration tests, JMS messages are redelivered, i.e. I can achieve retry mechanism

How can I achieve the same for the Tomcat with ActiveMQ?

I found here: http://activemq.apache.org/tomcat.html, that manually integrating ActiveMQ with Tomcat does allow for Topic, Queue, and ConnectionFactory injection but does not support transactional sending and delivery, but I am not sure whether there is some workaround or not

Thanks for help!

UPDATE: I also rethrow exception in error handler as follows:

@Bean
public IntegrationFlow errorHandlingFlow() {
    return IntegrationFlows.from(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
             .handle(this::errorMessageHandler)
             .get();
}

public void errorMessageHandler(Message<?> message) {
    log.warn("handling error message");
    log.warn("headers: " + message.getHeaders().toString());
    log.warn("payload: " + message.getPayload().toString());
    MessagingException exception = (MessagingException) message.getPayload();
    log.warn("original payload: " + exception.getFailedMessage().getPayload());
    throw exception; // make JMS broker redeliver
}
1

1 Answers

0
votes

If you would like to have rallback and redelivery, your error flow must rethrow an exception and not swallow like it is with default errorChannel. You can find similar questions and answers for them here around.

UPDATE

Well, not sure where is your problem I have a test-case similar to yours:

    @Autowired
    private MessageChannel errorChannel;

    @Bean
    public IntegrationFlow jmsMessageDrivenFlow() {
        return IntegrationFlows
                .from(Jms.messageDrivenChannelAdapter(this.jmsConnectionFactory)
                        .configureListenerContainer(c -> c.sessionTransacted(true))
                        .errorChannel(this.errorChannel)
                        .destination("jmsMessageDriver"))
                .<String, String>transform(p -> {
                    throw new RuntimeException("intentional");
                })
                .get();
    }

    @Bean
    public IntegrationFlow errorHandlingFlow() {
        return IntegrationFlows.from(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
                .handle(m -> {
                    MessagingException exception = (MessagingException) m.getPayload();
                    Message<?> failedMessage = exception.getFailedMessage();
                    throw exception;
                })
                .get();
    }

and on the redelivery I see these headers in the failedMessage:

"jms_redelivered" -> "true"
"JMSXDeliveryCount" -> "2"

And yes, we definitely use embedded ActiveMQ in our tests. The JMS ConnectionFactory is supplied for Spring Boot ActiveMQAutoConfiguration, by the way.