4
votes

I am learning kafka following Apache kafka document. I started it with default configuration.

bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties &

I ran the kafka-console-producer.sh and kafka-console-consumer.sh to produce and consume message, and it was successful. I wrote a java code using producer API to produce message, which is OK. This is verified by kafka-console-consumer.sh. The code is same as Apache Kafka guide:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer",    "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++)
  producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

producer.close();

Although producer code works, consumer code doesn't work. There's no Exception, but it just blocks at consumer.poll(100). The code is from Apache Kafka documentation:

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records)
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
 }

By the way, the example of kafka-console-consumer.sh in Apache Kafka document is successful in consuming message, which is produced to topic "test" by producer :

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

But if I don't connect to zookeeper, but connect to kafka broker directly, then also it doesn't work with no wrong and no exception, it just blocks.

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

The kafka version and API version is 0.11.0.0

Why they can't consume messages?

2
Hello I am stuck at the same point? Did you managed to make it work somehow?Sarabjeet Singh

2 Answers

1
votes

Using --zookeeper parameter means using the old consumer and it works well because you are specifying a Zookeeper server (localhost:2181).

When you want to specify a Kafka broker (so using the new consumer) you have to use the --bootstrap-server option: you are still using the --zookeeper but passing a valid Kafka broker address (localhost:9092).

So for the console consumer application your configuration needs to be --bootstrap-server localhost:9092 instead of --zookeeper localhost:9092.

Regarding your code, are you sure that the poll method is blocked? It should exit after 100 ms (the timeout you have specified) if there are no records but not blocking.

Then from your code I see that producer is sending to "my-topic", the consumer subscribes to "foo" and "bar"; finally the console consumer reads from "test". All are different topics!

0
votes

props.put("auto.offset.reset", "smallest"); Add this Properties ,May it will work