I'm using sarama (https://github.com/Shopify/sarama/) with Kafka 0.8.0 for my consumers. This is what my code looks like:
consumerLoop:
for {
select {
case event := <-consumer.Events():
if event.Err != nil {
break consumerLoop
panic(event.Err)
}
<-c.sem
go c.processJob(event.Value)
}
}
I'm using buffered channels(c.sem) to control how many processJob goroutines can run at a time. It's how I control the concurrency/speed of my consumers.
The problem I have with this approach is that if I need to change the concurrency, I have to shut the consumer down and restart it (channel buffer size is a command line flag). I log offsets that are processed and I have to look in my logs to figure out which offset(s) were processed and where I want the consumer to resume from. I want a more handsfree approach to manage these offsets.
I've turned autocommit.enabled to true in the consumer.properties but I don't see anything change in zookeeper. I think that is because the current Kafka protocol doesn't support the offset API: https://issues.apache.org/jira/browse/KAFKA-993
I can try and manually store the offset in zookeeper after I'm done processing a job but I don't know how that's going to work with multiple asynchronous processJob running. This is where Kafka is supposed to store the offsets: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper
Is this supposed to hold a single value? If that's true then that would mean I can't use asynchronous processJob as there might be latency between different processes and they'll overwrite each others value. Is the consumer supposed to run in a single thread and process a single event at a time? Is the right approach to start more consumers to speed things up, instead of going the goroutines route?
for event := range consumer.Events() { if event.Err != nil { break }; <-c.sem; go c.proecessJob(event.Value) }-- but why would you do all that complicated stuff with channels for a semaphore? Just start that many goroutines reading from the channel and you'll get far more simple concurrency. If you need more, just start more goroutines. - Dustin