I'm running a kubernetes cluster on google cloud platform and installed kafka (https://hub.kubeapps.com/charts/bitnami/kafka) on it using Helm charts. I also have another deployement that runs a python pod. I've exposed kafka and zookeper using LoadBalancers. Here's what I get when I run kubectl get all
,(IP addresses have been changed)
kubectl get all
NAME READY STATUS RESTARTS AGE
pod/my-kafka-0 1/1 Running 1 3h2m
pod/my-kafka-zookeepe-0 1/1 Running 0 3h2m
pod/my-python-6c746645f5-5xvsb 1/1 Running 0 34m
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/kubernetes ClusterIP 10.10.0.1 <none> 443/TCP 3h16m
service/my-kafka LoadBalancer 10.10.0.110 35.35.135.150 9092:30769/TCP 3h2m
service/my-kafka-headless ClusterIP None <none> 9092/TCP 3h2m
service/my-kafka-zookeepe LoadBalancer 10.10.0.45 35.35.135.160 2181:32740/TCP,2888:31095/TCP,3888:30057/TCP 3h2m
service/my-kafka-zookeepe-headless ClusterIP None <none> 2181/TCP,2888/TCP,3888/TCP 3h2m
service/my-python ClusterIP 10.10.10.80 <none> 9999/TCP 171m
NAME DESIRED CURRENT UP-TO-DATE AVAILABLE AGE
deployment.apps/my-python 1 1 1 1 136m
NAME DESIRED CURRENT READY AGE
replicaset.apps/my-python-6c746645f5 1 1 1 35m
replicaset.apps/my-python-848f769cd 0 0 0 136m
NAME DESIRED CURRENT AGE
statefulset.apps/my-kafka 1 1 3h2m
statefulset.apps/my-kafka-zookeepe 1 1 3h2m
If I open a terminal to the python pod I can access the kafka services. I can create topics, create producers and consumers using python and it works without any issue. Following are pieces of code that I used to test this.
kubectl exec -it my-python-6c746645f5-5xvsb /bin/bash
from kafka.admin import KafkaAdminClient, NewTopic
admin_client = KafkaAdminClient(bootstrap_servers="my-kafka-headless.default.svc.cluster.local:9092", client_id='test')
topic_list = []
topic_list.append(NewTopic(name="test-topic", num_partitions=1, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
from kafka import KafkaProducer
producer = KafkaProducer(bootstrap_servers='my-kafka-headless.default.svc.cluster.local:9092')
producer.send('test-topic', b'message')
from kafka import KafkaConsumer
while True:
consumer = KafkaConsumer('test-topic',
bootstrap_servers='my-kafka-headless.default.svc.cluster.local:9092')
for msg in consumer:
print (msg)
The value of the bootstrap server was found from the kafka configuration yaml file.
- name: KAFKA_CFG_ADVERTISED_LISTENERS
value: PLAINTEXT://$(MY_POD_NAME).my-kafka-headless.default.svc.cluster.local:$(KAFKA_PORT_NUMBER)
Up to this point everything seems to work fine. Now, if I try to access the kafka brokers from outside using the external IP, it doesn't seem to work. I can see the topics using the following.
c= kafka.KafkaConsumer(bootstrap_servers=["35.35.135.150:9092"])
c.topics()
set([test-topic'])
However, I can't see any messages using the following.
from kafka import KafkaConsumer
while True:
consumer = KafkaConsumer('test-topic',
bootstrap_servers=["35.35.135.150:9092"])
for msg in consumer:
print (msg)
I don't get any errors either. I can't seem to figure out what I'm doing wrong.
Thanks to cricket_007's comment I was able to fix the issue. I modified the configuration file to include the external endpoint as the advertised listener and moved the internal service name to the listeners.
- name: KAFKA_CFG_LISTENERS
value: PLAINTEXT://$(MY_POD_NAME).my-kafka-headless.default.svc.cluster.local:$(KAFKA_PORT_NUMBER)
- name: KAFKA_CFG_ADVERTISED_LISTENERS
value: PLAINTEXT://35.35.135.150:9092