Accessing bitnami/kafka outside the kubernetes cluster

10/20/2019

I am currently using bitnami/kafka image(https://hub.docker.com/r/bitnami/kafka) and deploying it on kubernetes.

  • kubernetes master: 1
  • kubernetes workers: 3

Within the cluster the other application are able to find kafka. The problem occurs when trying to access the kafka container from outside the cluster. When reading little bit I read that we need to set property "advertised.listener=PLAINTTEXT://hostname:port_number" for external kafka clients.

I am currently referencing "https://github.com/bitnami/charts/tree/master/bitnami/kafka". Inside my values.yaml file I have added

values.yaml

  • advertisedListeners1: 10.21.0.191

and statefulset.yaml

    - name: KAFKA_CFG_ADVERTISED_LISTENERS
      value: 'PLAINTEXT://{{ .Values.advertisedListeners }}:9092' 

For a single kafka instance it is working fine.

But for 3 node kafka cluster, I changed some configuration like below: values.yaml

  • advertisedListeners1: 10.21.0.191
  • advertisedListeners2: 10.21.0.192
  • advertisedListeners3: 10.21.0.193

and Statefulset.yaml

    - name: KAFKA_CFG_ADVERTISED_LISTENERS
      {{- if $MY_POD_NAME := "kafka-0" }}
      value: 'PLAINTEXT://{{ .Values.advertisedListeners1 }}:9092'
      {{- else if $MY_POD_NAME := "kafka-1" }}
      value: 'PLAINTEXT://{{ .Values.advertisedListeners2 }}:9092'
      {{- else if $MY_POD_NAME := "kafka-2" }}
      value: 'PLAINTEXT://{{ .Values.advertisedListeners3 }}:9092'
      {{- end }}

Expected result is that all the 3 kafka instances should get advertised.listener property set to worker nodes ip address.

example:

  • kafka-0 --> "PLAINTEXT://10.21.0.191:9092"

  • kafka-1 --> "PLAINTEXT://10.21.0.192:9092"

  • kafka-3 --> "PLAINTEXT://10.21.0.193:9092"

Currently only one kafka pod in up and running and the other two are going to crashloopbackoff state.

and the other two pods are showing error as:

[2019-10-20 13:09:37,753] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler) [2019-10-20 13:09:37,786] ERROR [KafkaServer id=1002] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) java.lang.IllegalArgumentException: requirement failed: Configured end points 10.21.0.191:9092 in advertised listeners are already registered by broker 1001 at scala.Predef$.require(Predef.scala:224) at kafka.server.KafkaServer$anonfun$createBrokerInfo$2.apply(KafkaServer.scala:399) at kafka.server.KafkaServer$anonfun$createBrokerInfo$2.apply(KafkaServer.scala:397) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at kafka.server.KafkaServer.createBrokerInfo(KafkaServer.scala:397) at kafka.server.KafkaServer.startup(KafkaServer.scala:261) at kafka.server.KafkaServerStartable.startup(KafkaServerStartable.scala:38) at kafka.Kafka$.main(Kafka.scala:84) at kafka.Kafka.main(Kafka.scala)

That means the logic applied in statefulset.yaml is not working. Can anyone help me in resolving this..?

Any help would be appreciated..

The output of kubectl get statefulset kafka -o yaml

apiVersion: apps/v1
kind: StatefulSet
metadata:
  creationTimestamp: "2019-10-29T07:04:12Z"
  generation: 1
  labels:
    app.kubernetes.io/component: kafka
    app.kubernetes.io/instance: kafka
    app.kubernetes.io/managed-by: Tiller
    app.kubernetes.io/name: kafka
    helm.sh/chart: kafka-6.0.1
  name: kafka
  namespace: default
  resourceVersion: "12189730"
  selfLink: /apis/apps/v1/namespaces/default/statefulsets/kafka
  uid: d40cfd5f-46a6-49d0-a9d3-e3a851356063
spec:
  podManagementPolicy: Parallel
  replicas: 3
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      app.kubernetes.io/component: kafka
      app.kubernetes.io/instance: kafka
      app.kubernetes.io/name: kafka
  serviceName: kafka-headless
  template:
    metadata:
      creationTimestamp: null
      labels:
        app.kubernetes.io/component: kafka
        app.kubernetes.io/instance: kafka
        app.kubernetes.io/managed-by: Tiller
        app.kubernetes.io/name: kafka
        helm.sh/chart: kafka-6.0.1
      name: kafka
    spec:
      containers:
      - env:
        - name: MY_POD_IP
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: status.podIP
        - name: MY_POD_NAME
          valueFrom:
            fieldRef:
              apiVersion: v1
              fieldPath: metadata.name
        - name: KAFKA_CFG_ZOOKEEPER_CONNECT
          value: kafka-zookeeper
        - name: KAFKA_PORT_NUMBER
          value: "9092"
        - name: KAFKA_CFG_LISTENERS
          value: PLAINTEXT://:$(KAFKA_PORT_NUMBER)
        - name: KAFKA_CFG_ADVERTISED_LISTENERS
          value: PLAINTEXT://10.21.0.191:9092
        - name: ALLOW_PLAINTEXT_LISTENER
          value: "yes"
        - name: KAFKA_CFG_BROKER_ID
          value: "-1"
        - name: KAFKA_CFG_DELETE_TOPIC_ENABLE
          value: "false"
        - name: KAFKA_HEAP_OPTS
          value: -Xmx1024m -Xms1024m
        - name: KAFKA_CFG_LOG_FLUSH_INTERVAL_MESSAGES
          value: "10000"
        - name: KAFKA_CFG_LOG_FLUSH_INTERVAL_MS
          value: "1000"
        - name: KAFKA_CFG_LOG_RETENTION_BYTES
          value: "1073741824"
        - name: KAFKA_CFG_LOG_RETENTION_CHECK_INTERVALS_MS
          value: "300000"
        - name: KAFKA_CFG_LOG_RETENTION_HOURS
          value: "168"
        - name: KAFKA_CFG_LOG_MESSAGE_FORMAT_VERSION
        - name: KAFKA_CFG_MESSAGE_MAX_BYTES
          value: "1000012"
        - name: KAFKA_CFG_LOG_SEGMENT_BYTES
          value: "1073741824"
        - name: KAFKA_CFG_LOG_DIRS
          value: /bitnami/kafka/data
        - name: KAFKA_CFG_DEFAULT_REPLICATION_FACTOR
          value: "1"
        - name: KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR
          value: "1"
        - name: KAFKA_CFG_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
          value: "1"
        - name: KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM
          value: https
        - name: KAFKA_CFG_TRANSACTION_STATE_LOG_MIN_ISR
          value: "1"
        - name: KAFKA_CFG_NUM_IO_THREADS
          value: "8"
        - name: KAFKA_CFG_NUM_NETWORK_THREADS
          value: "3"
        - name: KAFKA_CFG_NUM_PARTITIONS
          value: "1"
        - name: KAFKA_CFG_NUM_RECOVERY_THREADS_PER_DATA_DIR
          value: "1"
        - name: KAFKA_CFG_SOCKET_RECEIVE_BUFFER_BYTES
          value: "102400"
        - name: KAFKA_CFG_SOCKET_REQUEST_MAX_BYTES
          value: "104857600"
        - name: KAFKA_CFG_SOCKET_SEND_BUFFER_BYTES
          value: "102400"
        - name: KAFKA_CFG_ZOOKEEPER_CONNECTION_TIMEOUT_MS
          value: "6000"
        image: docker.io/bitnami/kafka:2.3.0-debian-9-r88
        imagePullPolicy: IfNotPresent
        livenessProbe:
          failureThreshold: 2
          initialDelaySeconds: 10
          periodSeconds: 10
          successThreshold: 1
          tcpSocket:
            port: kafka
          timeoutSeconds: 5
        name: kafka
        ports:
        - containerPort: 9092
          name: kafka
          protocol: TCP
        readinessProbe:
          failureThreshold: 6
          initialDelaySeconds: 5
          periodSeconds: 10
          successThreshold: 1
          tcpSocket:
            port: kafka
          timeoutSeconds: 5
        resources: {}
        terminationMessagePath: /dev/termination-log
        terminationMessagePolicy: File
        volumeMounts:
        - mountPath: /bitnami/kafka
          name: data
      dnsPolicy: ClusterFirst
      restartPolicy: Always
      schedulerName: default-scheduler
      securityContext:
        fsGroup: 1001
        runAsUser: 1001
      terminationGracePeriodSeconds: 30
  updateStrategy:
    type: RollingUpdate
  volumeClaimTemplates:
  - metadata:
      creationTimestamp: null
      name: data
    spec:
      accessModes:
      - ReadWriteOnce
      resources:
        requests:
          storage: 8Gi
      volumeMode: Filesystem
    status:
      phase: Pending
status:
  collisionCount: 0
  currentReplicas: 3
  currentRevision: kafka-56ff499d74
  observedGeneration: 1
  readyReplicas: 1
  replicas: 3
  updateRevision: kafka-56ff499d74
  updatedReplicas: 3
-- Nikhil
apache-kafka
bitnami
kubernetes
kubernetes-helm
kubernetes-statefulset

1 Answer

10/30/2019

I see you have some trouble with passing different environment variables for differents pods in a StatefulSet.

You are trying to achieve this using helm templates:

- name: KAFKA_CFG_ADVERTISED_LISTENERS
  {{- if $MY_POD_NAME := "kafka-0" }}
  value: 'PLAINTEXT://{{ .Values.advertisedListeners1 }}:9092'
  {{- else if $MY_POD_NAME := "kafka-1" }}
  value: 'PLAINTEXT://{{ .Values.advertisedListeners2 }}:9092'
  {{- else if $MY_POD_NAME := "kafka-2" }}
  value: 'PLAINTEXT://{{ .Values.advertisedListeners3 }}:9092'
  {{- end }}

In helm template guide documentation you can find this explaination:

In Helm templates, a variable is a named reference to another object. It follows the form $name. Variables are assigned with a special assignment operator: :=.

Now let's look at your code:

{{- if $MY_POD_NAME := "kafka-0" }}

This is variable assignment, not comparasion and after this assignment, if statement evaluates this expression to true and that's why in your staefulset yaml manifest you see this as an output:

- name: KAFKA_CFG_ADVERTISED_LISTENERS
    value: PLAINTEXT://10.21.0.191:9092

To make it work as expected, you shouldn't use helm templating. It's not going to work.

One way to do it would be to create separate enviroment variable for every kafka node and pass all of these variables to all pods, like this:

- env:
  - name: MY_POD_NAME
    valueFrom:
      fieldRef:
        apiVersion: v1
        fieldPath: metadata.name
  - name: KAFKA_0
      value: 10.21.0.191
  - name: KAFKA_1
      value: 10.21.0.192
  - name: KAFKA_2
      value: 10.21.0.193
#  - name: KAFKA_CFG_ADVERTISED_LISTENERS
#      value: PLAINTEXT://$MY_POD_NAME:9092

and also create your own docker image with modified starting script that will export KAFKA_CFG_ADVERTISED_LISTENERS variable with appropriate value depending on MY_POD_NAME.

If you dont want to create your own image, you can create a ConfigMap with modified entrypoint.sh and mount it in place of old entrypoint.sh (you can also use any other file, just take a look here for more information on how kafka image is built).

Mounting ConfigMap looks like this:

apiVersion: v1
kind: Pod
metadata:
  name: test
spec:
  containers:
    - name: test-container
      image: docker.io/bitnami/kafka:2.3.0-debian-9-r88
      volumeMounts:
      - name: config-volume
        mountPath: /entrypoint.sh
        subPath: entrypoint.sh
  volumes:
    - name: config-volume
      configMap:
        # Provide the name of the ConfigMap containing the files you want
        # to add to the container
        name: kafka-entrypoint-config
        defaultMode: 0744 # remember to add proper (executable) permissions

apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-entrypoint-config
  namespace: default
data:
  entrypoint.sh: |
    #!/bin/bash
    # Here add modified entrypoint script

Please let me know if it helped.

-- HelloWorld
Source: StackOverflow