1
votes

I'm using spring integration jms channel to consume messages from the queue and process it.

Here is my inbound-channel-config.xml

<jms:message-driven-channel-adapter id="jmsIn"
            destination="requestQueue"
            channel="routingChannel" 
            connection-factory="cachingConnectionFactory" 
            error-channel="errorChannel"
            concurrent-consumers="${jms_adapter_concurrent_consumers}" />

Here when i set concurrent-consumers to a value greater than 1, the messages that i consume gets corrupted while processing. I'm consuming XML and Json messages from the queue and while parsing the data, i could see that some of its contents are changed and set to some random value.

The above config works fine only when concurrent-consumers value is set to 1.

My question is, do i have to manually Synchronize (make thread safe) my code when i set concurrent-consumers to a value greater than 1?

2
minimal reproducible example is mandatory. Guessing how your code looks like does not helpefekctive

2 Answers

3
votes

Yes, your code must be thread safe. That's the case for any multi threaded code.

However, synchronizing the whole thing will effectively defeat concurrency. It's better to use stateless code (no fields), or use thread-safe variables (AtomicInteger and friends), or limit synchronization to small blocks.

If you synchronize the whole listener code, only one container thread can process at a time.

0
votes

Vishal and I work together.

I have to mention that the caching connection factory is being used, and I noticed that in this post you discouraged its use.

    <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
    <property name="targetConnectionFactory" ref="MQConnectionFactory" />
    <property name="sessionCacheSize" value="10"/>
</bean>


@Bean(name="MQConnectionFactory")
public ConnectionFactory connectionFactory() {

    if (factory == null) {
        factory = new MQConnectionFactory();
        try {
            factory.setHostName(env.getRequiredProperty(HOST));
            factory.setPort(Integer.parseInt(env.getRequiredProperty(PORT)));            
            factory.setQueueManager(env.getRequiredProperty(QUEUE_MANAGER));
            factory.setChannel(env.getRequiredProperty(CHANNEL));
            factory.setTransportType(WMQConstants.WMQ_CM_CLIENT);  

        } catch (JMSException e) {
            throw new RuntimeException(e);
        }
    }
    return factory;
}

Could this be causing the issues? It seems so far that the issue happens in Spring Integration's default message convertors, as in some cases parts of the payload is "".

Cheers Kris