Using kubernetes-kafka as a starting point with minikube.
This uses a StatefulSet and a headless service for service discovery within the cluster.
The goal is to expose the individual Kafka Brokers externally which are internally addressed as:
kafka-0.broker.kafka.svc.cluster.local:9092
kafka-1.broker.kafka.svc.cluster.local:9092
kafka-2.broker.kafka.svc.cluster.local:9092
The constraint is that this external service be able to address the brokers specifically.
Whats the right (or one possible) way of going about this? Is it possible to expose a external service per kafka-x.broker.kafka.svc.cluster.local:9092
?
We have solved this in 1.7 by changing the headless service to Type=NodePort
and setting the externalTrafficPolicy=Local
. This bypasses the internal load balancing of a Service and traffic destined to a specific node on that node port will only work if a Kafka pod is on that node.
apiVersion: v1
kind: Service
metadata:
name: broker
spec:
externalTrafficPolicy: Local
ports:
- nodePort: 30000
port: 30000
protocol: TCP
targetPort: 9092
selector:
app: broker
type: NodePort
For example, we have two nodes nodeA and nodeB, nodeB is running a kafka pod. nodeA:30000 will not connect but nodeB:30000 will connect to the kafka pod running on nodeB.
https://kubernetes.io/docs/tutorials/services/source-ip/#source-ip-for-services-with-typenodeport
Note this was also available in 1.5 and 1.6 as a beta annotation, more can be found here on feature availability: https://kubernetes.io/docs/tasks/access-application-cluster/create-external-load-balancer/#preserving-the-client-source-ip
Note also that while this ties a kafka pod to a specific external network identity, it does not guarantee that your storage volume will be tied to that network identity. If you are using the VolumeClaimTemplates in a StatefulSet then your volumes are tied to the pod while kafka expects the volume to be tied to the network identity.
For example, if the kafka-0 pod restarts and kafka-0 comes up on nodeC instead of nodeA, kafka-0's pvc (if using VolumeClaimTemplates) has data that it is for nodeA and the broker running on kafka-0 starts rejecting requests thinking that it is nodeA not nodeC.
To fix this, we are looking forward to Local Persistent Volumes but right now we have a single PVC for our kafka StatefulSet and data is stored under $NODENAME
on that PVC to tie volume data to a particular node.
https://github.com/kubernetes/features/issues/121 https://kubernetes.io/docs/concepts/storage/volumes/#local
Solutions so far weren't quite satisfying enough for myself, so I'm going to post an answer of my own. My goals:
Starting with Yolean/kubernetes-kafka, the only thing missing is exposing the service externally and two challenges in doing so.
Per pod labels and external services:
To generate labels per pod, this issue was really helpful. Using it as a guide, we add the following line to the 10broker-config.yml init.sh
property with:
kubectl label pods ${HOSTNAME} kafka-set-component=${HOSTNAME}
We keep the existing headless service, but we also generate an external Service per pod using the label (I added them to 20dns.yml):
apiVersion: v1
kind: Service
metadata:
name: broker-0
namespace: kafka
spec:
type: NodePort
ports:
- port: 9093
nodePort: 30093
selector:
kafka-set-component: kafka-0
Configure Kafka with internal/external listeners
I found this issue incredibly useful in trying to understand how to configure Kafka.
This again requires updating the init.sh
and server.properties
properties in 10broker-config.yml with the following:
Add the following to the server.properties
to update the security protocols (currently using PLAINTEXT
):
listener.security.protocol.map=INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT
inter.broker.listener.name=INTERNAL_PLAINTEXT
Dynamically determine the external IP and for external port for each Pod in the init.sh
:
EXTERNAL_LISTENER_IP=<your external addressable cluster ip>
EXTERNAL_LISTENER_PORT=$((30093 + ${HOSTNAME##*-}))
Then configure listeners
and advertised.listeners
IPs for EXTERNAL_LISTENER
and INTERNAL_LISTENER
(also in the init.sh
property):
sed -i "s/#listeners=PLAINTEXT:\/\/:9092/listeners=INTERNAL_PLAINTEXT:\/\/0.0.0.0:9092,EXTERNAL_PLAINTEXT:\/\/0.0.0.0:9093/" /etc/kafka/server.properties
sed -i "s/#advertised.listeners=PLAINTEXT:\/\/your.host.name:9092/advertised.listeners=INTERNAL_PLAINTEXT:\/\/$HOSTNAME.broker.kafka.svc.cluster.local:9092,EXTERNAL_PLAINTEXT:\/\/$EXTERNAL_LISTENER_IP:$EXTERNAL_LISTENER_PORT/" /etc/kafka/server.properties
Obviously, this is not a full solution for production (for example addressing security for the externally exposed brokers) and I'm still refining my understanding of how to also let internal producer/consumers to also communicate with the brokers.
However, so far this is the best approach for my understanding of Kubernetes and Kafka.
I'd like to say that I'd read this Question and Answer 3 times before from trying to wrap my head around what Headless Services were/what the point of them was. (and I never fully understood Headless Services, or what this Q&A was about.)
And on the 4th read (revisiting it after further educating myself) it finally clicked/I finally understood.
So the purpose of this answer is to restate Nadir's question/problem/and answer as if explaining it to a grade schooler. So that others who stumble upon this will get the significance of Nadir's awesome solution on the first read.
Useful Background Knowledge:
There exists a Service of type: ExternalName.
ExternalName Service simply points to a DNS address.
There are 2 Flavors of ExternalName Service:
With a Cluster IP:
This is the version of an ExternalName Service that is key to the solution.
A Stateful Set has 3 parts to its identity:
A Persistent and predictable Inner Cluster DNS Name (it gets this from the requirement that it must be shipped with a Headless service)
There are 3 important things to remember about Kube-Proxy:
KubeProxy is responsible for mapping Traffic that comes in on NodePorts to a corresponding Kubernetes Service with a static Cluster IP. <-- This is very important to the requirement that the Stateful Services should be externally exposed, NodePorts are always supposed to be involved when it comes to externally exposing services.
There are 4 important things to remember about a Headless Service:
Now that we understand the problem better, lets go back to the question: How can a Headless Service (which points to an individual member of a stateful set) be externally exposed?
Solution Part 1:
Any pod in the cluster can talk to the members of the statefulset.
Because the stateful generate a headless service, with a predictable inner cluster DNS address of the form:
statefulsetname-#.associatedheadlessservice.namespace.svc.cluster.local:port
kafka-0.broker.kafka.svc.cluster.local:9092
kafka-1.broker.kafka.svc.cluster.local:9092
kafka-2.broker.kafka.svc.cluster.local:9092
broker.kafka.svc.cluster.local:9092, can also be used to refer to which ever one's available.
Solution Part 2:
You allow external traffic to talk to members of the stateful set, by introducing a 2nd service that can accept external traffic, and then redirecting traffic from that service to the headless service that can only accept internet traffic.
For each pod in the Stateful Set a Service of type ExternalName with a Virtual Static ClusterIP Address that's managed by Kube-Proxy is created. Each one of these ExternalName Services points to/redirects traffic to a predictable static inner cluster DNS Address identified in Solution 1, and because this ExternalName service has a Virtual Static ClusterIP managed via Kube-Proxy, there can be a mapping from NodePorts to it.
Change the service from a headless ClusterIP into a NodePort which would forward request to any of the nodes on a set port (30092 in my example) to port 9042 on the Kafkas. You would hit one of the pods, on random, but I guess that is fine.
20dns.yml becomes (something like this):
# A no longer headless service to create DNS records
---
apiVersion: v1
kind: Service
metadata:
name: broker
namespace: kafka
spec:
type: NodePort
ports:
- port: 9092
- nodePort: 30092
# [podname].broker.kafka.svc.cluster.local
selector:
app: kafka
Disclaimer: You might need two services. One headless for the internal dns names and one NodePort for the external access. I haven't tried this my self.
From the kubernetes kafka documentation:
Outside access with hostport
An alternative is to use the hostport for the outside access. When using this only one kafka broker can run on each host, which is a good idea anyway.
In order to switch to hostport the kafka advertise address needs to be switched to the ExternalIP or ExternalDNS name of the node running the broker. in kafka/10broker-config.yml switch to
OUTSIDE_HOST=$(kubectl get node "$NODE_NAME" -o jsonpath='{.status.addresses[?(@.type=="ExternalIP")].address}') OUTSIDE_PORT=${OutsidePort}
and in kafka/50kafka.yml add the hostport:
- name: outside containerPort: 9094 hostPort: 9094