0
votes

I am using spring batch remote partitioning for batch process. I am launching jobs using spring batch admin.

I have inbound gateway consumer concurrency step to 10 but maximum number of partitions running in parallel are 8.

I want to increase the consumer concurrency to 15 later on.

Below is my configuration,

<task:executor id="taskExecutor" pool-size="50" />

<rabbit:template id="computeAmqpTemplate"
    connection-factory="rabbitConnectionFactory" routing-key="computeQueue"
    reply-timeout="${compute.partition.timeout}">
</rabbit:template>

<int:channel id="computeOutboundChannel">
    <int:dispatcher task-executor="taskExecutor" />
</int:channel>

<int:channel id="computeInboundStagingChannel" />

<amqp:outbound-gateway request-channel="computeOutboundChannel"
    reply-channel="computeInboundStagingChannel" amqp-template="computeAmqpTemplate"
    mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
    mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />


<beans:bean id="computeMessagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate"
    p:defaultChannel-ref="computeOutboundChannel"
    p:receiveTimeout="${compute.partition.timeout}" />


<beans:bean id="computePartitionHandler"
    class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler"
    p:stepName="computeStep" p:gridSize="${compute.grid.size}"
    p:messagingOperations-ref="computeMessagingTemplate" />

<int:aggregator ref="computePartitionHandler"
    send-partial-result-on-expiry="true" send-timeout="${compute.step.timeout}"
    input-channel="computeInboundStagingChannel" />

<amqp:inbound-gateway concurrent-consumers="${compute.consumer.concurrency}"
    request-channel="computeInboundChannel" 
    reply-channel="computeOutboundStagingChannel" queue-names="computeQueue"
    connection-factory="rabbitConnectionFactory"
    mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
    mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />


<int:channel id="computeInboundChannel" />

<int:service-activator ref="stepExecutionRequestHandler"
    input-channel="computeInboundChannel" output-channel="computeOutboundStagingChannel" />

<int:channel id="computeOutboundStagingChannel" />

<beans:bean id="computePartitioner"
    class="org.springframework.batch.core.partition.support.MultiResourcePartitioner"
    p:resources="file:${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/shares_rics/shares_rics_*.txt"
    scope="step" />



<beans:bean id="computeFileItemReader"
    class="org.springframework.batch.item.file.FlatFileItemReader"
    p:resource="#{stepExecutionContext[fileName]}" p:lineMapper-ref="stLineMapper"
    scope="step" />

<beans:bean id="computeItemWriter"
    class="com.st.batch.foundation.writers.ComputeItemWriter"
    p:symfony-ref="symfonyStepScoped" p:timeout="${compute.item.timeout}"
    p:batchId="#{jobParameters[batch_id]}" scope="step" />


<step id="computeStep">
    <tasklet transaction-manager="transactionManager">
        <chunk reader="computeFileItemReader" writer="computeItemWriter"
            commit-interval="${compute.commit.interval}" />
    </tasklet>
</step>

<flow id="computeFlow">
    <step id="computeStep.master">
        <partition partitioner="computePartitioner"
            handler="computePartitionHandler" />
    </step>
</flow>

<job id="computeJob" restartable="true">
    <flow id="computeJob.computeFlow" parent="computeFlow" />
</job>



compute.grid.size = 112
compute.consumer.concurrency = 10

Input files are splited to 112 equal parts = compute.grid.size = total number of partitions

Number of servers = 4.

There are 2 problems,

i) Even though I have set concurrency to 10, maximum number of threads running are 8.

ii)

some are slower as other processes runs on them and some are faster so I want make sure step executions are distributed fairly i.e. if faster servers are done with their execution, other remaining executions in queue should go to them . It should not be distributed round robbin fashion.

I know in rabbitmq there is prefetch count setting and ack mode to distribute farely. For spring integration, prefetch count is 1 default and ack mode is AUTO by default. But still some servers keeps running more partitions even though other servers are done for long time. Ideally no servers should be sitting idle.

Update:

One more thing I now observed is that, for some steps which runs in parallel using split (not distributed using remote partitioning) also run max 8 in parallel. It looks something like thread pool limit issue but as you can see taskExecutor has pool-size set to 50.

Is there anything in spring-batch/spring-batch-admin which limits number of concurrently running steps ?

2nd Update:

And, if there are 8 or more threads running in parallel processing items, spring batch admin doesn't load. It just hangs. If I reduce concurrency, spring batch admin loads. I even tested it with setting concurrency 4 on one server and 8 on other server, spring batch admin doesn't load it I use URL of server where 8 threads are running but it works on the server where 4 threads are running.

Spring batch admin manager has below jobLauncher configuration,

<bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
    <property name="jobRepository" ref="jobRepository" />
    <property name="taskExecutor" ref="jobLauncherTaskExecutor" />
</bean>

<task:executor id="jobLauncherTaskExecutor" pool-size="6" rejection-policy="ABORT" />

The pool size is 6 there, has it anything to do with above problem ?

Or is there anything in tomcat 7 which restricts number of threads running to 8 ?

2
Hi Vishal, I have the same issue. Did you get this resolved? If so, may I know what was your solution.....The Guest

2 Answers

1
votes

Are you using a database for JobRepository?

During the execution, the batch frameworks persists step executions and number of connections to the JobRepository database can interfere in parallel step executions.

Concurrency of 8 makes me thinks you might be using BasicDataSource? If so, switch to something like DriverManagerDataSource and see.

0
votes

Confused - you said "I have set the concurrency to 10" but then show compute.consumer.concurrency = 8. So it is working as configured. It is impossible to have only 8 consumer threads if the property is set to 10.

From Rabbit's perspective, all consumers are equal - if there are 10 consumers on a slow box and 10 consumers on a fast box, and you only have 10 partitions, it is possible that all 10 partitions will end up on the slow box.

RabbitMQ does not distribute work across servers, it distributes the work across consumers only.

You might get better distribution by reducing the concurrency. You should also set the concurrency lower on the slower boxes.