
I need a spring-batch ItemReader to consume Kafka messages whose results to be processed and written further ahead.

Here's an item reader I have implemented:

public abstract class KafkaItemReader<T> implements ItemReader<List<T>> {
  public abstract KafkaConsumer<String, T> getKafkaConsumer();

  public abstract String getTopic();

  public abstract long getPollingTime();

  public List<T> read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
    Iterator<ConsumerRecord<String, T>> iterator = getKafkaConsumer()
    List<T> records = new ArrayList<>();
    while (iterator.hasNext()) {
    return records;

These are the following beans for spring batch job and step:

  public ItemWriter<List<DbEntity>> databaseWriter(DataSource dataSource) {
    //some item writer that needs to be implmented
    return null;

  public Step kafkaToDatabaseStep(KafkaItemReader kafkaItemReader, //implementation of KafkaItemReader
                                  StepBuilderFactory stepBuilderFactory,
                                  DataSource dataSource) {

    return stepBuilderFactory
        .<List<KafkaRecord>, List<DbEntity>>chunk(100)
        .processor(itemProcessor()) //List<KafkaRecord> to List<DbEntity> converter

  public Job kafkaToDatabaseJob(
      @Qualifier("kafkaToDatabaseStep") Step step) {
    return jobBuilderFactory.get("kafkaToDatabaseJob")
        .incrementer(new RunIdIncrementer())

Here I do not know:

  1. How to commit the offset of read messages in the writer as I want to commit only after complete processing of the record.
  2. How to Use JdbcBatchItemWriter as the ItemWriter in my scenario.

1 Answers


The upcoming Spring Batch v4.2 GA will provide support for reading/writing data to Apache Kafka topics. You can already try this out with the 4.2.0.M2 release.

You can also take a look at the Spring Tips installment about it by Josh Long.