I am a new student studying Kafka and I've run into some fundamental issues with understanding multiple consumers that articles, documentations, etc. have not been too helpful with so far.
One thing I have tried to do is write my own high level Kafka producer and consumer and run them simultaneously, publishing 100 simple messages to a topic and having my consumer retrieve them. I have managed to do this successfully, but when I try to introduce a second consumer to consume from the same topic that messages were just published to, it receives no messages.
It was my understanding that for each topic, you could have consumers from separate consumer groups and each of these consumer groups would get a full copy of the messages produced to some topic. Is this correct? If not, what would be the proper way for me to set up multiple consumers? This is the consumer class that I have written so far:
public class AlternateConsumer extends Thread {
private final KafkaConsumer<Integer, String> consumer;
private final String topic;
private final Boolean isAsync = false;
public AlternateConsumer(String topic, String consumerGroup) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", consumerGroup);
properties.put("partition.assignment.strategy", "roundrobin");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("session.timeout.ms", "30000");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumer = new KafkaConsumer<Integer, String>(properties);
consumer.subscribe(topic);
this.topic = topic;
}
public void run() {
while (true) {
ConsumerRecords<Integer, String> records = consumer.poll(0);
for (ConsumerRecord<Integer, String> record : records) {
System.out.println("We received message: " + record.value() + " from topic: " + record.topic());
}
}
}
}
Furthermore, I noticed that originally I was testing the above consumption for a topic 'test' with only a single partition. When I added another consumer to an existing consumer group say 'testGroup', this trigged a Kafka rebalance which slowed down the latency of my consumption by a significant amount, in the magnitude of seconds. I thought that this was an issue with rebalancing since I only had a single partition, but when I created a new topic 'multiplepartitions' with say 6 partitions, similar issues arose where adding more consumers to the same consumer group caused latency issues. I have looked around and people are telling me I should be using a multi-threaded consumer -- can anyone shed light on that?
0.8.1
. – chrsblckList<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
... Then assign each thread a partitionexecutor.submit(new ConsumerTest(stream, threadNumber))
. – chrsblck