1
votes

I was working on a Spring batch application to execute two batch jobs using the java configuration. Recently i added a Spring scheduler to schedule one of the Job i have written. The listener gets invoked first time the job completes but does not get invoked after the next executions. Following is the code for my Job Configuration :

@Configuration
@EnableBatchProcessing
public class BatchConfiguration{

    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Autowired
    public MongoTemplate mongoTemplate;

    @Autowired
    UnitsRepository unitsRepos;

    @Autowired
    UserRepository userRepository;

    @Autowired
    ElectraService electraService;

    /*@Autowired InfrastructureConfiguration infrastructureConfiguration;*/

    // tag::readerwriterprocessor[]
    @Bean
    @StepScope
    public MongoItemReader<UserBean> reader() {
        MongoItemReader<UserBean> reader = new MongoItemReader<UserBean>();
        reader.setTemplate(mongoTemplate);
        reader.setCollection("user");
        reader.setQuery("{ '_id': 'U3'}");
        reader.setSort(new HashMap<String,Direction>(){{put("_id", Direction.ASC);}});
        reader.setTargetType(UserBean.class);
        return reader;
    }

    @Bean
    public ExceedUsageProcessor processor() {
        return new ExceedUsageProcessor(unitsRepos,electraService);
    }

    @Bean
    public AnomalyProcessor anomalyProcessor() {
        return new AnomalyProcessor(unitsRepos);
    }
    @Bean
    @StepScope
    public MongoItemWriter<DayByDayUsage> writer() {
        MongoItemWriter<DayByDayUsage> writer = new MongoItemWriter<DayByDayUsage>();
        writer.setTemplate(mongoTemplate);
        writer.setCollection("usage");
        return writer;
    }
    // end::readerwriterprocessor[]

    // tag::listener[]

    @Bean
    @StepScope
    public MongoItemWriter<AnomalyBean> anomalyWriter() {
        MongoItemWriter<AnomalyBean> writer = new MongoItemWriter<AnomalyBean>();
        writer.setTemplate(mongoTemplate);
        writer.setCollection("anomaly");
        return writer;
    }

    @Bean
    public ExceedJobNotificationListener listener() {
        return new ExceedJobNotificationListener(mongoTemplate);
    }

    @Bean
    public AnomalyJobListener anomalyListener(){
        return new AnomalyJobListener(mongoTemplate,userRepository);

    }
    // end::listener[]

    // tag::jobstep[]
    @Bean
    public Job notifyUserJob() {
        return jobBuilderFactory.get("notifyUserJob")
                .incrementer(new RunIdIncrementer())
                .listener(listener())
                .flow(step1())
                .end()
                .build();
    }

    @Bean
    public Job anomalyJob() {
        return jobBuilderFactory.get("anomalyJob")
                .incrementer(new RunIdIncrementer())
                .listener(anomalyListener())
                .flow(step2())
                .end()
                .build();
    }
    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<UserBean, DayByDayUsage> chunk(50)
                .reader(reader())
                .processor(processor())
                .writer(writer())
                .taskExecutor(taskExecutor())
                .throttleLimit(10)
                .allowStartIfComplete(true)
                .build();
    }
    // end::jobstep[]

    @Bean
    public Step step2() {
        return stepBuilderFactory.get("step2")
                .<UserBean, AnomalyBean> chunk(50)
                .reader(reader())
                .processor(anomalyProcessor())
                .writer(anomalyWriter())
                .taskExecutor(taskExecutor())
                .throttleLimit(10)
                .allowStartIfComplete(true)
                .build();
    }

    @Bean
    public TaskExecutor taskExecutor() {
        // TODO Auto-generated method stub
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setMaxPoolSize(10);
        taskExecutor.afterPropertiesSet();
        return taskExecutor;
    }

    @Bean
    public DataSource dataSource() {
        return new EmbeddedDatabaseBuilder()
                .setType(EmbeddedDatabaseType.HSQL)
                .addScript("classpath:/org/springframework/batch/core/schema-hsqldb.sql")
                .build();
    }
}

And following is the code for my Scheduler :

@Component
public class AnomalyScheduler {
     private Job myImportJob;
        private JobLauncher jobLauncher;

        @Autowired
        public AnomalyScheduler(JobLauncher jobLauncher, @Qualifier("anomalyJob") Job myImportJob){
            this.myImportJob = myImportJob; 
            this.jobLauncher = jobLauncher;
       }

       @Scheduled(fixedDelay=60000)
       public void runJob(){
           try {
            jobLauncher.run(myImportJob, new JobParameters());
        } catch (JobExecutionAlreadyRunningException | JobRestartException
                | JobInstanceAlreadyCompleteException
                | JobParametersInvalidException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
       }
}

And my listener is as follows :

    public class AnomalyJobListener extends JobExecutionListenerSupport {
    private PushNotification pushNotification = PushNotification
            .getPushNotificationInstance();

    @Autowired
    public AnomalyJobListener(MongoTemplate mongoTemplate,
            UserRepository userRepository) {
        List<AnomalyBean> anomalies = new ArrayList<AnomalyBean>(0);
        anomalies = mongoTemplate.findAll(AnomalyBean.class);
        int numAnomalies = anomalies.size();
        List<UserBean> admins = new ArrayList<UserBean>(0);
        admins = userRepository.userByType("admin");
        if (numAnomalies > 0) {
            for (UserBean admin : admins) {
                pushNotification.pushNotification(numAnomalies
                        + " anomalies detected ! Keep an eye on that.",
                        admin.getDeveiceId());
            }

        }
    }
}

Here is the console output :

2016-05-04 08:17:39.565  INFO 9348 --- [           main] com.electra.Application                  : Starting Application on all-PC with PID 9348 (F:\Electrck\ElectrackJobRepository\ElectrackJobs\bin started by all in F:\Electrck\ElectrackJobRepository\ElectrackJobs)
    2016-05-04 08:17:39.571  INFO 9348 --- [           main] com.electra.Application                  : No active profile set, falling back to default profiles: default
    2016-05-04 08:17:39.681  INFO 9348 --- [           main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@20b2475a: startup date [Wed May 04 08:17:39 IST 2016]; root of context hierarchy
    2016-05-04 08:17:41.943  WARN 9348 --- [           main] o.s.c.a.ConfigurationClassEnhancer       : @Bean method ScopeConfiguration.stepScope is non-static and returns an object assignable to Spring's BeanFactoryPostProcessor interface. This will result in a failure to process annotations such as @Autowired, @Resource and @PostConstruct within the method's declaring @Configuration class. Add the 'static' modifier to this method to avoid these container lifecycle issues; see @Bean javadoc for complete details.
    2016-05-04 08:17:41.966  WARN 9348 --- [           main] o.s.c.a.ConfigurationClassEnhancer       : @Bean method ScopeConfiguration.jobScope is non-static and returns an object assignable to Spring's BeanFactoryPostProcessor interface. This will result in a failure to process annotations such as @Autowired, @Resource and @PostConstruct within the method's declaring @Configuration class. Add the 'static' modifier to this method to avoid these container lifecycle issues; see @Bean javadoc for complete details.
    2016-05-04 08:17:42.258  INFO 9348 --- [           main] o.s.j.d.e.EmbeddedDatabaseFactory        : Starting embedded database: url='jdbc:hsqldb:mem:testdb', username='sa'
    2016-05-04 08:17:42.646  INFO 9348 --- [           main] o.s.jdbc.datasource.init.ScriptUtils     : Executing SQL script from class path resource [org/springframework/batch/core/schema-hsqldb.sql]
    2016-05-04 08:17:42.658  INFO 9348 --- [           main] o.s.jdbc.datasource.init.ScriptUtils     : Executed SQL script from class path resource [org/springframework/batch/core/schema-hsqldb.sql] in 12 ms.
    2016-05-04 08:18:01.793  INFO 9348 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService 
    2016-05-04 08:18:01.812  INFO 9348 --- [           main] o.s.s.concurrent.ThreadPoolTaskExecutor  : Initializing ExecutorService  'taskExecutor'
    push status [ messageId=0:1462330092323002%9b3f4867f9fd7ecd ]
    push status [ messageId=0:1462330095502779%9b3f4867f9fd7ecd ]
    2016-05-04 08:18:16.883  INFO 9348 --- [           main] o.s.jdbc.datasource.init.ScriptUtils     : Executing SQL script from class path resource [org/springframework/batch/core/schema-hsqldb.sql]
    2016-05-04 08:18:16.927  INFO 9348 --- [           main] o.s.jdbc.datasource.init.ScriptUtils     : Executed SQL script from class path resource [org/springframework/batch/core/schema-hsqldb.sql] in 20 ms.
    2016-05-04 08:18:17.392  INFO 9348 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Registering beans for JMX exposure on startup
    2016-05-04 08:18:17.413  INFO 9348 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 0
    2016-05-04 08:18:17.737  INFO 9348 --- [           main] o.s.b.a.b.JobLauncherCommandLineRunner   : Running default command line with: []
    2016-05-04 08:18:17.766  INFO 9348 --- [           main] o.s.b.c.r.s.JobRepositoryFactoryBean     : No database type set, using meta data indicating: HSQL
    2016-05-04 08:18:18.032  INFO 9348 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : No TaskExecutor has been set, defaulting to synchronous executor.
    2016-05-04 08:18:18.147  INFO 9348 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=notifyUserJob]] launched with the following parameters: [{run.id=1}]
    2016-05-04 08:18:18.187  INFO 9348 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step1]
    2016-05-04 08:18:22.044  INFO 9348 --- [pool-2-thread-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=anomalyJob]] launched with the following parameters: [{}]
    2016-05-04 08:18:22.079  INFO 9348 --- [pool-2-thread-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step2]
    2016-05-04 08:18:28.990  INFO 9348 --- [ taskExecutor-1] com.electra.service.ElectraServiceImpl   : date Mon May 02 08:18:12 IST 1
    2016-05-04 08:18:28.991  INFO 9348 --- [ taskExecutor-1] c.e.repository.UnitsRepositoryImpl       : push new unit
    2016-05-04 08:18:32.581  INFO 9348 --- [ taskExecutor-1] com.electra.service.ElectraServiceImpl   : date Wed May 04 08:18:32 IST 2016
    2016-05-04 08:18:32.581  INFO 9348 --- [ taskExecutor-1] c.e.repository.UnitsRepositoryImpl       : push new unit
    2016-05-04 08:19:16.876  INFO 9348 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=notifyUserJob]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED]
    2016-05-04 08:19:16.999  INFO 9348 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=anomalyJob]] launched with the following parameters: [{run.id=1}]
    2016-05-04 08:19:17.053  INFO 9348 --- [           main] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step2]
    2016-05-04 08:19:17.491  INFO 9348 --- [pool-2-thread-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=anomalyJob]] completed with the following parameters: [{}] and the following status: [COMPLETED]
    2016-05-04 08:19:52.399  INFO 9348 --- [           main] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=anomalyJob]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED]
    2016-05-04 08:19:52.401  INFO 9348 --- [           main] com.electra.Application                  : Started Application in 133.639 seconds (JVM running for 134.724)
    2016-05-04 08:20:21.066  INFO 9348 --- [pool-2-thread-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=anomalyJob]] launched with the following parameters: [{}]
    2016-05-04 08:20:21.288  INFO 9348 --- [pool-2-thread-1] o.s.batch.core.job.SimpleStepHandler     : Executing step: [step2]
    2016-05-04 08:20:31.103  INFO 9348 --- [pool-2-thread-1] o.s.b.c.l.support.SimpleJobLauncher      : Job: [FlowJob: [name=anomalyJob]] completed with the following parameters: [{}] and the following status: [COMPLETED]

Please tell me what i am doing wrong and why is the listener not getting executed for subsequent tries.

1
Did you add @EnableScheduling to your configuration?joshiste
I added this annotation on top of my class BatchConfiguration but didn't make a difference. My Listener still gets invoked only for the first time.Aryan Singh
So the job runs? You only have code in your listeners constructor. But not in the beforeJob() afterJob() or any other method... So I guess the listener is called but does nothing...joshiste
The job runs and i added my code to the afterJob() method of listener. but iam getting the exception org.springframework.dao.ConcurrencyFailureException: PreparedStatementCallback; SQL [INSERT into BATCH_JOB_INSTANCE(JOB_INSTANCE_ID, JOB_NAME, JOB_KEY, VERSION) values (?, ?, ?, ?)]; transaction rollback: serialization failure; nested exception is java.sql.SQLTransactionRollbackException: transaction rollback: serialization failure nowAryan Singh

1 Answers

0
votes

You only have code in your listeners constructor. But not in the beforeJob() afterJob() or any other method... So I guess the listener is called but does nothing... So you need to override the appropriate listener methods.