I am new to spring, and looking for a spring batch solution for simple ETL process. I followed the following link: https://examples.javacodegeeks.com/enterprise-java/spring/batch/spring-batch-etl-job-example/ for learning.
It works well, and I got the idea of basic structure of using spring batch for ETL.
Next, I wanted to get the file from FTP server as a PoC. Basically, instead of using the hard-coded input file, I want to add the fetch from FTP.
Upon searching, I understand we can use spring integration for FTP/SFTP.
I followed the following link: https://coreyreil.wordpress.com/2012/12/21/spring-batch-creating-an-ftp-tasklet-to-get-remote-files/ to create a tasklet which will do FTP/SFTP fetch.
In the existing the BatchConfiguration.java I added the following to add my FTP task as first most step.
@Bean
public FtpGetRemoteFilesTasklet myFtpGetRemoteFilesTasklet()
{
FtpGetRemoteFilesTasklet ftpTasklet = new FtpGetRemoteFilesTasklet();
ftpTasklet.setRetryIfNotFound(true);
ftpTasklet.setDownloadFileAttempts(3);
ftpTasklet.setRetryIntervalMilliseconds(10000);
ftpTasklet.setFileNamePattern("README");
//ftpTasklet.setFileNamePattern("TestFile");
ftpTasklet.setRemoteDirectory("/");
ftpTasklet.setLocalDirectory(new File(System.getProperty("java.io.tmpdir")));
ftpTasklet.setSessionFactory(myFtpSessionFactory);
return ftpTasklet;
}
@Bean
public SessionFactory myFtpSessionFactory()
{
DefaultFtpSessionFactory ftpSessionFactory = new DefaultFtpSessionFactory();
ftpSessionFactory.setHost("ftp.gnu.org");
ftpSessionFactory.setClientMode(0);
ftpSessionFactory.setFileType(0);
ftpSessionFactory.setPort(21);
ftpSessionFactory.setUsername("anonymous");
ftpSessionFactory.setPassword("anonymous");
return ftpSessionFactory;
}
And then changed by jobbuilder to the following:
// Configure job step
@Bean
public Job marketPricesETLJob() {
return jobBuilderFactory.get("Market Prices ETL Job")
.incrementer(new RunIdIncrementer())
.listener(listener())
//.flow(etlStep()).end().build();
.start(getFilesFromFTPServer()).on("FAILED").end()
.from(getFilesFromFTPServer()).on("COMPLETED").to(etlStep())
.end()
.build();
}
@Bean
public Step getFilesFromFTPServer() {
return stepBuilderFactory.get("Get File from FTP Server")
.tasklet(myFtpGetRemoteFilesTasklet())
.build();
}
@Bean
public Step etlStep() {
return stepBuilderFactory.get("Extract -> Transform -> Aggregate -> Load").<MarketEvent, Trade> chunk(10000)
.reader(marketEventReader())
.processor(marketEventProcessor())
.writer(stockPriceAggregator())
.build();
}
Basically using the site ftp.gnu.org to read README file for example.
But I am getting the following error.
2020-04-28 19:10:19.269 INFO 20380 --- [ main] c.s.demo.SpringBatchDemoApplication : Started SpringBatchDemoApplication in 16.846 seconds (JVM running for 18.903)
2020-04-28 19:10:19.274 INFO 20380 --- [ main] o.s.b.a.b.JobLauncherCommandLineRunner : Running default command line with: []
2020-04-28 19:10:19.748 INFO 20380 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=Market Prices ETL Job]] launched with the following parameters: [{run.id=8}]
2020-04-28 19:10:19.884 INFO 20380 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [Get File from FTP Server]
2020-04-28 19:11:40.046 WARN 20380 --- [l-1 housekeeper] com.zaxxer.hikari.pool.HikariPool : HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=49s792ms777µs228ns).
2020-04-28 19:12:13.603 ERROR 20380 --- [ main] o.s.batch.core.step.AbstractStep : Encountered an error executing step Get File from FTP Server in job Market Prices ETL Job
org.springframework.messaging.MessagingException: Problem occurred while synchronizing 'null' to local directory; nested exception is org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is java.lang.IllegalStateException: failed to create FTPClient
at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:355) ~[spring-integration-file-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:333) ~[spring-integration-file-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at com.springbatch.demo.tasklet.FtpGetRemoteFilesTasklet.execute(FtpGetRemoteFilesTasklet.java:112) ~[classes/:na]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:68) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:68) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:137) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:140) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
at java.base/java.lang.reflect.Method.invoke(Method.java:567) ~[na:na]
at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) ~[spring-aop-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at com.sun.proxy.$Proxy62.run(Unknown Source) ~[na:na]
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.execute(JobLauncherCommandLineRunner.java:192) ~[spring-boot-autoconfigure-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.executeLocalJobs(JobLauncherCommandLineRunner.java:166) ~[spring-boot-autoconfigure-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.launchJobFromProperties(JobLauncherCommandLineRunner.java:153) ~[spring-boot-autoconfigure-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.run(JobLauncherCommandLineRunner.java:148) ~[spring-boot-autoconfigure-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:784) ~[spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:768) ~[spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:322) ~[spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) ~[spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
at com.springbatch.demo.SpringBatchDemoApplication.main(SpringBatchDemoApplication.java:10) ~[classes/:na]
Caused by: org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is java.lang.IllegalStateException: failed to create FTPClient
at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:446) ~[spring-integration-file-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:348) ~[spring-integration-file-5.2.5.RELEASE.jar:5.2.5.RELEASE]
... 43 common frames omitted
Caused by: java.lang.IllegalStateException: failed to create FTPClient
at org.springframework.integration.ftp.session.AbstractFtpSessionFactory.getSession(AbstractFtpSessionFactory.java:170) ~[spring-integration-ftp-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.ftp.session.AbstractFtpSessionFactory.getSession(AbstractFtpSessionFactory.java:41) ~[spring-integration-ftp-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:432) ~[spring-integration-file-5.2.5.RELEASE.jar:5.2.5.RELEASE]
... 44 common frames omitted
Caused by: java.net.ConnectException: Connection timed out: connect
at java.base/sun.nio.ch.Net.connect0(Native Method) ~[na:na]
at java.base/sun.nio.ch.Net.connect(Net.java:493) ~[na:na]
at java.base/sun.nio.ch.Net.connect(Net.java:482) ~[na:na]
at java.base/sun.nio.ch.NioSocketImpl.connect(NioSocketImpl.java:588) ~[na:na]
at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:339) ~[na:na]
at java.base/java.net.Socket.connect(Socket.java:603) ~[na:na]
at org.apache.commons.net.SocketClient._connect(SocketClient.java:243) ~[commons-net-3.6.jar:3.6]
at org.apache.commons.net.SocketClient.connect(SocketClient.java:202) ~[commons-net-3.6.jar:3.6]
at org.springframework.integration.ftp.session.AbstractFtpSessionFactory.createClient(AbstractFtpSessionFactory.java:193) ~[spring-integration-ftp-5.2.5.RELEASE.jar:5.2.5.RELEASE]
The main error is:
Caused by: java.lang.IllegalStateException: failed to create FTPClient
at org.springframework.integration.ftp.session.AbstractFtpSessionFactory.getSession(AbstractFtpSessionFactory.java:170) ~[spring-integration-ftp-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.ftp.session.AbstractFtpSessionFactory.getSession(AbstractFtpSessionFactory.java:41) ~[spring-integration-ftp-5.2.5.RELEASE.jar:5.2.5.RELEASE]
at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:432) ~[spring-integration-file-5.2.5.RELEASE.jar:5.2.5.RELEASE]
... 44 common frames omitted
Caused by: java.net.ConnectException: Connection timed out: connect
I can connect to the website from my machine. Also download the file. How do I check what the issue is? How do I resolve this?
Basically, I am looking for good example for ETL process, which would involve following basic tasks: 1. Get the file from remote host (FTP/SFTP) to local host. 2. Read/Parse the downloaded file. 3. Process the file (some business logic). Possible to read the multiple such downloaded files, combine them (like working with multiple dataframes) 4. Writer to store the processed results in DB (mostly)/any other source.
For the above I am going through some examples to understand how spring batch works with some working examples.
---- Update based on below suggestion ----
Yes, Based on https://docs.spring.io/spring-integration/api/org/springframework/integration/ftp/session/AbstractFtpSessionFactory.html#setClientMode-int- ; I tried both 0 and 2.
I check the ftpSessionFactory, looks good to me based on what I set.
ftpSessionFactory.setClientMode(0);
? – Mahmoud Ben Hassine