I am new to kafka and i am trying to prototype a simple consumer-producer message queue (traditional queue) model using Apache kafka 0.9.0 Java clients.
From the producer process, i am pushing 100 random messages to a topic configured with 3 partitions. This looks fine.
I created 3 consumer threads with same group id, subscribed to the same topic. auto commit enabled. Since all 3 consumer threads are subscribed to same topic i assume that each consumer will get a partition to consume and will commit the offset logs per partition.
But i am facing weird problem here. all my messages are duplicated. i get x time more records at consumer side from each of my thread. Since each of my consumer thread does infinite loop to poll from topic i have to kill the process.
I even tried with single thread and still i get duplicate records x times and still continues.
Could any please help me identify what mistake i am doing here.
I am posting my consumer code for your reference.
public class ConsumerDemo {
public static void main(String[] args) {
ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("Consumer-%d").build();
ExecutorService executor = Executors.newFixedThreadPool(3, threadFactory);
executor.submit(new ConsumerThread("topic1", "myThread-1"));
executor.submit(new ConsumerThread("topic1", "myThread-2"));
executor.submit(new ConsumerThread("topic1", "myThread-3"));
//executor shutdown logic is skipped
}
}
Consumer Thread:
public class ConsumerThread implements Runnable {
private static final String KAFKA_BROKER = "<<IP:port>>";
private final KafkaConsumer<String, String> consumer;
public ConsumerThread(String topic, String name) {
Properties props = new Properties();
props.put("bootstrap.servers", ConsumerThread.KAFKA_BROKER);
props.put("group.id", "DemoConsumer");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "6000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer(props);
this.consumer.subscribe(Collections.singletonList(topic));
}
public void run() {
try {
boolean isRunning = true;
while (isRunning) {
ConsumerRecords<String,String> records= consumer.poll(10L);
System.out.println("Partition Assignment to this Consumer: "+consumer.assignment());
Iterator it = records.iterator();
while(it.hasNext()) {
ConsumerRecord record = (ConsumerRecord)it.next();
System.out.println("Received message from thread : "+Thread.currentThread().getName()+"(" + record.key() + ", " + (String)record.value() + ") at offset " + record.offset());
}
}
consumer.close();
}
catch (Exception e) {
e.printStackTrace();
}
}
}
Also very importantly, i am aiming for exactly once semantic. I know am 1000 mile away for that. Any help is really appreciated.
Observation: Debug sysout prints all 3 tpoics. Does this mean that partitions are not assigned to each consumer?
Partition Assignment to this Consumer: [topic1-1, topic1-0, topic1-2]
Kafka experts, apart from above problem i am looking 2 other inputs.
- Please help me understand what is wrong in above code.
- In general, how exactly once schematic can be implemented. Example if possible.
- Exception scenarios like consumer down. how to handle without loss of message.
Thanks in advance.