0
votes

I have implemented Spring Batch + Spring Integration combination with DSL referring to following link

https://github.com/cppwfs/spring-batch/blob/1f7cada52aba95bcb23d06bc034b21fe1de0a7a5/spring-batch-docs/asciidoc/spring-batch-integration.adoc#launching-batch-jobs-through-messages

Here is the code.

@Bean
public IntegrationFlow integrationFlowUi(JobLaunchingGateway jobLaunchingGateway) {
    return IntegrationFlows.from(Files.inboundAdapter(new File(incomingDirUi)).
                    filter(new SimplePatternFileListFilter("*.csv")).
                    filter(new AcceptOnceFileListFilter<>()),
            c -> c.poller(Pollers.fixedRate(500).maxMessagesPerPoll(1))).
            channel("bridgeChannel").
            handle(fileMessageToJobRequest()).
            handle(jobLaunchingGateway).
            log(LoggingHandler.Level.WARN, "headers.id + ': ' + payload").
            get();
}

with Batch config as

@Bean
public Job fundingCardActivationJob() {
    return jobBuilderFactory.get("fundingCardActivationJob")
            .incrementer(new RunIdIncrementer())
            .flow(activationStep())
            .next(fileRenamingStep())
            .end()
            .build();
}

Job runs. But At the end it throws following exception.

2017-12-15 12:39:54.867  WARN 13723 --- [ask-scheduler-1] headers.id + ': ' + payload              : GenericMessage [payload=JobExecution: id=572, version=2, startTime=Fri Dec 15 12:39:52 PST 2017, endTime=Fri Dec 15 12:39:54 PST 2017, lastUpdated=Fri Dec 15 12:39:54 PST 2017, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=348, version=0, Job=[fundingCardActivationJob]], jobParameters=[{input.file.name=/Users/sudhir/ui/fundactive_2748_4444444444444444_2017-11-17_12-12-20.018.csv}], headers={id=4dcafdd0-aaed-ff62-bed9-781db41dfdf2, timestamp=1513370394863}]

2017-12-15 12:39:54.900 ERROR 13723 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.integrationFlowUi.channel#2'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=JobExecution: id=572, version=2, startTime=Fri Dec 15 12:39:52 PST 2017, endTime=Fri Dec 15 12:39:54 PST 2017, lastUpdated=Fri Dec 15 12:39:54 PST 2017, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=348, version=0, Job=[fundingCardActivationJob]], jobParameters=[{input.file.name=/Users/sudhir/ui/fundactive_2748_4444444444444444_2017-11-17_12-12-20.018.csv}], headers={id=4dcafdd0-aaed-ff62-bed9-781db41dfdf2, timestamp=1513370394863}], failedMessage=GenericMessage [payload=JobExecution: id=572, version=2, startTime=Fri Dec 15 12:39:52 PST 2017, endTime=Fri Dec 15 12:39:54 PST 2017, lastUpdated=Fri Dec 15 12:39:54 PST 2017, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=348, version=0, Job=[fundingCardActivationJob]], jobParameters=[{input.file.name=/Users/sudhir/ui/fundactive_2748_4444444444444444_2017-11-17_12-12-20.018.csv}], headers={id=4dcafdd0-aaed-ff62-bed9-781db41dfdf2, timestamp=1513370394863}] at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:93) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:425) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:375) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:360) at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:271) at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:188) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:425) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:375) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:360) at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:271) at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:188) at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:425) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:375) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:210) at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:272) at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:58) at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:190) at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:186) at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:353) at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55) at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51) at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:344) at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=JobExecution: id=572, version=2, startTime=Fri Dec 15 12:39:52 PST 2017, endTime=Fri Dec 15 12:39:54 PST 2017, lastUpdated=Fri Dec 15 12:39:54 PST 2017, status=COMPLETED, exitStatus=exitCode=COMPLETED;exitDescription=, job=[JobInstance: id=348, version=0, Job=[fundingCardActivationJob]], jobParameters=[{input.file.name=/Users/sudhir/ui/fundactive_2748_4444444444444444_2017-11-17_12-12-20.018.csv}], headers={id=4dcafdd0-aaed-ff62-bed9-781db41dfdf2, timestamp=1513370394863}] at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:154) at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ... 52 more

I have no idea why the exception is being thrown when integration configuration seems complete.

1
Edit the question to add the full stack trace so we can see which channel it is.Gary Russell
Done. Added exception trail.Sudhirkd

1 Answers

0
votes

Something is closing (or stopping) the application context before the JobExecution result is sent to the .log().