0
votes

I'm using

  1. Spring Batch

    • Step 1
    • Step 2 Master (Partitioner)
    • Step 3
  2. Spring Integration (JMS) to communicate Master and Slave

The issue we are seeing is, the first slave handles all JMS messages instead of even distribution between slaves.

See configuration as below

  1. Master

    <bean id="PreProcess" class="com.job.tasklet.PreProcessTasklet" scope="step">
        <constructor-arg index="0" value="${run.slave}"/>
        <property name="maxNumberOfSlaves" value="#{jobParameters['max-slave-count']}"/>
    </bean>
    
    <bean id="PostProcess" class="com.job.tasklet.PostProcessTasklet" scope="prototype">
        <constructor-arg index="0" ref="chpsJobDataSource"/>
    </bean>
    
    
    <bean id="partitioner" class="com.job.partition.DatabasePartitioner" scope="step">
        <constructor-arg index="3" value="${max.row.count}"/>
    </bean>
    
    <bean id="partitionHandler" class="com.job.handler.StepExecutionAggregatorHandler">
        <property name="stepName" value="processAutoHoldSlaveStep"/>
        <property name="gridSize" value="${grid.size}"/>
        <property name="replyChannel" ref="aggregatedGroupRuleReplyChannel"/>
        <property name="messagingOperations">
            <bean class="org.springframework.integration.core.MessagingTemplate">
                <property name="defaultChannel" ref="groupRuleRequestsChannel"/>
            </bean>
        </property>
    </bean>
    

    <!-- Request Start -->
    <int:channel id="groupRuleRequestsChannel" />
    <int-jms:outbound-channel-adapter channel="groupRuleRequestsChannel" jms-template="jmsTemplateToSlave"/>
    
    <bean id="jmsTemplateToSlave" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory"/>
        <property name="receiveTimeout" value="5000"/>
        <property name="defaultDestinationName" value="defaultRequest"/>
    </bean>
    
    <bean id="jmsTemplateFromSlave" class="org.springframework.jms.core.JmsTemplate" parent="jmsTemplateToSlave">
        <property name="defaultDestinationName" value="defaultRequest"/>
    </bean>
    
    
    <!-- Response Test Start -->
    <int:channel id="groupRuleReplyChannel">
        <!-- <int:queue/> -->
    </int:channel>
    
    <int-jms:inbound-channel-adapter channel="groupRuleReplyChannel" jms-template="jmsTemplateFromSlave">
        <int:poller id="defaultPoller" default="true" max-messages-per-poll="1" fixed-rate="3000"  />
    </int-jms:inbound-channel-adapter>
    
    <!-- define aggregatedReplyChannel -->
    <int:channel id="aggregatedGroupRuleReplyChannel">
        <int:queue/>
    </int:channel>
    
    <int:aggregator ref="partitionHandler"
                    input-channel="groupRuleReplyChannel"
                    output-channel="aggregatedGroupRuleReplyChannel"
                    send-timeout="3600000"/>
    
  2. Slave

    <int:channel id="requestsChannel" />
    
    <bean id="connectionFactory" class="org.apache.activemq.spring.ActiveMQConnectionFactory">
        <property name="brokerURL" value="${spring.activemq.broker-url}" />
        <property name="trustAllPackages" value="true" />
    </bean>
    
     <int-jms:message-driven-channel-adapter id="jmsIn" destination-name="#{args[0]}" channel="requestsChannel" connection-factory="connectionFactory" max-messages-per-task="1"/>
    
    <int:service-activator input-channel="requestsChannel" output-channel="replyChannel" ref="stepExecutionRequestHandler" />
    
    <int:channel id="replyChannel" />
    
    <int-jms:outbound-channel-adapter connection-factory="connectionFactory" destination-name="#{args[1]}" channel="replyChannel" />
    

Please advice if you have experience the issue.

Let me know if you need more information.

Note: I already search a lot at here and google but no luck for solution yet.

1

1 Answers

1
votes

ActiveMQ uses a prefetch of 1000 by default see here.

In other words, the first (up to) 1000 partitions will go to the first consumer etc.

You can reduce the prefetch; 1 is probably fine for this application.