I was working on a Spring batch application to execute two batch jobs using the java configuration. Recently i added a Spring scheduler to schedule one of the Job i have written. The listener gets invoked first time the job completes but does not get invoked after the next executions. Following is the code for my Job Configuration :
@Configuration
@EnableBatchProcessing
public class BatchConfiguration{
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
public MongoTemplate mongoTemplate;
@Autowired
UnitsRepository unitsRepos;
@Autowired
UserRepository userRepository;
@Autowired
ElectraService electraService;
/*@Autowired InfrastructureConfiguration infrastructureConfiguration;*/
// tag::readerwriterprocessor[]
@Bean
@StepScope
public MongoItemReader<UserBean> reader() {
MongoItemReader<UserBean> reader = new MongoItemReader<UserBean>();
reader.setTemplate(mongoTemplate);
reader.setCollection("user");
reader.setQuery("{ '_id': 'U3'}");
reader.setSort(new HashMap<String,Direction>(){{put("_id", Direction.ASC);}});
reader.setTargetType(UserBean.class);
return reader;
}
@Bean
public ExceedUsageProcessor processor() {
return new ExceedUsageProcessor(unitsRepos,electraService);
}
@Bean
public AnomalyProcessor anomalyProcessor() {
return new AnomalyProcessor(unitsRepos);
}
@Bean
@StepScope
public MongoItemWriter<DayByDayUsage> writer() {
MongoItemWriter<DayByDayUsage> writer = new MongoItemWriter<DayByDayUsage>();
writer.setTemplate(mongoTemplate);
writer.setCollection("usage");
return writer;
}
// end::readerwriterprocessor[]
// tag::listener[]
@Bean
@StepScope
public MongoItemWriter<AnomalyBean> anomalyWriter() {
MongoItemWriter<AnomalyBean> writer = new MongoItemWriter<AnomalyBean>();
writer.setTemplate(mongoTemplate);
writer.setCollection("anomaly");
return writer;
}
@Bean
public ExceedJobNotificationListener listener() {
return new ExceedJobNotificationListener(mongoTemplate);
}
@Bean
public AnomalyJobListener anomalyListener(){
return new AnomalyJobListener(mongoTemplate,userRepository);
}
// end::listener[]
// tag::jobstep[]
@Bean
public Job notifyUserJob() {
return jobBuilderFactory.get("notifyUserJob")
.incrementer(new RunIdIncrementer())
.listener(listener())
.flow(step1())
.end()
.build();
}
@Bean
public Job anomalyJob() {
return jobBuilderFactory.get("anomalyJob")
.incrementer(new RunIdIncrementer())
.listener(anomalyListener())
.flow(step2())
.end()
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<UserBean, DayByDayUsage> chunk(50)
.reader(reader())
.processor(processor())
.writer(writer())
.taskExecutor(taskExecutor())
.throttleLimit(10)
.allowStartIfComplete(true)
.build();
}
// end::jobstep[]
@Bean
public Step step2() {
return stepBuilderFactory.get("step2")
.<UserBean, AnomalyBean> chunk(50)
.reader(reader())
.processor(anomalyProcessor())
.writer(anomalyWriter())
.taskExecutor(taskExecutor())
.throttleLimit(10)
.allowStartIfComplete(true)
.build();
}
@Bean
public TaskExecutor taskExecutor() {
// TODO Auto-generated method stub
ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
taskExecutor.setMaxPoolSize(10);
taskExecutor.afterPropertiesSet();
return taskExecutor;
}
@Bean
public DataSource dataSource() {
return new EmbeddedDatabaseBuilder()
.setType(EmbeddedDatabaseType.HSQL)
.addScript("classpath:/org/springframework/batch/core/schema-hsqldb.sql")
.build();
}
}
And following is the code for my Scheduler :
@Component
public class AnomalyScheduler {
private Job myImportJob;
private JobLauncher jobLauncher;
@Autowired
public AnomalyScheduler(JobLauncher jobLauncher, @Qualifier("anomalyJob") Job myImportJob){
this.myImportJob = myImportJob;
this.jobLauncher = jobLauncher;
}
@Scheduled(fixedDelay=60000)
public void runJob(){
try {
jobLauncher.run(myImportJob, new JobParameters());
} catch (JobExecutionAlreadyRunningException | JobRestartException
| JobInstanceAlreadyCompleteException
| JobParametersInvalidException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
And my listener is as follows :
public class AnomalyJobListener extends JobExecutionListenerSupport {
private PushNotification pushNotification = PushNotification
.getPushNotificationInstance();
@Autowired
public AnomalyJobListener(MongoTemplate mongoTemplate,
UserRepository userRepository) {
List<AnomalyBean> anomalies = new ArrayList<AnomalyBean>(0);
anomalies = mongoTemplate.findAll(AnomalyBean.class);
int numAnomalies = anomalies.size();
List<UserBean> admins = new ArrayList<UserBean>(0);
admins = userRepository.userByType("admin");
if (numAnomalies > 0) {
for (UserBean admin : admins) {
pushNotification.pushNotification(numAnomalies
+ " anomalies detected ! Keep an eye on that.",
admin.getDeveiceId());
}
}
}
}
Here is the console output :
2016-05-04 08:17:39.565 INFO 9348 --- [ main] com.electra.Application : Starting Application on all-PC with PID 9348 (F:\Electrck\ElectrackJobRepository\ElectrackJobs\bin started by all in F:\Electrck\ElectrackJobRepository\ElectrackJobs)
2016-05-04 08:17:39.571 INFO 9348 --- [ main] com.electra.Application : No active profile set, falling back to default profiles: default
2016-05-04 08:17:39.681 INFO 9348 --- [ main] s.c.a.AnnotationConfigApplicationContext : Refreshing org.springframework.context.annotation.AnnotationConfigApplicationContext@20b2475a: startup date [Wed May 04 08:17:39 IST 2016]; root of context hierarchy
2016-05-04 08:17:41.943 WARN 9348 --- [ main] o.s.c.a.ConfigurationClassEnhancer : @Bean method ScopeConfiguration.stepScope is non-static and returns an object assignable to Spring's BeanFactoryPostProcessor interface. This will result in a failure to process annotations such as @Autowired, @Resource and @PostConstruct within the method's declaring @Configuration class. Add the 'static' modifier to this method to avoid these container lifecycle issues; see @Bean javadoc for complete details.
2016-05-04 08:17:41.966 WARN 9348 --- [ main] o.s.c.a.ConfigurationClassEnhancer : @Bean method ScopeConfiguration.jobScope is non-static and returns an object assignable to Spring's BeanFactoryPostProcessor interface. This will result in a failure to process annotations such as @Autowired, @Resource and @PostConstruct within the method's declaring @Configuration class. Add the 'static' modifier to this method to avoid these container lifecycle issues; see @Bean javadoc for complete details.
2016-05-04 08:17:42.258 INFO 9348 --- [ main] o.s.j.d.e.EmbeddedDatabaseFactory : Starting embedded database: url='jdbc:hsqldb:mem:testdb', username='sa'
2016-05-04 08:17:42.646 INFO 9348 --- [ main] o.s.jdbc.datasource.init.ScriptUtils : Executing SQL script from class path resource [org/springframework/batch/core/schema-hsqldb.sql]
2016-05-04 08:17:42.658 INFO 9348 --- [ main] o.s.jdbc.datasource.init.ScriptUtils : Executed SQL script from class path resource [org/springframework/batch/core/schema-hsqldb.sql] in 12 ms.
2016-05-04 08:18:01.793 INFO 9348 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService
2016-05-04 08:18:01.812 INFO 9348 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Initializing ExecutorService 'taskExecutor'
push status [ messageId=0:1462330092323002%9b3f4867f9fd7ecd ]
push status [ messageId=0:1462330095502779%9b3f4867f9fd7ecd ]
2016-05-04 08:18:16.883 INFO 9348 --- [ main] o.s.jdbc.datasource.init.ScriptUtils : Executing SQL script from class path resource [org/springframework/batch/core/schema-hsqldb.sql]
2016-05-04 08:18:16.927 INFO 9348 --- [ main] o.s.jdbc.datasource.init.ScriptUtils : Executed SQL script from class path resource [org/springframework/batch/core/schema-hsqldb.sql] in 20 ms.
2016-05-04 08:18:17.392 INFO 9348 --- [ main] o.s.j.e.a.AnnotationMBeanExporter : Registering beans for JMX exposure on startup
2016-05-04 08:18:17.413 INFO 9348 --- [ main] o.s.c.support.DefaultLifecycleProcessor : Starting beans in phase 0
2016-05-04 08:18:17.737 INFO 9348 --- [ main] o.s.b.a.b.JobLauncherCommandLineRunner : Running default command line with: []
2016-05-04 08:18:17.766 INFO 9348 --- [ main] o.s.b.c.r.s.JobRepositoryFactoryBean : No database type set, using meta data indicating: HSQL
2016-05-04 08:18:18.032 INFO 9348 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : No TaskExecutor has been set, defaulting to synchronous executor.
2016-05-04 08:18:18.147 INFO 9348 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=notifyUserJob]] launched with the following parameters: [{run.id=1}]
2016-05-04 08:18:18.187 INFO 9348 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step1]
2016-05-04 08:18:22.044 INFO 9348 --- [pool-2-thread-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=anomalyJob]] launched with the following parameters: [{}]
2016-05-04 08:18:22.079 INFO 9348 --- [pool-2-thread-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [step2]
2016-05-04 08:18:28.990 INFO 9348 --- [ taskExecutor-1] com.electra.service.ElectraServiceImpl : date Mon May 02 08:18:12 IST 1
2016-05-04 08:18:28.991 INFO 9348 --- [ taskExecutor-1] c.e.repository.UnitsRepositoryImpl : push new unit
2016-05-04 08:18:32.581 INFO 9348 --- [ taskExecutor-1] com.electra.service.ElectraServiceImpl : date Wed May 04 08:18:32 IST 2016
2016-05-04 08:18:32.581 INFO 9348 --- [ taskExecutor-1] c.e.repository.UnitsRepositoryImpl : push new unit
2016-05-04 08:19:16.876 INFO 9348 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=notifyUserJob]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED]
2016-05-04 08:19:16.999 INFO 9348 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=anomalyJob]] launched with the following parameters: [{run.id=1}]
2016-05-04 08:19:17.053 INFO 9348 --- [ main] o.s.batch.core.job.SimpleStepHandler : Executing step: [step2]
2016-05-04 08:19:17.491 INFO 9348 --- [pool-2-thread-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=anomalyJob]] completed with the following parameters: [{}] and the following status: [COMPLETED]
2016-05-04 08:19:52.399 INFO 9348 --- [ main] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=anomalyJob]] completed with the following parameters: [{run.id=1}] and the following status: [COMPLETED]
2016-05-04 08:19:52.401 INFO 9348 --- [ main] com.electra.Application : Started Application in 133.639 seconds (JVM running for 134.724)
2016-05-04 08:20:21.066 INFO 9348 --- [pool-2-thread-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=anomalyJob]] launched with the following parameters: [{}]
2016-05-04 08:20:21.288 INFO 9348 --- [pool-2-thread-1] o.s.batch.core.job.SimpleStepHandler : Executing step: [step2]
2016-05-04 08:20:31.103 INFO 9348 --- [pool-2-thread-1] o.s.b.c.l.support.SimpleJobLauncher : Job: [FlowJob: [name=anomalyJob]] completed with the following parameters: [{}] and the following status: [COMPLETED]
Please tell me what i am doing wrong and why is the listener not getting executed for subsequent tries.
@EnableScheduling
to your configuration? – joshisteorg.springframework.dao.ConcurrencyFailureException: PreparedStatementCallback; SQL [INSERT into BATCH_JOB_INSTANCE(JOB_INSTANCE_ID, JOB_NAME, JOB_KEY, VERSION) values (?, ?, ?, ?)]; transaction rollback: serialization failure; nested exception is java.sql.SQLTransactionRollbackException: transaction rollback: serialization failure
now – Aryan Singh