Kafka listeners are wrong in the confluentic kubernetes setup

4/16/2020

I can't connect to my kafka cluster from the outside. There seems to be a problem with the listeners and advertised listeners.

Any suggestions?

When I try to connect from the outside on port 30092, then I always get a reference back to kafka-svc:9092

  • Cluster name: dev-docker-x02
  • How I test: default kafka for windows: .\bin\windows\kafka-topics.bat --list --bootstrap-server dev-docker-x02:30092
  • Requirement: use confluentinc/cp-kafka:5.4.0-1-ubi8

My setup:

enter image description here

My broker configuration (The problem seems to be in the (advertised) listeners.

kind: Deployment
metadata:
  name: kafka-deploy
spec:
  replicas: 1
  selector:
    matchLabels:
        app: kafka-pod
  template:
    metadata:
      labels:
        app: kafka-pod
    spec:
      containers:
      - name: kafka-ctr         # Container name
        image: confluentinc/cp-kafka:5.4.0-1-ubi8
        ports:
        - containerPort: 9092   # Port exposed by the container
        env:
        - name: KAFKA_BROKER_ID
          value: "0"
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zookeeper-svc:2181
        - name: KAFKA_LISTENERS
          value: "LISTENER_INTERNAL://:9092,LISTENER_EXTERNAL://:30092"
        - name: KAFKA_ADVERTISED_LISTENERS
          value: "LISTENER_INTERNAL://kafka-svc:9092,LISTENER_EXTERNAL://dev-kube-x02:30092"
        - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
          value: "LISTENER_INTERNAL:PLAINTEXT,LISTENER_EXTERNAL:PLAINTEXT"
        - name: KAFKA_INTER_BROKER_LISTENER_NAME
          value: "LISTENER_EXTERNAL"
        - name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
          value: "false"
        - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
          value: "1"
        - name: KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS
          value: "100"
-- Dimitri Dewaele
apache-kafka
confluent
docker
kubernetes
service

2 Answers

4/17/2020

If you used the Confluent Helm Charts and read through the docs there, then you can configure different, functional options for remote listeners.

Also, I'd suggest using an Operator rather than a simple Deployment https://operatorhub.io/?keyword=kafka

Alternatively, use Docker Compose if you are just on a single machine

-- OneCricketeer
Source: StackOverflow

4/17/2020

Kafka has a lot of components like Headless Services, Statefulsets and each one has a distinctive role. For that reason I'd suggest too the usage of Kafka Confluentic Helm Chart.

This guide is based on the helm chart since you mentioned you'd use it in the comments but the concepts here can be extended to any application that uses headless services and need external access.

For what you provided, I believe you are facing some difficulties because you are referencing a headless service externally, which will not work since the headless service does not have an internal operational IP.

The Headless Service is created alongside the StatefulSet. The created service will not be given a clusterIP, but will instead simply include a list of Endpoints. These Endpoints are then used to generate instance-specific DNS records in the form of: <StatefulSet>-<Ordinal>.<Service>.<Namespace>.svc.cluster.local

It creates a DNS name for each pod, e.g:

[ root@curl:/ ]$ nslookup my-confluent-cp-kafka-headless
Server:    10.0.0.10
Address 1: 10.0.0.10 kube-dns.kube-system.svc.cluster.local

Name:      my-confluent-cp-kafka-headless
Address 1: 10.8.0.23 my-confluent-cp-kafka-1.my-confluent-cp-kafka-headless.default.svc.cluster.local
Address 2: 10.8.1.21 my-confluent-cp-kafka-0.my-confluent-cp-kafka-headless.default.svc.cluster.local
Address 3: 10.8.3.7 my-confluent-cp-kafka-2.my-confluent-cp-kafka-headless.default.svc.cluster.local
  • This is what makes this services connect to each other inside the cluster.

  • You can't, therefore, expose cp-kafka:9092 which is the headless service, also only used internally, as I explained above.

  • In order to get outside access you have to set the parameters nodeport.enabled to true as stated here: External Access Parameters.

  • It adds one service to each kafka-N pod during chart deployment.
  • Note that the service created has the selector statefulset.kubernetes.io/pod-name: demo-cp-kafka-0 this is how the service identifies the pod it is intended to connect to.

Reproduction:

  • git clone https://github.com/confluentinc/cp-helm-charts.git
  • edit the file cp-helm-charts/cp-kafka/values.yaml changing the nodeport from false to true and change the ports as you'd like:
nodeport:
  enabled: true
  servicePort: 19092
  firstListenerPort: 31090
  • Deploy the chart:
$ helm install demo cp-helm-charts
$ kubectl get pods
NAME                                       READY   STATUS    RESTARTS   AGE
demo-cp-control-center-6d79ddd776-ktggw    1/1     Running   3          113s
demo-cp-kafka-0                            2/2     Running   1          113s
demo-cp-kafka-1                            2/2     Running   0          94s
demo-cp-kafka-2                            2/2     Running   0          84s
demo-cp-kafka-connect-79689c5c6c-947c4     2/2     Running   2          113s
demo-cp-kafka-rest-56dfdd8d94-79kpx        2/2     Running   1          113s
demo-cp-ksql-server-c498c9755-jc6bt        2/2     Running   2          113s
demo-cp-schema-registry-5f45c498c4-dh965   2/2     Running   3          113s
demo-cp-zookeeper-0                        2/2     Running   0          112s
demo-cp-zookeeper-1                        2/2     Running   0          93s
demo-cp-zookeeper-2                        2/2     Running   0          74s

$ kubectl get svc
NAME                         TYPE        CLUSTER-IP    EXTERNAL-IP   PORT(S)             AGE
demo-cp-control-center       ClusterIP   10.0.13.134   <none>        9021/TCP            50m
demo-cp-kafka                ClusterIP   10.0.15.71    <none>        9092/TCP            50m
demo-cp-kafka-0-nodeport     NodePort    10.0.7.101    <none>        19092:31090/TCP     50m
demo-cp-kafka-1-nodeport     NodePort    10.0.4.234    <none>        19092:31091/TCP     50m
demo-cp-kafka-2-nodeport     NodePort    10.0.3.194    <none>        19092:31092/TCP     50m
demo-cp-kafka-connect        ClusterIP   10.0.3.217    <none>        8083/TCP            50m
demo-cp-kafka-headless       ClusterIP   None          <none>        9092/TCP            50m
demo-cp-kafka-rest           ClusterIP   10.0.14.27    <none>        8082/TCP            50m
demo-cp-ksql-server          ClusterIP   10.0.7.150    <none>        8088/TCP            50m
demo-cp-schema-registry      ClusterIP   10.0.7.84     <none>        8081/TCP            50m
demo-cp-zookeeper            ClusterIP   10.0.9.119    <none>        2181/TCP            50m
demo-cp-zookeeper-headless   ClusterIP   None          <none>        2888/TCP,3888/TCP   50m
  • My Node is on IP 35.226.189.123 and I'll connect to the demo-cp-kafka-0-nodeport nodeport service which is on port 31090, now let's try to connect from outside the cluster. For that I'll connect to another VM where I have a minikube, so I can use kafka-client pod to test:
user@minikube:~$ kubectl get pods
NAME           READY   STATUS    RESTARTS   AGE
kafka-client   1/1     Running   0          17h

user@minikube:~$ kubectl exec kafka-client -it -- bin/bash

root@kafka-client:/# kafka-console-consumer --bootstrap-server 35.226.189.123:31090 --topic demo-topic --from-beginning --timeout-ms 8000 --max-messages 1
Wed Apr 15 18:19:48 UTC 2020
Processed a total of 1 messages
root@kafka-client:/# 

As you can see, I was able to access the kafka from outside.

  • Using this method, the helm chart will create 1 external service for each replica you define.
  • If you need external access to Zookeeper it's not automatically provisioned like the kafka agent, but I'll leave a service model for you:

zookeeper-external-0.yaml

apiVersion: v1
kind: Service
metadata:
  labels:
    app: cp-zookeeper
    pod: demo-cp-zookeeper-0
  name: demo-cp-zookeeper-0-nodeport
  namespace: default
spec:
  externalTrafficPolicy: Cluster
  ports:
  - name: external-broker
    nodePort: 31181
    port: 12181
    protocol: TCP
    targetPort: 31181
  selector:
    app: cp-zookeeper
    statefulset.kubernetes.io/pod-name: demo-cp-zookeeper-0
  sessionAffinity: None
  type: NodePort
  • It will create a service for it:
NAME                           TYPE        CLUSTER-IP    EXTERNAL-IP   PORT(S)             AGE
demo-cp-zookeeper-0-nodeport   NodePort    10.0.5.67     <none>        12181:31181/TCP     2s
  • Test it with your external IP:
pod/zookeeper-client created
user@minikube:~$ kubectl exec -it zookeeper-client -- /bin/bash
root@zookeeper-client:/# zookeeper-shell 35.226.189.123:31181
Connecting to 35.226.189.123:31181
Welcome to ZooKeeper!
JLine support is disabled

If you have any doubts, let me know in the comments!

-- willrof
Source: StackOverflow