1
votes

I have implemented my batch jobs using spring batch partitioning and using rabbitmq as a middle ware.

I studied documentation and referred to these unit tests

https://github.com/sshcherbakov/spring-batch-talk/blob/master/src/main/resources/META-INF/master.xml

https://github.com/sshcherbakov/spring-batch-talk/blob/master/src/main/resources/META-INF/slave.xml

I can run my job steps concurrently but I am bit worried about how it will work if I launch multiple instances of same job at the same time with different parameters.

e.g I am importing exchanges data using importExchange job but if I launch importExchange job for different markets e.g US market, Europe marktet at the same time.

Partitioner will partition input exchange names in to different partitioned step execution contexts, MessagePartitionHandler will send stepExecutionRequests as messages through rabbitmq queues to different servers and will execute steps concurrently on different servers.

Now the confusion is when responses are sent back on reply queue (which is same for all job instances), listeners of all instances will listen to same reply queue. e.g job1 and job2 replies both will be listening on same reply queue.

How can we make sure responses of job1 will be picked up by job1 outbound gateway and not by job2's and vice versa ? Does outbound gateway only pickup responses to the requests sent by itself (by checking corelation id) and ignore other responses ?

As we are using direct channels and exchanges, response will be delivered to only one listeners, so can it happen that response of job1 will be pickedup by job2's listener ?

or is there any router or filters which choose replies selectively ?

Do I need to worry about this or MessageChannelPartitionHandler takes care of this ? or shall I prefix reply queues with job id ?

Below is my configuration

<task:executor id="taskExecutor" pool-size="20" />

<int:channel id="importExchangesOutboundChannel">
    <int:dispatcher task-executor="taskExecutor" />
</int:channel>

<int:channel id="importExchangesInboundStagingChannel" />

<amqp:outbound-gateway request-channel="importExchangesOutboundChannel"
    reply-channel="importExchangesInboundStagingChannel" amqp-template="importExchangesAmqpTemplate"
    mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
    mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />


<beans:bean id="importExchangesMessagingTemplate"
    class="org.springframework.integration.core.MessagingTemplate"
    p:defaultChannel-ref="importExchangesOutboundChannel"
    p:receiveTimeout="150000" />


<beans:bean id="importExchangesPartitioner"
    class="org.springframework.batch.core.partition.support.FlatFilePartitioner"
    p:resource="file:${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/exchanges.txt"
    scope="step" />


<beans:bean id="importExchangesPartitionHandler"
    class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler"
    p:stepName="importExchangesStep" p:gridSize="6"
    p:messagingOperations-ref="importExchangesMessagingTemplate" />

<int:aggregator ref="importExchangesPartitionHandler"
    send-partial-result-on-expiry="true" send-timeout="300000"
    input-channel="importExchangesInboundStagingChannel" />

<amqp:inbound-gateway concurrent-consumers="6"
    request-channel="importExchangesInboundChannel" receive-timeout="300000"
    reply-channel="importExchangesOutboundStagingChannel" queue-names="importExchangesQueue"
    connection-factory="rabbitConnectionFactory"
    mapped-request-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS"
    mapped-reply-headers="correlationId, sequenceNumber, sequenceSize, STANDARD_REQUEST_HEADERS" />

<rabbit:template id="importExchangesAmqpTemplate" connection-factory="rabbitConnectionFactory"
    routing-key="importExchangesQueue" reply-timeout="300000">
</rabbit:template>

<int:channel id="importExchangesInboundChannel" />

<int:service-activator ref="stepExecutionRequestHandler"
    input-channel="importExchangesInboundChannel" output-channel="importExchangesOutboundStagingChannel" />

<int:channel id="importExchangesOutboundStagingChannel" />



<rabbit:queue name="${import.exchanges.queue}" />
<rabbit:queue name="${import.exchanges.reply.queue}" />

<rabbit:direct-exchange name="${import.exchanges.exchange}">
    <rabbit:bindings>
        <rabbit:binding queue="${import.exchanges.queue}"
            key="${import.exchanges.routing.key}" />
    </rabbit:bindings>
</rabbit:direct-exchange>


<beans:bean id="stepExecutionRequestHandler"
    class="org.springframework.batch.integration.partition.StepExecutionRequestHandler"
    p:jobExplorer-ref="jobExplorer" p:stepLocator-ref="stepLocator" />


<beans:bean id="stepLocator"
    class="org.springframework.batch.integration.partition.BeanFactoryStepLocator" />


<beans:bean id="importExchangesItemWriter"
    class="com.st.batch.foundation.ImportExchangesItemWriter"
    p:symfony-ref="symfony" p:replyTimeout="${import.exchanges.reply.timeout}"
    p:logFilePath="${batch.log.file.path}.#{jobParameters[batch_id]}"
    scope="step" />


<beans:bean id="importExchangesFileItemReader"
    class="org.springframework.batch.item.file.MultiThreadedFlatFileItemReader"
    p:resource="file:${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/exchanges.txt"
    p:lineMapper-ref="stLineMapper" p:startAt="#{stepExecutionContext['startAt']}"
    p:maxItemCount="#{stepExecutionContext['itemsCount']}" scope="step" />

<step id="importExchangesStep">
    <tasklet transaction-manager="transactionManager">
        <chunk reader="importExchangesFileItemReader" writer="importExchangesItemWriter"
            commit-interval="${import.exchanges.commit.interval}" />
    </tasklet>
</step>

<job id="importExchangesJob" restartable="true">

    <step id="importExchangesStep.master" next="importEclsStep.master">
        <partition partitioner="importExchangesPartitioner"
            handler="importExchangesPartitionHandler" />
    </step>

</job>

EDIT:

I tried removing the reply queue name from amqpTemplate to use default temporary reply queue and tested this use case, before looking at replies, the problem is at the slave side as well.

<rabbit:template id="importExchangesAmqpTemplate" connection-factory="rabbitConnectionFactory"
    routing-key="importExchangesQueue" reply-timeout="300000">
</rabbit:template>

I created two input file with dummy data e.g.

my job ids are 2014-06-08 and 2014-06-09. I created exchanges.txt under folder names 2014-06-08 and 2014-06-09.

/home/ubuntu/tmp/spring/batch/2015-06-08/exchanges.txt
/home/ubuntu/tmp/spring/batch/2015-06-09/exchanges.txt

The data in /home/ubuntu/tmp/spring/batch/2015-06-08/exchanges.txt file is

1
2
3
up to 30

and in /home/ubuntu/tmp/spring/batch/2015-06-09/exchanges.txt is

31
32
33
up to 60

I am using this item reader to read items and pass to writer.

Reader:

<beans:bean id="importExchangesFileItemReader"
    class="org.springframework.batch.item.file.MultiThreadedFlatFileItemReader"
    p:resource="file:${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/exchanges.txt"
    p:lineMapper-ref="stLineMapper" p:startAt="#{stepExecutionContext['startAt']}"
    p:maxItemCount="#{stepExecutionContext['itemsCount']}" scope="step" />

Writer:

<beans:bean id="importExchangesItemWriter"
    class="com.st.batch.foundation.ImportExchangesItemWriter"
    p:symfony-ref="symfony" p:replyTimeout="${import.ecls.reply.timeout}"
    p:logFilePath="${batch.log.file.path}.#{jobParameters[batch_id]}"
    scope="step" />

and inside writer I am calling external command which import data for each item exchange

@Override
public void write(List<? extends T> exchanges) throws Exception {

    commandRunner.setLogFilePath(this.logFilePath);

    for (T exchange : exchanges) {

        String command = commandRunner.getConsolePath() + " "
                + "st:import exchange" + " " + exchange.toString();

        commandRunner.run(command, this.replyTimeout);  
    }

}

Inside commandRunner,

public void run(String command, long replyTimeout)
        throws Exception {

    String[] commands = command.split("\\s+");

    ProcessBuilder pb = new ProcessBuilder(commands);
    File log = new File(this.logFilePath);
    pb.redirectErrorStream(true);
    pb.redirectOutput(Redirect.appendTo(log));
    Process p = pb.start();
    .......
}

If I launch only one instance of job (eight with batch id 2015-06-08 or 2015-06-09), everything works fine but if I launch both simultaneously input data for steps of these two job instances get mixed, I mean, this is what I get in log file

tail -f /var/log/st/batch.log.2015-06-08

14 23 1 27 19 9 15 24 2 10 28 20 25 16 3 21 29 11 26 17 4 30 12 22 18 5 44 45 46

and in /var/log/st/batch.log.2015-06-09

52 13 47 31 37 6 53 57 48 32 38 54 7 49 58 33 39 55 8 59 50 34 40 56 60 51 35 42 41 36 43

so 44 45 46 goes to batch.log.2015-06-08 which should go to batch.log.2015-06-09 and 6 7 8 goes to batch.log.2015-06-09 which should go to batch.log.2015-06-08

I am passing log file path to item writer as I need separate log file for each job so appending batch_id to the file names.

<beans:bean id="importExchangesItemWriter"
    class="com.st.batch.foundation.ImportExchangesItemWriter"
    p:symfony-ref="symfony" p:replyTimeout="${import.exchanges.reply.timeout}"
    p:logFilePath="${batch.log.file.path}.#{jobParameters[batch_id]}"
    scope="step" />

Is it happening due to outbound and inbound gateways ? Are different instances of spring integration channels, gateways etc created for different job instances or they are like rabbitmq queues which are same for all job instances ?

Inbound gateway has concurrent-consumers="8" are these consumers same for all job instances or separate 8 consumers will be created for each job instance ?

Can this handler partitioning for multiple jobs ?

<beans:bean id="importExchangesPartitioner"
    class="org.springframework.batch.core.partition.support.FlatFilePartitioner"
    p:resource="file:${spring.tmp.batch.dir}/#{jobParameters[batch_id]}/exchanges.txt"
    scope="step" />

Application to reproduce this is here

https://github.com/vishalmelmatti/spring-batch-remote-partition-test/tree/master

1

1 Answers

1
votes

It's all taken care of for you by the framework. When the partition handler sends out the step execution requests, he sets a correlation id in the header...

.setCorrelationId(stepExecutionRequest.getJobExecutionId() + ":" + stepExecutionRequest.getStepName())

This is used by the aggregator to aggregate all the responses (for this job) into a single message, which is released to the partition handler when all responses are received...

Message<Collection<StepExecution>> message = (Message<Collection<StepExecution>>) messagingGateway.receive(replyChannel);

This is achieved using the sequence size header.