2
votes

I am sending input JSON data to Kafka topic. I can see the same JSON data in kafka consumer by using below command.

.\bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic mytopic --from-beginning

Case1: But when I am trying to read the records from kafka consumer (java) not getting any records in java console. I have tried the examples given in this link https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html

Case2: If I send any message from the producer (command window) the same getting in the consumer(command window). and able to see the same records in the java console. This scenario is working.

If I submit the data to the topic through java program. Then the same JSON data appearing in consumer(command window). But not coming in java console.Case1 is not working.
Case2 is working. Please let me know any configuration need to be done?

3
so you start consuming from java and then send new data to the topic and no messages appear (case 1)?tgrez
Are you running case2 using only one JVM?Lyubomir Papazov
How did you write your producer? Share us your code.JR ibkr
Start both console-consumer and custom-consumer. Then run your java producer. Also, make sure that your producer sends messages to Kafka. Do you see messages on both consumers ?JR ibkr

3 Answers

1
votes

You need to make sure you are not sending empty lines at the end and only consuming the latest - use auto.offset.reset: 'earliest' like above, or properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

And then, consumer.seekToBeginning(consumer.assignment()); to make sure.

0
votes

You need to set the ConsumerConfig.AUTO_OFFSET_RESET_CONFIG in order to read from the beginning.

kafkaConsumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest"); 

And also make sure that you are not running different-different consumer processes using the same consumer group ID, as the data from one partition might be read from one process and another will not see anything.

0
votes

Assuming producer & consumer code is correct.

  • Stop all consumers.
  • Reset your consumer group

$KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group group_name --topic topic_name --reset-offsets --to-earliest --execute

  • Now start your consumer

This should fix your issue.

Here are some Kafka consumer properties:

    bootstrap.servers: 'localhost:9092'
    group.id: 'group_id'
    auto.offset.reset: 'earliest'
    key.deserializer: 'org.apache.kafka.common.serialization.*' //Replace * with class
    value.deserializer: 'org.apache.kafka.common.serialization.*'

Thanks