1
votes

I have Apache Camel with ActiveMQ to do batch processing of size 1000. My understanding is apache camel will retrieve 1000 data FROM the queue and process it then it will push to TO queue once processing is done. My question is what happens if a system in which processing these data crashes before pushing the TO queue. Does this data remain in the ActiveMQ or it is lost forever.

In scenarios without Apache camel, we can configure the listener with different acknowledgment modes like CLIENT_ACKNOWLEDGE.

Is there anything like this in Apache camel. Below is my JMS Config class

public class JMSConfig {

    private Environment env;

    public JMSConfig(Environment env) {
        this.env = env;
    }

    @Bean
    public ConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
        return connectionFactory;
    }

    @Bean
    public JmsTransactionManager geJmsTransactionManager(final ConnectionFactory connectionFactory) {
        JmsTransactionManager jmsTransactionManager = new JmsTransactionManager();
        jmsTransactionManager.setConnectionFactory(connectionFactory);
        return jmsTransactionManager;
    }

    @Bean
    public JmsComponent createJmsComponent(final ConnectionFactory connectionFactory, final JmsTransactionManager jmsTransactionManager) {
        JmsComponent jmsComponent = JmsComponent.jmsComponentTransacted(connectionFactory, jmsTransactionManager);
        return jmsComponent;
    }
}
2
Camel JMS component has the acknowledgementMode property. Read the docs please.Strelok
@Strelok - Let me check. Thanks.CrazyCoder
@Strelok I tried using JmsComponent.jmsComponentClientAcknowledge but still messages are getting lost if system crash before handing off the exchange to TO queue.CrazyCoder

2 Answers

0
votes

I don't really get what you mean with "batch processing of size 1000", but it sounds like you consume 1000 messages, commit them to the broker and then continue to process them. And that is the source of the problem.

If you want to avoid message loss, you must consume transacted from the broker and not committing messages before they are "safe".

Most simple example:

from("activemq:queue:myInput")
.to("activemq:queue:myOutput")

If the JMS component is correctly configured to consume transacted, you can't lose messages in this route even if you kill your Camel application in the middle of processing.

That is because the broker keeps the messages until they are committed. And Camel commits the message not before it was able to hand it over to myOutput. Therefore this is totally safe.

But when you consume a batch of 1000 messages, I assume that they are committed right after consumption and therefore deleted on the broker. So they are no more persisted anywhere and only present in the memory of the Camel application. If it goes down, they are lost.

Notice that you don't need a Spring TxManager to consume transacted. Check out the Camel docs about transacted JMS, especially the point about transacted DMLC (Camel JMS uses Springs MessageListenerContainer). This reduces your configuration and you also don't need to flag your Camel route as transacted.

By the way, if your route contains stateful components like the Aggregator, things are getting complicated because these components introduce new transaction boundaries.

0
votes

Good morning,

I have been meaning to reply to your question for a while now, apologies for the delay.

I will be discussing Dups-OK message transfers with local JMS transactions. This means that the JMS send and receive sessions will use local JMS transactions. These two independent local transactions will be synchronized by a JTA transaction manager. With JTA synchronized transactions, the commit for the 'put' must be acknowledged before the 'get' is acknowledged.

But first, some thoughts on using ActiveMQs message prefetch in this scenario. By default, out of the box, A-MQ will 'prefetch' 1000 messages when a recieve() call is made on a session. This presumes that the client code is going to leave the message listener up and consume all 1000 messages. If the client code just reads 1 message, for example, and then closes the session, 999 messages have to be returned to the source queue. Or, if something bad is happening, transactions may be failing and thrashing the prefetch.

I can go on with this topic at length, but just know that the Spring JMS code does not play well with transactional Camel routes and message prefetch, and I will be disabling prefetch in my example. Note that message prefetching is a performance optimization, so getting a route working without prefetch is a good thing. You may then find out that the message prefetch can increase performance, but has very undesirable side effects.

You did not mention your platform, I will be discussing a Blueprint XML file. This is very similar to a standard Camel/Spring XML file:

Blueprint Camel Route

As I am working on an OSGi platform, I have Aries installed by default as the Platform Transaction Manager, and Aries is a JTA transaction manager. If you are in Spring Boot, for example, you may need to include a starter like:

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-jta-narayana</artifactId>
    </dependency>

In order to have a JTA PlatformTransactionManager (Arjuna, in this case). Do not instantiate a JmsTransactionManager and assign it as the PlatformTransactionManager.

Looking at the code, note:

<property name="brokerURL" value="tcp://10.0.0.170:61616?jms.prefetchPolicy.all=0"/>

This is disabling the message prefetching.

Note that I do not explicitly reference any transaction managers. Setting:

<property name="transacted" value="true"/>

will create a JmsTransactionManager for you.

Then the line:

 <transacted id="trans"/>

will start a JTA transaction context. The two local JMS transactions will be synchronized, and you may get duplicate messages, but you will not lose messages.

Add some logging:

<logger name="org.apache.camel" level="DEBUG" additivity="false">
    <appender-ref ref="STDOUT" />
</logger>    

<logger name="org.springframework.jms.connection.JmsTransactionManager" level="DEBUG" additivity="false">
    <appender-ref ref="STDOUT" />
</logger>

<logger name="org.apache.activemq.TransactionContext" level="DEBUG" additivity="false">
    <appender-ref ref="STDOUT" />
</logger>

Or maybe:

<logger name="com.arjuna" level="TRACE" additivity="false">
    <appender-ref ref="STDOUT" />
</logger>

If you are using Arjuna.

You should see the two commits back-to-back. This gives a very small window where a failure can result in duplicate message delivery.

I would highly recommend starting with my example as verbatim as possible. Even small deviations can have very unexpected results.