0
votes

I have developed a Spring Batch Job which read from Kafka topic using KafkaItemReader class. I want to commit the offset only when the messages read in defined chunk are Processed and written successfully to an Output .dat file.

@Bean
public Job kafkaEventReformatjob(
        @Qualifier("MaintStep") Step MainStep,
        @Qualifier("moveFileToFolder") Step moveFileToFolder,
        @Qualifier("compressFile") Step compressFile,
        JobExecutionListener listener)
{
    return jobBuilderFactory.get("kafkaEventReformatJob")
            .listener(listener)
            .incrementer(new RunIdIncrementer())
            .flow(MainStep)
            .next(moveFileToFolder)
            .next(compressFile)
            .end()
            .build();
}

@Bean
Step MainStep(
        ItemProcessor<IncomingRecord, List<Record>> flatFileItemProcessor,
        ItemWriter<List<Record>> flatFileWriter)
{
    return stepBuilderFactory.get("mainStep")
            .<InputRecord, List<Record>> chunk(5000)
            .reader(kafkaItemReader())
            .processor(flatFileItemProcessor)
            .writer(writer())
            .listener(basicStepListener)
            .build();
}
//Reader reads all the messages from akfka topic and sending back in form of IncomingRecord.
 @Bean
KafkaItemReader<String, IncomingRecord> kafkaItemReader() {
    Properties props = new Properties();
    props.putAll(this.properties.buildConsumerProperties());
    List<Integer> partitions = new ArrayList<>();
    partitions.add(0);
    partitions.add(1);
    return new KafkaItemReaderBuilder<String, IncomingRecord>()
            .partitions(partitions)
            .consumerProperties(props)
            .name("records")
            .saveState(true)
            .topic(topic)
            .pollTimeout(Duration.ofSeconds(40L))
            .build();
}

  @Bean
public ItemWriter<List<Record>> writer() {
    ListUnpackingItemWriter<Record> listUnpackingItemWriter = new ListUnpackingItemWriter<>();
    listUnpackingItemWriter.setDelegate(flatWriter());
    return listUnpackingItemWriter;
}

public ItemWriter<Record> flatWriter() {
    FlatFileItemWriter<Record> fileWriter = new FlatFileItemWriter<>();
    String tempFileName = "abc";
    LOGGER.info("Output File name " + tempFileName + " is in working directory ");
    String workingDir = service.getWorkingDir().toAbsolutePath().toString();
    Path outputFile = Paths.get(workingDir, tempFileName);
    fileWriter.setName("fileWriter");
    fileWriter.setResource(new FileSystemResource(outputFile.toString()));
    fileWriter.setLineAggregator(lineAggregator());
    fileWriter.setForceSync(true);
    fileWriter.setFooterCallback(customFooterCallback());
    fileWriter.close();
    LOGGER.info("Successfully created the file writer");
    return fileWriter;

}

@StepScope
@Bean
public TransformProcessor processor() {
    return new TransformProcessor();
}

==============================================================================

Writer Class

 @BeforeStep
public void beforeStep(StepExecution stepExecution) {
    this.stepExecution = stepExecution;
}

@AfterStep
public void afterStep(StepExecution stepExecution) {
    this.stepExecution.setWriteCount(count);
}

@Override
public void write(final List<? extends List<Record>> lists) throws Exception {

    List<Record> consolidatedList = new ArrayList<>();
    for (List<Record> list : lists) {
        if (!list.isEmpty() && null != list)
            consolidatedList.addAll(list);
    }

    delegate.write(consolidatedList);
    count += consolidatedList.size(); // to count Trailer record count
}

===============================================================

Item Processor

@Override public List process(IncomingRecord record) {

    List<Record> recordList = new ArrayList<>();

    if (null != record.getEventName() and a few other conditions inside this section) {
        // setting values of Record Class by extracting from the IncomingRecord.
        recordList.add(the valid records which matching the condition);
        }else{
        return null;
        }
1
Is the file system where you write your output .dat file transactional?Mahmoud Ben Hassine
Can you explain your question a bit? How transactional?Sonia
oh sorry for not being clear. There are transactional file systems and non transaction ones. A transactional file system allows you to do atomic write operations. For example, if you flush a buffer to a file and there is no more space on disk, then you can rollback the entire buffer. With a non transactional file system, you would have a partially flushed buffer and the file could be corrupted. Do you see? More details here: en.wikipedia.org/wiki/File_system#Transactional_file_systems. So, is your file system transactional?Mahmoud Ben Hassine
Please don't add code in comments, it is not readable. You can always edit the question and add code. I was talking about the file system and not the job repository. Anyway, I will add an answer to try to help.Mahmoud Ben Hassine

1 Answers

0
votes

Synchronizing a read operation and a write operation between two transactional resources (a queue and a database for instance) is possible by using a JTA transaction manager that coordinates both transaction managers (2PC protocol).

However, this approach is not possible if one of the resources is not transactional (like the majority of file systems). So unless you use a transactional file system and a JTA transaction manager that coordinates a kafka transaction manager and a file system transaction manager.. you need another approach, like the Compensating Transaction pattern. In your case, the "undo" operation (compensating action) would be rewinding the offset where it was before the failed chunk.