I am new to the Kubernetes and trying to deploy a single replica kafka instance on the single node minikube cluster.
Here is the zookeeper service/deployment yml
apiVersion: v1
kind: Service
metadata:
name: zookeeper-cluster
labels:
component: zookeeper
spec:
ports:
- name: "2181"
port: 2181
targetPort: 2181
selector:
component: zookeeper
status:
loadBalancer: {}
---
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
component: zookeeper
name: zookeeper
spec:
replicas: 1
strategy:
type: Recreate
selector:
matchLabels:
component: zookeeper
template:
metadata:
labels:
component: zookeeper
spec:
containers:
- image: zookeeper:3.4.13
name: zookeeper
ports:
- containerPort: 2181
resources:
limits:
memory: "256Mi"
cpu: "100m"
volumeMounts:
- mountPath: /conf
name: zookeeper-claim0
- mountPath: /data
name: zookeeper-claim1
- mountPath: /datalog
name: zookeeper-claim2
restartPolicy: Always
volumes:
- name: zookeeper-claim0
persistentVolumeClaim:
claimName: zookeeper-claim0
- name: zookeeper-claim1
persistentVolumeClaim:
claimName: zookeeper-claim1
- name: zookeeper-claim2
persistentVolumeClaim:
claimName: zookeeper-claim2
status: {}
and here is the Kafka service/deployment yml
apiVersion: v1
kind: Service
metadata:
name: kafka-cluster
labels:
component: kafka
spec:
ports:
- name: "9092"
port: 9092
targetPort: 9092
selector:
component: kafka
status:
loadBalancer: {}
---
apiVersion: apps/v1
kind: Deployment
metadata:
labels:
component: kafka
name: kafka
spec:
replicas: 1
strategy:
type: Recreate
selector:
matchLabels:
component: kafka
template:
metadata:
labels:
component: kafka
spec:
containers:
- args:
- start-kafka.sh
env:
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: LISTENER_BOB
- name: KAFKA_ADVERTISED_LISTENERS
value: LISTENER_BOB://:9092
- name: KAFKA_LISTENERS
value: LISTENER_BOB://:9092
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: LISTENER_BOB:PLAINTEXT
- name: KAFKA_BROKER_ID
value: "1"
- name: KAFKA_LOG_DIRS
value: /kafka/kafka-logs
- name: KAFKA_ZOOKEEPER_CONNECT
value: zookeeper-cluster:2181
image: wurstmeister/kafka:2.12-2.4.1
name: kafka
ports:
- containerPort: 9092
resources:
limits:
memory: "256Mi"
cpu: "200m"
volumeMounts:
- mountPath: /kafka/kafka-logs
name: kafka-claim0
restartPolicy: Always
volumes:
- name: kafka-claim0
persistentVolumeClaim:
claimName: kafka-claim0
status: {}
When trying to access the kafka from another application on kafka-cluster:9092, which is also running as a deployment, it throws an UnresolvedAddressException. where kafka-6799c65d58-f6tbt:9092 is the pod name
java.io.IOException: Can't resolve address: **kafka-6799c65d58-f6tbt:9092**
at org.apache.kafka.common.network.Selector.doConnect(Selector.java:235)
at org.apache.kafka.common.network.Selector.connect(Selector.java:214)
at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:864)
at org.apache.kafka.clients.NetworkClient.access$700(NetworkClient.java:64)
at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1035)
at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:920)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:508)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163)
at java.base/java.lang.Thread.run(Thread.java:835)
Caused by: java.nio.channels.UnresolvedAddressException: null
at java.base/sun.nio.ch.Net.checkAddress(Net.java:130)
at java.base/sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:675)
at org.apache.kafka.common.network.Selector.doConnect(Selector.java:233)
... 9 common frames omitted
Am I making any mistake while configuring it? or is there any alternative to it?
There are a couple of problems that I can think of.
The service type has not been explicitly set thus, defaults to ClusterIP
type which is only accessible within the cluster. (K8s Internal DNS should work)
. You can use NodePort
or LoadBalancer
types if you want to expose it to the outside world.
Even if your application is running on the same k8s cluster and in a different k8s namespace, then you have to use kafka-cluster.mynamespace
as the internal address.
Check here for more info: https://kubernetes.io/docs/concepts/services-networking/
It looks like kafka broker is advertising its own hostname (kafka-6799c65d58-f6tbt) as FQDN, which is the same as a pod name. Deployment pod's names cannot be resolved by DNS.
If you take a look at any kafka helm chart i.e. this one you are going to see that they are using statefulsets. Statefulsets allow for resolving ip addresses of pods. Take a look here at k8s docs on how this works.
You could also try setting KAFKA_ADVERTISED_LISTENERS to :
- name: MY_POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: KAFKA_ADVERTISED_LISTENERS
value: "LISTENER_BOB://$(MY_POD_IP):9092/"
But it doesn't scale well when changeing number of replicas.