Check my Kafka and Zookeeper functionality and connections

2/14/2020

As part of my system, I need to use Kafka and Zookeeper cluster on top of Kubernetes. I'm using Statefulset for deploying them and use headless service for Kafka's Broker to be able to talk with the Zookeeper servers.

Its seems that the clusters are running - (by typing kubectl get pods)

NAME                      READY   STATUS    RESTARTS   AGE
kafka-statefulset-0       1/1     Running   14         5d10h
kafka-statefulset-1       1/1     Running   14         5d10h
kafka-statefulset-2       1/1     Running   16         5d10h
kafka-statefulset-3       1/1     Running   14         5d10h
kafka-statefulset-4       1/1     Running   14         5d10h
zookeeper-statefulset-0   1/1     Running   5          5d10h
zookeeper-statefulset-1   1/1     Running   5          5d10h
zookeeper-statefulset-2   1/1     Running   5          5d10h

My problem is that I do not really understand how can I check if they can communicate properly.

What I have already tried - I tried -

 kafkacat -L -b kafka-statefulset-0.kafka headless.default.svc.cluster.local:9093

and got -

% ERROR: Failed to acquire metadata: Local: Broker transport failure

I tried -

kafkacat -b 172.17.0.10:9093 -t second_topic -P

and got -

% ERROR: Local: Host resolution failure: kafka-statefulset-0.kafka-headless.default.svc.cluster.local:9093/0: Failed to resolve 'kafka-statefulset-0.kafka-headless.default.svc.cluster.local:9093': Name or service not known
% ERROR: Local: Host resolution failure: kafka-statefulset-1.kafka-headless.default.svc.cluster.local:9093/1: Failed to resolve 'kafka-statefulset-1.kafka-headless.default.svc.cluster.local:9093': Name or service not known
% ERROR: Local: Host resolution failure: kafka-statefulset-3.kafka-headless.default.svc.cluster.local:9093/3: Failed to resolve 'kafka-statefulset-3.kafka-headless.default.svc.cluster.local:9093': Name or service not known
% ERROR: Local: Host resolution failure: kafka-statefulset-4.kafka-headless.default.svc.cluster.local:9093/4: Failed to resolve 'kafka-statefulset-4.kafka-headless.default.svc.cluster.local:9093': Name or service not known
% ERROR: Local: Host resolution failure: kafka-statefulset-2.kafka-headless.default.svc.cluster.local:9093/2: Failed to resolve 'kafka-statefulset-2.kafka-headless.default.svc.cluster.local:9093': Name or service not known

but when I ran -

kafkacat -L -b 172.17.0.10:9093

I get -

Metadata for all topics (from broker -1: 172.17.0.10:9093/bootstrap):
 5 brokers:
  broker 2 at kafka-statefulset-2.kafka-headless.default.svc.cluster.local:9093
  broker 4 at kafka-statefulset-4.kafka-headless.default.svc.cluster.local:9093
  broker 1 at kafka-statefulset-1.kafka-headless.default.svc.cluster.local:9093
  broker 3 at kafka-statefulset-3.kafka-headless.default.svc.cluster.local:9093
  broker 0 at kafka-statefulset-0.kafka-headless.default.svc.cluster.local:9093
 4 topics:
  topic "second_topic" with 1 partitions:
    partition 0, leader 4, replicas: 4, isrs: 4

As I understand right now, I have not configured the services correctly for connecting with them from outside the cluster, but I can connect to them from within the cluster. Although I can communicate with them from within the cluster, I keep getting errors I can't understand.

For example (installing Kafkacat inside the pod's container and trying to talk with other Kafka's broker) -

 > kubectl exec -it  kafka-statefulset-0 bash 
 > apt-get install kafkacat
 > kafkacat -b kafka-statefulset-1.kafka-headless.default.svc.cluster.local:9093 -t TOPIC -P
 > kafkacat -L -b kafka-statefulset-1.kafka-headless.default.svc.cluster.local:9093

I got the right metadata but with error in the end -

etadata for all topics (from broker -1: kafka-statefulset-1.kafka-headless.default.svc.cluster.local:9093/bootstrap):
 5 brokers:
  broker 2 at kafka-statefulset-2.kafka-headless.default.svc.cluster.local:9093
  broker 4 at kafka-statefulset-4.kafka-headless.default.svc.cluster.local:9093
  broker 1 at kafka-statefulset-1.kafka-headless.default.svc.cluster.local:9093
  broker 3 at kafka-statefulset-3.kafka-headless.default.svc.cluster.local:9093
  broker 0 at kafka-statefulset-0.kafka-headless.default.svc.cluster.local:9093
 5 topics:
  topic "TOPIC" with 1 partitions:
    partition 0, leader 2, replicas: 2, isrs: 2
  topic "second_topic" with 1 partitions:
    partition 0, leader 4, replicas: 4, isrs: 4
  topic "first_topic" with 1 partitions:
    partition 0, leader 2, replicas: 2, isrs: 2
  topic "nir_topic" with 1 partitions:
    partition 0, leader 0, replicas: 0, isrs: 0
  topic "first" with 1 partitions:
    partition 0, leader 3, replicas: 3, isrs: 3
%3|1581685918.022|FAIL|rdkafka#producer-0| kafka-statefulset-0.kafka-headless.default.svc.cluster.local:9093/0: Failed to connect to broker at kafka-statefulset-0.kafka-headless.default.svc.cluster.local:: Interrupted system call
%3|1581685918.022|ERROR|rdkafka#producer-0| kafka-statefulset-0.kafka-headless.default.svc.cluster.local:9093/0: Failed to connect to broker at kafka-statefulset-0.kafka-headless.default.svc.cluster.local:: Interrupted system call
%3|1581685918.022|FAIL|rdkafka#producer-0| kafka-statefulset-2.kafka-headless.default.svc.cluster.local:9093/2: Failed to connect to broker at kafka-statefulset-2.kafka-headless.default.svc.cluster.local:: Interrupted system call
%3|1581685918.022|ERROR|rdkafka#producer-0| kafka-statefulset-2.kafka-headless.default.svc.cluster.local:9093/2: Failed to connect to broker at kafka-statefulset-2.kafka-headless.default.svc.cluster.local:: Interrupted system call
%3|1581685918.023|FAIL|rdkafka#producer-0| kafka-statefulset-4.kafka-headless.default.svc.cluster.local:9093/4: Failed to connect to broker at kafka-statefulset-4.kafka-headless.default.svc.cluster.local:: Interrupted system call
%3|1581685918.023|ERROR|rdkafka#producer-0| kafka-statefulset-4.kafka-headless.default.svc.cluster.local:9093/4: Failed to connect to broker at kafka-statefulset-4.kafka-headless.default.svc.cluster.local:: Interrupted system call

What cloud be the problem? How can I check the communication between them and check the functionalities correctness of both of them - Kafka and Zookeeper?

Some more information about the system (I can copy all the YAML configuration if needed, but it is pretty long)-

  1. There is a Statefulset of Kafka and Statefulset of Zookeeper.
  2. Kafka's Broker can talk with the zookeeper through port 2181. My configuration is -

-override zookeeper.connect=zookeeper-statefulset-0.zookeeper-headless.default.svc.cluster.local:2181 ...

  1. I also built headless service for talking with each Kafka's Broker through port 9093

  2. Zookeeper ports:

    a. 2181 - client port

    b. 2888 - server port

    c. 3888 - port for leader election

5.The services which run -

NAME                 TYPE        CLUSTER-IP      EXTERNAL-IP   PORT(S)             AGE
kafka-headless       ClusterIP   None            <none>        9093/TCP            5d10h
kubernetes           ClusterIP   10.96.0.1       <none>        443/TCP             5d11h
zookeeper-cs         ClusterIP   10.106.99.170   <none>        2181/TCP            5d11h
zookeeper-headless   ClusterIP   None            <none>        2888/TCP,3888/TCP   5d11h

UPDATE - Adding Kafka and Zookeeper configuration files kafka -

apiVersion: apps/v1
kind: StatefulSet
metadata:
spec:
  selector:
    matchLabels:
      app: kafka-app
  serviceName: kafka-headless
  replicas: 5
  podManagementPolicy: Parallel
  template:
    metadata:
      labels:
        app: kafka-app
        kafkaMinReplicas: need-supervision
    spec:
      affinity:
        podAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
             - weight: 1
               podAffinityTerm:
                 labelSelector:
                    matchExpressions:
                      - key: "app"
                        operator: In
                        values: 
                        - zookeeper-app
                 topologyKey: "kubernetes.io/hostname"
      terminationGracePeriodSeconds: 300
      containers:
      - name: k8skafka
        imagePullPolicy: Always
        image: gcr.io/google_containers/kubernetes-kafka:1.0-10.2.1 
        resources:
          requests:
            memory: "1Gi"
            cpu: 500m
        ports:
        - containerPort: 9093
          name: server
        command:
        - sh
        - -c
        - "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=${HOSTNAME##*-} \
          --override listeners=PLAINTEXT://:9093 \
          --override zookeeper.connect=zookeeper-statefulset-0.zookeeper-headless.default.svc.cluster.local:2181,zookeeper-statefulset-1.zookeeper-headless.default.svc.cluster.local:2181,zookeeper-statefulset-2.zookeeper-headless.default.svc.cluster.local:2181 \
          --override log.dir=/var/lib/kafka \
          --override auto.create.topics.enable=true \
          --override auto.leader.rebalance.enable=true \
          --override background.threads=10 \
          --override compression.type=producer \
          --override delete.topic.enable=false \
          --override leader.imbalance.check.interval.seconds=300 \
          --override leader.imbalance.per.broker.percentage=10 \
          --override log.flush.interval.messages=9223372036854775807 \
          --override log.flush.offset.checkpoint.interval.ms=60000 \
          --override log.flush.scheduler.interval.ms=9223372036854775807 \
          --override log.retention.bytes=-1 \
          --override log.retention.hours=168 \
          --override log.roll.hours=168 \
          --override log.roll.jitter.hours=0 \
          --override log.segment.bytes=1073741824 \
          --override log.segment.delete.delay.ms=60000 \
          --override message.max.bytes=1000012 \
          --override min.insync.replicas=1 \
          --override num.io.threads=8 \
          --override num.network.threads=3 \
          --override num.recovery.threads.per.data.dir=1 \
          --override num.replica.fetchers=1 \
          --override offset.metadata.max.bytes=4096 \
          --override offsets.commit.required.acks=-1 \
          --override offsets.commit.timeout.ms=5000 \
          --override offsets.load.buffer.size=5242880 \
          --override offsets.retention.check.interval.ms=600000 \
          --override offsets.retention.minutes=1440 \
          --override offsets.topic.compression.codec=0 \
          --override offsets.topic.num.partitions=50 \
          --override offsets.topic.replication.factor=3 \
          --override offsets.topic.segment.bytes=104857600 \
          --override queued.max.requests=500 \
          --override quota.consumer.default=9223372036854775807 \
          --override quota.producer.default=9223372036854775807 \
          --override replica.fetch.min.bytes=1 \
          --override replica.fetch.wait.max.ms=500 \
          --override replica.high.watermark.checkpoint.interval.ms=5000 \
          --override replica.lag.time.max.ms=10000 \
          --override replica.socket.receive.buffer.bytes=65536 \
          --override replica.socket.timeout.ms=30000 \
          --override request.timeout.ms=30000 \
          --override socket.receive.buffer.bytes=102400 \
          --override socket.request.max.bytes=104857600 \
          --override socket.send.buffer.bytes=102400 \
          --override unclean.leader.election.enable=true \
          --override zookeeper.session.timeout.ms=30000 \
          --override zookeeper.set.acl=false \
          --override broker.id.generation.enable=true \
          --override connections.max.idle.ms=600000 \
          --override controlled.shutdown.enable=true \
          --override controlled.shutdown.max.retries=3 \
          --override controlled.shutdown.retry.backoff.ms=5000 \
          --override controller.socket.timeout.ms=30000 \
          --override default.replication.factor=1 \
          --override fetch.purgatory.purge.interval.requests=1000 \
          --override group.max.session.timeout.ms=300000 \
          --override group.min.session.timeout.ms=6000 \
          --override inter.broker.protocol.version=0.10.2-IV0 \
          --override log.cleaner.backoff.ms=15000 \
          --override log.cleaner.dedupe.buffer.size=134217728 \
          --override log.cleaner.delete.retention.ms=86400000 \
          --override log.cleaner.enable=true \
          --override log.cleaner.io.buffer.load.factor=0.9 \
          --override log.cleaner.io.buffer.size=524288 \
          --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \
          --override log.cleaner.min.cleanable.ratio=0.5 \
          --override log.cleaner.min.compaction.lag.ms=0 \
          --override log.cleaner.threads=1 \
          --override log.cleanup.policy=delete \
          --override log.index.interval.bytes=4096 \
          --override log.index.size.max.bytes=10485760 \
          --override log.message.timestamp.difference.max.ms=9223372036854775807 \
          --override log.message.timestamp.type=CreateTime \
          --override log.preallocate=false \
          --override log.retention.check.interval.ms=300000 \
          --override max.connections.per.ip=2147483647 \
          --override num.partitions=1 \
          --override producer.purgatory.purge.interval.requests=1000 \
          --override replica.fetch.backoff.ms=1000 \
          --override replica.fetch.max.bytes=1048576 \
          --override replica.fetch.response.max.bytes=10485760 \
          --override reserved.broker.max.id=1000 "
        env:
        - name: KAFKA_HEAP_OPTS
          value : "-Xmx512M -Xms512M"
        - name: KAFKA_OPTS
          value: "-Dlogging.level=INFO"
        volumeMounts:
        - name: datadir
          mountPath: /var/lib/kafka
        readinessProbe:
          exec:
            command: 
            - sh 
            - -c 
            - "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server=localhost:9093"
      securityContext:
        fsGroup: 1000
  volumeClaimTemplates:
  - metadata:
      name: datadir
    spec:
      accessModes: 
      - ReadWriteOnce
      resources:
        requests:
          storage: 1Gi

Zookeeper -

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: zookeeper-statefulset
spec:
  selector:
    matchLabels:
      app: zookeeper-app
  serviceName: zookeeper-headless
  replicas: 5
  updateStrategy:
    type: RollingUpdate
  podManagementPolicy: OrderedReady
  template:
    metadata:
      labels:
        app: zookeeper-app
    spec:
      containers:
      - name: kubernetes-zookeeper
        imagePullPolicy: Always
        image: "k8s.gcr.io/kubernetes-zookeeper:1.0-3.4.10"
        resources:
          requests:
            memory: "1Gi"
            cpu: "0.5"

        ports:
        # Expose by zookeeper-cs to the client
        - containerPort: 2181
          name: client
        # Expose by zookeeper-headless to the other replicas of the set
        - containerPort: 2888
          name: server
        - containerPort: 3888
          name: leader-election
        command:
        - sh
        - -c
        - "start-zookeeper \
          --servers=3 \
          --data_dir=/var/lib/zookeeper/data \
          --data_log_dir=/var/lib/zookeeper/data/log \
          --conf_dir=/opt/zookeeper/conf \
          --client_port=2181 \
          --election_port=3888 \
          --server_port=2888 \
          --tick_time=2000 \
          --init_limit=10 \
          --sync_limit=5 \
          --heap=512M \
          --max_client_cnxns=60 \
          --snap_retain_count=3 \
          --purge_interval=12 \
          --max_session_timeout=40000 \
          --min_session_timeout=4000 \
          --log_level=INFO"
        readinessProbe:
          exec:
            command: ["sh", "-c", "zookeeper-ready 2181"]
          initialDelaySeconds: 60
          timeoutSeconds: 10
        livenessProbe:
          exec:
            command:
            - sh
            - -c
            - "zookeeper-ready 2181"
          initialDelaySeconds: 60
          timeoutSeconds: 10
        volumeMounts:
        - name: zookeeper-volume
          mountPath: /var/lib/zookeeper
      securityContext:
        fsGroup: 1000
  volumeClaimTemplates:
  - metadata:
      name: zookeeper-volume
    spec:
      accessModes: [ "ReadWriteOnce" ]
      resources:
        requests:
          storage: 1Gi 
-- nirkov
apache-kafka
apache-zookeeper
kubernetes

2 Answers

2/17/2020

As You mentioned in comment:

I ran - / # nslookup headless.default.svc.cluster.local and got - Server: 10.96.0.10 Address 1: 10.96.0.10 kube-dns.kube-system.svc.cluster.local nslookup: can't resolve 'headless.default.svc.cluster.local

Problem is related with DNS in your environment as your environment cannot resolve headless service. DNS component should provide Pod's DNS names.

You should receive something like:

/ # nslookup my-kafka-headless
Server:    10.122.0.10
Address 1: 10.122.0.10 kube-dns.kube-system.svc.cluster.local

Name:      my-kafka-headless
Address 1: 10.56.0.5 my-kafka-0.my-kafka-headless.default.svc.cluster.local

Treat is as sort of prerequisite if you want to refer to Pod's backing the Statefulset in Kubernetes based on DNS name (in your case: headless.default.svc.cluster.local).

Verify if your services have set .sepec.clusterIP: None or if everything is ok with kube-dns-XXXX pod in kube-system namespace. Here you can find some information about troubleshooting your DNS issue.

Also as @cricket_007 advised you can use helm for deploying kafka. For example helm chart from this source which also contains HowTo.

-- PjoterS
Source: StackOverflow

2/14/2020

kafkacat -L -b kafka-statefulset-0.kafka headless.default.svc.cluster.local:9093

Doesn't work. There is a space in there after the first -b flag


kafkacat -b 172.17.0.10:9093 -t second_topic -P

Without the correct advertised.listeners that return the resolvable addresses of brokers, this won't work.


kafkacat -L -b 172.17.0.10:9093

This is a step in the right direction, but you're using the Docker network IP, and not a Cluster / Node Port of any of the services, still isn't portable to other machines outside your cluster.


installing Kafkacat inside the pod's container and trying to talk with other Kafka's broker

That's a good test to make sure that replication will at least work, but doesn't solve the external client issue.


I got the right metadata but with error in the end

That could be a sign that your healthcheck on the broker or zookeeper is failing and the pod is restarting. Notice that the error cycles from 0,2,4...

I also built headless service for talking with each Kafka's Broker through port 9093

Okay, great, but you need to set advertised.listeners of each broker to now accept connections on 9093.


All in all, I think just using existing Kafka Helm charts that allow for external connectivity would be your best option.

-- OneCricketeer
Source: StackOverflow