I'm trying to get a Camel route JMS->HTTP4 with Transaction working but the message is not transferred to ActiveMQ.DLQ when an Exception and I can't see why.
The example below illustrates what could happen if the server of the REST service is down and route cannot be delivered.
I get the correct Exception :
2018-01-18 12:30:50:962-[Camel (LRM-Relay) thread #5 - JmsConsumer[myIncomingQueue]] WARN o.a.c.s.s.TransactionErrorHandler - Transaction rollback (0x30a1c779) redelivered(false) for (MessageId: ID:MGR-MacBook-Pro.local-51837-1516262355358-4:2:1:1:16 on ExchangeId: ID-MGR-MacBook-Pro-local-1516275047663-0-1) caught: java.net.ConnectException: Cannot connect to CORE REST
2018-01-18 12:30:50:965-[Camel (LRM-Relay) thread #5 - JmsConsumer[myIncomingQueue]] WARN o.a.c.c.j.EndpointMessageListener - Execution of JMS message listener failed. Caused by: [org.apache.camel.RuntimeCamelException - java.net.ConnectException: Cannot connect to CORE REST]
org.apache.camel.RuntimeCamelException: java.net.ConnectException: Cannot connect to CORE REST …
But the message is consumed and removed from queue. My assumption was that using transaction/transacted Camel and AMQ would resolve this and move the message to ActiveMQ.DLQ.
I have read chapter 9 of Camel in Action 1st Ed. and googled but haven't found any solution to my problem.
I know I can create/define my own TransactionErrorHandler() and store messages in a queue of my choice but I was under the impression that this was default when using transacted…
I'm using a standalone ActiveMQ 5.15.2 vanilla installed and config.
Camel 2.20.1
Java 8_144 on MacOS 10.13.2
My config:
@Configuration
public class Config {
/**
* The Camel context.
*/
final CamelContext camelContext;
/**
* The Broker url.
*/
@Value("${jms.broker.url}")
private String brokerURL;
/**
* Instantiates a new Config.
*
* @param camelContext the sisyfos context
* @param metricRegistry the metric registry
*/
@Autowired
public Config(final CamelContext camelContext, final MetricRegistry metricRegistry) {
this.camelContext = camelContext;
this.metricRegistry = metricRegistry;
}
@Bean
public ActiveMQConnectionFactory activeMQConnectionFactory() {
final ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory();
activeMQConnectionFactory.setBrokerURL(brokerURL);
return activeMQConnectionFactory;
}
/**
* Pooled connection factory pooled connection factory.
*
* @return the pooled connection factory
*/
@Bean
@Primary
public PooledConnectionFactory pooledConnectionFactory() {
final PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory();
pooledConnectionFactory.setMaxConnections(8);
pooledConnectionFactory.setMaximumActiveSessionPerConnection(500);
pooledConnectionFactory.setConnectionFactory(activeMQConnectionFactory());
return pooledConnectionFactory;
}
/**
* Jms configuration jms configuration.
*
* @return the jms configuration
*/
@Bean
public JmsConfiguration jmsConfiguration() {
final JmsConfiguration jmsConfiguration = new JmsConfiguration();
jmsConfiguration.setConnectionFactory(pooledConnectionFactory());
jmsConfiguration.setTransacted(true);
jmsConfiguration.setTransactionManager(transactionManager());
jmsConfiguration.setConcurrentConsumers(10);
return jmsConfiguration;
}
/**
* Transaction manager jms transaction manager.
*
* @return the jms transaction manager
*/
@Bean
public JmsTransactionManager transactionManager() {
final JmsTransactionManager transactionManager = new JmsTransactionManager();
transactionManager.setConnectionFactory(pooledConnectionFactory());
return transactionManager;
}
/**
* Active mq component active mq component.
*
* @return the active mq component
*/
@Bean
public ActiveMQComponent activeMQComponent(JmsConfiguration jmsConfiguration,
PooledConnectionFactory pooledConnectionFactory,
JmsTransactionManager transactionManager) {
final ActiveMQComponent activeMQComponent = new ActiveMQComponent();
activeMQComponent.setConfiguration(jmsConfiguration);
activeMQComponent.setTransacted(true);
activeMQComponent.setUsePooledConnection(true);
activeMQComponent.setConnectionFactory(pooledConnectionFactory);
activeMQComponent.setTransactionManager(transactionManager);
return activeMQComponent;
}
}
My Route:
@Component
public class SendToCore extends SpringRouteBuilder {
@Override
public void configure() throws Exception {
Logger.getLogger(SendToCore.class).info("Sending to CORE");
//No retries if first fails due to connection error
interceptSendToEndpoint("http4:*")
.choice()
.when(header("JMSRedelivered").isEqualTo("false"))
.throwException(new ConnectException("Cannot connect to CORE REST"))
.end();
from("activemq:queue:myIncomingQueue")
.transacted()
.setHeader(Exchange.CONTENT_TYPE, constant("application/xml"))
.to("http4:localhost/myRESTservice")
.log("${header.CamelHttpResponseCode}")
.end();
}
}
You might find redundant declarations in some of the beans and that is me trying to resolve the issue…
Adding a link to a Github repo of mine with a small test project illustrating this:
https://github.com/hakuseki/transacted