0
votes

I am migration spring batch partition jobs (XML configuration) from Spring Batch 2.2.7 / Spring 3.2 to Spring Batch 4.0.2 / Spring 5.0.12. The war file is deployed on Wildfly 11 with ActiveMQ Artemis. The overall approach uses x clustered application servers and divides partitioned jobs into y partitions with each server having y/x listeners so the load is evenly distributed around the cluster.

We utilize 1 queue for outgoing messages and 1 queue for incoming messages across all partitioned batch jobs. All jobs share a single JmsInboundGateway like:

<int-jms:inbound-gateway 
        id="springbatch.master.inbound.gateway" 
        connection-factory="springbatch.listener.jmsConnectionFactory" 
        request-channel="springbatch.slave.jms.request" 
        request-destination="springbatch.partition.jms.requestsQueue" 
        concurrent-consumers="${springbatch.partition.concurrent.consumers}" 
        max-concurrent-consumers="${springbatch.partition.concurrent.maxconsumers}" 
        max-messages-per-task="${springbatch.partition.concurrent.maxmessagespertask}"/>

    <int:service-activator 
        input-channel="springbatch.slave.jms.request" 
        output-channel="springbatch.slave.jms.response" 
        ref="springbatch.stepExecutionRequestHandler"/>   

Each Job has an outbound gateway defined like:

<int-jms:outbound-gateway 
    connection-factory="springbatch.jmsConnectionFactory" 
    request-channel="partitioned.jms.requests" 
    request-destination="partition.jms.requestsQueue" 
    reply-channel="partitioned.jms.reply" 
    reply-destination="partition.jms.repliesQueue"
    receive-timeout="partitioned.timeout}"
    correlation-key="JMSCorrelationID" >
    <int-jms:reply-listener cache-level="0" />        
</int-jms:outbound-gateway>

<int:aggregator 
    input-channel="partitioned.jms.reply" 
    ref="partitioned.jms.handler"/>

Based on integration schema changes we removed the JMSCorrelationId and reply listener from the inbound gateway.

For the initial integration effort I only define the inbound gateway and Wildfly is throwing the following exception:

[org.springframework.jms.listener.DefaultMessageListenerContainer] (springbatch.master.inbound.gateway.container-2) Setup of JMS message listener invoker failed for destination 'ActiveMQQueue[partitionRequestQueue]' - trying to recover. Cause: Only allowed one session per connection. See the J2EE spec, e.g. J2EE1.4 Section 6.6: javax.jms.IllegalStateException: Only allowed one session per connection. See the J2EE spec, e.g. J2EE1.4 Section 6.6     
    at org.apache.activemq.artemis.ra.ActiveMQRASessionFactoryImpl.allocateConnection(ActiveMQRASessionFactoryImpl.java:817)    
    at org.apache.activemq.artemis.ra.ActiveMQRASessionFactoryImpl.createSession(ActiveMQRASessionFactoryImpl.java:531)     
    at org.springframework.jms.support.JmsAccessor.createSession(JmsAccessor.java:208)  
    at org.springframework.jms.listener.DefaultMessageListenerContainer.access$1500(DefaultMessageListenerContainer.java:125)

Is there a different approach to defining number of listeners becauseof this error

Only allowed one session per connection. See the J2EE spec, e.g. J2EE1.4 Section 6.6

UPDATE to Questions Below

When Wildfly starts up I see this line

WFLYJCA0002: Bound JCA ConnectionFactory [java:/JmsXA]

I am using java:/JmsXA

This is a Spring Boot Application and it is working without errors in the logs for the 5 JmsListeners defined.

2019-01-16 06:30:54,667 DEBUG DefaultMessageListenerContainer] (ServerService Thread Pool -- 67) Resumed paused task: org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker@a877e28

When I add the definition of the Jms-inbound-gateway, I start seeing the errors listed above.

Schema Question

The XML defining the inbound gateway has the following schema defintion

http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd

The previous code (batch 2.2) had the following schema definition:

http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.2.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms-2.2.xsd

I just updated with

http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-5.0.xsd
http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms-5.0.xsd

and I can add JMSCoordinationID back in so that issue is solved.

However, I still have the with ActiveMQ error on server startup when I inclde jms-inbound-gateway.

1
What JNDI name are you using for the JMS connection factory? It appears you're pointing to a JCA-based connection factory which won't allow more than one session per connection. Could you try pointing to a non-JCA connection factory instead?Justin Bertram
Based on integration schema changes we removed the JMSCorrelationId and reply listener from the inbound gateway. You are using a single queue shared between all partitioned jobs and at the same time removing the correlation Id. How would request messages be correlated with reply messages in this case? Can you share more details about which "integration schema" have you based this decision upon?Mahmoud Ben Hassine
updated question with this informationMike Rother

1 Answers

0
votes

To resolve the issue with the J2ee Spec, I needed to add cache-level=0 to the inbound gateway.

<int-jms:inbound-gateway 
    id="springbatch.master.inbound.gateway" 
    connection-factory="springbatch.jmsConnectionFactory" 
    request-channel="springbatch.slave.jms.request" 
    request-destination="springbatch.partition.jms.requestsQueue" 
    reply-channel="springbatch.slave.jms.response" 
    concurrent-consumers="${springbatch.partition.concurrent.consumers}" 
    max-concurrent-consumers="${springbatch.partition.concurrent.maxconsumers}" 
    max-messages-per-task="${springbatch.partition.concurrent.maxmessagespertask}"
    reply-time-to-live="${springbatch.partition.reply.time.to.live}"  
    cache-level="0"
/>