I'm using
Spring Batch
- Step 1
- Step 2 Master (Partitioner)
- Step 3
Spring Integration (JMS) to communicate Master and Slave
The issue we are seeing is, the first slave handles all JMS messages instead of even distribution between slaves.
See configuration as below
Master
<bean id="PreProcess" class="com.job.tasklet.PreProcessTasklet" scope="step"> <constructor-arg index="0" value="${run.slave}"/> <property name="maxNumberOfSlaves" value="#{jobParameters['max-slave-count']}"/> </bean> <bean id="PostProcess" class="com.job.tasklet.PostProcessTasklet" scope="prototype"> <constructor-arg index="0" ref="chpsJobDataSource"/> </bean> <bean id="partitioner" class="com.job.partition.DatabasePartitioner" scope="step"> <constructor-arg index="3" value="${max.row.count}"/> </bean> <bean id="partitionHandler" class="com.job.handler.StepExecutionAggregatorHandler"> <property name="stepName" value="processAutoHoldSlaveStep"/> <property name="gridSize" value="${grid.size}"/> <property name="replyChannel" ref="aggregatedGroupRuleReplyChannel"/> <property name="messagingOperations"> <bean class="org.springframework.integration.core.MessagingTemplate"> <property name="defaultChannel" ref="groupRuleRequestsChannel"/> </bean> </property> </bean>
<!-- Request Start --> <int:channel id="groupRuleRequestsChannel" /> <int-jms:outbound-channel-adapter channel="groupRuleRequestsChannel" jms-template="jmsTemplateToSlave"/> <bean id="jmsTemplateToSlave" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory" ref="connectionFactory"/> <property name="receiveTimeout" value="5000"/> <property name="defaultDestinationName" value="defaultRequest"/> </bean> <bean id="jmsTemplateFromSlave" class="org.springframework.jms.core.JmsTemplate" parent="jmsTemplateToSlave"> <property name="defaultDestinationName" value="defaultRequest"/> </bean> <!-- Response Test Start --> <int:channel id="groupRuleReplyChannel"> <!-- <int:queue/> --> </int:channel> <int-jms:inbound-channel-adapter channel="groupRuleReplyChannel" jms-template="jmsTemplateFromSlave"> <int:poller id="defaultPoller" default="true" max-messages-per-poll="1" fixed-rate="3000" /> </int-jms:inbound-channel-adapter> <!-- define aggregatedReplyChannel --> <int:channel id="aggregatedGroupRuleReplyChannel"> <int:queue/> </int:channel> <int:aggregator ref="partitionHandler" input-channel="groupRuleReplyChannel" output-channel="aggregatedGroupRuleReplyChannel" send-timeout="3600000"/>
Slave
<int:channel id="requestsChannel" /> <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory"> <property name="brokerURL" value="${spring.activemq.broker-url}" /> <property name="trustAllPackages" value="true" /> </bean> <int-jms:message-driven-channel-adapter id="jmsIn" destination-name="#{args[0]}" channel="requestsChannel" connection-factory="connectionFactory" max-messages-per-task="1"/> <int:service-activator input-channel="requestsChannel" output-channel="replyChannel" ref="stepExecutionRequestHandler" /> <int:channel id="replyChannel" /> <int-jms:outbound-channel-adapter connection-factory="connectionFactory" destination-name="#{args[1]}" channel="replyChannel" />
Please advice if you have experience the issue.
Let me know if you need more information.
Note: I already search a lot at here and google but no luck for solution yet.