1
votes

I am seeing the same problem as stated here

http://forum.spring.io/forum/spring-projects/integration/724241-s-integration-s-batch-remote-step-execution-problem?view=thread

I have multiple jobs running which has step that use same parent partition hanldler 'parentPartitionHandler'defined below

<bean id="parentPartitionHandler"class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler>
<property name="gridSize" value="3"/>
<property name="replyChannel" ref="outboundReplies"/>
<property name="messagingOperations">
<bean class="org.springframework.integration.core.MessagingTemplate">
  <property name="defaultChannel" ref="outboundRequests"/>
            <property name="receiveTimeout" value="60000000"/>
        </bean>
    </property>
    <property name="stepName" value="parentPartitionStep"/>
</bean>

All of my jobs have similar configuration like below with 2nd step 'studentPartitionAndProcessStep' being partition step

<job id="studentLoadJob" xmlns="http://www.springframework.org/schema/batch"
     job-repository="jobRepository" restartable="true" parent="abstractJob">
    <step id="studentLoadStep" parent="parentLoadStep" next="studentPartitionAndProcessStep"/>
    <step id="studentPartitionAndProcessStep" next="studentCleanupStep">
        <partition partitioner="filePartitioner" handler="studentPartitionHandler"/>
    </step>
    <step id="studentCleanupStep" parent="parentCleanupStep"/>
</job>

<bean id="studentPartitionHandler"
      parent="parentPartitionHandler">
    <property name="stepName" value="studentPartitionStep"/>
</bean>

I used the same master configuration from here https://github.com/mminella/Spring-Batch-Talk-2.0/blob/master/src/main/resources/META-INF/remotePartition.xml

<int:channel id="outboundReplies">
 <int:queue/>
 </int:channel>

<int:channel id="inboundStaging">
</int:channel>

<int:aggregator ref="parentPartitionHandler" send-partial-result-on-expiry="true" send-timeout="60000000"
                input-channel="inboundStaging" output-channel="outboundReplies"
                expire-groups-upon-completion="true"/>

Problem I have is aggregator seems to collect the messages of a group correctly but inside MessageChannelPartitionHandler the below statement receives the first available message without being taking the header information from the message.

Message<Collection<StepExecution>> message = (Message<Collection<StepExecution>>) messagingGateway.receive(replyChannel);

So PartitionHandler handles the result of jobExecutionA instead of jobExecutionB and therefore it is completing a wrong job.

It seems that MessageChannelPartitionHandler consumes the reply from QueueChannel (outboundReplies in my configuration) without taking into consideration the correlationId but the first available message. It works some times and then it doesn't and then when I debug I found the same thing happening to me as the post here

http://forum.spring.io/forum/spring-projects/integration/724241-s-integration-s-batch-remote-step-execution-problem?view=thread

Is there something I am doing wrong here?
I can provide more configuration if you need.

EDIT:
With Gateways all working good. I am also trying to work this with Adapters and add header-enricher so the replyChannel object is added to the header. I added a header-enricher element but I might be using it wrong because when aggregate message hanlder throws "no outputChannel or replyChannel header available". All partition requests are sent on outboundRequests channel and to rabbit queue where slaves will consume the requests from inboundRequestschannel where service activator process them and send back to reply queue on outboundStaging channel. On the master side partition response messages are read by aggregator from inboundStaging channel.Can you point where exactly I need to use header-enricher when using adapters.

<int:channel id="outboundRequests">
    <int:dispatcher task-executor="taskExecutor" failover="true"/>
</int:channel>

<int:channel id="inboundStaging"/>
<int:channel id="inboundRequests"/>
<int:channel id="outboundStaging"/>
<int:channel id="setHeaderPartionHandlerReplyChannel"/>


<int-amqp:outbound-channel-adapter
        id="filePartitionRequestOutboundGateway"
        channel="outboundRequests"
        amqp-template="rabbitTemplate"
        exchange-name="${rabbitmq.classflow.exchange}"
        routing-key="${rabbitmq.classflow.batch.partition.routingkey}"
        mapped-request-headers="*"
        />


<int-amqp:inbound-channel-adapter
        id="filePartitionRequestInboundGateway"
        concurrent-consumers="${rabbitmq.classflow.batch.partition.consumers}"
        channel="inboundRequests"
        receive-timeout="60000000"
        queue-names="${rabbitmq.classflow.batch.partition.queuename.request}"
        connection-factory="rabbitConnectionFactory"
        mapped-request-headers="*"
        />

<int:header-enricher input-channel="setHeaderPartionHandlerReplyChannel" output-channel="outboundRequests">
    <int:header-channels-to-string/>
</int:header-enricher>

<int:service-activator ref="stepExecutionRequestHandler" input-channel="inboundRequests"
                       output-channel="outboundStaging"/>


<int-amqp:outbound-channel-adapter
        id="filePartitionRepyOutboundGateway"
        channel="outboundStaging"
        amqp-template="rabbitTemplate"
        exchange-name="${rabbitmq.classflow.exchange}"
        routing-key="${rabbitmq.classflow.batch.partition.queuename.reply.routingkey}"
        mapped-request-headers="*"
 />

<int-amqp:inbound-channel-adapter
        id="filePartitionRepyInboundGateway"
        channel="inboundStaging"
        queue-names="${rabbitmq.classflow.batch.partition.queuename.reply}"
        connection-factory="rabbitConnectionFactory"
        concurrent-consumers="${rabbitmq.classflow.batch.tenant.job.consumers}"
        mapped-request-headers="*"
   />

<int:aggregator ref="parentPartitionHandler"
                send-partial-result-on-expiry="true"
                send-timeout="60000000"
                input-channel="inboundStaging"/>

<bean id="parentPartitionHandler"
      class="org.springframework.batch.integration.partition.MessageChannelPartitionHandler">
    <property name="gridSize" value="3"/>
    <property name="messagingOperations">
        <bean class="org.springframework.integration.core.MessagingTemplate">
            <property name="defaultChannel" ref="setHeaderPartionHandlerReplyChannel"/>
            <property name="receiveTimeout" value="60000000"/>
        </bean>
    </property>
    <property name="stepName" value="parentPartitionStep"/>
</bean>

LOGGING:

13:46:09.566 [SimpleAsyncTaskExecutor-1] DEBUG o.s.b.i.p.MessageChannelPartitionHandler - Sending request: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={replyChannel=org.springframework.integration.channel.QueueChannel@f89f170, correlationId=56:studentPartitionStep, sequenceSize=1, sequenceNumber=0, id=b39d71de-b469-667f-2d77-e59db6e0e25a, timestamp=1425494769566}] (MessageChannelPartitionHandler.java:222) 13:46:09.566 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.DirectChannel - preSend on channel 'setHeaderPartionHandlerReplyChannel', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={replyChannel=org.springframework.integration.channel.QueueChannel@f89f170, correlationId=56:studentPartitionStep, sequenceSize=1, sequenceNumber=0, id=b39d71de-b469-667f-2d77-e59db6e0e25a, timestamp=1425494769566}] (AbstractMessageChannel.java:334) 13:46:09.567 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.DirectChannel - preSend on channel 'loggerChannel', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={replyChannel=org.springframework.integration.channel.QueueChannel@f89f170, correlationId=56:studentPartitionStep, sequenceSize=1, sequenceNumber=0, id=b39d71de-b469-667f-2d77-e59db6e0e25a, timestamp=1425494769566}] (AbstractMessageChannel.java:334) 13:46:09.567 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.handler.LoggingHandler - org.springframework.integration.handler.LoggingHandler#0 received message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={replyChannel=org.springframework.integration.channel.QueueChannel@f89f170, correlationId=56:studentPartitionStep, sequenceSize=1, sequenceNumber=0, id=b39d71de-b469-667f-2d77-e59db6e0e25a, timestamp=1425494769566}] (AbstractMessageHandler.java:72) 13:46:09.568 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.handler.LoggingHandler - [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={replyChannel=org.springframework.integration.channel.QueueChannel@f89f170, correlationId=56:studentPartitionStep, sequenceSize=1, sequenceNumber=0, id=b39d71de-b469-667f-2d77-e59db6e0e25a, timestamp=1425494769566}] (LoggingHandler.java:160) 13:46:09.568 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'loggerChannel', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={replyChannel=org.springframework.integration.channel.QueueChannel@f89f170, correlationId=56:studentPartitionStep, sequenceSize=1, sequenceNumber=0, id=b39d71de-b469-667f-2d77-e59db6e0e25a, timestamp=1425494769566}] (AbstractMessageChannel.java:349) 13:46:09.568 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.t.MessageTransformingHandler - org.springframework.integration.transformer.MessageTransformingHandler#0 received message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={replyChannel=org.springframework.integration.channel.QueueChannel@f89f170, correlationId=56:studentPartitionStep, sequenceSize=1, sequenceNumber=0, id=b39d71de-b469-667f-2d77-e59db6e0e25a, timestamp=1425494769566}] (AbstractMessageHandler.java:72) 13:46:09.568 [SimpleAsyncTaskExecutor-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'integrationEvaluationContext' (AbstractBeanFactory.java:247) 13:46:09.570 [SimpleAsyncTaskExecutor-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'integrationHeaderChannelRegistry' (AbstractBeanFactory.java:247) 13:46:09.572 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.c.DefaultHeaderChannelRegistry - Registered org.springframework.integration.channel.QueueChannel@f89f170 as 771064f6-d5ae-4f50-b827-00c225a36c86:1 (DefaultHeaderChannelRegistry.java:167) 13:46:09.572 [SimpleAsyncTaskExecutor-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'integrationEvaluationContext' (AbstractBeanFactory.java:247) 13:46:09.572 [SimpleAsyncTaskExecutor-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'integrationHeaderChannelRegistry' (AbstractBeanFactory.java:247) 13:46:09.573 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.t.MessageTransformingHandler - handler 'org.springframework.integration.transformer.MessageTransformingHandler#0' sending reply Message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={correlationId=56:studentPartitionStep, replyChannel=771064f6-d5ae-4f50-b827-00c225a36c86:1, sequenceSize=1, sequenceNumber=0, id=77608d25-cff4-4f3d-9f60-829f05f7bd6f, timestamp=1425494769573}] (AbstractReplyProducingMessageHandler.java:238) 13:46:09.573 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.ExecutorChannel - preSend on channel 'outboundRequests', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={correlationId=56:studentPartitionStep, replyChannel=771064f6-d5ae-4f50-b827-00c225a36c86:1, sequenceSize=1, sequenceNumber=0, id=77608d25-cff4-4f3d-9f60-829f05f7bd6f, timestamp=1425494769573}] (AbstractMessageChannel.java:334) 13:46:09.574 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.DirectChannel - preSend on channel 'loggerChannel', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={correlationId=56:studentPartitionStep, replyChannel=771064f6-d5ae-4f50-b827-00c225a36c86:1, sequenceSize=1, sequenceNumber=0, id=77608d25-cff4-4f3d-9f60-829f05f7bd6f, timestamp=1425494769573}] (AbstractMessageChannel.java:334) 13:46:09.574 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.handler.LoggingHandler - org.springframework.integration.handler.LoggingHandler#0 received message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={correlationId=56:studentPartitionStep, replyChannel=771064f6-d5ae-4f50-b827-00c225a36c86:1, sequenceSize=1, sequenceNumber=0, id=77608d25-cff4-4f3d-9f60-829f05f7bd6f, timestamp=1425494769573}] (AbstractMessageHandler.java:72) 13:46:09.574 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.handler.LoggingHandler - [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={correlationId=56:studentPartitionStep, replyChannel=771064f6-d5ae-4f50-b827-00c225a36c86:1, sequenceSize=1, sequenceNumber=0, id=77608d25-cff4-4f3d-9f60-829f05f7bd6f, timestamp=1425494769573}] (LoggingHandler.java:160) 13:46:09.574 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'loggerChannel', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={correlationId=56:studentPartitionStep, replyChannel=771064f6-d5ae-4f50-b827-00c225a36c86:1, sequenceSize=1, sequenceNumber=0, id=77608d25-cff4-4f3d-9f60-829f05f7bd6f, timestamp=1425494769573}] (AbstractMessageChannel.java:349) 13:46:09.575 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.ExecutorChannel - postSend (sent=true) on channel 'outboundRequests', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={correlationId=56:studentPartitionStep, replyChannel=771064f6-d5ae-4f50-b827-00c225a36c86:1, sequenceSize=1, sequenceNumber=0, id=77608d25-cff4-4f3d-9f60-829f05f7bd6f, timestamp=1425494769573}] (AbstractMessageChannel.java:349) 13:46:09.575 [taskExecutor-3] DEBUG o.s.i.a.o.AmqpOutboundEndpoint - org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint#0 received message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={correlationId=56:studentPartitionStep, replyChannel=771064f6-d5ae-4f50-b827-00c225a36c86:1, sequenceSize=1, sequenceNumber=0, id=77608d25-cff4-4f3d-9f60-829f05f7bd6f, timestamp=1425494769573}] (AbstractMessageHandler.java:72) 13:46:09.577 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'setHeaderPartionHandlerReplyChannel', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={replyChannel=org.springframework.integration.channel.QueueChannel@f89f170, correlationId=56:studentPartitionStep, sequenceSize=1, sequenceNumber=0, id=b39d71de-b469-667f-2d77-e59db6e0e25a, timestamp=1425494769566}] (AbstractMessageChannel.java:349) 13:46:09.578 [taskExecutor-3] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[correlationId] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240) 13:46:09.578 [taskExecutor-3] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[sequenceSize] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240) 13:46:09.578 [taskExecutor-3] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[sequenceNumber] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240) 13:46:09.579 [taskExecutor-3] DEBUG o.s.amqp.rabbit.core.RabbitTemplate - Executing callback on RabbitMQ Channel: Cached Rabbit Channel: AMQChannel(amqp://[email protected]:5672/activfounlocal,13) (RabbitTemplate.java:1043) 13:46:09.579 [taskExecutor-3] DEBUG o.s.amqp.rabbit.core.RabbitTemplate - Publishing message on exchange [classflow.topic], routingKey = [partition.request.] (RabbitTemplate.java:1071) 13:46:09.579 [taskExecutor-3] DEBUG o.s.i.a.o.AmqpOutboundEndpoint - handler 'org.springframework.integration.amqp.outbound.AmqpOutboundEndpoint#0' produced no reply for request Message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={correlationId=56:studentPartitionStep, replyChannel=771064f6-d5ae-4f50-b827-00c225a36c86:1, sequenceSize=1, sequenceNumber=0, id=77608d25-cff4-4f3d-9f60-829f05f7bd6f, timestamp=1425494769573}] (AbstractReplyProducingMessageHandler.java:184) 13:46:09.580 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[amqp_receivedRoutingKey] WILL be mapped, matched pattern= (AbstractHeaderMapper.java:240) 13:46:09.580 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[amqp_deliveryMode] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240) 13:46:09.580 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[amqp_receivedExchange] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240) 13:46:09.580 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[contentType] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240) 13:46:09.581 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[amqp_redelivered] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240) 13:46:09.581 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[amqp_deliveryTag] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240) 13:46:09.581 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[correlationId] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240) 13:46:09.581 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[sequenceSize] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240) 13:46:09.581 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.a.s.DefaultAmqpHeaderMapper - headerName=[sequenceNumber] WILL be mapped, matched pattern=* (AbstractHeaderMapper.java:240) 13:46:09.581 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.DirectChannel - preSend on channel 'inboundRequests', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={amqp_receivedRoutingKey=partition.request., amqp_deliveryMode=PERSISTENT, correlationId=56:studentPartitionStep, sequenceSize=1, amqp_receivedExchange=classflow.topic, contentType=application/x-java-serialized-object, amqp_redelivered=false, amqp_deliveryTag=1, sequenceNumber=0, id=e51f8ed9-03df-0f90-b9c2-c3542bda76c6, timestamp=1425494769581}] (AbstractMessageChannel.java:334) 13:46:09.581 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.DirectChannel - preSend on channel 'loggerChannel', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={amqp_receivedRoutingKey=partition.request., amqp_deliveryMode=PERSISTENT, correlationId=56:studentPartitionStep, sequenceSize=1, amqp_receivedExchange=classflow.topic, contentType=application/x-java-serialized-object, amqp_redelivered=false, amqp_deliveryTag=1, sequenceNumber=0, id=e51f8ed9-03df-0f90-b9c2-c3542bda76c6, timestamp=1425494769581}] (AbstractMessageChannel.java:334) 13:46:09.581 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.handler.LoggingHandler - org.springframework.integration.handler.LoggingHandler#0 received message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={amqp_receivedRoutingKey=partition.request., amqp_deliveryMode=PERSISTENT, correlationId=56:studentPartitionStep, sequenceSize=1, amqp_receivedExchange=classflow.topic, contentType=application/x-java-serialized-object, amqp_redelivered=false, amqp_deliveryTag=1, sequenceNumber=0, id=e51f8ed9-03df-0f90-b9c2-c3542bda76c6, timestamp=1425494769581}] (AbstractMessageHandler.java:72) 13:46:09.582 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.handler.LoggingHandler - [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={amqp_receivedRoutingKey=partition.request., amqp_deliveryMode=PERSISTENT, correlationId=56:studentPartitionStep, sequenceSize=1, amqp_receivedExchange=classflow.topic, contentType=application/x-java-serialized-object, amqp_redelivered=false, amqp_deliveryTag=1, sequenceNumber=0, id=e51f8ed9-03df-0f90-b9c2-c3542bda76c6, timestamp=1425494769581}] (LoggingHandler.java:160) 13:46:09.582 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.channel.DirectChannel - postSend (sent=true) on channel 'loggerChannel', message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={amqp_receivedRoutingKey=partition.request., amqp_deliveryMode=PERSISTENT, correlationId=56:studentPartitionStep, sequenceSize=1, amqp_receivedExchange=classflow.topic, contentType=application/x-java-serialized-object, amqp_redelivered=false, amqp_deliveryTag=1, sequenceNumber=0, id=e51f8ed9-03df-0f90-b9c2-c3542bda76c6, timestamp=1425494769581}] (AbstractMessageChannel.java:349) 13:46:09.582 [SimpleAsyncTaskExecutor-1] DEBUG o.s.i.h.ServiceActivatingHandler - ServiceActivator for [org.springframework.integration.handler.MethodInvokingMessageProcessor@d15c892] received message: [Payload StepExecutionRequest content=StepExecutionRequest: [jobExecutionId=56, stepExecutionId=269, stepName=studentPartitionStep]][Headers={amqp_receivedRoutingKey=partition.request., amqp_deliveryMode=PERSISTENT, correlationId=56:studentPartitionStep, sequenceSize=1, amqp_receivedExchange=classflow.topic, contentType=application/x-java-serialized-object, amqp_redelivered=false, amqp_deliveryTag=1, sequenceNumber=0, id=e51f8ed9-03df-0f90-b9c2-c3542bda76c6, timestamp=1425494769581}] (AbstractMessageHandler.java:72) 13:46:09.599 [SimpleAsyncTaskExecutor-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'studentPartitionStep' (AbstractBeanFactory.java:247) 13:46:09.658 [SimpleAsyncTaskExecutor-1] DEBUG o.s.b.f.s.DefaultListableBeanFactory - Returning cached instance of singleton bean 'org.springframework.transaction.config.internalTransactionAdvisor' (AbstractBeanFactory.java:247) 13:46:09.659 [SimpleAsyncTaskExecutor-1] DEBUG o.s.batch.core.step.AbstractStep - Executing: id=269 (AbstractStep.java:183)

Thanks
Mallikarjun

1

1 Answers

0
votes

This is fixed in Spring Batch 3.0.3 as long as you don't inject a specific replyChannel into the partition handler.

spring-batch-integration is now part of the main Spring Batch project.

EDIT

In response to your last comment (about using adapters instead of gateways).

Yes; the gateway takes care of keeping the replyChannel header intact. There's some extra config needed when using adapters:

  • Add a header enricher with a <header-channels-to-string/> element (see the Header Channel Registry documentation); this converts the live header channel object to a key for the channel registry.
  • Configure the adapters to map the replyChannel header (e.g. mapped-request-headers="*").

This is not needed for the gateway because it holds a reference to the outbound message when the reply is received.