0
votes

EDITS BASED ON SUGGESTION: For brevity, I will remove older code and long part and re-phrase the issue.

I am trying to build the app (Spring boot + Spring Batch) taking the date and config information from command line. Based on suggestions, I can use the application properties? The main aim is to use the same job (task of the job) to download different files form different host/time etc. So, properties file can give the information to use for download and compiled jar should read the info and do its tasks.

Main Entry point.

@SpringBootApplication
public class CoreApplication implements ApplicationRunner {

    @Autowired
    JobLauncher jobLauncher;

    @Autowired
    Job processJob;

    @Value("${rundate}")
    private String run_date;

    private static final Logger logger = LoggerFactory.getLogger(CoreApplication.class);

    public static void main(String[] args) {
        SpringApplication.run(CoreApplication.class, args);
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {

        JobParameters jobParameters = new JobParametersBuilder()
                .addLong("JobID", System.currentTimeMillis())
                .addString("RunDate", run_date)
                .toJobParameters();

        try {
            jobLauncher.run(processJob, jobParameters);
        } catch (Exception e) {
            logger.error("Exception while running a batch job {}", e.getMessage());
        }

    }

}

I rearranged the code, to use the values of server, user, etc from application.properties file. Please let me know if it is wrong way to inject the properties.

application.properties file:

spring.datasource.url=jdbc:postgresql://dbhost:1000/db
spring.datasource.username=username
spring.datasource.password=password
spring.datasource.platform=postgresql
spring.batch.job.enabled=false
local.directory="/my/local/path/"
file.name="file_name_20200601.csv"
remote.directory="/remote/ftp/location"
remote.host="remotehost"
remote.port=22
remote.user="remoteuser"
private.key.location="/key/file/location"

My Batch Configuration:

@Configuration
@EnableBatchProcessing
@EnableIntegration
@EnableAutoConfiguration
public class BatchConfiguration {
    private Logger logger = LoggerFactory.getLogger(BatchConfiguration.class);

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job ftpJob() {
        return jobBuilderFactory.get("FTP Job")
                .incrementer(new RunIdIncrementer())
                .start(getFilesFromFTPServer())
                .build();
    }

    @Bean
    public Step getFilesFromFTPServer() {
        return stepBuilderFactory.get("Get file from server")
                .tasklet(new RemoteFileInboundTasklet())
                .build();

    }
}

My Tasklet:

public class RemoteFileInboundTasklet implements Tasklet {

private Logger logger = LoggerFactory.getLogger(RemoteFileInboundTasklet.class);

@Value("${file.name}")
private String fileNamePattern;

private String clientName;
private boolean deleteLocalFiles = true;
private boolean retryIfNotFound = false;

@Value("${local.directory}")
private String local_directory_value;

private File localDirectory;
private int downloadFileAttempts = 12;
private long retryIntervalMilliseconds = 300000;

@Value("${remote.directory}")
private String remoteDirectory;

@Value("${remote.host}")
private String remoteHost;

@Value("${remote.user}")
private String remoteUser;

@Value("${remote.port}")
private int remotePort;

@Value("${private.key.location}")
private String private_key_file;

public SessionFactory<ChannelSftp.LsEntry> clientSessionFactory() {
    DefaultSftpSessionFactory ftpSessionFactory = new DefaultSftpSessionFactory();
    ftpSessionFactory.setHost(remoteHost);
    ftpSessionFactory.setPort(remotePort);
    ftpSessionFactory.setUser(remoteUser);
    ftpSessionFactory.setPrivateKey(new FileSystemResource(private_key_file));
    ftpSessionFactory.setAllowUnknownKeys(true);
    return ftpSessionFactory;
}

private SessionFactory sessionFactory = clientSessionFactory();

public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
    SftpInboundFileSynchronizer sftpInboundFileSynchronizer = new SftpInboundFileSynchronizer(sessionFactory);
    sftpInboundFileSynchronizer.setDeleteRemoteFiles(false);
    sftpInboundFileSynchronizer.setRemoteDirectory(remoteDirectory);
    return sftpInboundFileSynchronizer;
}

private SftpInboundFileSynchronizer ftpInboundFileSynchronizer = sftpInboundFileSynchronizer();

private SftpInboundFileSynchronizingMessageSource sftpInboundFileSynchronizingMessageSource;

public boolean isDeleteLocalFiles() {
    return deleteLocalFiles;
}

public void setDeleteLocalFiles(boolean deleteLocalFiles) {
    this.deleteLocalFiles = deleteLocalFiles;
}

public SftpInboundFileSynchronizer getFtpInboundFileSynchronizer() {
    return ftpInboundFileSynchronizer;
}

public void setFtpInboundFileSynchronizer(SftpInboundFileSynchronizer ftpInboundFileSynchronizer) {
    this.ftpInboundFileSynchronizer = ftpInboundFileSynchronizer;
}

public SessionFactory getSessionFactory() {
    return sessionFactory;
}

public void setSessionFactory(SessionFactory sessionFactory) {
    this.sessionFactory = sessionFactory;
}

public SftpInboundFileSynchronizingMessageSource getSftpInboundFileSynchronizingMessageSource() {
    return sftpInboundFileSynchronizingMessageSource;
}

public void setSftpInboundFileSynchronizingMessageSource(SftpInboundFileSynchronizingMessageSource sftpInboundFileSynchronizingMessageSource) {
    this.sftpInboundFileSynchronizingMessageSource = sftpInboundFileSynchronizingMessageSource;
}

public String getRemoteDirectory() {
    return remoteDirectory;
}

public void setRemoteDirectory(String remoteDirectory) {
    this.remoteDirectory = remoteDirectory;
}

private SFTPGateway sftpGateway;


@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler clientMessageHandler() {
    SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(clientSessionFactory(), "mget", "payload");
    sftpOutboundGateway.setAutoCreateLocalDirectory(true);
    sftpOutboundGateway.setLocalDirectory(new File(local_directory_value));
    sftpOutboundGateway.setFileExistsMode(FileExistsMode.REPLACE_IF_MODIFIED);
    sftpOutboundGateway.setFilter(new AcceptOnceFileListFilter<>());
    return sftpOutboundGateway;
}

private void deleteLocalFiles()
{
    if (deleteLocalFiles)
    {
        localDirectory = new File(local_directory_value);
        SimplePatternFileListFilter filter = new SimplePatternFileListFilter(fileNamePattern);
        List<File> matchingFiles = filter.filterFiles(localDirectory.listFiles());
        if (CollectionUtils.isNotEmpty(matchingFiles))
        {
            for (File file : matchingFiles)
            {
                FileUtils.deleteQuietly(file);
            }
        }
    }
}

@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {

    deleteLocalFiles();
    ftpInboundFileSynchronizer.synchronizeToLocalDirectory(localDirectory);
    if (retryIfNotFound) {

        SimplePatternFileListFilter filter = new SimplePatternFileListFilter(fileNamePattern);
        int attemptCount = 1;
        while (filter.filterFiles(localDirectory.listFiles()).size() == 0 && attemptCount <= downloadFileAttempts) {

            logger.info("File(s) matching " + fileNamePattern + " not found on remote site. Attempt " + attemptCount + " out of " + downloadFileAttempts);
            Thread.sleep(retryIntervalMilliseconds);
            ftpInboundFileSynchronizer.synchronizeToLocalDirectory(localDirectory);
            attemptCount++;
        }

        if (attemptCount >= downloadFileAttempts && filter.filterFiles(localDirectory.listFiles()).size() == 0) {
            throw new FileNotFoundException("Could not find remote file(s) matching " + fileNamePattern + " after " + downloadFileAttempts + " attempts.");
        }
    }
    return RepeatStatus.FINISHED;
}

public String getFileNamePattern() {
    return fileNamePattern;
}

public void setFileNamePattern(String fileNamePattern) {
    this.fileNamePattern = fileNamePattern;
}

public String getClientName() {
    return clientName;
}

public void setClientName(String clientName) {
    this.clientName = clientName;
}

public boolean isRetryIfNotFound() {
    return retryIfNotFound;
}

public void setRetryIfNotFound(boolean retryIfNotFound) {
    this.retryIfNotFound = retryIfNotFound;
}

public File getLocalDirectory() {
    return localDirectory;
}

public void setLocalDirectory(File localDirectory) {
    this.localDirectory = localDirectory;
}

public int getDownloadFileAttempts() {
    return downloadFileAttempts;
}

public void setDownloadFileAttempts(int downloadFileAttempts) {
    this.downloadFileAttempts = downloadFileAttempts;
}

public long getRetryIntervalMilliseconds() {
    return retryIntervalMilliseconds;
}

public void setRetryIntervalMilliseconds(long retryIntervalMilliseconds) {
    this.retryIntervalMilliseconds = retryIntervalMilliseconds;
}

}

My understanding (please correct here if wrong) that the application.properties file properties can be injected in the tasklet (as above). Then I try to build the package.

mvn clean package

I get the following error:

Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [org.springframework.batch.core.Step]: Factory method 'getFilesFromFTPServer' threw exception; nested exception is java.lang.IllegalArgumentException: Path must not be null
    at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:185) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:651) ~[spring-beans-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    ... 122 common frames omitted
Caused by: java.lang.IllegalArgumentException: Path must not be null
    at org.springframework.util.Assert.notNull(Assert.java:198) ~[spring-core-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    at org.springframework.core.io.FileSystemResource.<init>(FileSystemResource.java:80) ~[spring-core-5.2.6.RELEASE.jar:5.2.6.RELEASE]
    at com.my.batch.core.tasklet.RemoteFileInboundTasklet.clientSessionFactory(RemoteFileInboundTasklet.java:78) ~[classes/:na]
    at com.my.batch.core.tasklet.RemoteFileInboundTasklet.<init>(RemoteFileInboundTasklet.java:83) ~[classes/:na]
    at com.my.batch.core.BatchConfiguration.getFilesFromFTPServer(BatchConfiguration.java:71) ~[classes/:na]
    at com.my.batch.core.BatchConfiguration$$EnhancerBySpringCGLIB$$17d8a6d9.CGLIB$getFilesFromFTPServer$1(<generated>) ~[classes/:na]

The line in the code is:

ftpSessionFactory.setPrivateKey(new FileSystemResource(private_key_file));

called via BatchConfiguration.java -> getFilesFromFTPServer.

This means my values from applcation.properties is not passed? What changes I need to do?

And, while compiling or building the jar, why is it checking the value of variable?

NEW EDITS:

I tried to declare my tasklet as a bean in Configuration and build the package again. However, it is giving the same error.

My application.properties file after change:

spring.datasource.url=jdbc:postgresql://dbhost:1000/db
spring.datasource.username=username
spring.datasource.password=password
spring.datasource.platform=postgresql
spring.batch.job.enabled=false
local.directory=/my/local/path/
file.name=file_name_20200601.csv
remote.directory=/remote/ftp/location
remote.host=remotehost
remote.port=22
remote.user=remoteuser
private.key.location=/key/file/location

No change in tasklet.

Changed Configuration:

@Configuration
@EnableBatchProcessing
@EnableIntegration
@EnableAutoConfiguration
public class BatchConfiguration {
    private Logger logger = LoggerFactory.getLogger(BatchConfiguration.class);

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Bean
    public RemoteFileInboundTasklet remoteFileInboundTasklet() {
        return new RemoteFileInboundTasklet();
    }

    @Bean
    public Job ftpJob() {
        return jobBuilderFactory.get("FTP Job")
                .incrementer(new RunIdIncrementer())
                .start(getFilesFromFTPServer())
                .build();
    }

    @Bean
    public Step getFilesFromFTPServer() {
        return stepBuilderFactory.get("Get file from server")
                .tasklet(remoteFileInboundTasklet())
                .build();

    }
}

When I tried to build the package(mvn clean package), I still get the same error.

Path must not be null.

It is not able to read the properties. Any idea what is wrong?

EDITS BASED ON DIFFERENT APPROACH:

I tried to further see how to use configuration and found the following approach to use the @ConfigurationProperties annotation (How to access a value defined in the application.properties file in Spring Boot)

I created a new ftp config class:

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

@ConfigurationProperties(prefix = "ftp")
@Configuration("coreFtpProperties")
public class CoreFtp {
    private String host;
    private String port;
    private String user;
    private String passwordKey;
    private String localDirectory;
    private String remoteDirectory;
    private String fileName;

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public String getPort() {
        return port;
    }

    public void setPort(String port) {
        this.port = port;
    }

    public String getUser() {
        return user;
    }

    public void setUser(String user) {
        this.user = user;
    }

    public String getPasswordKey() {
        return passwordKey;
    }

    public void setPasswordKey(String passwordKey) {
        this.passwordKey = passwordKey;
    }

    public String getLocalDirectory() {
        return localDirectory;
    }

    public void setLocalDirectory(String localDirectory) {
        this.localDirectory = localDirectory;
    }

    public String getRemoteDirectory() {
        return remoteDirectory;
    }

    public void setRemoteDirectory(String remoteDirectory) {
        this.remoteDirectory = remoteDirectory;
    }

    public String getFileName() {
        return fileName;
    }

    public void setFileName(String fileName) {
        this.fileName = fileName;
    }
}

Minor change to application.properties file:

spring.datasource.url=jdbc:postgresql://dbhost:1000/db
spring.datasource.username=username
spring.datasource.password=password
spring.datasource.platform=postgresql
spring.batch.job.enabled=false
ftp.local_directory=/my/local/path/
ftp.file_name=file_name_20200601.csv
ftp.remote_directory=/remote/ftp/location
ftp.host=remotehost
ftp.port=22
ftp.user=remoteuser
ftp.password_key=/key/file/location

In my batch configuration I made this changes:

@Configuration
@EnableBatchProcessing
@EnableIntegration
public class BatchConfiguration {
    private Logger logger = LoggerFactory.getLogger(BatchConfiguration.class);

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Autowired
    private CoreFtp coreFtpProperties;


    @Bean
    public RemoteFileInboundTasklet remoteFileInboundTasklet() {
        RemoteFileInboundTasklet ftpTasklet = new RemoteFileInboundTasklet();
        ftpTasklet.setRetryIfNotFound(true);
        ftpTasklet.setDownloadFileAttempts(3);
        ftpTasklet.setRetryIntervalMilliseconds(10000);
        ftpTasklet.setFileNamePattern(coreFtpProperties.getFileName());
        ftpTasklet.setRemoteDirectory(coreFtpProperties.getRemoteDirectory());
        ftpTasklet.setLocalDirectory(new File(coreFtpProperties.getLocalDirectory()));
        ftpTasklet.setSessionFactory(clientSessionFactory());
        ftpTasklet.setFtpInboundFileSynchronizer(sftpInboundFileSynchronizer());
        ftpTasklet.setSftpInboundFileSynchronizingMessageSource(new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer()));

        return ftpTasklet;
    }

    @Bean
    public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
        SftpInboundFileSynchronizer sftpInboundFileSynchronizer = new SftpInboundFileSynchronizer(clientSessionFactory());
        sftpInboundFileSynchronizer.setDeleteRemoteFiles(false);
        sftpInboundFileSynchronizer.setRemoteDirectory(coreFtpProperties.getRemoteDirectory());
        return sftpInboundFileSynchronizer;
    }

    @Bean(name = "clientSessionFactory")
    public SessionFactory<LsEntry> clientSessionFactory() {
        DefaultSftpSessionFactory ftpSessionFactory = new DefaultSftpSessionFactory();
        ftpSessionFactory.setHost(coreFtpProperties.getHost());
        ftpSessionFactory.setPort(Integer.parseInt(coreFtpProperties.getPort()));
        ftpSessionFactory.setUser(coreFtpProperties.getUser());
        ftpSessionFactory.setPrivateKey(new FileSystemResource(coreFtpProperties.getPasswordKey()));
        ftpSessionFactory.setPassword("");
        ftpSessionFactory.setAllowUnknownKeys(true);
        return ftpSessionFactory;
    }

    @Bean
    @ServiceActivator(inputChannel = "sftpChannel")
    public MessageHandler clientMessageHandler() {
        SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(clientSessionFactory(), "mget", "payload");
        sftpOutboundGateway.setAutoCreateLocalDirectory(true);
        sftpOutboundGateway.setLocalDirectory(new File(coreFtpProperties.getLocalDirectory()));
        sftpOutboundGateway.setFileExistsMode(FileExistsMode.REPLACE_IF_MODIFIED);
        sftpOutboundGateway.setFilter(new AcceptOnceFileListFilter<>());
        return sftpOutboundGateway;
    }


    @Bean
    public Job ftpJob() {
        return jobBuilderFactory.get("FTP Job")
                .incrementer(new RunIdIncrementer())
                .start(getFilesFromFTPServer())
                .build();
    }

    @Bean
    public Step getFilesFromFTPServer() {
        return stepBuilderFactory.get("Get file from server")
                .tasklet(remoteFileInboundTasklet())
                .build();

    }



}

So, accordingly my Tasklet is changed as:

public class RemoteFileInboundTasklet implements Tasklet {

    private Logger logger = LoggerFactory.getLogger(RemoteFileInboundTasklet.class);

    private String fileNamePattern;

    private String clientName;
    private boolean deleteLocalFiles = true;
    private boolean retryIfNotFound = false;

    private File localDirectory;

    private int downloadFileAttempts = 12;
    private long retryIntervalMilliseconds = 300000;

    private String remoteDirectory;

    private SessionFactory sessionFactory;
    private SftpInboundFileSynchronizer ftpInboundFileSynchronizer;

    private SftpInboundFileSynchronizingMessageSource sftpInboundFileSynchronizingMessageSource;

    public boolean isDeleteLocalFiles() {
        return deleteLocalFiles;
    }

    public void setDeleteLocalFiles(boolean deleteLocalFiles) {
        this.deleteLocalFiles = deleteLocalFiles;
    }

    public SftpInboundFileSynchronizer getFtpInboundFileSynchronizer() {
        return ftpInboundFileSynchronizer;
    }

    public void setFtpInboundFileSynchronizer(SftpInboundFileSynchronizer ftpInboundFileSynchronizer) {
        this.ftpInboundFileSynchronizer = ftpInboundFileSynchronizer;
    }

    public SessionFactory getSessionFactory() {
        return sessionFactory;
    }

    public void setSessionFactory(SessionFactory sessionFactory) {
        this.sessionFactory = sessionFactory;
    }

    public SftpInboundFileSynchronizingMessageSource getSftpInboundFileSynchronizingMessageSource() {
        return sftpInboundFileSynchronizingMessageSource;
    }

    public void setSftpInboundFileSynchronizingMessageSource(SftpInboundFileSynchronizingMessageSource sftpInboundFileSynchronizingMessageSource) {
        this.sftpInboundFileSynchronizingMessageSource = sftpInboundFileSynchronizingMessageSource;
    }



    public String getRemoteDirectory() {
        return remoteDirectory;
    }

    public void setRemoteDirectory(String remoteDirectory) {
        this.remoteDirectory = remoteDirectory;
    }

    private SFTPGateway sftpGateway;


    private void deleteLocalFiles()
    {
        if (deleteLocalFiles)
        {
            SimplePatternFileListFilter filter = new SimplePatternFileListFilter(fileNamePattern);
            List<File> matchingFiles = filter.filterFiles(localDirectory.listFiles());
            if (CollectionUtils.isNotEmpty(matchingFiles))
            {
                for (File file : matchingFiles)
                {
                    FileUtils.deleteQuietly(file);
                }
            }
        }
    }

    @Override
    public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {

        deleteLocalFiles();

        ftpInboundFileSynchronizer.synchronizeToLocalDirectory(localDirectory);

        if (retryIfNotFound) {

            SimplePatternFileListFilter filter = new SimplePatternFileListFilter(fileNamePattern);
            int attemptCount = 1;
            while (filter.filterFiles(localDirectory.listFiles()).size() == 0 && attemptCount <= downloadFileAttempts) {

                logger.info("File(s) matching " + fileNamePattern + " not found on remote site. Attempt " + attemptCount + " out of " + downloadFileAttempts);
                Thread.sleep(retryIntervalMilliseconds);
                ftpInboundFileSynchronizer.synchronizeToLocalDirectory(localDirectory);
                attemptCount++;
            }

            if (attemptCount >= downloadFileAttempts && filter.filterFiles(localDirectory.listFiles()).size() == 0) {
                throw new FileNotFoundException("Could not find remote file(s) matching " + fileNamePattern + " after " + downloadFileAttempts + " attempts.");
            }
        }
        return RepeatStatus.FINISHED;
    }
}

Based on above changes, I am able to compile the code and create the necessary Jar, and run the code using the jar.

2

2 Answers

0
votes

You are declaring a bean jobExecutionListener() in which you create new FileSystemResource(config_file_path);. The config_file_path is injected from job parameters @Value("#{jobParameters['ConfigFilePath']}") which are not available at configuration time but only when a job/step is run. This is called late binding.

So in your case, when Spring tries to create the bean jobExecutionListener(), it tries to inject config_file_path but it is null at that time (at this point Spring is only creating beans to configure the application context) and the job is not run yet hence the method beforeJob is not executed yet. This is the reason you have a NullPointerException. Adding @JobScope on the jobExecutionListener() bean should fix the issue but I do not recommend that. The reason is that you are trying to configure some properties in the wrong way and in the wrong place, so I would fix that design instead of working around the issue by adding an annotation.

Job parameters are used for business parameters and not technical details. In your case, runDate is a good choice for a job parameter but not ConfigFilePath. Moreover, since you use Spring, why do you inject the file path then do properties = PropertiesLoaderUtils.loadProperties(resource); and Integer.parseInt(properties.getProperty("remote.port"));? Spring will do that for you if tell it to inject properties where needed.

I would remove this config_file_path job parameter as well as the job listener and inject the properties in the remoteFileInboundTasklet directly, that is, as close as possible to where these properties are needed.

Edit: Add code example

Can you help to understand where can I declare the tasklet as a bean?

In your step getFilesFromFTPServer , you are creating the tasklet manually, so dependency injection is not performed. You need to declare the tasklet as a Spring bean for this to work, something like:

@Bean
public Tasklet myTasklet() {
   return new RemoteFileInboundTasklet()
}

@Bean
public Step getFilesFromFTPServer() {
    return stepBuilderFactory.get("Get file from server")
            .tasklet(myTasklet())
            .build();

}
0
votes

You need to change getFilesFromFTPServer bean to JobScope and read all the job runtime parameters from there.

@Bean
@JobScope
public Step getFilesFromFTPServer() {