How to specify advertised listeners for Kafka multi broker setup on kubernetes and expose the cluster expernally?

8/1/2017

I am trying to setup a multi broker kafka on a kubernetes cluster hosted in Azure. I have a single broker setup working. For the multi broker setup, currently I have an ensemble of zookeeper nodes(3) that manage the kafka service. I am deploying the kafka cluster as a replication controller with replication factor of 3. That is 3 brokers. How can I register the three brokers with Zookeeper such that they register different IP addresses with the Zookeeper?

I bring up my replication controller after the service is deployed and use the Cluster IP in my replication-controller yaml file to specify two advertised.listeners, one for SSL and another for PLAINTEXT. However, in this scenario all brokers register with the same IP and write to replicas fail. I don't want to deploy each broker as a separate replication controller/pod and service as scaling becomes an issue. I would really appreciate any thoughts/ideas on this.

Edit 1:

I am additionally trying to expose the cluster to another VPC in cloud. I have to expose SSL and PLAINTEXT ports for clients which I am doing using advertised.listeners. If I use a statefulset with replication factor of 3 and let kubernetes expose the canonical host names of the pods as host names, these cannot be resolved from an external client. The only way I got this working is to use/expose an external service corresponding to each broker. However, this does not scale.

-- Annu
apache-kafka
apache-zookeeper
kubernetes
messagebroker

1 Answer

8/2/2017

Kubernetes has the concept of Statefulsets to solve these issues. Each instance of a statefulset has it's own DNS name so you can reference to each instance by a dns name.

This concept is described here in more detail. You can also take a look at this complete example:

apiVersion: v1
kind: Service
metadata:
  name: zk-headless
  labels:
    app: zk-headless
spec:
  ports:
  - port: 2888
    name: server
  - port: 3888
    name: leader-election
  clusterIP: None
  selector:
    app: zk
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: zk-config
data:
  ensemble: "zk-0;zk-1;zk-2"
  jvm.heap: "2G"
  tick: "2000"
  init: "10"
  sync: "5"
  client.cnxns: "60"
  snap.retain: "3"
  purge.interval: "1"
---
apiVersion: policy/v1beta1
kind: PodDisruptionBudget
metadata:
  name: zk-budget
spec:
  selector:
    matchLabels:
      app: zk
  minAvailable: 2
---
apiVersion: apps/v1beta1
kind: StatefulSet
metadata:
  name: zk
spec:
  serviceName: zk-headless
  replicas: 3
  template:
    metadata:
      labels:
        app: zk
      annotations:
        pod.alpha.kubernetes.io/initialized: "true"

    spec:
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            - labelSelector:
                matchExpressions:
                  - key: "app"
                    operator: In
                    values: 
                    - zk-headless
              topologyKey: "kubernetes.io/hostname"
      containers:
      - name: k8szk
        imagePullPolicy: Always
        image: gcr.io/google_samples/k8szk:v1
        resources:
          requests:
            memory: "4Gi"
            cpu: "1"
        ports:
        - containerPort: 2181
          name: client
        - containerPort: 2888
          name: server
        - containerPort: 3888
          name: leader-election
        env:
        - name : ZK_ENSEMBLE
          valueFrom:
            configMapKeyRef:
              name: zk-config
              key: ensemble
        - name : ZK_HEAP_SIZE
          valueFrom:
            configMapKeyRef:
                name: zk-config
                key: jvm.heap
        - name : ZK_TICK_TIME
          valueFrom:
            configMapKeyRef:
                name: zk-config
                key: tick
        - name : ZK_INIT_LIMIT
          valueFrom:
            configMapKeyRef:
                name: zk-config
                key: init
        - name : ZK_SYNC_LIMIT
          valueFrom:
            configMapKeyRef:
                name: zk-config
                key: tick
        - name : ZK_MAX_CLIENT_CNXNS
          valueFrom:
            configMapKeyRef:
                name: zk-config
                key: client.cnxns
        - name: ZK_SNAP_RETAIN_COUNT
          valueFrom:
            configMapKeyRef:
                name: zk-config
                key: snap.retain
        - name: ZK_PURGE_INTERVAL
          valueFrom:
            configMapKeyRef:
                name: zk-config
                key: purge.interval
        - name: ZK_CLIENT_PORT
          value: "2181"
        - name: ZK_SERVER_PORT
          value: "2888"
        - name: ZK_ELECTION_PORT
          value: "3888"
        command:
        - sh
        - -c
        - zkGenConfig.sh && zkServer.sh start-foreground
        readinessProbe:
          exec:
            command:
            - "zkOk.sh"
          initialDelaySeconds: 15
          timeoutSeconds: 5
        livenessProbe:
          exec:
            command:
            - "zkOk.sh"
          initialDelaySeconds: 15
          timeoutSeconds: 5
        volumeMounts:
        - name: datadir
          mountPath: /var/lib/zookeeper
      securityContext:
        runAsUser: 1000
        fsGroup: 1000
  volumeClaimTemplates:
  - metadata:
      name: datadir
    spec:
      accessModes: [ "ReadWriteOnce" ]
      resources:
        requests:
          storage: 20Gi
-- Lukas Eichler
Source: StackOverflow