2
votes

I have confluent kafka installed running on docker. In the topic i have 10 partitions. The problem is that I cannot consume messages from that topic, but I can Produce messages in the topic. I am trying to consume from the topic using C# confluent.kafka driver 1.5.1 (latest) with librd.kafka 1.5.0 (latest).

The docker-compose file i start kafka with is the following

version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper
    hostname: zookeeper
    networks:
      - bridge_network
    ports:
      - "3001:3001"    
    environment:
      ZOOKEEPER_CLIENT_PORT: 3001
      ZOOKEEPER_TICK_TIME: 3000

  broker:
    image: confluentinc/cp-kafka
    hostname: broker
    depends_on:
      - zookeeper
    ports:
      - "3002:3002"
    networks:
      - bridge_network
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:3001'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://broker:3002'
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
 
  kafka_manager:
    image: sheepkiller/kafka-manager
    hostname: kafka_manager
    depends_on:
      - zookeeper
    ports:
      - '9000:9000'
    networks:
      - bridge_network
    environment:
      ZK_HOSTS: 'zookeeper:3001'
networks:
  bridge_network:
    driver: bridge
    driver_opts:
      com.docker.network.enable_ipv6: "false"

My consumer configuration in C# is the following:

            var consumer = new ConsumerBuilder<string, string>(new Dictionary<string, string>
            {
                { "bootstrap.servers", "PLAINTEXT://localhost:3002" },
                { "group.id", "some-test-group" },
                { "auto.offset.reset", "latest"},
                { "compression.codec", "gzip" },
                { "enable.auto.commit", "false" }
            }).Build();

            consumer.Subscribe("some-test-topic");

            while (true)
            {
                var cr = consumer.Consume(30_000);
                if (cr == null || cr.Message.Key == null || cr.Message.Value == null)
                {
                    System.Console.WriteLine("that's it");
                    break;
                }

                System.Console.WriteLine(cr.Message.Key + ": " + cr.Message.Value);
            }

I'm sure there are messages in the topic's partitions as I can examine the topic using kafka tool 2.0 enter image description here

the configuration i used for kafka tool is enter image description hereenter image description here

I'm pretty sure I have missed something in the config file but after 2 days of documentaton reading and slamming my head in the wall i still cannot find the issue. So can anyone help?

2
Could you please attach your kafka consumer logs ?Rishabh Sharma

2 Answers

1
votes

You need to set auto.offset.reset to "earliest" or produce messages to the topic while your consumer is running.

1
votes

The issue is with the brokers and topic replication factor. I used your docker-compose file to deploy kafka, I connected to see the logs and there were messages:

ERROR [KafkaApi-1] Number of alive brokers '1' does not meet the required replication factor '3' for the offsets topic (configured via 'offsets.topic.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet. (kafka.server.KafkaApis)

To solve this problem I had to add `KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1' for broker configuration. So my broker service config looks like this:

broker:
    image: confluentinc/cp-kafka
    hostname: broker
    depends_on:
      - zookeeper
    ports:
      - "3002:3002"
    networks:
      - bridge_network
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:3001'
      KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:3002'
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

After restarting the broker I was able to produce/consume messages.