1
votes

I'm using sarama (https://github.com/Shopify/sarama/) with Kafka 0.8.0 for my consumers. This is what my code looks like:

consumerLoop:
for {
    select {
        case event := <-consumer.Events():
        if event.Err != nil {
            break consumerLoop
            panic(event.Err)
        }
        <-c.sem
        go c.processJob(event.Value)
    }
 }

I'm using buffered channels(c.sem) to control how many processJob goroutines can run at a time. It's how I control the concurrency/speed of my consumers.

The problem I have with this approach is that if I need to change the concurrency, I have to shut the consumer down and restart it (channel buffer size is a command line flag). I log offsets that are processed and I have to look in my logs to figure out which offset(s) were processed and where I want the consumer to resume from. I want a more handsfree approach to manage these offsets.

I've turned autocommit.enabled to true in the consumer.properties but I don't see anything change in zookeeper. I think that is because the current Kafka protocol doesn't support the offset API: https://issues.apache.org/jira/browse/KAFKA-993

I can try and manually store the offset in zookeeper after I'm done processing a job but I don't know how that's going to work with multiple asynchronous processJob running. This is where Kafka is supposed to store the offsets: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper

Is this supposed to hold a single value? If that's true then that would mean I can't use asynchronous processJob as there might be latency between different processes and they'll overwrite each others value. Is the consumer supposed to run in a single thread and process a single event at a time? Is the right approach to start more consumers to speed things up, instead of going the goroutines route?

1
> I log offsets that are processed and I have to look in my logs to figure out which offset(s) were processed and where I want the consumer to resume from. | So how do you manually select from which offset to start? - Kluyg
Sarama's consumer config has a OffsetMethodManual value which allows you to pass in the start offset value - psyb0rg
so much code... It's the equivalent of this: for event := range consumer.Events() { if event.Err != nil { break }; <-c.sem; go c.proecessJob(event.Value) } -- but why would you do all that complicated stuff with channels for a semaphore? Just start that many goroutines reading from the channel and you'll get far more simple concurrency. If you need more, just start more goroutines. - Dustin
@psyb0rg I mean, you have a log file with written offsets. Because of parallelism the last written offset or the biggest offset in the log may be bigger than the one still in progress. So if you just select the biggest one from the log - you potentially loose some messages. So how do you select the offset to start from from your log file? - Kluyg
@Kluyq, you're right. I don't start with the biggest offset, I go back a few offsets so there's a good chance I get all of them. I can live with a little reprocessing and dropped events. This is also a problem I wish to solve with proper offset tracking. - psyb0rg

1 Answers

0
votes

I suspect the simplest answer is to not use a channel for your semaphore. Use an integer protected by a lock instead, and then you can adjust the maximum available goroutines without restarting.

If you really want to keep using a channel for this, you could use a ResizableChannel from my channel package: https://godoc.org/github.com/eapache/channels#ResizableChannel