0
votes

I am new to spring, and looking for a spring batch solution for simple ETL process. I followed the following link: https://examples.javacodegeeks.com/enterprise-java/spring/batch/spring-batch-etl-job-example/ for learning.

It works well, and I got the idea of basic structure of using spring batch for ETL.

Next, I wanted to get the file from FTP server as a PoC. Basically, instead of using the hard-coded input file, I want to add the fetch from FTP.

Upon searching, I understand we can use spring integration for FTP/SFTP.

I followed the following link: https://coreyreil.wordpress.com/2012/12/21/spring-batch-creating-an-ftp-tasklet-to-get-remote-files/ to create a tasklet which will do FTP/SFTP fetch.

In the existing the BatchConfiguration.java I added the following to add my FTP task as first most step.

@Bean
public FtpGetRemoteFilesTasklet myFtpGetRemoteFilesTasklet()
{
    FtpGetRemoteFilesTasklet  ftpTasklet = new FtpGetRemoteFilesTasklet();
    ftpTasklet.setRetryIfNotFound(true);
    ftpTasklet.setDownloadFileAttempts(3);
    ftpTasklet.setRetryIntervalMilliseconds(10000);
    ftpTasklet.setFileNamePattern("README");
    //ftpTasklet.setFileNamePattern("TestFile");
    ftpTasklet.setRemoteDirectory("/");
    ftpTasklet.setLocalDirectory(new File(System.getProperty("java.io.tmpdir")));
    ftpTasklet.setSessionFactory(myFtpSessionFactory);

    return ftpTasklet;
}

@Bean
public SessionFactory myFtpSessionFactory()
{
    DefaultFtpSessionFactory ftpSessionFactory = new DefaultFtpSessionFactory();
    ftpSessionFactory.setHost("ftp.gnu.org");
    ftpSessionFactory.setClientMode(0);
    ftpSessionFactory.setFileType(0);
    ftpSessionFactory.setPort(21);
    ftpSessionFactory.setUsername("anonymous");
    ftpSessionFactory.setPassword("anonymous");

    return ftpSessionFactory;
}

And then changed by jobbuilder to the following:

// Configure job step
@Bean
public Job marketPricesETLJob() {
    return jobBuilderFactory.get("Market Prices ETL Job")
            .incrementer(new RunIdIncrementer())
            .listener(listener())
            //.flow(etlStep()).end().build();
            .start(getFilesFromFTPServer()).on("FAILED").end()
            .from(getFilesFromFTPServer()).on("COMPLETED").to(etlStep())
            .end()
            .build();
}

@Bean
public Step getFilesFromFTPServer() {
    return stepBuilderFactory.get("Get File from FTP Server")
            .tasklet(myFtpGetRemoteFilesTasklet())
            .build();
}

@Bean
public Step etlStep() {
    return stepBuilderFactory.get("Extract -> Transform -> Aggregate -> Load").<MarketEvent, Trade> chunk(10000)
            .reader(marketEventReader())
            .processor(marketEventProcessor())
            .writer(stockPriceAggregator())
            .build();
}

Basically using the site ftp.gnu.org to read README file for example.

But I am getting the following error.

2020-04-28 19:10:19.269  INFO 20380 --- [           main] c.s.demo.SpringBatchDemoApplication      : Started SpringBatchDemoApplication in 16.846 seconds (JVM running for 18.903)
2020-04-28 19:10:19.274  INFO 20380 --- [           main] o.s.b.a.b.JobLauncherCommandLineRunner   : Running default command line with: []
2020-04-28 19:10:19.748  INFO 20380 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=Market Prices ETL Job]] launched with the following parameters: [{run.id=8}]
2020-04-28 19:10:19.884  INFO 20380 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [Get File from FTP Server]
2020-04-28 19:11:40.046  WARN 20380 --- [l-1 housekeeper] com.zaxxer.hikari.pool.HikariPool        : HikariPool-1 - Thread starvation or clock leap detected (housekeeper delta=49s792ms777µs228ns).
2020-04-28 19:12:13.603 ERROR 20380 --- [           main] o.s.batch.core.step.AbstractStep         : Encountered an error executing step Get File from FTP Server in job Market Prices ETL Job

org.springframework.messaging.MessagingException: Problem occurred while synchronizing 'null' to local directory; nested exception is org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is java.lang.IllegalStateException: failed to create FTPClient
    at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:355) ~[spring-integration-file-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:333) ~[spring-integration-file-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at com.springbatch.demo.tasklet.FtpGetRemoteFilesTasklet.execute(FtpGetRemoteFilesTasklet.java:112) ~[classes/:na]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:407) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:331) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:140) ~[spring-tx-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:273) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:82) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:375) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:145) ~[spring-batch-infrastructure-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:258) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:208) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.job.flow.JobFlowExecutor.executeStep(JobFlowExecutor.java:68) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.job.flow.support.state.StepState.handle(StepState.java:68) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.job.flow.support.SimpleFlow.resume(SimpleFlow.java:169) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.job.flow.support.SimpleFlow.start(SimpleFlow.java:144) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.job.flow.FlowJob.doExecute(FlowJob.java:137) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:319) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:147) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) ~[spring-core-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:140) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:na]
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:na]
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:na]
    at java.base/java.lang.reflect.Method.invoke(Method.java:567) ~[na:na]
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:344) ~[spring-aop-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:198) ~[spring-aop-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:127) ~[spring-batch-core-4.2.1.RELEASE.jar:4.2.1.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:212) ~[spring-aop-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at com.sun.proxy.$Proxy62.run(Unknown Source) ~[na:na]
    at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.execute(JobLauncherCommandLineRunner.java:192) ~[spring-boot-autoconfigure-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.executeLocalJobs(JobLauncherCommandLineRunner.java:166) ~[spring-boot-autoconfigure-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.launchJobFromProperties(JobLauncherCommandLineRunner.java:153) ~[spring-boot-autoconfigure-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.boot.autoconfigure.batch.JobLauncherCommandLineRunner.run(JobLauncherCommandLineRunner.java:148) ~[spring-boot-autoconfigure-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.boot.SpringApplication.callRunner(SpringApplication.java:784) ~[spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.boot.SpringApplication.callRunners(SpringApplication.java:768) ~[spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:322) ~[spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226) ~[spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) ~[spring-boot-2.2.6.RELEASE.jar:2.2.6.RELEASE]
    at com.springbatch.demo.SpringBatchDemoApplication.main(SpringBatchDemoApplication.java:10) ~[classes/:na]
Caused by: org.springframework.messaging.MessagingException: Failed to execute on session; nested exception is java.lang.IllegalStateException: failed to create FTPClient
    at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:446) ~[spring-integration-file-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.integration.file.remote.synchronizer.AbstractInboundFileSynchronizer.synchronizeToLocalDirectory(AbstractInboundFileSynchronizer.java:348) ~[spring-integration-file-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    ... 43 common frames omitted
Caused by: java.lang.IllegalStateException: failed to create FTPClient
    at org.springframework.integration.ftp.session.AbstractFtpSessionFactory.getSession(AbstractFtpSessionFactory.java:170) ~[spring-integration-ftp-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.integration.ftp.session.AbstractFtpSessionFactory.getSession(AbstractFtpSessionFactory.java:41) ~[spring-integration-ftp-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:432) ~[spring-integration-file-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    ... 44 common frames omitted
Caused by: java.net.ConnectException: Connection timed out: connect
    at java.base/sun.nio.ch.Net.connect0(Native Method) ~[na:na]
    at java.base/sun.nio.ch.Net.connect(Net.java:493) ~[na:na]
    at java.base/sun.nio.ch.Net.connect(Net.java:482) ~[na:na]
    at java.base/sun.nio.ch.NioSocketImpl.connect(NioSocketImpl.java:588) ~[na:na]
    at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:339) ~[na:na]
    at java.base/java.net.Socket.connect(Socket.java:603) ~[na:na]
    at org.apache.commons.net.SocketClient._connect(SocketClient.java:243) ~[commons-net-3.6.jar:3.6]
    at org.apache.commons.net.SocketClient.connect(SocketClient.java:202) ~[commons-net-3.6.jar:3.6]
    at org.springframework.integration.ftp.session.AbstractFtpSessionFactory.createClient(AbstractFtpSessionFactory.java:193) ~[spring-integration-ftp-5.2.5.RELEASE.jar:5.2.5.RELEASE]

The main error is:

Caused by: java.lang.IllegalStateException: failed to create FTPClient
    at org.springframework.integration.ftp.session.AbstractFtpSessionFactory.getSession(AbstractFtpSessionFactory.java:170) ~[spring-integration-ftp-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.integration.ftp.session.AbstractFtpSessionFactory.getSession(AbstractFtpSessionFactory.java:41) ~[spring-integration-ftp-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    at org.springframework.integration.file.remote.RemoteFileTemplate.execute(RemoteFileTemplate.java:432) ~[spring-integration-file-5.2.5.RELEASE.jar:5.2.5.RELEASE]
    ... 44 common frames omitted
Caused by: java.net.ConnectException: Connection timed out: connect

I can connect to the website from my machine. Also download the file. How do I check what the issue is? How do I resolve this?

Basically, I am looking for good example for ETL process, which would involve following basic tasks: 1. Get the file from remote host (FTP/SFTP) to local host. 2. Read/Parse the downloaded file. 3. Process the file (some business logic). Possible to read the multiple such downloaded files, combine them (like working with multiple dataframes) 4. Writer to store the processed results in DB (mostly)/any other source.

For the above I am going through some examples to understand how spring batch works with some working examples.

---- Update based on below suggestion ----

Yes, Based on https://docs.spring.io/spring-integration/api/org/springframework/integration/ftp/session/AbstractFtpSessionFactory.html#setClientMode-int- ; I tried both 0 and 2.

I check the ftpSessionFactory, looks good to me based on what I set.

enter image description here

1
Make sure your FTP session factory is correctly configured. Have you tried another client mode in ftpSessionFactory.setClientMode(0);?Mahmoud Ben Hassine
Thanks, I updated my question with above, what I tried and checked.Mihir

1 Answers

0
votes

Knowing the following pieces of information may help to pinpoint the issue.

  1. The code of isSftp() method (mentioned in your link)
  2. The declaration of the field myFtpSessionFactory from your listing

Have you tried to narrow down the problem by taking any of the following steps? Anyway those are shots in the dark.

  1. Provoke it to throw java.net.UnknownHostException by setting the following host name.

    ftpSessionFactory.setHost("invalid")
    

    If it doesn't happen I'd suspect that a wrong instance of SessionFactory gets injected

  2. Similarly, provoke it to throw Connection refused by setting the following host name and port.

    ftpSessionFactory.setHost("localhost") 
    ftpSessionFactory.setPort(42)
    
  3. What happens if we set a different timeout?

    ftpSessionFactory.setConnectTimeout(60000);
    
  4. Set up a breakpoint at the line AbstractFtpSessionFactory.java:193

    a. Inspect the value of client.connectionTimeout. It's supposed to be, say, at least 60000ms or 0

    b. Inspect the values of host and port

  5. Run a simplified equivalent piece of code. Does it still reproduce the error?

    DefaultFtpSessionFactory ftpSessionFactory = new DefaultFtpSessionFactory();
    ftpSessionFactory.setHost("ftp.gnu.org");
    ftpSessionFactory.setClientMode(0);
    ftpSessionFactory.setFileType(0);
    ftpSessionFactory.setPort(21);
    ftpSessionFactory.setUsername("anonymous");
    ftpSessionFactory.setPassword("anonymous");
    
    AbstractInboundFileSynchronizer<?> synchronizer = new FtpInboundFileSynchronizer(ftpSessionFactory);
    File localDirectory = null;
    synchronizer.synchronizeToLocalDirectory(localDirectory);