Running simple kafka instance on kubernetes

8/31/2020

I am new to the Kubernetes and trying to deploy a single replica kafka instance on the single node minikube cluster.

Here is the zookeeper service/deployment yml

apiVersion: v1
kind: Service
metadata:
  name: zookeeper-cluster
  labels:
    component: zookeeper
spec:
  ports:
  - name: "2181"
    port: 2181
    targetPort: 2181
  selector:
    component: zookeeper
status:
  loadBalancer: {}

---

apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    component: zookeeper
  name: zookeeper
spec:
  replicas: 1
  strategy:
    type: Recreate
  selector:
    matchLabels:
      component: zookeeper      
  template:
    metadata:
      labels:
        component: zookeeper
    spec:
      containers:
      - image: zookeeper:3.4.13
        name: zookeeper
        ports:
        - containerPort: 2181
        resources:
          limits:
            memory: "256Mi"
            cpu: "100m"
        volumeMounts:
        - mountPath: /conf
          name: zookeeper-claim0
        - mountPath: /data
          name: zookeeper-claim1
        - mountPath: /datalog
          name: zookeeper-claim2
      restartPolicy: Always
      volumes:
      - name: zookeeper-claim0
        persistentVolumeClaim:
          claimName: zookeeper-claim0
      - name: zookeeper-claim1
        persistentVolumeClaim:
          claimName: zookeeper-claim1
      - name: zookeeper-claim2
        persistentVolumeClaim:
          claimName: zookeeper-claim2
status: {}

and here is the Kafka service/deployment yml

apiVersion: v1
kind: Service
metadata:
  name: kafka-cluster
  labels:
    component: kafka
spec:
  ports:
  - name: "9092"
    port: 9092
    targetPort: 9092
  selector:
    component: kafka
status:
  loadBalancer: {}

---

apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    component: kafka
  name: kafka
spec:
  replicas: 1
  strategy:
    type: Recreate
  selector:
    matchLabels:
      component: kafka    
  template:
    metadata:
      labels:
        component: kafka
    spec:
      containers:
      - args:
        - start-kafka.sh
        env:
        - name: KAFKA_INTER_BROKER_LISTENER_NAME
          value: LISTENER_BOB
        - name: KAFKA_ADVERTISED_LISTENERS
          value: LISTENER_BOB://:9092
        - name: KAFKA_LISTENERS
          value: LISTENER_BOB://:9092
        - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
          value:  LISTENER_BOB:PLAINTEXT
        - name: KAFKA_BROKER_ID
          value: "1"
        - name: KAFKA_LOG_DIRS
          value: /kafka/kafka-logs
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zookeeper-cluster:2181
        image: wurstmeister/kafka:2.12-2.4.1
        name: kafka
        ports:
        - containerPort: 9092
        resources:
          limits:
            memory: "256Mi"
            cpu: "200m"
        volumeMounts:
        - mountPath: /kafka/kafka-logs
          name: kafka-claim0
      restartPolicy: Always
      volumes:
      - name: kafka-claim0
        persistentVolumeClaim:
          claimName: kafka-claim0
status: {}

When trying to access the kafka from another application on kafka-cluster:9092, which is also running as a deployment, it throws an UnresolvedAddressException. where kafka-6799c65d58-f6tbt:9092 is the pod name

java.io.IOException: Can't resolve address: **kafka-6799c65d58-f6tbt:9092**
	at org.apache.kafka.common.network.Selector.doConnect(Selector.java:235)
	at org.apache.kafka.common.network.Selector.connect(Selector.java:214)
	at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:864)
	at org.apache.kafka.clients.NetworkClient.access$700(NetworkClient.java:64)
	at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1035)
	at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:920)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:508)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
	at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
	at java.base/java.lang.Thread.run(Thread.java:835)
Caused by: java.nio.channels.UnresolvedAddressException: null
	at java.base/sun.nio.ch.Net.checkAddress(Net.java:130)
	at java.base/sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:675)
	at org.apache.kafka.common.network.Selector.doConnect(Selector.java:233)
	... 9 common frames omitted 

Am I making any mistake while configuring it? or is there any alternative to it?

-- Nishit Jain
apache-kafka
kubernetes
kubernetes-pod

2 Answers

8/31/2020

There are a couple of problems that I can think of.

  1. The service type has not been explicitly set thus, defaults to ClusterIP type which is only accessible within the cluster. (K8s Internal DNS should work) . You can use NodePort or LoadBalancer types if you want to expose it to the outside world.

  2. Even if your application is running on the same k8s cluster and in a different k8s namespace, then you have to use kafka-cluster.mynamespace as the internal address.

Check here for more info: https://kubernetes.io/docs/concepts/services-networking/

-- Pubudu Sitinamaluwa
Source: StackOverflow

9/1/2020

It looks like kafka broker is advertising its own hostname (kafka-6799c65d58-f6tbt) as FQDN, which is the same as a pod name. Deployment pod's names cannot be resolved by DNS.

If you take a look at any kafka helm chart i.e. this one you are going to see that they are using statefulsets. Statefulsets allow for resolving ip addresses of pods. Take a look here at k8s docs on how this works.

You could also try setting KAFKA_ADVERTISED_LISTENERS to :

- name: MY_POD_IP
  valueFrom:
    fieldRef:
      fieldPath: status.podIP
- name: KAFKA_ADVERTISED_LISTENERS
  value: "LISTENER_BOB://$(MY_POD_IP):9092/"

But it doesn't scale well when changeing number of replicas.

-- Matt
Source: StackOverflow