I have a simple java producer like below
public class Producer
{
private final static String TOPIC = "my-example-topi8";
private final static String BOOTSTRAP_SERVERS = "localhost:8092";
public static void main( String[] args ) throws Exception {
Producer<String, byte[]> producer = createProducer();
for(int i=0;i<3000;i++) {
String msg = "Test Message-" + i;
final ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>(TOPIC, "key" + i, msg.getBytes());
producer.send(record).get();
System.out.println("Sent message " + msg);
}
producer.close();
}
private static Producer<String, byte[]> createProducer() {
Properties props = new Properties();
props.put("metadata.broker.list", BOOTSTRAP_SERVERS);
props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
props.put("client.id", "AppFromJava");
props.put("serializer.class", "kafka.serializer.DefaultEncoder");
props.put("key.serializer.class", "kafka.serializer.StringEncoder");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("compression.codec", "snappy");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
return new KafkaProducer<String, byte[]>(props);
}
}
I am trying to read data as below
public class Consumer
{
private final static String TOPIC = "my-example-topi8";
private final static String BOOTSTRAP_SERVERS = "localhost:8092";
public static void main( String[] args ) throws Exception {
Consumer<String, byte[]> consumer = createConsumer();
start(consumer);
}
static void start(Consumer<String, byte[]> consumer) throws InterruptedException {
final int giveUp = 10;
int noRecordsCount = 0;
int stopCount = 1000;
while (true) {
final ConsumerRecords<String, byte[]> consumerRecords = consumer.poll(1000);
if (consumerRecords.count()==0) {
noRecordsCount++;
if (noRecordsCount > giveUp) break;
else continue;
}
consumerRecords.forEach(record -> {
// Process the record System.out.printf("\nConsumer Record:(%s, %s, %s)", record.key(), new String(record.value()), record.topic());
});
consumer.commitSync();
break;
}
consumer.close();
System.out.println("DONE");
}
private static Consumer<String, byte[]> createConsumer() {
final Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG,
"KafkaExampleConsumer");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, "1234");
props.put("enable.auto.commit", "false");
// Create the consumer using props.
final Consumer<String, byte[]> consumer = new KafkaConsumer(props);
consumer.subscribe(Collections.singletonList(TOPIC));
return consumer;
}
}
But the consumer is not reading any message from kafka. If I add the below at the very start()
consumer.poll(0);
consumer.seekToBeginning(consumer.assignment());
Then the consumer starts reading from the topic. But then each time the consumer is restarted it is reading message from the start of the topic which I don't want. If I add the below config while starting Consumer
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
then it reads message from the topic but if the consumer gets restarted before processing all the message then it does not read the unprocessed message.
Can someone let me know what is going wrong and how can I fix this?
Kafka broker and zookeeper is running with the default configuration.
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
, is the problem that the consumer reads a whole large batch of messages, it only processes some of the messages before it stops, and restarting it skips the whole batch that was read before? If that's the case then the issue is probably that the offset after all read messages is getting auto-committed. You might want to disable auto-commit and commit the offsets that are actually processed even if more messages were read or decrease the maximum batch size that the consumer will read. - Reinstate MonicacreateConsumer()
I am setting thisprops.put("enable.auto.commit", "false");
The problem I am having is let's say there are 5000 messages. Then let's say the consumer received 1000 messages in a batch, after committing this batch via commitSync. If the consumer restarts then I am not seeing the consumer receiving any message from 1001 onwards. Let me know if I am not clear what I am trying to ask. - tuk