1
votes

Trying to batch the records using kafka streams processor API. Batching is based on size and time. Lets say if the batch size reaches 10 or the last batch is processed more than 10 secs ago (Size or last processed time what ever comes first) then call external API to send the batch and commit using ProcessingContext.

Using punctuate to periodically check if the batch can be cleared and send to the external system.

Question - Can the processor API process method be invoked by streams API when the punctuate thread is being executed? Since the code is calling commit in punctuate thread can the context.commit() commit records which are not yet processed by process method?

Is it possible that the punctuate thread and process method being executed at the same time in different threads? If so then the code I have commit records which are not processed yet

public class TestProcessor extends AbstractProcessor<String, String> {

    private ProcessorContext context;
    private List<String> batchList = new LinkedList<>();
    private AtomicLong lastProcessedTime = new AtomicLong(System.currentTimeMillis());

    private static final Logger LOG = LoggerFactory.getLogger(TestProcessor.class);

    @Override
    public void init(ProcessorContext context) {
        LOG.info("Calling init method " + context.taskId());
        this.context = context;
        context.schedule(10000, PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
            if(batchList.size() > 0 && System.currentTimeMillis() - lastProcessedTime.get() >
                    10000){
                //call external API
                batchList.clear();
                lastProcessedTime.set(System.currentTimeMillis());
            }
            context.commit();
        });

    }

    @Override
    public void process(String key, String value) {
        batchList.add(value);
        LOG.info("Context details " + context.taskId() + " " + context.partition() + " " +
                "storeSize " + batchList.size());
        if(batchList.size() == 10){
            //call external API to send the batch
            batchList.clear();
            lastProcessedTime.set(System.currentTimeMillis());
        }
        context.commit();
    }

    @Override
    public void close() {
        if(batchList.size() > 0){
            //call external API to send the left over records
            batchList.clear();
        }
    }

}
1

1 Answers

3
votes

Can the processor API process method be invoked by streams API when the punctuate thread is being executed?

nope, it's not possible, as Processor executes process and punctuate methods in a single thread (the same thread used for both methods).

Is it possible that the punctuate thread and process method being executed at the same time in different threads?

response is 'it's not possible', description provided above.

take into consideration that each topic partition will have own instance of your class TestProcessor. instead of local variables batchList and lastProcessedTime I recommend to use Kafka state store like KeyValueStore, so your stream will be fault tolerant.