1
votes

I have a Spring batch job that reads records from DB, processes it, and writes to another database. I want to measure the total time / average time taken in the Spring batch processor and Writer. As per the documentation - https://docs.spring.io/spring-batch/docs/current/reference/html/monitoring-and-metrics.html the metrics are readily available using spring.batch prefix. How do I log to the console the time taken in the Processor and writer. When i print the Metrics results from metrics registry at end of job listener, I see the job and step status as Completed but the Timer stats metrics are showing as 0. is there anything that needs to be done to enable Timer? I don't intend to push the metrics to prometheus or atlas or any other registries. So only the micrometer core dependency is added in pom.xml. Please advise how to log the time taken metrics for different components. Any examples would be really helpful.

1

1 Answers

2
votes

I see the job and step status as Completed but the Timer stats metrics are showing as 0.

The reason for that is that by default, the global registry in micrometer is an empty composite. You need to add at least one registry to retain metrics. See Why does MicroMeter Timer returns zero?

I don't intend to push the metrics to prometheus or atlas or any other registries.

If you want to consume metrics without pushing them to a metrics backend, you can use a listener and get access from the global registry directly. Here is a quick example for item processing timing as you requested:

import java.util.Arrays;
import java.util.List;

import io.micrometer.core.instrument.Measurement;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;

import org.springframework.batch.core.ItemProcessListener;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobParameters;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.support.ListItemReader;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@EnableBatchProcessing
public class MyJob {

    @Bean
    public ItemReader<Integer> itemReader() {
        return new ListItemReader<>(Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9));
    }

    @Bean
    public ItemProcessor<Integer, Integer> itemProcessor() {
        return item -> {
            System.out.println("processing item " + item);
            Thread.sleep(2000);
            return item + 1;
        };
    }

    @Bean
    public ItemWriter<Integer> itemWriter() {
        return items -> {
            for (Integer item : items) {
                System.out.println("writing item = " + item);
            }
        };
    }

    @Bean
    public Job job(JobBuilderFactory jobs, StepBuilderFactory steps) {
        return jobs.get("job")
                .start(steps.get("step")
                        .<Integer, Integer>chunk(5)
                        .reader(itemReader())
                        .processor(itemProcessor())
                        .writer(itemWriter())
                        .listener(new MonitoringItemProcessListener())
                        .build())
                .build();
    }
    
    static class MonitoringItemProcessListener implements ItemProcessListener<Integer, Integer> {

        @Override
        public void beforeProcess(Integer item) {
            
        }

        @Override
        public void afterProcess(Integer item, Integer result) {
            List<Meter> meters = Metrics.globalRegistry.getMeters();
            for (Meter meter : meters) {
                if (meter.getId().getName().equals("spring.batch.item.process")) {
                    System.out.println("meter description = " + meter.getId().getDescription());
                    Iterable<Measurement> measurements = meter.measure();
                    for (Measurement measurement : measurements) {
                        System.out.println("measurement: statistic = " + measurement.getStatistic() + " | value = " + measurement.getValue());
                    }
                }
            }
        }

        @Override
        public void onProcessError(Integer item, Exception e) {

        }
    }

    public static void main(String[] args) throws Exception {
        Metrics.addRegistry(new SimpleMeterRegistry());
        ApplicationContext context = new AnnotationConfigApplicationContext(MyJob.class);
        JobLauncher jobLauncher = context.getBean(JobLauncher.class);
        Job job = context.getBean(Job.class);
        jobLauncher.run(job, new JobParameters());
    }

}

This sample prints something like:

processing item 1
meter description = Item processing duration
measurement: statistic = COUNT | value = 1.0
measurement: statistic = TOTAL_TIME | value = 2.00080715
measurement: statistic = MAX | value = 2.00080715
processing item 2
meter description = Item processing duration
measurement: statistic = COUNT | value = 2.0
measurement: statistic = TOTAL_TIME | value = 4.003516877
measurement: statistic = MAX | value = 2.002709727
processing item 3
meter description = Item processing duration
measurement: statistic = COUNT | value = 3.0
measurement: statistic = TOTAL_TIME | value = 6.005287923
measurement: statistic = MAX | value = 2.002709727