0
votes

I have a task to create an application with kafka in kubernetes. But when i connect consumer to kafka i got error:

WARN 1 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-test-1, groupId=test] Connection to node -1 (kafka-service/10.99.233.131:9092) could not be established. Broker may not be available.

WARN 1 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-test-1, groupId=test] Bootstrap broker kafka-service:9092 (id: -1 rack: null) disconnected

Here is my yaml file for kubernetes:

apiVersion: v1
kind: Service
metadata:
  name: kafka-service
  labels:
    name: kafka
spec:
  ports:
  - port: 9092
    targetPort: 9092
    protocol: TCP
  selector:
    name: kafka
---
apiVersion: v1
kind: Service
metadata:
  name: zookeeper-service
  labels:
    name: zookeeper
spec:
  ports:
  - name: client
    port: 2181
    protocol: TCP
  - name: follower
    port: 2888
    protocol: TCP
  - name: leader
    port: 3888
    protocol: TCP
  selector:
    name: zookeeper
  type: LoadBalancer
---
apiVersion: v1
kind: Pod
metadata:
  name: zookeeper
  labels:
    name: zookeeper 
spec:
  containers:
    - name: zookeeper
      image: zookeeper:3.7.0
---
apiVersion: v1
kind: Pod
metadata:
  name: kafka
  labels:
    name: kafka 
spec:
  containers:
    - name: kafka
      image: wurstmeister/kafka:2.13-2.6.0
      imagePullPolicy: "IfNotPresent"
      env:
        - name: KAFKA_ADVERTISED_PORT
          value: "666"
        - name: KAFKA_ADVERTISED_HOST_NAME
          value: localhost
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zookeeper-service:2181
        - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
          value: INSIDE:PLAINTEXT
        - name: KAFKA_ADVERTISED_LISTENERS
          value: INSIDE://:666
        - name: KAFKA_LISTENERS
          value: INSIDE://:666
        - name: KAFKA_INTER_BROKER_LISTENER_NAME
          value: INSIDE
      ports:
        - containerPort: 9092

Simple java code:

@Service
public class Consumer {
    @KafkaListener(topics = "new-topic",groupId = "test")
    public void consumeMessage(String message){
        System.out.println("************************");
        System.out.println(message);
        System.out.println("************************");
    }
}

How to create kafka broker without scaling just for tests?

1
Start here strimzi.ioOneCricketeer
In any case, you seem to misunderstand the env vars you've set. KAFKA_ADVERTISED_LISTENERS takes precedence over KAFKA_ADVERTISED_PORT and the hostname. And since you've set the listener port to 666, then there's no container port to 9092 that you'll be able to use in a clientOneCricketeer

1 Answers

0
votes

For me very helpful was this post: https://www.confluent.io/blog/kafka-listeners-explained/

I am changed yaml file to this:

apiVersion: v1
kind: Pod
metadata:
  name: kafka-service
  labels:
    name: kafka-service 
spec:
  containers:
    - name: kafka-service
      image: wurstmeister/kafka:2.13-2.6.0
      imagePullPolicy: "IfNotPresent"
      env:
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zookeeper-service:2181
        - name: KAFKA_BROKER_ID
          value: "1"

        - name: KAFKA_LISTENERS
          value: IN://:9092,OUT://:9093
        - name: KAFKA_ADVERTISED_LISTENERS
          value: IN://localhost:9092,OUT://kafka-service:9093
        - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
          value: IN:PLAINTEXT,OUT:PLAINTEXT
        - name: KAFKA_INTER_BROKER_LISTENER_NAME
          value: IN

This means that port 9093 оpen inside kubernetes network and port 9092 open from host machine. After that all work correctly.