Create kafka cluster on kubernetes

10/30/2018

I'm trying to create a kafka cluster deployed on kubernetes. I have the following configuration:

Kafka service:

apiVersion: v1
kind: Service
metadata:
  name: kafka
  labels:
    app: kafka
  namespace: kafka
spec:
  ports:
  - name: kafka-port
    port: 9093
    protocol: TCP
  selector:
    app: kafka
  type: NodePort

Kafka StatefullSet:

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka
  namespace: kafka
  labels:
    app: kafka
spec:
  replicas: 1
  serviceName: kafka
  podManagementPolicy: Parallel
  updateStrategy:
    type: RollingUpdate
  selector:
    matchLabels:
      app: kafka
  template:
    metadata:
      labels:
        app: kafka
    spec:
      containers:
      - name: kafka
        imagePullPolicy: Always
        image: wurstmeister/kafka
        ports:
        - containerPort: 9093
        command:
        - sh
        - -c
        - "exec kafka-server-start.sh /opt/kafka/config/server.properties --override broker.id=${HOSTNAME##*-} \
                  --override listeners=PLAINTEXT://:9093 \
                  --override zookeeper.connect=zookeeper:2181 \
                  --override log.dir=/var/lib/kafka \
                  --override auto.create.topics.enable=true \
                  --override auto.leader.rebalance.enable=true \
                  --override background.threads=10 \
                  --override compression.type=producer \
                  --override delete.topic.enable=false \
                  --override leader.imbalance.check.interval.seconds=300 \
                  --override leader.imbalance.per.broker.percentage=10 \
                  --override log.flush.interval.messages=9223372036854775807 \
                  --override log.flush.offset.checkpoint.interval.ms=60000 \
                  --override log.flush.scheduler.interval.ms=9223372036854775807 \
                  --override log.retention.bytes=-1 \
                  --override log.retention.hours=168 \
                  --override log.roll.hours=168 \
                  --override log.roll.jitter.hours=0 \
                  --override log.segment.bytes=1073741824 \
                  --override log.segment.delete.delay.ms=60000 \
                  --override message.max.bytes=1000012 \
                  --override min.insync.replicas=1 \
                  --override num.io.threads=8 \
                  --override num.network.threads=3 \
                  --override num.recovery.threads.per.data.dir=1 \
                  --override num.replica.fetchers=1 \
                  --override offset.metadata.max.bytes=4096 \
                  --override offsets.commit.required.acks=-1 \
                  --override offsets.commit.timeout.ms=5000 \
                  --override offsets.load.buffer.size=5242880 \
                  --override offsets.retention.check.interval.ms=600000 \
                  --override offsets.retention.minutes=1440 \
                  --override offsets.topic.compression.codec=0 \
                  --override offsets.topic.num.partitions=50 \
                  --override offsets.topic.replication.factor=1 \
                  --override offsets.topic.segment.bytes=104857600 \
                  --override queued.max.requests=500 \
                  --override quota.consumer.default=9223372036854775807 \
                  --override quota.producer.default=9223372036854775807 \
                  --override replica.fetch.min.bytes=1 \
                  --override replica.fetch.wait.max.ms=500 \
                  --override replica.high.watermark.checkpoint.interval.ms=5000 \
                  --override replica.lag.time.max.ms=10000 \
                  --override replica.socket.receive.buffer.bytes=65536 \
                  --override replica.socket.timeout.ms=30000 \
                  --override request.timeout.ms=30000 \
                  --override socket.receive.buffer.bytes=102400 \
                  --override socket.request.max.bytes=104857600 \
                  --override socket.send.buffer.bytes=102400 \
                  --override unclean.leader.election.enable=true \
                  --override zookeeper.session.timeout.ms=6000 \
                  --override zookeeper.set.acl=false \
                  --override broker.id.generation.enable=true \
                  --override connections.max.idle.ms=600000 \
                  --override controlled.shutdown.enable=true \
                  --override controlled.shutdown.max.retries=3 \
                  --override controlled.shutdown.retry.backoff.ms=5000 \
                  --override controller.socket.timeout.ms=30000 \
                  --override default.replication.factor=1 \
                  --override fetch.purgatory.purge.interval.requests=1000 \
                  --override group.max.session.timeout.ms=300000 \
                  --override group.min.session.timeout.ms=6000 \
                  --override inter.broker.protocol.version=0.11.0 \
                  --override log.cleaner.backoff.ms=15000 \
                  --override log.cleaner.dedupe.buffer.size=134217728 \
                  --override log.cleaner.delete.retention.ms=86400000 \
                  --override log.cleaner.enable=true \
                  --override log.cleaner.io.buffer.load.factor=0.9 \
                  --override log.cleaner.io.buffer.size=524288 \
                  --override log.cleaner.io.max.bytes.per.second=1.7976931348623157E308 \
                  --override log.cleaner.min.cleanable.ratio=0.5 \
                  --override log.cleaner.min.compaction.lag.ms=0 \
                  --override log.cleaner.threads=1 \
                  --override log.cleanup.policy=delete \
                  --override log.index.interval.bytes=4096 \
                  --override log.index.size.max.bytes=10485760 \
                  --override log.message.timestamp.difference.max.ms=9223372036854775807 \
                  --override log.message.timestamp.type=CreateTime \
                  --override log.preallocate=false \
                  --override log.retention.check.interval.ms=300000 \
                  --override max.connections.per.ip=2147483647 \
                  --override num.partitions=1 \
                  --override producer.purgatory.purge.interval.requests=1000 \
                  --override replica.fetch.backoff.ms=1000 \
                  --override replica.fetch.max.bytes=1048576 \
                  --override replica.fetch.response.max.bytes=10485760 \
                  --override reserved.broker.max.id=1000 "
        volumeMounts:
        - name: datadir
          mountPath: /var/lib/kafka
        readinessProbe:
          exec:
            command:
            - sh
            - -c
            - "/opt/kafka/bin/kafka-broker-api-versions.sh --bootstrap-server=localhost:9093"
  volumeClaimTemplates:
  - metadata:
      name: datadir
    spec:
      accessModes: [ "ReadWriteOnce" ]
      resources:
        requests:
          storage: 10Gi

Zookeeper service:

apiVersion: v1
kind: Service
metadata:
  name: zookeeper
  namespace: kafka
  labels:
    name: zookeeper
spec:
  ports:
  - name: client
    port: 2181
    protocol: TCP
  - name: follower
    port: 2888
    protocol: TCP
  - name: leader
    port: 3888
    protocol: TCP
  selector:
    name: zookeeper

Zookeeper Deployment:

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: zookeeper
  namespace: kafka
spec:
  replicas: 1
  template:
    metadata:
      labels:
        name: zookeeper
    spec:
      containers:
      - env:
        - name: ZOOKEEPER_ID
          value: "1"
        - name: ZOOKEEPER_SERVER_1
          value: zookeeper
        name: zookeeper
        image: digitalwonderland/zookeeper
        ports:
        - containerPort: 2181

With this configuration all works well. But I want to add more replicas of kafka. If i try to add another replica, I receive this error:

Error connecting to node kafka-1.kafka.kafka.svc.cluster.local:9093 (id: 1 rack: null) (org.apache.kafka.clients.NetworkClient)
java.io.IOException: Can't resolve address: kafka-1.kafka.kafka.svc.cluster.local:9093

I can’t figure out where "kafka-1.kafka.kafka.svc.cluster.local" is set and if I can come from that. I can see a solution would be to create a service for every pod but with this approach the cluster doesn’t scale easily.

Can I configure my kafka stateful to discover each other without any other service?

EDIT:

The configuration with one replica works only from cluster, if I'm trying to access broker from outside I receive the exception:

Can't resolve address: kafka-0.kafka.kafka.svc.cluster.local:9093

Thanks

-- Dorin
apache-kafka
kubernetes

1 Answer

10/30/2018

You need to set the .metadata.name in the service definition and the .spec.serviceName in the deploy definition to the same name, and the service should be headless service, with 'clusterIP: None' in your setup, then you can resolve kafka-0.kafka.kafka.svc.cluster.local to the pod ip.

-- Kun Li
Source: StackOverflow