I went through the Spring Integration guide and the examples here and got a working sample for Spring Integration SFTP program. I already have a working Spring Batch program that reads a bunch of file and dumps into the Database.
I am now trying to integrate both Spring Batch and Spring Integration programs by going through the Spring docs and I created the below configuration.
<bean id="sftpSessionFactory" class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
<property name="host" value="${host}"/>
<property name="port" value="${port}"/>
<property name="user" value="${user}"/>
<property name="password" value="${password}"/>
</bean>
<int:channel id="inboundFileChannel"><int:queue/></int:channel>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>
<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
channel="inboundFileChannel"
session-factory="sftpSessionFactory"
local-directory="file:/chofac/data/mex/registry/outbox"
remote-directory="/chofac/SFTP/MEX/outbox"
auto-create-local-directory="true"
delete-remote-files="false"
filename-pattern="*.txt">
<int:poller max-messages-per-poll="-1" fixed-rate="1000" />
</int-sftp:inbound-channel-adapter>
<int:transformer input-channel="inboundFileChannel"
output-channel="outboundJobRequestChannel">
<bean class="com.chofac.mint.integration.FileMessageToJobRequest">
<property name="job" ref="responseFileReaderJob"/>
<!-- <property name="fileParameterName" value="input.file.name"/> -->
</bean>
</int:transformer>
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
reply-channel="jobLaunchReplyChannel" job-launcher="jobLauncher">
<int:poller fixed-rate="1000"/>
</batch-int:job-launching-gateway>
<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>
<batch:job id="responseFileReaderJob">
<batch:step id="dailyReaderJob">
<batch:tasklet>
<batch:chunk reader="dailyRRReader" writer="dailyRRDBWriter" processor="itemProcessor" commit-interval="10"/>
</batch:tasklet>
</batch:step>
</batch:job>
I am running this program using the test case below:
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:META-INF/spring/applicationContext.xml","classpath:META-INF/spring/inbound-ResponseReaderJobIntegration.xml"})
public class AAASftpInboundMsgJobTriggerTest {
@Resource(name="inboundFileChannel")
PollableChannel localFileChannel;
@Test
public void runDemo(){
System.out.println("Received first file message: " + localFileChannel.receive());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
I get this error:
2014-07-01 10:51:48,987 [main] INFO org.hibernate.impl.SessionFactoryImpl - closing 2014-07-01 10:51:48,988 [main] DEBUG org.springframework.beans.factory.support.DisposableBeanAdapter - Invoking destroy method 'close' on bean with name 'dataSource' 2014-07-01 10:51:48,988 [main] ERROR org.springframework.test.context.TestContextManager - Caught exception while allowing TestExecutionListener [org.springframework.test.context.support.DependencyInjectionTestExecutionListener@2faadcc6] to prepare test instance [com.chofac.mint.integration.AAASftpInboundMsgJobTriggerTest@5d3ad33d] java.lang.IllegalStateException: Failed to load ApplicationContext at org.springframework.test.context.CacheAwareContextLoaderDelegate.loadContext(CacheAwareContextLoaderDelegate.java:99) at org.springframework.test.context.DefaultTestContext.getApplicationContext(DefaultTestContext.java:101) at org.springframework.test.context.support.DependencyInjectionTestExecutionListener.injectDependencies(DependencyInjectionTestExecutionListener.java:109) at org.springframework.test.context.support.DependencyInjectionTestExecutionListener.prepareTestInstance(DependencyInjectionTestExecutionListener.java:75) at org.springframework.test.context.TestContextManager.prepareTestInstance(TestContextManager.java:326) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.createTest(SpringJUnit4ClassRunner.java:212) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner$1.runReflectiveCall(SpringJUnit4ClassRunner.java:289) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.methodBlock(SpringJUnit4ClassRunner.java:291) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:232) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:89) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222) at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61) at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:71) at org.junit.runners.ParentRunner.run(ParentRunner.java:300) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:175) at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50) at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197) Caused by: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0': Invocation of init method failed; nested exception is java.lang.IllegalArgumentException: No poller has been defined for endpoint 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0', and no default poller is available within the context. at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1553) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:539) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:475) at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:304) at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:228) at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:300) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:195) at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:681) at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:760) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:482) at org.springframework.test.context.support.AbstractGenericContextLoader.loadContext(AbstractGenericContextLoader.java:121) at org.springframework.test.context.support.AbstractGenericContextLoader.loadContext(AbstractGenericContextLoader.java:60) at org.springframework.test.context.support.AbstractDelegatingSmartContextLoader.delegateLoading(AbstractDelegatingSmartContextLoader.java:100) at org.springframework.test.context.support.AbstractDelegatingSmartContextLoader.loadContext(AbstractDelegatingSmartContextLoader.java:250) at org.springframework.test.context.CacheAwareContextLoaderDelegate.loadContextInternal(CacheAwareContextLoaderDelegate.java:64) at org.springframework.test.context.CacheAwareContextLoaderDelegate.loadContext(CacheAwareContextLoaderDelegate.java:91) ... 25 more Caused by: java.lang.IllegalArgumentException: No poller has been defined for endpoint 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0', and no default poller is available within the context. at org.springframework.util.Assert.notNull(Assert.java:112) at org.springframework.integration.config.ConsumerEndpointFactoryBean.initializeEndpoint(ConsumerEndpointFactoryBean.java:238) at org.springframework.integration.config.ConsumerEndpointFactoryBean.afterPropertiesSet(ConsumerEndpointFactoryBean.java:187) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1612) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1549) ... 40 more 2014-07-01 10:51:48,992 [main] DEBUG org.springframework.test.context.support.DirtiesContextTestExecutionListener - After test class: context [DefaultTestContext@ac44b88 testClass = SftpInboundMsgJobTriggerTest, testInstance = [null], testMethod = [null], testException = [null], mergedContextConfiguration = [MergedContextConfiguration@4102799c testClass = SftpInboundMsgJobTriggerTest, locations = '{classpath:META-INF/spring/applicationContext.xml, classpath:META-INF/spring/inbound-ResponseReaderJobIntegration.xml.xml}', classes = '{}', contextInitializerClasses = '[]', activeProfiles = '{}', contextLoader = 'org.springframework.test.context.support.DelegatingSmartContextLoader', parent = [null]]], dirtiesContext [false].
I experimented with most of the top answers on this issue from Google and the suggestions from StachOverflow as I typed this question, all of them resulted in different other errors and I seem to be getting diverted from the main issue.
Most common suggestion was to add a global poller, but this resulted in the error below:
<int:poller default="true" fixed-delay="50"/>
Caused by: java.lang.IllegalArgumentException: A poller should not be specified for endpoint 'org.springframework.integration.config.ConsumerEndpointFactoryBean#1', since 'outboundJobRequestChannel' is a SubscribableChannel (not pollable).
(I am a newbie in all of these, Spring, Spring Batch and Spring Integration) Any help would be greatly appreciated. Thanks in advance.
Update 1
I removed the poller in the #2 like the below
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
reply-channel="jobLaunchReplyChannel" job-launcher="jobLauncher">
</batch-int:job-launching-gateway>
and removed the global poller
<int:poller default="true" fixed-delay="50"/>
I get the below exception:
Exception in thread "main" org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0': Invocation of init method failed; nested exception is java.lang.IllegalArgumentException: No poller has been defined for endpoint 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0', and no default poller is available within the context. at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1553) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:539) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:475) at org.springframework.beans.factory.support.AbstractBeanFactory$1.getObject(AbstractBeanFactory.java:304) at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:228) at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:300) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:195) at org.springframework.beans.factory.support.DefaultListableBeanFactory.preInstantiateSingletons(DefaultListableBeanFactory.java:681) at org.springframework.context.support.AbstractApplicationContext.finishBeanFactoryInitialization(AbstractApplicationContext.java:760) at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:482) at org.springframework.context.support.ClassPathXmlApplicationContext.(ClassPathXmlApplicationContext.java:139) at org.springframework.context.support.ClassPathXmlApplicationContext.(ClassPathXmlApplicationContext.java:93) at com.chofac.mint.integration.DownloadFileRunBatch.main(DownloadFileRunBatch.java:15) Caused by: java.lang.IllegalArgumentException: No poller has been defined for endpoint 'org.springframework.integration.config.ConsumerEndpointFactoryBean#0', and no default poller is available within the context. at org.springframework.util.Assert.notNull(Assert.java:112) at org.springframework.integration.config.ConsumerEndpointFactoryBean.initializeEndpoint(ConsumerEndpointFactoryBean.java:238) at org.springframework.integration.config.ConsumerEndpointFactoryBean.afterPropertiesSet(ConsumerEndpointFactoryBean.java:187) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1612) at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1549) ... 12 more
However... if I leave the global poller the SFTP transfer happens and the job gets triggered
<int:poller default="true" fixed-delay="50"/>
Update 2
I get the below exception if I remove
Exception in thread "main" org.springframework.beans.factory.BeanNotOfRequiredTypeException: Bean named 'inboundFileChannel' must be of type [org.springframework.messaging.PollableChannel], but was actually of type [org.springframework.integration.channel.DirectChannel] at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:376) at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:200) at org.springframework.context.support.AbstractApplicationContext.getBean(AbstractApplicationContext.java:979) at com.chofac.mint.batchintegration.SftpInboundMsgJobTriggerMain.main(SftpInboundMsgJobTriggerMain.java:16)
Here's my configuration again:
<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
channel="inboundFileChannel"
session-factory="sftpSessionFactory"
local-directory="file:${inbound.local.directory}"
remote-directory="${inbound.remote.directory}"
auto-create-local-directory="true"
delete-remote-files="false"
filename-pattern="*.*">
<int:poller max-messages-per-poll="-1" fixed-rate="45000" />
<!-- <int:poller max-messages-per-poll="-1" cron="${MEX.CRON.PATTERN}"/> -->
<!-- 0 15 10 ? * MON-FRI -->
</int-sftp:inbound-channel-adapter>
<int:channel id="inboundFileChannel"></int:channel>
<int:channel id="outboundJobRequestChannel"/>
<int:channel id="jobLaunchReplyChannel"/>
<!-- <int:poller default="true" cron="${MEX.CRON.PATTERN}"/> -->
<int:transformer input-channel="inboundFileChannel"
output-channel="outboundJobRequestChannel">
<bean class="com.chofac.mint.integration.FileMessageToJobRequest">
<property name="job" ref="responseFileReaderJob"/>
</bean>
</int:transformer>
<batch-int:job-launching-gateway request-channel="outboundJobRequestChannel"
reply-channel="jobLaunchReplyChannel" job-launcher="jobLauncher">
</batch-int:job-launching-gateway>
<int:logging-channel-adapter channel="jobLaunchReplyChannel"/>
I am triggering this from a Main program as below:
ApplicationContext context = new ClassPathXmlApplicationContext("classpath:META-INF/spring/applicationContext.xml","classpath:META-INF/spring/batchintegration/inboundSFTPJob.xml");
PollableChannel pollableFileChannel = context.getBean("inboundFileChannel", PollableChannel.class);
System.out.println("Received first file message: " + pollableFileChannel.receive());
Update 3
Sample Configuration, JUnit, and Spring example on the web.
Update 4
<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
channel="inboundFileChannel"
session-factory="sftpSessionFactory"
local-directory="file:${inbound.local.directory}"
remote-directory="${inbound.remote.directory}"
auto-create-local-directory="true"
delete-remote-files="false"
filename-pattern ="*.*"
local-filter="acceptAllFileListFilter">
<int:poller max-messages-per-poll="-1" fixed-rate="45000" />
<bean id="acceptAllFileListFilter" class="org.springframework.integration.file.filters.AcceptAllFileListFilter"/>
program runs into a continuous loop with exceptions. I feel the polling happens very frequently (every second) though I have specified 45 seconds for the poller Full configuration and logs here.