0
votes

I have a simple java producer like below

public class Producer 
{
    private final static String TOPIC = "my-example-topi8";
    private final static String BOOTSTRAP_SERVERS = "localhost:8092";

    public static void main( String[] args ) throws Exception {
        Producer<String, byte[]> producer = createProducer();
        for(int i=0;i<3000;i++) {
            String msg = "Test Message-" + i;
            final ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>(TOPIC, "key" + i, msg.getBytes());
            producer.send(record).get();
            System.out.println("Sent message " + msg);
        }
        producer.close();
    }

    private static Producer<String, byte[]> createProducer() {
        Properties props = new Properties();
        props.put("metadata.broker.list", BOOTSTRAP_SERVERS);
        props.put("bootstrap.servers", BOOTSTRAP_SERVERS);
        props.put("client.id", "AppFromJava");
        props.put("serializer.class", "kafka.serializer.DefaultEncoder");
        props.put("key.serializer.class", "kafka.serializer.StringEncoder");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("compression.codec", "snappy");
        props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
        return new KafkaProducer<String, byte[]>(props);
    }
}

I am trying to read data as below

public class Consumer 
{
    private final static String TOPIC = "my-example-topi8";
    private final static String BOOTSTRAP_SERVERS = "localhost:8092";

    public static void main( String[] args ) throws Exception {
        Consumer<String, byte[]> consumer = createConsumer();
        start(consumer);
    }

    static void start(Consumer<String, byte[]> consumer) throws InterruptedException {
        final int giveUp = 10;   
        int noRecordsCount = 0;
        int stopCount = 1000;

        while (true) {
            final ConsumerRecords<String, byte[]> consumerRecords = consumer.poll(1000);
            if (consumerRecords.count()==0) {
                noRecordsCount++;
                if (noRecordsCount > giveUp) break;
                else continue;
            }


            consumerRecords.forEach(record -> {
               // Process the record System.out.printf("\nConsumer Record:(%s, %s, %s)", record.key(), new String(record.value()), record.topic());
            });

            consumer.commitSync();
            break;
        }
        consumer.close();
        System.out.println("DONE");
    }

    private static Consumer<String, byte[]> createConsumer() {
        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                                    BOOTSTRAP_SERVERS);
        props.put(ConsumerConfig.GROUP_ID_CONFIG,
                                    "KafkaExampleConsumer");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                ByteArrayDeserializer.class.getName());
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "1234");
        props.put("enable.auto.commit", "false");

        // Create the consumer using props.
        final Consumer<String, byte[]> consumer = new KafkaConsumer(props);
        consumer.subscribe(Collections.singletonList(TOPIC));
        return consumer;
    }
}

But the consumer is not reading any message from kafka. If I add the below at the very start()

consumer.poll(0);
consumer.seekToBeginning(consumer.assignment());

Then the consumer starts reading from the topic. But then each time the consumer is restarted it is reading message from the start of the topic which I don't want. If I add the below config while starting Consumer

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

then it reads message from the topic but if the consumer gets restarted before processing all the message then it does not read the unprocessed message.

Can someone let me know what is going wrong and how can I fix this?

Kafka broker and zookeeper is running with the default configuration.

1
If you set props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"), is the problem that the consumer reads a whole large batch of messages, it only processes some of the messages before it stops, and restarting it skips the whole batch that was read before? If that's the case then the issue is probably that the offset after all read messages is getting auto-committed. You might want to disable auto-commit and commit the offsets that are actually processed even if more messages were read or decrease the maximum batch size that the consumer will read. - Reinstate Monica
In the code I have posted in createConsumer() I am setting this props.put("enable.auto.commit", "false"); The problem I am having is let's say there are 5000 messages. Then let's say the consumer received 1000 messages in a batch, after committing this batch via commitSync. If the consumer restarts then I am not seeing the consumer receiving any message from 1001 onwards. Let me know if I am not clear what I am trying to ask. - tuk
Okay. My comment is moot then. - Reinstate Monica
I am facing error :- java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.Consumer.subscribe(Ljava/util/Collection;)V - ShrJoshi

1 Answers

2
votes

Your call to commitSync() is acknowledging all messages in the batch from the last poll() and not just each individual one as you are processing them which is what I think you are trying to do.

From the documentation

“The above example uses commitSync to mark all received records as committed. In some cases you may wish to have even finer control over which records have been committed by specifying an offset explicitly. In the example below we commit offset after we finish handling the records in each partition.

 try {
     while(running) {
         ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
         for (TopicPartition partition : records.partitions()) {
             List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
             for (ConsumerRecord<String, String> record : partitionRecords) {
                 System.out.println(record.offset() + ": " + record.value());
             }
             long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
             consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
         }
     }
 } finally {
   consumer.close();
 }

Note: The committed offset should always be the offset of the next message that your application will read. Thus, when calling commitSync(offsets) you should add one to the offset of the last message processed. ”