Listening to kafak on kubernetes from outside the cluster

7/16/2019

I'm running a kubernetes cluster on google cloud platform and installed kafka (https://hub.kubeapps.com/charts/bitnami/kafka) on it using Helm charts. I also have another deployement that runs a python pod. I've exposed kafka and zookeper using LoadBalancers. Here's what I get when I run kubectl get all,(IP addresses have been changed)

kubectl get all

NAME                                    READY   STATUS    RESTARTS   AGE
pod/my-kafka-0                          1/1     Running   1          3h2m
pod/my-kafka-zookeepe-0                 1/1     Running   0          3h2m
pod/my-python-6c746645f5-5xvsb          1/1     Running   0          34m

NAME                                        TYPE           CLUSTER-IP    EXTERNAL-IP     PORT(S)                                        AGE
service/kubernetes                          ClusterIP      10.10.0.1     <none>          443/TCP                                        3h16m
service/my-kafka                            LoadBalancer   10.10.0.110   35.35.135.150   9092:30769/TCP                                 3h2m
service/my-kafka-headless                   ClusterIP      None          <none>          9092/TCP                                       3h2m
service/my-kafka-zookeepe                   LoadBalancer   10.10.0.45    35.35.135.160   2181:32740/TCP,2888:31095/TCP,3888:30057/TCP   3h2m
service/my-kafka-zookeepe-headless          ClusterIP      None          <none>          2181/TCP,2888/TCP,3888/TCP                     3h2m
service/my-python                           ClusterIP      10.10.10.80   <none>          9999/TCP                                       171m

NAME                              DESIRED   CURRENT   UP-TO-DATE   AVAILABLE   AGE
deployment.apps/my-python         1         1         1            1           136m

NAME                                         DESIRED   CURRENT   READY   AGE
replicaset.apps/my-python-6c746645f5         1         1         1       35m
replicaset.apps/my-python-848f769cd          0         0         0       136m

NAME                                        DESIRED   CURRENT   AGE
statefulset.apps/my-kafka                   1         1         3h2m
statefulset.apps/my-kafka-zookeepe          1         1         3h2m

If I open a terminal to the python pod I can access the kafka services. I can create topics, create producers and consumers using python and it works without any issue. Following are pieces of code that I used to test this.

kubectl exec -it my-python-6c746645f5-5xvsb /bin/bash
from kafka.admin import KafkaAdminClient, NewTopic
admin_client = KafkaAdminClient(bootstrap_servers="my-kafka-headless.default.svc.cluster.local:9092", client_id='test')

topic_list = []
topic_list.append(NewTopic(name="test-topic", num_partitions=1, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='my-kafka-headless.default.svc.cluster.local:9092')

producer.send('test-topic', b'message')
from kafka import KafkaConsumer

while True:

    consumer = KafkaConsumer('test-topic',
                            bootstrap_servers='my-kafka-headless.default.svc.cluster.local:9092')
    for msg in consumer:
        print (msg)

The value of the bootstrap server was found from the kafka configuration yaml file.

- name: KAFKA_CFG_ADVERTISED_LISTENERS
  value: PLAINTEXT://$(MY_POD_NAME).my-kafka-headless.default.svc.cluster.local:$(KAFKA_PORT_NUMBER)

Up to this point everything seems to work fine. Now, if I try to access the kafka brokers from outside using the external IP, it doesn't seem to work. I can see the topics using the following.

c= kafka.KafkaConsumer(bootstrap_servers=["35.35.135.150:9092"])
c.topics()
set([test-topic'])

However, I can't see any messages using the following.

from kafka import KafkaConsumer

while True:

    consumer = KafkaConsumer('test-topic',
                            bootstrap_servers=["35.35.135.150:9092"])
    for msg in consumer:
        print (msg)

I don't get any errors either. I can't seem to figure out what I'm doing wrong.

-- rasthiya
apache-kafka
google-kubernetes-engine
kafka-python
kubernetes

1 Answer

7/17/2019

Thanks to cricket_007's comment I was able to fix the issue. I modified the configuration file to include the external endpoint as the advertised listener and moved the internal service name to the listeners.

- name: KAFKA_CFG_LISTENERS
  value: PLAINTEXT://$(MY_POD_NAME).my-kafka-headless.default.svc.cluster.local:$(KAFKA_PORT_NUMBER)
- name: KAFKA_CFG_ADVERTISED_LISTENERS
  value: PLAINTEXT://35.35.135.150:9092
-- rasthiya
Source: StackOverflow