1
votes

I have a topic, A, with 12 partitions. I have 3 Kafka brokers in a cluster. There are 4 partitions per broker for topic A. I haven't created any replicas as I am not concerned with resiliency.

I have a simple Java Consumer using the kafka-client library. I have mentioned the following in the property

        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-serverA:9092,kafka-serverB:9092,kafka-serverC:9092");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupID);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.setProperty("max.partition.fetch.bytes", "100000");

There is more code for ConsumerRecord and print the records, which is working fine. I have 12 messages in the topic and I have verified through "kafka-run-class.sh kafka.admin.ConsumerGroupCommand" that there is a message in each partition. The message size is 100000 bytes, exactly equal to the max.partition.fetch.bytes limit.

When I poll, I should see 12 messages come back as a response. However, the response is very erratic. Sometimes I see messages from 4 partitions, indicating that only one broker is responding to the consumer request, or sometimes I see 8. I never got a response from all 12 partitions. Just for testing, I removed the max.partition.fetch.bytes property. I observed the same behavior.

Am I missing anything? It seems the serve1, server2, server3 in the bootstrap config is not picking all 3 brokers when serving the request.

Any help is greatly appreciated. I am running the brokers and the consumer on separate machines and they are adequately sized.

1
Do you have only 12 messages in total (one in each partition?) or do you have plenty to them in each partition? A consumer takes a bit of time to rebalance and get the different partitions assigned. Even the first poll() might return no data back as the consumer might still be in the process of subscribing (so no partitions are assigned yet).Augusto
Yes, for testing I added only 12 messages. However, the same is true for over 100 messages. All I am doing is trying to limit the total message size returned from each partition. In the case of 12 messages, one in each partition, the max.partition.fetch.bytes size matches a single message size and should have returned 1 message from each partition. The consumer rebalancing was done in the first poll and it occurred on all 12 partitions. So no problem there. Is it the right way to add the servers in the bootstrap config.? The servers are randomly picked during a consumer request, Very weird.Nick
Did you see any error message? Try enabling Kafka logs and paste what is going onJavaTechnical
@Nick Wanted to know how the topics were created . So topicA was created manually in each broker with replication factor as 1 and partitions as 4. Please correct me if am wrong.Kashyap KN
@KashyapKN The topics were created manually using the kafka-topics script by passing all 3 brokers. Replication Factor of 1 and total partitions 12.Nick

1 Answers

0
votes

Am I missing anything? It seems the serve1, server2, server3 in the bootstrap config is not picking all 3 brokers when serving the request.

In your Kafka bootstrap.servers property you have listed all your brokers which is good. One of the brokers will be picked up for fetching metadata which is basically the information about how many partitions topic has and which broker is leader for those partitions.

No matter which server is picked up, one should give information about the other.

Check if all of your brokers know each other i.e. they belong to the same Kafka cluster i.e. they point to the same zookeeper instance(s).


All of the broker IPs that you mention must be accessible to your consumer. Therefore, ensure that you have set the appropriate advertised.listeners property.

For example, if advertised.listeners=PLAINTEXT://1.2.3.4:9092 then 1.2.3.4:9092 must be accessible by the consumer.


Also, by default Kafka messages are automatically committed periodically after sometime and if messages are read by a certain consumer with a particular groupId, they are not consumed again beacuse they are committed. So, you may also want to try changing the group.id property and re-check.


Also, check if you are running multiple consumers with the same group id and in that case, some partitions will be assigned to one consumer and some to other.

You can troubleshoot this by using kafka-console-consumer with --from-beginning flag giving the topic and see if all the messages are consumed.


You may also want to check default.api.timeout.ms parameter and try increasing the value, in case there is any network congestion causing the client to switch from one bootstrap server to other.