0
votes

I'm trying to configure a RemoteChunking task using Spring Boot, Spring Batch and Spring Integrations.

I have configured an activeMQ server and I start configuring Spring Batch following the official docs https://docs.spring.io/spring-batch/4.0.x/reference/html/spring-batch-integration.html#remote-chunking.

My master config:

import com.arrobaautowired.payment.Payment;
import com.arrobaautowired.record.Record;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.spring.ActiveMQConnectionFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter;
import org.springframework.batch.item.file.MultiResourceItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.jms.dsl.Jms;

@Configuration
@Slf4j
@EnableBatchProcessing
public class MasterBatchConfiguration {

    private final static String MASTER_JOB_TEST = "JOB_MASTER";
    private final static String MATER_JOB_STEP = "STEP-1";
    private final static int CHUNK_SIZE = 50;

    private JobBuilderFactory jobBuilderFactory;
    private StepBuilderFactory stepBuilderFactory;
    private MultiResourceItemReader<Record> filesReader;
    private StepListener stepListener;


    @Autowired
    public MasterBatchConfiguration(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory, MultiResourceItemReader<Record> filesReader, StepListener stepListener) {
        this.jobBuilderFactory = jobBuilderFactory;
        this.stepBuilderFactory = stepBuilderFactory;
        this.filesReader = filesReader;
        this.stepListener = stepListener;
    }

@Bean
public Job processRecordsJob(JobCompletionNotificationListener listener, Step step1) {
    return jobBuilderFactory
            .get(MASTER_JOB_TEST)
            .listener(listener)
            .flow(step1)
            .end()
            .build();
}

@Bean
public TaskletStep step1() {
    return stepBuilderFactory.get(MATER_JOB_STEP)
            .<Record, Payment>chunk(CHUNK_SIZE)
            .reader(filesReader)
            .writer(itemWriter())
            .listener(stepListener)
            .build();
}

@Bean
public ActiveMQConnectionFactory connectionFactory() {
    ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
    factory.setBrokerURL("tcp://localhost:61616");
    factory.setTrustAllPackages(Boolean.TRUE);
    return factory;
}

/*
 * Configure outbound flow (requests going to workers)
 */
@Bean
public DirectChannel requests() {
    return new DirectChannel();
}

@Bean
public IntegrationFlow jmsOutboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlows
            .from("requests")
            .handle(Jms
                    .outboundAdapter(connectionFactory)
                    .destination("requests"))
            .get();
}


/*
 * Configure inbound flow (replies coming from workers)
 */
@Bean
public QueueChannel replies() {
    return new QueueChannel();
}

@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory) {
    return IntegrationFlows
            .from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
            .channel(replies())
            .get();
}

/*
 * Configure the ChunkMessageChannelItemWriter.
 * Se trata de un ItemWriter especial, {@link ChunkMessageChannelItemWriter}, que se encarga de enviar la información al pooleer (Middleware externo) y recogerla.
 */
@Bean
@StepScope
public ChunkMessageChannelItemWriter<Payment> itemWriter() {

    ChunkMessageChannelItemWriter<Payment> chunkMessageChannelItemWriter = new ChunkMessageChannelItemWriter<>();
    chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate());
    chunkMessageChannelItemWriter.setReplyChannel(replies());

    return chunkMessageChannelItemWriter;
}

@Bean
public MessagingTemplate messagingTemplate(){

    MessagingTemplate messagingTemplate = new MessagingTemplate();
    messagingTemplate.setDefaultChannel(requests());
    messagingTemplate.setReceiveTimeout(2000);
    return messagingTemplate;
}
}

My slave Configuration:

import com.arrobaautowired.processor.PaymentWriter;
import com.arrobaautowired.processor.ComplexRecordProcessor;
import com.arrobaautowired.processor.SimpleRecordProcessor;
import com.arrobaautowired.record.Record;
import org.apache.activemq.spring.ActiveMQConnectionFactory;
import org.springframework.batch.core.step.item.SimpleChunkProcessor;
import org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.AggregatorFactoryBean;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.jms.dsl.Jms;


@Configuration
@IntegrationComponentScan
@EnableIntegration
public class WorkerBatchConfiguration {

    @Bean
    public ActiveMQConnectionFactory connectionFactory() {
        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
        factory.setBrokerURL("tcp://localhost:61616");
        factory.setTrustAllPackages(Boolean.TRUE);
        return factory;
    }

    @Bean
    public DirectChannel requests() {
        return new DirectChannel();
    }

    @Bean
    public DirectChannel replies() {
        return new DirectChannel();
    }

    @Bean
    public IntegrationFlow jmsIn() {
        return IntegrationFlows
                .from(Jms
                        .messageDrivenChannelAdapter(connectionFactory())
                        .configureListenerContainer(c -> c.subscriptionDurable(false))
                        .destination("requests"))
                .channel(requests())
                .get();
    }

    @Bean
    public IntegrationFlow outgoingReplies() {
        return IntegrationFlows
                .from("replies")
                .handle(Jms
                        .outboundGateway(connectionFactory())
                        .requestDestination("replies"))
                .get();
    }

    @Bean
    @ServiceActivator(inputChannel = "requests", outputChannel = "replies", sendTimeout = "10000")
    public ChunkProcessorChunkHandler<Record> chunkProcessorChunkHandler() {
        ChunkProcessorChunkHandler chunkProcessorChunkHandler = new ChunkProcessorChunkHandler();
        chunkProcessorChunkHandler.setChunkProcessor(new SimpleChunkProcessor(recordProcessor(), paymentWriter()));
        return chunkProcessorChunkHandler;
    }

    @Bean
    public SimpleRecordProcessor recordProcessor() {
        return new SimpleRecordProcessor();
    }

    @Bean
    public PaymentWriter paymentWriter() {
        return new PaymentWriter();
    }

}

It seems work OK, but when the slave finishes a chunk, it sends an response to the master which finishes the job and the slave shows a "timeout error":

2018-09-17 13:15:21.509 DEBUG 75729 --- [erContainer#0-1] c.a.processor.PaymentWriter              : 

FINALIZADO CHUNK  ============================


2018-09-17 13:15:21.510 DEBUG 75729 --- [onPool-worker-4] c.a.processor.PaymentWriter              :        PAYMENT: Payment(fullName=Culver Gapper, bic=ES08 9240 0446 6617 7749 9525, amount=75189, currency=€)
2018-09-17 13:15:21.510 DEBUG 75729 --- [erContainer#0-1] c.a.processor.PaymentWriter              :        PAYMENT: Payment(fullName=Burgess Feldbau, bic=ES62 6361 1904 4990 0753 3877, amount=null, currency=€)
2018-09-17 13:15:21.510 DEBUG 75729 --- [onPool-worker-0] c.a.processor.PaymentWriter              :        PAYMENT: Payment(fullName=Brenda Waddell, bic=ES23 4535 5585 5095 5691 1491, amount=28353, currency=€)
2018-09-17 13:15:21.510 DEBUG 75729 --- [onPool-worker-6] c.a.processor.PaymentWriter              :        PAYMENT: Payment(fullName=Brittaney Bliben, bic=ES88 3076 6115 5504 4561 1796, amount=86995, currency=€)
2018-09-17 13:15:21.510 DEBUG 75729 --- [onPool-worker-0] c.a.processor.PaymentWriter              :        PAYMENT: Payment(fullName=Hortensia Willshee, bic=ES62 7020 0819 9813 3352 2742, amount=null, currency=€)
2018-09-17 13:15:21.510 DEBUG 75729 --- [onPool-worker-4] c.a.processor.PaymentWriter              :        PAYMENT: Payment(fullName=Roselin Maccrie, bic=ES34 5876 6541 1999 9568 8714, amount=29865, currency=€)
2018-09-17 13:15:21.510 DEBUG 75729 --- [erContainer#0-1] c.a.processor.PaymentWriter              :        PAYMENT: Payment(fullName=Jonathan Parlet, bic=ES74 5605 5066 6941 1376 6204, amount=null, currency=€)
2018-09-17 13:15:21.510 DEBUG 75729 --- [erContainer#0-1] c.a.processor.PaymentWriter              :        PAYMENT: Payment(fullName=Ilise Semiras, bic=ES59 4689 9344 4052 2235 5296, amount=10698, currency=€)
2018-09-17 13:15:21.510 DEBUG 75729 --- [onPool-worker-5] c.a.processor.PaymentWriter              :        PAYMENT: Payment(fullName=Audrey Lempenny, bic=ES45 2456 6470 0023 3823 3629, amount=56543, currency=€)
2018-09-17 13:15:21.510 DEBUG 75729 --- [onPool-worker-2] c.a.processor.PaymentWriter              :        PAYMENT: Payment(fullName=Hayyim Fetter, bic=ES76 5134 4202 2267 7072 2547, amount=14662, currency=€)
2018-09-17 13:15:21.510 DEBUG 75729 --- [erContainer#0-1] c.a.processor.PaymentWriter              : 

==============================================


2018-09-17 13:15:21.510 DEBUG 75729 --- [erContainer#0-1] o.s.integration.channel.DirectChannel    : preSend on channel 'replies', message: GenericMessage [payload=ChunkResponse: jobId=1, sequence=0, stepContribution=[StepContribution: read=0, written=10, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], successful=true, headers={jms_redelivered=true, JMSXDeliveryCount=2, jms_destination=queue://requests, id=16120698-22b4-c615-502b-7c6d050d82c6, priority=4, jms_timestamp=1537182878228, jms_messageId=ID:mbp-de-jose.neoris.cxnetworks.net-59228-1537182877872-1:2:1:1:1, timestamp=1537182921510}]
2018-09-17 13:15:21.510 DEBUG 75729 --- [erContainer#0-1] o.s.integration.jms.JmsOutboundGateway   : outgoingReplies.org.springframework.integration.jms.JmsOutboundGateway#0 received message: GenericMessage [payload=ChunkResponse: jobId=1, sequence=0, stepContribution=[StepContribution: read=0, written=10, filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], successful=true, headers={jms_redelivered=true, JMSXDeliveryCount=2, jms_destination=queue://requests, id=16120698-22b4-c615-502b-7c6d050d82c6, priority=4, jms_timestamp=1537182878228, jms_messageId=ID:mbp-de-jose.neoris.cxnetworks.net-59228-1537182877872-1:2:1:1:1, timestamp=1537182921510}]
2018-09-17 13:15:21.514 DEBUG 75729 --- [erContainer#0-1] o.s.integration.jms.JmsOutboundGateway   : ReplyTo: temp-queue://ID:mbp-de-jose.neoris.cxnetworks.net-58907-1537181494749-3:24:1
2018-09-17 13:15:26.536  WARN 75729 --- [erContainer#0-1] o.s.j.l.DefaultMessageListenerContainer  : Execution of JMS message listener failed, and no ErrorHandler has been set.

org.springframework.integration.MessageTimeoutException: failed to receive JMS response within timeout of: 5000ms
    at org.springframework.integration.jms.JmsOutboundGateway.handleRequestMessage(JmsOutboundGateway.java:762) ~[spring-integration-jms-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:150) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:142) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:415) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.jms.ChannelPublishingJmsMessageListener$GatewayDelegate.send(ChannelPublishingJmsMessageListener.java:511) ~[spring-integration-jms-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.integration.jms.ChannelPublishingJmsMessageListener.onMessage(ChannelPublishingJmsMessageListener.java:341) ~[spring-integration-jms-5.0.8.RELEASE.jar:5.0.8.RELEASE]
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736) ~[spring-jms-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696) ~[spring-jms-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674) ~[spring-jms-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318) [spring-jms-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257) [spring-jms-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189) [spring-jms-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179) [spring-jms-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076) [spring-jms-5.0.9.RELEASE.jar:5.0.9.RELEASE]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]

The code is available in (https://github.com/jamataran/spring-batch-scale)[https://github.com/jamataran/spring-batch-scale]

1
I see nothing wrong with your config. Does this happen after each chunk request or in the end after all chunks have been processed?Mahmoud Ben Hassine
@MahmoudBenHassine thanks for your interest, It happens each chunk.Jose A. Matarán
See my answer on the matter.Artem Bilan
The remote chunking sample in documentation of v4.0 is actually not correct. On the worker side, an outboundAdapter should be used instead of an outboundGateway. This was fixed in the docs of version 4.1, See docs.spring.io/spring-batch/4.1.x/reference/html/…. So the answer by Artem is correct.Mahmoud Ben Hassine

1 Answers

2
votes

I think your problem is on the worker side here:

@Bean
public IntegrationFlow outgoingReplies() {
    return IntegrationFlows
            .from("replies")
            .handle(Jms
                    .outboundGateway(connectionFactory())
                    .requestDestination("replies"))
            .get();
}

You just send replies and you don't except there anything from the master.

It has to be the one-way

Jms
  .outboundAdapter(connectionFactory())
  .destination("replies")`