Client app tries to producing topic on kafka but stucks and, don't return either any error message neither 200 OK

9/29/2019

We deploy kafka and zookeeper pods on a kubernetes cluster. These two are connected to each other properly. But when we want produce a topic through a client app the PUT request stuck in pending and after a lot of time no message returned! How can I debug this situation? The .yaml files for kafka and zookeeper and client app is like below:

kafka.yaml

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  annotations:
    kompose.cmd: kompose convert
    kompose.version: 1.18.0 (06a2e56)
  creationTimestamp: null

  labels:
    io.kompose.service: kafka
  name: kafka
spec:
  replicas: 1
  strategy:
    type: Recreate
  template:
    metadata:
      creationTimestamp: null
      labels:
        io.kompose.service: kafka
    spec:
      containers:
      - env:
        - name: KAFKA_ADVERTISED_HOST_NAME
          value: kafka
        - name: KAFKA_ADVERTISED_PORT
          value: "9092"
        - name: KAFKA_CREATE_TOPICS
          value: newsrawdata:1:1
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: 192.168.88.42:30573
        - name: KAFKA_PORT
          value: "9092"
        - name: KAFKA_ZOOKEEPER_CONNECT_TIMEOUT_MS
          value: "1000"
        image: wurstmeister/kafka
        name: kafka
        ports:
        - containerPort: 9092
        - containerPort: 9094
        resources: {}
        volumeMounts:
        - mountPath: /var/run/docker.sock
          name: kafka-claim0
      hostname: kafka
      restartPolicy: Always
      volumes:
      - name: kafka-claim0
        persistentVolumeClaim:
          claimName: kafka-claim0
status: {}

zookeeper.yaml:

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  annotations:
    kompose.cmd: kompose convert
    kompose.version: 1.18.0 (06a2e56)
  creationTimestamp: null
  labels:
    io.kompose.service: zookeeper
  name: zookeeper
spec:
  replicas: 1
  strategy: {}
  template:
    metadata:
      creationTimestamp: null
      labels:
        io.kompose.service: zookeeper
    spec:
      containers:
      - env:
        - name: ALLOW_ANONYMOUS_LOGIN
          value: "yes"
        image: wurstmeister/zookeeper
        name: zookeeper
        ports:
        - containerPort: 2181
        resources: {}
      hostname: zookeeper
      restartPolicy: Always
status: {}

app.yaml:

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  labels:
    service: broker-service
  name: broker-app
spec:
  replicas: 1
  template:
    metadata:
      labels:
        service: broker-service
    spec:
      imagePullSecrets:
      - name: pullsecret
      containers:
      - env:
        - name: OHH_COMMON_REDEPLOY
          value: THIS_WILL_BE_REPLACED
        - name: ASPNETCORE_ENVIRONMENT
          value: docker
        image: localgitlabregistry/broker.app:v0.01
        name: broker-app
        imagePullPolicy: "Always"
        ports:
        - containerPort: 80
        - containerPort: 443
      nodeSelector:
        role: slave1
      restartPolicy: Always

And the services are like below:

kafka-service.yaml:

apiVersion: v1
kind: Service
metadata:
  annotations:
    kompose.cmd: kompose convert
    kompose.version: 1.18.0 (06a2e56)
  creationTimestamp: null
  labels:
    io.kompose.service: kafka
  name: kafka
spec:
  ports:
  - name: "9092"
    port: 9092
    targetPort: 9092
  - name: "9094"
    port: 9094
    targetPort: 9094
  clusterIP: None
#  type: NodePort
  selector:
    io.kompose.service: kafka
status:
  loadBalancer: {}

zookeeper-service.yaml

apiVersion: v1
kind: Service
metadata:
  annotations:
    kompose.cmd: kompose convert
    kompose.version: 1.18.0 (06a2e56)
  creationTimestamp: null
  labels:
    io.kompose.service: zookeeper
  name: zookeeper
spec:
  ports:
  - name: "2181"
    port: 2181
    targetPort: 2181
  selector:
    io.kompose.service: zookeeper
status:
  loadBalancer: {}

app-service.yaml

apiVersion: v1
kind: Service
metadata:
  labels:
    service: broker-service
  name: broker-service
spec:
  ports:
  - name: "57270"
    port: 80
    targetPort: 80
  - name: "44348"
    port: 443
    targetPort: 443
  selector:
    service: broker-service
  type: 
    NodePort

The log from kafka pod is like below:

waiting for kafka to be ready
[Configuring] 'advertised.port' in '/opt/kafka/config/server.properties'
Excluding KAFKA_HOME from broker config
[Configuring] 'advertised.host.name' in '/opt/kafka/config/server.properties'
[Configuring] 'port' in '/opt/kafka/config/server.properties'
[Configuring] 'broker.id' in '/opt/kafka/config/server.properties'
Excluding KAFKA_VERSION from broker config
[Configuring] 'zookeeper.connect' in '/opt/kafka/config/server.properties'
[Configuring] 'log.dirs' in '/opt/kafka/config/server.properties'
[Configuring] 'zookeeper.connect.timeout.ms' in '/opt/kafka/config/server.properties'
[2019-09-29 08:06:56,783] INFO Registered kafka:type=kafka.Log4jController MBean (kafka.utils.Log4jControllerRegistration$)
[2019-09-29 08:06:57,767] INFO Registered signal handlers for TERM, INT, HUP (org.apache.kafka.common.utils.LoggingSignalHandler)
[2019-09-29 08:06:57,768] INFO starting (kafka.server.KafkaServer)
[2019-09-29 08:06:57,769] INFO Connecting to zookeeper on 192.168.88.42:30573 (kafka.server.KafkaServer)
[2019-09-29 08:06:57,796] INFO [ZooKeeperClient Kafka server] Initializing a new session to 

.
.
.
[2019-09-29 08:06:57,804] INFO Client environment:java.io.tmpdir=/tmp (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,804] INFO Client environment:java.compiler=<NA> (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,804] INFO Client environment:os.name=Linux (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,804] INFO Client environment:os.arch=amd64 (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,804] INFO Client environment:os.version=4.4.0-116-generic (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,804] INFO Client environment:user.name=root (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,804] INFO Client environment:user.home=/root (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,804] INFO Client environment:user.dir=/ (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,806] INFO Initiating client connection, connectString=192.168.88.42:30573 sessionTimeout=6000 watcher=kafka.zookeeper.ZooKeeperClient$ZooKeeperClientWatcher$@2667f029 (org.apache.zookeeper.ZooKeeper)
[2019-09-29 08:06:57,822] INFO [ZooKeeperClient Kafka server] Waiting until connected. (kafka.zookeeper.ZooKeeperClient)
[2019-09-29 08:06:57,847] INFO Opening socket connection to server 192.168.88.42/192.168.88.42:30573. Will not attempt to authenticate using SASL (unknown error) (org.apache.zookeeper.ClientCnxn)
[2019-09-29 08:06:57,865] INFO Socket connection established to 192.168.88.42/192.168.88.42:30573, initiating session (org.apache.zookeeper.ClientCnxn)
[2019-09-29 08:06:57,880] INFO Session establishment complete on server 192.168.88.42/192.168.88.42:30573, sessionid = 0x10005366a620042, negotiated timeout = 6000 (org.apache.zookeeper.ClientCnxn)
[2019-09-29 08:06:57,886] INFO [ZooKeeperClient Kafka server] Connected. (kafka.zookeeper.ZooKeeperClient)
[2019-09-29 08:06:58,448] INFO Cluster ID = b8bTvrC2T6iidAcNqD482A (kafka.server.KafkaServer)
[2019-09-29 08:06:58,455] WARN No meta.properties file under dir /kafka/kafka-logs-kafka/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2019-09-29 08:06:58,632] INFO KafkaConfig values: 
    advertised.host.name = kafka
    advertised.listeners = null
    advertised.port = 9092
    alter.config.policy.class.name = null
    alter.log.dirs.replication.quota.window.num = 11
    alter.log.dirs.replication.quota.window.size.seconds = 1
    authorizer.class.name = 
    auto.create.topics.enable = true
    auto.leader.rebalance.enable = true
.
.
.
    zookeeper.connect = 192.168.88.42:30573
    zookeeper.connection.timeout.ms = 6000
    zookeeper.max.in.flight.requests = 10
    zookeeper.session.timeout.ms = 6000
    zookeeper.set.acl = false
    zookeeper.sync.time.ms = 2000
 (kafka.server.KafkaConfig)
[2019-09-29 08:06:58,659] INFO KafkaConfig values: 
    advertised.host.name = kafka
    advertised.listeners = null
    advertised.port = 9092
    alter.config.policy.class.name = null
    alter.log.dirs.replication.quota.window.num = 11
    alter.log.dirs.replication.quota.window.size.seconds = 1
    authorizer.class.name = 
    auto.create.topics.enable = true
    auto.leader.rebalance.enable = true

    kafka.metrics.reporters = []
    leader.imbalance.check.interval.seconds = 300
    leader.imbalance.per.broker.percentage = 10
    listener.security.protocol.map = PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
    listeners = null
    log.cleaner.backoff.ms = 15000
    log.cleaner.dedupe.buffer.size = 134217728
    log.cleaner.delete.retention.ms = 86400000
    log.cleaner.enable = true
    log.cleaner.io.buffer.load.factor = 0.9
    log.cleaner.io.buffer.size = 524288
    log.cleaner.io.max.bytes.per.second = 1.7976931348623157E308
    log.cleaner.max.compaction.lag.ms = 9223372036854775807
    log.cleaner.min.cleanable.ratio = 0.5
    log.cleaner.min.compaction.lag.ms = 0
    log.cleaner.threads = 1
    log.cleanup.policy = [delete]
    log.dir = /tmp/kafka-logs
        .
.
.
    unclean.leader.election.enable = false
    zookeeper.connect = 192.168.88.42:30573
    zookeeper.connection.timeout.ms = 6000
    zookeeper.max.in.flight.requests = 10
    zookeeper.session.timeout.ms = 6000
    zookeeper.set.acl = false
    zookeeper.sync.time.ms = 2000
 (kafka.server.KafkaConfig)
[2019-09-29 08:06:58,721] INFO [ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2019-09-29 08:06:58,722] INFO [ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2019-09-29 08:06:58,724] INFO [ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2019-09-29 08:06:58,797] INFO Log directory /kafka/kafka-logs-kafka not found, creating it. (kafka.log.LogManager)
[2019-09-29 08:06:58,814] INFO Loading logs. (kafka.log.LogManager)
[2019-09-29 08:06:58,834] INFO Logs loading complete in 20 ms. (kafka.log.LogManager)
[2019-09-29 08:06:58,869] INFO Starting log cleanup with a period of 300000 ms. (kafka.log.LogManager)
[2019-09-29 08:06:58,877] INFO Starting log flusher with a default period of 9223372036854775807 ms. (kafka.log.LogManager)
[2019-09-29 08:06:59,505] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2019-09-29 08:06:59,549] INFO [SocketServer brokerId=1033] Created data-plane acceptor and processors for endpoint : EndPoint(null,9092,ListenerName(PLAINTEXT),PLAINTEXT) (kafka.network.SocketServer)
[2019-09-29 08:06:59,550] INFO [SocketServer brokerId=1033] Started 1 acceptor threads for data-plane (kafka.network.SocketServer)
[2019-09-29 08:06:59,587] INFO [ExpirationReaper-1033-Produce]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-09-29 08:06:59,590] INFO [ExpirationReaper-1033-Fetch]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-09-29 08:06:59,590] INFO [ExpirationReaper-1033-DeleteRecords]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-09-29 08:06:59,600] INFO [ExpirationReaper-1033-ElectPreferredLeader]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-09-29 08:06:59,614] INFO [LogDirFailureHandler]: Starting (kafka.server.ReplicaManager$LogDirFailureHandler)
[2019-09-29 08:06:59,716] INFO Creating /brokers/ids/1033 (is it secure? false) (kafka.zk.KafkaZkClient)
[2019-09-29 08:06:59,743] INFO Stat of the created znode at /brokers/ids/1033 is: 776,776,1569744419734,1569744419734,1,0,0,72063325309108290,180,0,776
 (kafka.zk.KafkaZkClient)
[2019-09-29 08:06:59,745] INFO Registered broker 1033 at path /brokers/ids/1033 with addresses: ArrayBuffer(EndPoint(kafka,9092,ListenerName(PLAINTEXT),PLAINTEXT)), czxid (broker epoch): 776 (kafka.zk.KafkaZkClient)
[2019-09-29 08:06:59,748] WARN No meta.properties file under dir /kafka/kafka-logs-kafka/meta.properties (kafka.server.BrokerMetadataCheckpoint)
[2019-09-29 08:06:59,882] INFO [ExpirationReaper-1033-topic]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-09-29 08:06:59,888] INFO [ExpirationReaper-1033-Heartbeat]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-09-29 08:06:59,895] INFO [ExpirationReaper-1033-Rebalance]: Starting (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)
[2019-09-29 08:06:59,940] INFO [GroupCoordinator 1033]: Starting up. (kafka.coordinator.group.GroupCoordinator)
[2019-09-29 08:06:59,949] INFO [GroupCoordinator 1033]: Startup complete. (kafka.coordinator.group.GroupCoordinator)
[2019-09-29 08:06:59,961] INFO [GroupMetadataManager brokerId=1033] Removed 0 expired offsets in 17 milliseconds. (kafka.coordinator.group.GroupMetadataManager)
[2019-09-29 08:06:59,990] INFO [ProducerId Manager 1033]: Acquired new producerId block (brokerId:1033,blockStartProducerId:21000,blockEndProducerId:21999) by writing to Zk with path version 22 (kafka.coordinator.transaction.ProducerIdManager)
[2019-09-29 08:07:00,044] INFO [TransactionCoordinator id=1033] Starting up. (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-29 08:07:00,056] INFO [Transaction Marker Channel Manager 1033]: Starting (kafka.coordinator.transaction.TransactionMarkerChannelManager)
[2019-09-29 08:07:00,061] INFO [TransactionCoordinator id=1033] Startup complete. (kafka.coordinator.transaction.TransactionCoordinator)
[2019-09-29 08:07:00,207] INFO [/config/changes-event-process-thread]: Starting (kafka.common.ZkNodeChangeNotificationListener$ChangeEventProcessThread)
[2019-09-29 08:07:00,289] INFO [SocketServer brokerId=1033] Started data-plane processors for 1 acceptors (kafka.network.SocketServer)
[2019-09-29 08:07:00,326] INFO Kafka version: 2.3.0 (org.apache.kafka.common.utils.AppInfoParser)
[2019-09-29 08:07:00,326] INFO Kafka commitId: fc1aaa116b661c8a (org.apache.kafka.common.utils.AppInfoParser)
[2019-09-29 08:07:00,326] INFO Kafka startTimeMs: 1569744420299 (org.apache.kafka.common.utils.AppInfoParser)
[2019-09-29 08:07:00,341] INFO [KafkaServer id=1033] started (kafka.server.KafkaServer)
creating topics: newsrawdata:1:1

The log from zookeeper pod:

2019-09-29 08:06:58,003 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@653] - Got user-level KeeperException when processing sessionid:0x10005366a620042 type:create cxid:0xd zxid:0x306 txntype:-1 reqpath:n/a Error Path:/config/brokers Error:KeeperErrorCode = NodeExists for /config/brokers
2019-09-29 08:07:00,421 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@596] - Got user-level KeeperException when processing sessionid:0x10005366a620042 type:multi cxid:0x3f zxid:0x30d txntype:-1 reqpath:n/a aborting remaining multi ops. Error Path:/admin/preferred_replica_election Error:KeeperErrorCode = NoNode for /admin/preferred_replica_election
2019-09-29 08:07:07,512 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxnFactory@215] - Accepted socket connection from /10.44.0.0:39244
2019-09-29 08:07:07,519 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:ZooKeeperServer@949] - Client attempting to establish new session at /10.44.0.0:39244
2019-09-29 08:07:07,521 [myid:] - INFO  [SyncThread:0:ZooKeeperServer@694] - Established session 0x10005366a620043 with negotiated timeout 30000 for client /10.44.0.0:39244
2019-09-29 08:07:08,034 [myid:] - INFO  [ProcessThread(sid:0 cport:2181)::PrepRequestProcessor@487] - Processed session termination for sessionid: 0x10005366a620043
2019-09-29 08:07:08,045 [myid:] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2181:NIOServerCnxn@1056] - Closed socket connection for client /10.44.0.0:39244 which had sessionid 0x10005366a620043
2019-09-29 08:07:13,180 [myid:] - INFO  [PurgeTask:DatadirCleanupManager$PurgeTask@138] - Purge task started.
2019-09-29 08:07:13,181 [myid:] - INFO  [PurgeTask:DatadirCleanupManager$PurgeTask@144] - Purge task completed.
2019-09-29 09:07:13,180 [myid:] - INFO  [PurgeTask:DatadirCleanupManager$PurgeTask@138] - Purge task started.
2019-09-29 09:07:13,182 [myid:] - INFO  [PurgeTask:DatadirCleanupManager$PurgeTask@144] - Purge task completed.

log from client app:

Kafka Ip Server:kafka:9092
warn: Microsoft.AspNetCore.DataProtection.KeyManagement.XmlKeyManager[35]
      No XML encryptor configured. Key {16b9a9aa-732a-47ab-bd31-ce341be7f812} may be persisted to storage in unencrypted form.
Hosting environment: docker
Content root path: /app
Now listening on: http://[::]:80
Application started. Press Ctrl+C to shut down.

We set the "BootstrapServers" to "kafka:9092" in the client app. Seems that client cat resolve the kafka in the cluster and see the IP of the kafka pod but no event occurs when we send PUT request. It worth noting that by using this config out of the kubernetes cluster with docker-compose it works as expected! what is wrong with this configuration?

-- Majid Rajabi
apache-kafka
apache-zookeeper
kubeadm
kubernetes

2 Answers

10/7/2019

Actually, the problem is related to the the kafka configuration (KAFKA_ADVERTISE_LISTENER, KAFKA_INTER_BROKER_LISTENER_NAME, KAFKA_LISTENERS). Anyway the below deployemnt.yaml config is working properly for me now, without any chang to the service files.

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  annotations:
  labels:
    io.kompose.service: kafka
  name: kafka
spec:
  replicas: 1
  strategy:
    type: Recreate
  template:
    metadata:
      labels:
        io.kompose.service: kafka
    spec:
      containers:
      - env:
        - name: KAFKA_ADVERTISED_HOST_NAME
          value: kafka
        - name: KAFKA_ADVERTISED_PORT
          value: "9092"
        - name: KAFKA_ADVERTISED_LISTENERS
          value: INSIDE://:9092
        - name: KAFKA_CREATE_TOPICS
          value: newsrawdata:1:1
        - name: KAFKA_INTER_BROKER_LISTENER_NAME
          value: INSIDE
        - name: KAFKA_LISTENERS
          value: INSIDE://:9092
        - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
          value: INSIDE:PLAINTEXT
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: 192.168.88.207:30573
        - name: KAFKA_PORT
          value: "9092"
        - name: KAFKA_ZOOKEEPER_CONNECT_TIMEOUT_MS
          value: "1000"
        image: wurstmeister/kafka
        name: kafka
        ports:
        - containerPort: 9092
        - containerPort: 9094
      hostname: kafka
      restartPolicy: Always

The kafka instance currently connect to the zookeeper and client apps can produce and consume properly to the kafka.

-- Majid Rajabi
Source: StackOverflow

9/30/2019

Then make sure that your nodes have proper seletector: node for deployment broker has to have role:slave1 seletector. Otherwise just delete lines with nodeSelector from broker deployment file.

Then add lines to spec of your deployments configuration file:

  selector:
    matchLabels:
      io.kompose.service: kafka

this one is for kafka.yaml

You don't have to label your services, specifying selectors is enough, so delete label field from services configuration files.

Then in kafka deployment configuration file chane lines :

 name: KAFKA_ZOOKEEPER_CONNECT
          #value: 192.168.88.42:30573
           value: your_zookeeper_service_ip:2181

Line value should include ip of your zookeeper service and port 2181, if your zookeeper service have ip 192.168.88.42 value is proper.

-- MaggieO
Source: StackOverflow