Digging into Spring Batch, I'd like to know as to How can we share data between the different steps of a Job?
Can we use JobRepository for this? If yes, how can we do that?
Is there any other way of doing/achieving the same?
the job repository is used indirectly for passing data between steps (Jean-Philippe is right that the best way to do that is to put data into the StepExecutionContext
and then use the verbosely named ExecutionContextPromotionListener
to promote the step execution context keys to the JobExecutionContext
.
It's helpful to note that there is a listener for promoting JobParameter
keys to a StepExecutionContext
as well (the even more verbosely named JobParameterExecutionContextCopyListener
); you will find that you use these a lot if your job steps aren't completely independent of one another.
Otherwise you're left passing data between steps using even more elaborate schemes, like JMS queues or (heaven forbid) hard-coded file locations.
As to the size of data that is passed in the context, I would also suggest that you keep it small (but I haven't any specifics on the
From a step, you can put data into the StepExecutionContext
.
Then, with a listener, you can promote data from StepExecutionContext
to JobExecutionContext
.
This JobExecutionContext
is available in all the following steps.
Becareful : data must be short.
These contexts are saved in the JobRepository
by serialization and the length is limited (2500 chars if I remember well).
So these contexts are good to share strings or simple values, but not for sharing collections or huge amounts of data.
Sharing huge amounts of data is not the philosophy of Spring Batch. Spring Batch is a set of distinct actions, not a huge Business processing unit.
I would say you have 3 options:
StepContext
and promote it to JobContext
and you have access to it from each step, you must as noted obey limit in size @JobScope
bean and add data to that bean, @Autowire
it where needed and use it (drawback is that it is in-memory structure and if job fails data is lost, migh cause problems with restartability)ids
in JobContext
and access when needed and delete that temporary table when job finishes successfully.Here is what I did to save an object which is accessible through out the steps.
@Component("myJobListener")
public class MyJobListener implements JobExecutionListener {
public void beforeJob(JobExecution jobExecution) {
String myValue = someService.getValue();
jobExecution.getExecutionContext().putString("MY_VALUE", myValue);
}
}
<listeners>
<listener ref="myJobListener"/>
</listeners>
@BeforeStep
public void initializeValues(StepExecution stepExecution) {
String value = stepExecution.getJobExecution().getExecutionContext().getString("MY_VALUE");
}
You can store data in the simple object. Like:
AnyObject yourObject = new AnyObject();
public Job build(Step step1, Step step2) {
return jobBuilderFactory.get("jobName")
.incrementer(new RunIdIncrementer())
.start(step1)
.next(step2)
.build();
}
public Step step1() {
return stepBuilderFactory.get("step1Name")
.<Some, Any> chunk(someInteger1)
.reader(itemReader1())
.processor(itemProcessor1())
.writer(itemWriter1(yourObject))
.build();
}
public Step step2() {
return stepBuilderFactory.get("step2Name")
.<Some, Any> chunk(someInteger2)
.reader(itemReader2())
.processor(itemProcessor2(yourObject))
.writer(itemWriter2())
.build();
}
Just add data to object in the writer or any other method and get it in any stage of next step
Another very simply approach, leaving here for future reference:
class MyTasklet implements Tasklet {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) {
getExecutionContext.put("foo", "bar");
}
}
and
class MyOtherTasklet implements Tasklet {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) {
getExecutionContext.get("foo");
}
}
getExecutionContext
here is:
ExecutionContext getExecutionContext(ChunkContext chunkContext) {
return chunkContext.getStepContext()
.getStepExecution()
.getJobExecution()
.getExecutionContext();
}
Put it in a super class, in an interface as a default
method, or simply paste in your Tasklet
s.
Use ExecutionContextPromotionListener
:
public class YourItemWriter implements ItemWriter<Object> {
private StepExecution stepExecution;
public void write(List<? extends Object> items) throws Exception {
// Some Business Logic
// put your data into stepexecution context
ExecutionContext stepContext = this.stepExecution.getExecutionContext();
stepContext.put("someKey", someObject);
}
@BeforeStep
public void saveStepExecution(Final StepExecution stepExecution) {
this.stepExecution = stepExecution;
}
}
Now you need to add promotionListener to your job
@Bean
public Step step1() {
return stepBuilder
.get("step1")<Company,Company> chunk(10)
.reader(reader()).processor(processor()).writer(writer())
.listener(promotionListener()).build();
}
@Bean
public ExecutionContextPromotionListener promotionListener() {
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys(new String[] {"someKey"});
listener.setStrict(true);
return listener;
}
Now, in step2 get your data from job ExecutionContext
public class RetrievingItemWriter implements ItemWriter<Object> {
private Object someObject;
public void write(List<? extends Object> items) throws Exception {
// ...
}
@BeforeStep
public void retrieveInterstepData(StepExecution stepExecution) {
JobExecution jobExecution = stepExecution.getJobExecution();
ExecutionContext jobContext = jobExecution.getExecutionContext();
this.someObject = jobContext.get("someKey");
}
}
If you are working with tasklets, then use the following to get or put ExecutionContext
List<YourObject> yourObjects = (List<YourObject>) chunkContent.getStepContext().getJobExecutionContext().get("someKey");
I was given a task to invoke the batch job one by one.Each job depends on another. First job result needs to execute the consequent job program. I was searching how to pass the data after job execution. I found that this ExecutionContextPromotionListener comes in handy.
1) I have added a bean for "ExecutionContextPromotionListener" like below
@Bean
public ExecutionContextPromotionListener promotionListener()
{
ExecutionContextPromotionListener listener = new ExecutionContextPromotionListener();
listener.setKeys( new String[] { "entityRef" } );
return listener;
}
2) Then I attached one of the listener to my Steps
Step step = builder.faultTolerant()
.skipPolicy( policy )
.listener( writer )
.listener( promotionListener() )
.listener( skiplistener )
.stream( skiplistener )
.build();
3) I have added stepExecution as a reference in my Writer step implementation and populated in the Beforestep
@BeforeStep
public void saveStepExecution( StepExecution stepExecution )
{
this.stepExecution = stepExecution;
}
4) in the end of my writer step, i populated the values in the stepexecution as the keys like below
lStepContext.put( "entityRef", lMap );
5) After the job execution, I retrieved the values from the
lExecution.getExecutionContext()
and populated as job response.
6) from the job response object, I will get the values and populate the required values in the rest of the jobs.
The above code is for promoting the data from the steps to ExecutionContext using ExecutionContextPromotionListener. It can done for in any steps.
Spring Batch creates metadata tables for itself (like batch_job_execution
, batch_job_execution_context
, batch_step_instance
, etc).
And I have tested (using postgres DB) that you can have at least 51,428 chars worth of data in one column (batch_job_execution_context.serialized_content
). It could be more, it is just how much I tested.
When you are using Tasklets for your step (like class MyTasklet implements Tasklet
) and override the RepeatStatus
method in there, you have immediate access to ChunkContext
.
class MyTasklet implements Tasklet {
@Override
public RepeatStatus execute(@NonNull StepContribution contribution,
@NonNull ChunkContext chunkContext) {
List<MyObject> myObjects = getObjectsFromSomewhereAndUseThemInNextStep();
chunkContext.getStepContext().getStepExecution()
.getJobExecution()
.getExecutionContext()
.put("mydatakey", myObjects);
}
}
And now you have another step with a different Tasklet where you can access those objects
class MyOtherTasklet implements Tasklet {
@Override
public RepeatStatus execute(@NonNull StepContribution contribution,
@NonNull ChunkContext chunkContext) {
List<MyObject> myObjects = (List<MyObject>)
chunkContext.getStepContext().getStepExecution()
.getJobExecution()
.getExecutionContext()
.get("mydatakey");
}
}
Or if you dont have a Tasklet and have like a Reader/Writer/Processor, then
class MyReader implements ItemReader<MyObject> {
@Value("#{jobExecutionContext['mydatakey']}")
List<MyObject> myObjects;
// And now myObjects are available in here
@Override
public MyObject read() throws Exception {
}
}
Simple solution using Tasklets
. No need to access the execution context. I used a map as the data element to move around. (Kotlin code.)
class MyTasklet : Tasklet {
lateinit var myMap: MutableMap<String, String>
override fun execute(contribution: StepContribution, chunkContext: ChunkContext): RepeatStatus? {
myMap.put("key", "some value")
return RepeatStatus.FINISHED
}
}
@Configuration
@EnableBatchProcessing
class BatchConfiguration {
@Autowired
lateinit var jobBuilderFactory: JobBuilderFactory
@Autowired
lateinit var stepBuilderFactory: StepBuilderFactory
var myMap: MutableMap<String, String> = mutableMapOf()
@Bean
fun jobSincAdUsuario(): Job {
return jobBuilderFactory
.get("my-SO-job")
.incrementer(RunIdIncrementer())
.start(stepMyStep())
.next(stepMyOtherStep())
.build()
}
@Bean
fun stepMyStep() = stepBuilderFactory.get("MyTaskletStep")
.tasklet(myTaskletAsBean())
.build()
@Bean
fun myTaskletAsBean(): MyTasklet {
val tasklet = MyTasklet()
tasklet.myMap = myMap // collection gets visible in the tasklet
return tasklet
}
}
Then in MyOtherStep
you can replicate the same idiom seen in MyStep
. This other Tasklet will see the data created in MyStep
.
Important:
@Bean fun
so that they can use @Autowired
(full explanation).InitializingBean
withoverride fun afterPropertiesSet() { Assert.notNull(myMap, "myMap must be set before calling the tasklet") }