As part of my system, I need to use Kafka and Zookeeper cluster on top of Kubernetes. I'm using Statefulset for deploying them and use headless service for Kafka's Broker to be able to talk with the Zookeeper servers.
Its seems that the clusters are running - (by typing kubectl get pods
)
NAME READY STATUS RESTARTS AGE
kafka-statefulset-0 1/1 Running 14 5d10h
kafka-statefulset-1 1/1 Running 14 5d10h
kafka-statefulset-2 1/1 Running 16 5d10h
kafka-statefulset-3 1/1 Running 14 5d10h
kafka-statefulset-4 1/1 Running 14 5d10h
zookeeper-statefulset-0 1/1 Running 5 5d10h
zookeeper-statefulset-1 1/1 Running 5 5d10h
zookeeper-statefulset-2 1/1 Running 5 5d10h
My problem is that I do not really understand how can I check if they can communicate properly.
What I have already tried - I tried -
kafkacat -L -b kafka-statefulset-0.kafka headless.default.svc.cluster.local:9093
and got -
% ERROR: Failed to acquire metadata: Local: Broker transport failure
I tried -
kafkacat -b 172.17.0.10:9093 -t second_topic -P
and got -
% ERROR: Local: Host resolution failure: kafka-statefulset-0.kafka-headless.default.svc.cluster.local:9093/0: Failed to resolve 'kafka-statefulset-0.kafka-headless.default.svc.cluster.local:9093': Name or service not known
% ERROR: Local: Host resolution failure: kafka-statefulset-1.kafka-headless.default.svc.cluster.local:9093/1: Failed to resolve 'kafka-statefulset-1.kafka-headless.default.svc.cluster.local:9093': Name or service not known
% ERROR: Local: Host resolution failure: kafka-statefulset-3.kafka-headless.default.svc.cluster.local:9093/3: Failed to resolve 'kafka-statefulset-3.kafka-headless.default.svc.cluster.local:9093': Name or service not known
% ERROR: Local: Host resolution failure: kafka-statefulset-4.kafka-headless.default.svc.cluster.local:9093/4: Failed to resolve 'kafka-statefulset-4.kafka-headless.default.svc.cluster.local:9093': Name or service not known
% ERROR: Local: Host resolution failure: kafka-statefulset-2.kafka-headless.default.svc.cluster.local:9093/2: Failed to resolve 'kafka-statefulset-2.kafka-headless.default.svc.cluster.local:9093': Name or service not known
but when I ran -
kafkacat -L -b 172.17.0.10:9093
I get -
Metadata for all topics (from broker -1: 172.17.0.10:9093/bootstrap):
5 brokers:
broker 2 at kafka-statefulset-2.kafka-headless.default.svc.cluster.local:9093
broker 4 at kafka-statefulset-4.kafka-headless.default.svc.cluster.local:9093
broker 1 at kafka-statefulset-1.kafka-headless.default.svc.cluster.local:9093
broker 3 at kafka-statefulset-3.kafka-headless.default.svc.cluster.local:9093
broker 0 at kafka-statefulset-0.kafka-headless.default.svc.cluster.local:9093
4 topics:
topic "second_topic" with 1 partitions:
partition 0, leader 4, replicas: 4, isrs: 4
As I understand right now, I have not configured the services correctly for connecting with them from outside the cluster, but I can connect to them from within the cluster. Although I can communicate with them from within the cluster, I keep getting errors I can't understand.
For example (installing Kafkacat inside the pod's container and trying to talk with other Kafka's broker) -
> kubectl exec -it kafka-statefulset-0 bash
> apt-get install kafkacat
> kafkacat -b kafka-statefulset-1.kafka-headless.default.svc.cluster.local:9093 -t TOPIC -P
> kafkacat -L -b kafka-statefulset-1.kafka-headless.default.svc.cluster.local:9093
I got the right metadata but with error in the end -
etadata for all topics (from broker -1: kafka-statefulset-1.kafka-headless.default.svc.cluster.local:9093/bootstrap):
5 brokers:
broker 2 at kafka-statefulset-2.kafka-headless.default.svc.cluster.local:9093
broker 4 at kafka-statefulset-4.kafka-headless.default.svc.cluster.local:9093
broker 1 at kafka-statefulset-1.kafka-headless.default.svc.cluster.local:9093
broker 3 at kafka-statefulset-3.kafka-headless.default.svc.cluster.local:9093
broker 0 at kafka-statefulset-0.kafka-headless.default.svc.cluster.local:9093
5 topics:
topic "TOPIC" with 1 partitions:
partition 0, leader 2, replicas: 2, isrs: 2
topic "second_topic" with 1 partitions:
partition 0, leader 4, replicas: 4, isrs: 4
topic "first_topic" with 1 partitions:
partition 0, leader 2, replicas: 2, isrs: 2
topic "nir_topic" with 1 partitions:
partition 0, leader 0, replicas: 0, isrs: 0
topic "first" with 1 partitions:
partition 0, leader 3, replicas: 3, isrs: 3
%3|1581685918.022|FAIL|rdkafka#producer-0| kafka-statefulset-0.kafka-headless.default.svc.cluster.local:9093/0: Failed to connect to broker at kafka-statefulset-0.kafka-headless.default.svc.cluster.local:: Interrupted system call
%3|1581685918.022|ERROR|rdkafka#producer-0| kafka-statefulset-0.kafka-headless.default.svc.cluster.local:9093/0: Failed to connect to broker at kafka-statefulset-0.kafka-headless.default.svc.cluster.local:: Interrupted system call
%3|1581685918.022|FAIL|rdkafka#producer-0| kafka-statefulset-2.kafka-headless.default.svc.cluster.local:9093/2: Failed to connect to broker at kafka-statefulset-2.kafka-headless.default.svc.cluster.local:: Interrupted system call
%3|1581685918.022|ERROR|rdkafka#producer-0| kafka-statefulset-2.kafka-headless.default.svc.cluster.local:9093/2: Failed to connect to broker at kafka-statefulset-2.kafka-headless.default.svc.cluster.local:: Interrupted system call
%3|1581685918.023|FAIL|rdkafka#producer-0| kafka-statefulset-4.kafka-headless.default.svc.cluster.local:9093/4: Failed to connect to broker at kafka-statefulset-4.kafka-headless.default.svc.cluster.local:: Interrupted system call
%3|1581685918.023|ERROR|rdkafka#producer-0| kafka-statefulset-4.kafka-headless.default.svc.cluster.local:9093/4: Failed to connect to broker at kafka-statefulset-4.kafka-headless.default.svc.cluster.local:: Interrupted system call
What cloud be the problem? How can I check the communication between them and check the functionalities correctness of both of them - Kafka and Zookeeper?
Some more information about the system (I can copy all the YAML configuration if needed, but it is pretty long)-
- There is a Statefulset of Kafka and Statefulset of Zookeeper.
- Kafka's Broker can talk with the zookeeper through port 2181. My configuration is -
-override zookeeper.connect=zookeeper-statefulset-0.zookeeper-headless.default.svc.cluster.local:2181 ...
I also built headless service for talking with each Kafka's Broker through port 9093
Zookeeper ports:
a. 2181 - client port
b. 2888 - server port
c. 3888 - port for leader election
5.The services which run -
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
kafka-headless ClusterIP None <none> 9093/TCP 5d10h
kubernetes ClusterIP 10.96.0.1 <none> 443/TCP 5d11h
zookeeper-cs ClusterIP 10.106.99.170 <none> 2181/TCP 5d11h
zookeeper-headless ClusterIP None <none> 2888/TCP,3888/TCP 5d11h
UPDATE - Adding Kafka and Zookeeper configuration files kafka -
apiVersion: apps/v1
kind: StatefulSet
metadata:
spec:
selector:
matchLabels:
app: kafka-app
serviceName: kafka-headless
replicas: 5
podManagementPolicy: Parallel
template:
metadata:
labels:
app: kafka-app
kafkaMinReplicas: need-supervision
spec:
affinity:
podAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 1
podAffinityTerm:
labelSelector:
matchExpressions:
- key: "app"
operator: In
values:
- zookeeper-app
topologyKey: "kubernetes.io/hostname"
terminationGracePeriodSeconds: 300
containers:
- name: k8skafka
imagePullPolicy: Always
image: gcr.io/google_containers/kubernetes-kafka:1.0-10.2.1
resources:
requests:
memory: "1Gi"
cpu: 500m
ports:
- containerPort: 9093
name: server
command:
- sh
- -c
- "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=${HOSTNAME##*-} \
--override listeners=PLAINTEXT://:9093 \
--override zookeeper.connect=zookeeper-statefulset-0.zookeeper-headless.default.svc.cluster.local:2181,zookeeper-statefulset-1.zookeeper-headless.default.svc.cluster.local:2181,zookeeper-statefulset-2.zookeeper-headless.default.svc.cluster.local:2181 \
--override log.dir=/var/lib/kafka \
--override auto.create.topics.enable=true \
--override auto.leader.rebalance.enable=true \
--override background.threads=10 \
--override compression.type=producer \
--override delete.topic.enable=false \
--override leader.imbalance.check.interval.seconds=300 \
--override leader.imbalance.per.broker.percentage=10 \
--override log.flush.interval.messages=9223372036854775807 \
--override log.flush.offset.checkpoint.interval.ms=60000 \
--override log.flush.scheduler.interval.ms=9223372036854775807 \
--override log.retention.bytes=-1 \
--override log.retention.hours=168 \
--override log.roll.hours=168 \
--override log.roll.jitter.hours=0 \
--override log.segment.bytes=1073741824 \
--override log.segment.delete.delay.ms=60000 \
--override message.max.bytes=1000012 \
--override min.insync.replicas=1 \
--override num.io.threads=8 \
--override num.network.threads=3 \
--override num.recovery.threads.per.data.dir=1 \
--override num.replica.fetchers=1 \
--override offset.metadata.max.bytes=4096 \
--override offsets.commit.required.acks=-1 \
--override offsets.commit.timeout.ms=5000 \
--override offsets.load.buffer.size=5242880 \
--override offsets.retention.check.interval.ms=600000 \
--override offsets.retention.minutes=1440 \
--override offsets.topic.compression.codec=0 \
--override offsets.topic.num.partitions=50 \
--override offsets.topic.replication.factor=3 \
--override offsets.topic.segment.bytes=104857600 \
--override queued.max.requests=500 \
--override quota.consumer.default=9223372036854775807 \
--override quota.producer.default=9223372036854775807 \
--override replica.fetch.min.bytes=1 \
--override replica.fetch.wait.max.ms=500 \
--override replica.high.watermark.checkpoint.interval.ms=5000 \
--override replica.lag.time.max.ms=10000 \
--override replica.socket.receive.buffer.bytes=65536 \
--override replica.socket.timeout.ms=30000 \
--override request.timeout.ms=30000 \
--override socket.receive.buffer.bytes=102400 \
--override socket.request.max.bytes=104857600 \
--override socket.send.buffer.bytes=102400 \
--override unclean.leader.election.enable=true \
--override zookeeper.session.timeout.ms=30000 \
--override zookeeper.set.acl=false \
--override broker.id.generation.enable=true \
--override connections.max.idle.ms=600000 \
--override controlled.shutdown.enable=true \
--override controlled.shutdown.max.retries=3 \
--override controlled.shutdown.retry.backoff.ms=5000 \
--override controller.socket.timeout.ms=30000 \
--override default.replication.factor=1 \
--override fetch.purgatory.purge.interval.requests=1000 \
--override group.max.session.timeout.ms=300000 \
--override group.min.session.timeout.ms=6000 \
--override inter.broker.protocol.version=0.10.2-IV0 \
--override log.cleaner.backoff.ms=15000 \
--override log.cleaner.dedupe.buffer.size=134217728 \
--override log.cleaner.delete.retention.ms=86400000 \
--override log.cleaner.enable=true \
--override log.cleaner.io.buffer.load.factor=0.9 \
--override log.cleaner.io.buffer.size=524288 \
--override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \
--override log.cleaner.min.cleanable.ratio=0.5 \
--override log.cleaner.min.compaction.lag.ms=0 \
--override log.cleaner.threads=1 \
--override log.cleanup.policy=delete \
--override log.index.interval.bytes=4096 \
--override log.index.size.max.bytes=10485760 \
--override log.message.timestamp.difference.max.ms=9223372036854775807 \
--override log.message.timestamp.type=CreateTime \
--override log.preallocate=false \
--override log.retention.check.interval.ms=300000 \
--override max.connections.per.ip=2147483647 \
--override num.partitions=1 \
--override producer.purgatory.purge.interval.requests=1000 \
--override replica.fetch.backoff.ms=1000 \
--override replica.fetch.max.bytes=1048576 \
--override replica.fetch.response.max.bytes=10485760 \
--override reserved.broker.max.id=1000 "
env:
- name: KAFKA_HEAP_OPTS
value : "-Xmx512M -Xms512M"
- name: KAFKA_OPTS
value: "-Dlogging.level=INFO"
volumeMounts:
- name: datadir
mountPath: /var/lib/kafka
readinessProbe:
exec:
command:
- sh
- -c
- "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server=localhost:9093"
securityContext:
fsGroup: 1000
volumeClaimTemplates:
- metadata:
name: datadir
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 1Gi
Zookeeper -
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: zookeeper-statefulset
spec:
selector:
matchLabels:
app: zookeeper-app
serviceName: zookeeper-headless
replicas: 5
updateStrategy:
type: RollingUpdate
podManagementPolicy: OrderedReady
template:
metadata:
labels:
app: zookeeper-app
spec:
containers:
- name: kubernetes-zookeeper
imagePullPolicy: Always
image: "k8s.gcr.io/kubernetes-zookeeper:1.0-3.4.10"
resources:
requests:
memory: "1Gi"
cpu: "0.5"
ports:
# Expose by zookeeper-cs to the client
- containerPort: 2181
name: client
# Expose by zookeeper-headless to the other replicas of the set
- containerPort: 2888
name: server
- containerPort: 3888
name: leader-election
command:
- sh
- -c
- "start-zookeeper \
--servers=3 \
--data_dir=/var/lib/zookeeper/data \
--data_log_dir=/var/lib/zookeeper/data/log \
--conf_dir=/opt/zookeeper/conf \
--client_port=2181 \
--election_port=3888 \
--server_port=2888 \
--tick_time=2000 \
--init_limit=10 \
--sync_limit=5 \
--heap=512M \
--max_client_cnxns=60 \
--snap_retain_count=3 \
--purge_interval=12 \
--max_session_timeout=40000 \
--min_session_timeout=4000 \
--log_level=INFO"
readinessProbe:
exec:
command: ["sh", "-c", "zookeeper-ready 2181"]
initialDelaySeconds: 60
timeoutSeconds: 10
livenessProbe:
exec:
command:
- sh
- -c
- "zookeeper-ready 2181"
initialDelaySeconds: 60
timeoutSeconds: 10
volumeMounts:
- name: zookeeper-volume
mountPath: /var/lib/zookeeper
securityContext:
fsGroup: 1000
volumeClaimTemplates:
- metadata:
name: zookeeper-volume
spec:
accessModes: [ "ReadWriteOnce" ]
resources:
requests:
storage: 1Gi
kafkacat
inside the cluster? Could you share youer manifests? Also what is the output ofkubectl run -i --tty --image busybox:1.28 dns-test --restart=Never --rm
- PjoterS/ #:
- nirkov/ #:
typenslookup headless.default.svc.cluster.local
and what will be output? Manifest's I mean YAMLs, your config - PjoterS/ # nslookup headless.default.svc.cluster.local
and got -Server: 10.96.0.10 Address 1: 10.96.0.10 kube-dns.kube-system.svc.cluster.local nslookup: can't resolve 'headless.default.svc.cluster.local'
- nirkov