1
votes

AFAIK,

The concept of partitions and (consumer) groups in kafka was introduced to implement parallelism. I am working with kafka through python. I have a certain topic, which has (say) 2 partitions. This means, if I start a consumer group with 2 consumers in it, they will be mapped(subscribed) to different partitions.

But, using kafka library in python, I came across a weird issue. I started 2 consumers with essentially the same group-ids, and started the threads for them to consume messages.

But, every message in the kafka-stream is being consumed by both of them !! This seems ridiculous to me, and even conceptually incorrect. Is there anyway I can map the consumers to certain (distinct) partitions manually (if they are not mapped to different partitions automatically)?

Here is the code:

from kafka import KafkaConsumer
import thread

def con1(consumer):
    for msg in consumer:
        print msg

consumer1 = KafkaConsumer('k-test', group_id='grp1', bootstrap_servers=['10.50.23.120:9092'])
consumer2 = KafkaConsumer('k-test', group_id='grp1', bootstrap_servers=['10.50.23.120:9092'])

thread.start_new_thread(con1, (consumer1,))
thread.start_new_thread(con1, (consumer2,))

Here is the output for some messages that I produced using kafka-console-producer:

ConsumerRecord(topic=u'k-test', partition=0, offset=47, timestamp=None, timestamp_type=None, key=None, value='polki')
ConsumerRecord(topic=u'k-test', partition=0, offset=47, timestamp=None, timestamp_type=None, key=None, value='polki')
ConsumerRecord(topic=u'k-test', partition=0, offset=48, timestamp=None, timestamp_type=None, key=None, value='qwewrg')
ConsumerRecord(topic=u'k-test', partition=0, offset=48, timestamp=None, timestamp_type=None, key=None, value='qwewrg')
ConsumerRecord(topic=u'k-test', partition=0, offset=49, timestamp=None, timestamp_type=None, key=None, value='shgjas')
ConsumerRecord(topic=u'k-test', partition=0, offset=49, timestamp=None, timestamp_type=None, key=None, value='shgjas')

while expected was one of each. BTW, this topic k-test has 2 partitions.

3
What Kafka Python client do you use? There are multiple available: cwiki.apache.org/confluence/display/KAFKA/… I would recommend to use the Confluent one: github.com/confluentinc/confluent-kafka-pythonMatthias J. Sax

3 Answers

0
votes

I guess you are working with Kafka 0.8 or lower version, which do not support this feature based on the documents:

... Some features will only be enabled on newer brokers, however; for example, fully coordinated consumer groups -- i.e., dynamic partition assignment to multiple consumers in the same group -- requires use of 0.9+ kafka brokers ...

0
votes
from kafka import KafkaConsumer
from kafka import TopicPartition

TOPIC = "k-test"
PARTITION_0 = 0
PARTITION_1 = 1

consumer_0 = KafkaConsumer(
    TOPIC, group_id='grp1', bootstrap_servers=['10.50.23.120:9092']
)
consumer_1 = KafkaConsumer(
    TOPIC, group_id='grp1', bootstrap_servers=['10.50.23.120:9092']
)
topic_partition_0 = TopicPartition(TOPIC, PARTITION_0)
topic_partition_1 = TopicPartition(TOPIC, PARTITION_1)
# format: topic, partition
consumer_0.assign([topic_partition_0])
consumer_1.assign([topic_partition_1])

assign() might be work for you, but once you use it, kafka will not balance consumers automatically when there is consumer stop working.

0
votes

Try running the bin/kafka-consumer-groups.sh command line tool to verify if the Python Kafka client you are using supports proper consumer group management. If both consumers are indeed in the same group then they should get messages from mutually exclusive partitions.