I have a task to create an application with kafka in kubernetes. But when i connect consumer to kafka i got error:
WARN 1 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-test-1, groupId=test] Connection to node -1 (kafka-service/10.99.233.131:9092) could not be established. Broker may not be available.
WARN 1 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-test-1, groupId=test] Bootstrap broker kafka-service:9092 (id: -1 rack: null) disconnected
Here is my yaml file for kubernetes:
apiVersion: v1
kind: Service
metadata:
name: kafka-service
labels:
name: kafka
spec:
ports:
- port: 9092
targetPort: 9092
protocol: TCP
selector:
name: kafka
---
apiVersion: v1
kind: Service
metadata:
name: zookeeper-service
labels:
name: zookeeper
spec:
ports:
- name: client
port: 2181
protocol: TCP
- name: follower
port: 2888
protocol: TCP
- name: leader
port: 3888
protocol: TCP
selector:
name: zookeeper
type: LoadBalancer
---
apiVersion: v1
kind: Pod
metadata:
name: zookeeper
labels:
name: zookeeper
spec:
containers:
- name: zookeeper
image: zookeeper:3.7.0
---
apiVersion: v1
kind: Pod
metadata:
name: kafka
labels:
name: kafka
spec:
containers:
- name: kafka
image: wurstmeister/kafka:2.13-2.6.0
imagePullPolicy: "IfNotPresent"
env:
- name: KAFKA_ADVERTISED_PORT
value: "666"
- name: KAFKA_ADVERTISED_HOST_NAME
value: localhost
- name: KAFKA_ZOOKEEPER_CONNECT
value: zookeeper-service:2181
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: INSIDE:PLAINTEXT
- name: KAFKA_ADVERTISED_LISTENERS
value: INSIDE://:666
- name: KAFKA_LISTENERS
value: INSIDE://:666
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: INSIDE
ports:
- containerPort: 9092
Simple java code:
@Service
public class Consumer {
@KafkaListener(topics = "new-topic",groupId = "test")
public void consumeMessage(String message){
System.out.println("************************");
System.out.println(message);
System.out.println("************************");
}
}
How to create kafka broker without scaling just for tests?
KAFKA_ADVERTISED_LISTENERS
takes precedence overKAFKA_ADVERTISED_PORT
and the hostname. And since you've set the listener port to 666, then there's no container port to 9092 that you'll be able to use in a client – OneCricketeer