0
votes

So I got some annoying offset commiting case with my kafka consumers. I use 'kafka-node' for my project. I created a topic. Created 2 consumers within a consumer-group over 2 servers. Auto-commit set to false. For every mesaage my consumers get, they start an async process which can take between 1~20sec, when the process done the consumer commits the offset.. My problem is: There is a senarios in which, Consumer 1 gets a message and takes 20sec to process. In the middle of the process he gets another message which takes 1s to process. He finish the second message processing, commit the offset, then crashes right away. Causing the previous message processing to fail. If I re run the consumer, hes not reading the first message again, because the second message already commited the offsst which is greater than the first. How can i avoid this?

Kafkaconsumer.on('message', async(message)=>{
await SOMETHING_ASYNC_1~20SEC;
Kafkaconsumer.commit(()=>{});
});
1

1 Answers

0
votes

You essentially want to throttle messages and handle concurrency by utilizing async.queue.

  1. Create a async.queue with message processor and concurrency of one (the message processor itself is wrapped with setImmediate so it will not freeze up the event loop)
  2. Set the queue.drain to resume the consumer
  3. The handler for consumer's message event pauses the consumer and pushes the message to the queue.

The kafka-node README details this here.

An example implementation, similar to your problem, can be found here.