Trying to batch the records using kafka streams processor API. Batching is based on size and time. Lets say if the batch size reaches 10 or the last batch is processed more than 10 secs ago (Size or last processed time what ever comes first) then call external API to send the batch and commit using ProcessingContext.
Using punctuate
to periodically check if the batch can be cleared and send to the external system.
Question - Can the processor API process
method be invoked by streams API when the punctuate thread is being executed? Since the code is calling commit in punctuate thread can the context.commit()
commit records which are not yet processed by process method?
Is it possible that the punctuate thread and process method being executed at the same time in different threads? If so then the code I have commit records which are not processed yet
public class TestProcessor extends AbstractProcessor<String, String> {
private ProcessorContext context;
private List<String> batchList = new LinkedList<>();
private AtomicLong lastProcessedTime = new AtomicLong(System.currentTimeMillis());
private static final Logger LOG = LoggerFactory.getLogger(TestProcessor.class);
@Override
public void init(ProcessorContext context) {
LOG.info("Calling init method " + context.taskId());
this.context = context;
context.schedule(10000, PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
if(batchList.size() > 0 && System.currentTimeMillis() - lastProcessedTime.get() >
10000){
//call external API
batchList.clear();
lastProcessedTime.set(System.currentTimeMillis());
}
context.commit();
});
}
@Override
public void process(String key, String value) {
batchList.add(value);
LOG.info("Context details " + context.taskId() + " " + context.partition() + " " +
"storeSize " + batchList.size());
if(batchList.size() == 10){
//call external API to send the batch
batchList.clear();
lastProcessedTime.set(System.currentTimeMillis());
}
context.commit();
}
@Override
public void close() {
if(batchList.size() > 0){
//call external API to send the left over records
batchList.clear();
}
}
}