1
votes

I have a Kafka broker running in a multi VM environment (Private cloud - with our own Kubernetes cluster - 4 node cluster).

I have created a spring boot application that has a publisher that has to publish a message to the Kafka topic inside the Kafka broker. I have both the containers(Kafka broker & spring boot app) running in the same Kubernetes cluster.

I couldn't access the Kafka broker (running in the same k8s cluster) by providing the Kafka's service name: port id in the publisher's bootstrap.servers

Spring boot's publisher configuration:

 props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-svc:9092");- -> not working 

Cluster information:

kafka-broker yaml file:

    apiVersion: extensions/v1beta1
    kind: Deployment
    metadata:
      name: kafka-deploy
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: kafka
      template:
        metadata:
          labels:
            app: kafka
        spec:
          containers:
          - env:
            - name: KAFKA_PORT
              value: "9092"
            - name: KAFKA_ADVERTISED_PORT
              value: "9092"
            - name: KAFKA_ADVERTISED_HOST_NAME
              value: kafka-svc
            - name: KAFKA_CREATE_TOPICS
              value: "test:1:1"
            - name: KAFKA_ZOOKEEPER_CONNECT
              value: zookeeper-svc:2181
            - name: KAFKA_BROKER_ID
              value: "1"
            name: kafka
            image: wurstmeister/kafka
            ports:
            - containerPort: 9092
    ------
       apiVersion: v1
       kind: Service
       metadata:
         name: kafka-svc
       spec:
         type: NodePort
         ports:
         - port: 9092
           targetPort: 9092
           name: http
           protocol: TCP
         selector:
           app: kafka
           kafka-broker-id: "1"

When I attempt to push a string message on to Kafka topic, I am getting the below exception:

"2020-08-18 13:28:04.525  INFO 1 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1597757284523
2020-08-18 13:29:04.539 ERROR 1 --- [nio-8080-exec-1] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='TEST' to topic test:

org.apache.kafka.common.errors.TimeoutException: Topic test not present in metadata after 60000 ms.

2020-08-18 13:29:04.544 ERROR 1 --- [nio-8080-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic test not present in metadata after 60000 ms.] with root cause
2

2 Answers

1
votes

It looks like you need to create test topic before you can publish anything.

You can see if the topic is available in your Kafka cluster. For example from the one of Kafka broker pods:

bin/kafka-topics.sh --bootstrap-server localhost:9092 --zookeeper zookeeper-svc:2181 --topic test --describe

Or you can create it on your Spring Boot app:

package io.stockgeeks.kafka.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;

@Configuration
public class KafkaTopicConfiguration {
  
  @Bean
  public NewTopic topicExample() {
    return TopicBuilder.name("test")
      .partitions(6)
      .replicas(3)
      .build();
  }
}

✌️

0
votes

Thanks. I have created the topic test using the environment variable option while spinning up the Kafka-broker pod itself. Hence we already have the topic test in the kafka broker system. I suspect the problem with the kafka-service configuration which we use in kubernetes to connect to kafka broker.

The issue is with the kafka service which we use ( from our spring boot app's publisher) to connect to kafka broker's topic

I have got the below exception when I try to connect to kafka broker using kafka service

bash-4.4# kafka-topics.sh --bootstrap-server kafka-svc:9092 --topic test --describe Error while executing topic command : org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics, deadlineMs=1597833365003) timed out at 1597833365004 after 1 attempt(s) [2020-08-19 10:36:05,027] ERROR java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics, deadlineMs=1597833365003) timed out at 1597833365004 after 1 attempt(s) at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) at kafka.admin.TopicCommand$AdminClientTopicService.getTopics(TopicCommand.scala:333) at kafka.admin.TopicCommand$AdminClientTopicService.describeTopic(TopicCommand.scala:288) at kafka.admin.TopicCommand$.main(TopicCommand.scala:68) at kafka.admin.TopicCommand.main(TopicCommand.scala) Caused by: org.apache.kafka.common.errors.TimeoutException: Call(callName=listTopics, deadlineMs=1597833365003) timed out at 1597833365004 after 1 attempt(s) Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. (kafka.admin.TopicCommand$) bash-4.4#

But, when I attempt to connect through zookeeper service, I could get the expected result back as shown below

***bash-4.4# kafka-topics.sh --zookeeper zookeeper-svc:2181 --topic test --describe
Topic: test   PartitionCount: 1       ReplicationFactor: 1    Configs:
        Topic: test   Partition: 0    Leader: 1       Replicas: 1     Isr: 1
bash-4.4#***