2
votes
var configs = new Dictionary<string, string>
{
    {"bootstrap.servers", MY_SERVER},
    {"security.protocol", "SASL_PLAINTEXT"},
    {"sasl.mechanism", "SCRAM-SHA-256"},
    {"sasl.username", "MY_USERNAME"},
    {"sasl.password", "MY_PWD"},
    {"group.id", "sample_group"} // added
};
var consumerConfig = new ConsumerConfig(configs);    

using (var schemaRegistry = new CachedSchemaRegistryClient(schemaRegistryConfig))
using (var consumer = new ConsumerBuilder<string, MyModel>(consumerConfig)
           .SetKeyDeserializer(new AvroDeserializer<string>(schemaRegistry, avroSerializerConfig).AsSyncOverAsync())
           .SetValueDeserializer(new AvroDeserializer<MyModel>(schemaRegistry, avroSerializerConfig).AsSyncOverAsync())
           .Build())
{
      consumer.Subscribe(TOPIC_NAME);

      while (true)
      {
          var result = consumer.Consume(); //stuck here
          Console.WriteLine(result);
      }
 }

As stated in the code, there is no response coming from consumer.Consume() . It does not throw any error message even during consumer.Subscribe() What will be the possible reason? (I am new to Kafka Consumer)

  1. Maybe there is no message in Topic, so nothing to receive?
  2. The code asked for missing 'group.id', so I added {"group.id", "sample_group"} in config and wrap with ConsumerConfig. Is random name ("sample_group") allowed for group.id or should it be something retrieved from Topic information?
  3. anything else?
1
Similar to the first question - Is there an active producer running? By default, you only get new dataOneCricketeer
@mike So, if there is no message in Topic, will consumer.Consume() won't exit?Adrian

1 Answers

0
votes

Your code looks fine and the fact that no errors and Exceptions are showing up is also a good sign.

"1. Maybe there is no message in Topic, so nothing to receive?"

Even if there are no messages in the Kafka topic, your observation matches the expected behavior. In the while(true) loop you are continuously trying to fetch data from the topic and if nothing can be fetched the consumer will try again in the next iteration. Consumers on Kafka topics are meant to read a topic sequentially while running continuously. It is totally fine that some times the consumers has consumed all messages and stays idle for some time until new message arrive into the topic. During the waiting time the consumer will not stop or crash.

Keep in mind that messages in a Kafka topic have by default a retention period of 7 days. After that time, the messages will be deleted.

"2. The code asked for missing 'group.id', so I added {"group.id", "sample_group"} in config and wrap with ConsumerConfig. Is random name ("sample_group") allowed for group.id or should it be something retrieved from Topic information?"

Yes, the name "sample_group" is allowed as a ConsumerGroup name. There are no reserved consumer group names so this name will not cause any trouble.

"3. anything else?"

By default, a KafkaConsumer reads the messages from "latest" offset. That means, if you run a ConsumerGroup for the very first time it will not read all messages from beginning but rather from end. Check the consumer configurations in the .net Kafka-API documentation for something like auto_offset_reset. You might set this configuration to "earliest" in case you want to read all messages from beginning. Please note, that as soon as you run your application with a given ConsumerGroup for the first time, the second time you run this application this configuration auto_offset_reset will not have any impact because the ConsumerGroup is now registered within Kafka.

What you can usually do, to ensure that the consumer should actually read messages is if you start your consumer before you start producing messages to that topic. Then, (almost) independent of your configuration you should see data flowing through your application.