Remotely accessing Kafka running inside kubernetes

9/25/2019

I have a single node Kafka broker running inside a pod on a single node kubernetes environment. I am using this image for kafka: https://hub.docker.com/r/wurstmeister/kafka

kafka version = 1.1.0

Kubernetes cluster is running inside a VM on a server. The VM has the following IP on the active interface ens32 - 192.168.3.102

Kafka.yaml

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  namespace: casb-deployment
  name: kafkaservice
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: kafkaservice
    spec:
      hostname: kafkaservice
      containers:
      - name: kafkaservice
        imagePullPolicy: IfNotPresent
        image: wurstmeister/kafka:1.1.0
        env:
         - name: KAFKA_BROKER_ID
           value: "1"
#         - name: KAFKA_ADVERTISED_HOST_NAME
#           value: "kafkaservice"
         - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
           value: "INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT"
         - name: KAFKA_LISTENERS
           value: "INTERNAL_PLAINTEXT://0.0.0.0:9092,EXTERNAL_PLAINTEXT://0.0.0.0:9093"
         - name: KAFKA_ADVERTISED_LISTENERS
           value: "INTERNAL_PLAINTEXT://kafkaservice:9092,EXTERNAL_PLAINTEXT://192.168.3.102:9093"
         - name: KAFKA_INTER_BROKER_LISTENER_NAME
           value: "INTERNAL_PLAINTEXT"
         - name: KAFKA_CREATE_TOPICS
           value: "topic-1:100:1,topic-2:1:1"  
         - name: KAFKA_ZOOKEEPER_CONNECT
           value: "zookeeper:2181"
        ports: 
        - name: port9092
          containerPort: 9092
        - name: port9093
          containerPort: 9093
        volumeMounts:
        - mountPath: /kafka/kafka-logs-kafkaservice
          name: kafka-volume
      volumes:
      - name: kafka-volume
        hostPath:
          path: /home/volume/kafka-logs

---

apiVersion: v1
kind: Service
metadata:
  namespace: casb-deployment
  name: kafkaservice
  labels:
    app: kafkaservice
spec:
  selector:
    app: kafkaservice
  ports:
  - name: port9092
    port: 9092
    targetPort: 9092
    protocol: TCP

---

apiVersion: v1
kind: Service
metadata:
  namespace: casb-deployment
  name: kafkaservice-external
  labels:
    app: kafkaservice-external
spec:
  selector:
    app: kafkaservice
  ports:
  - name: port9093
    port: 9093
    protocol: TCP
    nodePort: 30035
  type: NodePort

I am able to ping the VM i.e. the kubernetes node from my local machine ping 192.168.3.102 and I am using nodePort to expose the service.

I can also telnet telnet 192.168.3.102 30035 and it gives:

Trying 192.168.3.102...
Connected to 192.168.3.102.
Escape character is '^]'.

I tried running kafka console consumer and producer from my local machine:

Consumer: bin/kafka-console-consumer.sh --bootstrap-server 192.168.3.102:30035 --topic topic-1 --from-beginning

Output:

[2019-09-25 12:30:40,716] WARN [Consumer clientId=consumer-1, groupId=console-consumer-20551] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

Producer:

bin/kafka-console-producer.sh --broker-list 192.168.3.102:30035 --topic topic-1

Output:

[2019-09-25 12:32:07,958] WARN [Producer clientId=console-producer] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

Netstat:

netstat -tulpn | grep 30035
tcp6       0      0 :::30035                :::*                    LISTEN      113545/kube-proxy

I tried running a python based consumer i.e kafka-python==1.4.2 it gave me the following logs:

[2019-09-25T12:15:39+0500] INFO kafka.client Bootstrapping cluster metadata from [('192.168.3.102', 30035, <AddressFamily.AF_INET: 2>)]
[2019-09-25T12:15:39+0500] INFO kafka.conn <BrokerConnection node_id=bootstrap host=192.168.3.102:30035 <connecting> [IPv4 ('192.168.3.102', 30035)]>: connecting to 192.168.3.102:30035 [('192.168.3.102', 30035) IPv4]
[2019-09-25T12:15:39+0500] INFO kafka.conn <BrokerConnection node_id=bootstrap host=192.168.3.102:30035 <connecting> [IPv4 ('192.168.3.102', 30035)]>: Connection complete.
[2019-09-25T12:15:39+0500] INFO kafka.client Bootstrap succeeded: found 1 brokers and 26 topics.
[2019-09-25T12:15:39+0500] INFO kafka.conn <BrokerConnection node_id=bootstrap host=192.168.3.102:30035 <connected> [IPv4 ('192.168.3.102', 30035)]>: Closing connection. 
[2019-09-25T12:15:39+0500] INFO kafka.conn <BrokerConnection node_id=1 host=192.168.3.102:9093 <connecting> [IPv4 ('192.168.3.102', 9093)]>: connecting to 192.168.3.102:9093 [('192.168.3.102', 9093) IPv4]
[2019-09-25T12:15:39+0500] INFO kafka.conn Probing node 1 broker version
[2019-09-25T12:15:39+0500] ERROR kafka.conn Connect attempt to <BrokerConnection node_id=1 host=192.168.3.102:9093 <connecting> [IPv4 ('192.168.3.102', 9093)]> returned error 111. Disconnecting.
[2019-09-25T12:15:39+0500] INFO kafka.conn <BrokerConnection node_id=1 host=192.168.3.102:9093 <connecting> [IPv4 ('192.168.3.102', 9093)]>: Closing connection. ConnectionError: 111 ECONNREFUSED
[2019-09-25T12:15:40+0500] INFO kafka.conn <BrokerConnection node_id=1 host=192.168.3.102:9093 <connecting> [IPv4 ('192.168.3.102', 9093)]>: connecting to 192.168.3.102:9093 [('192.168.3.102', 9093) IPv4]
[2019-09-25T12:15:40+0500] ERROR kafka.conn Connect attempt to <BrokerConnection node_id=1 host=192.168.3.102:9093 <connecting> [IPv4 ('192.168.3.102', 9093)]> returned error 111. Disconnecting.
[2019-09-25T12:15:40+0500] INFO kafka.conn <BrokerConnection node_id=1 host=192.168.3.102:9093 <connecting> [IPv4 ('192.168.3.102', 9093)]>: Closing connection. ConnectionError: 111 ECONNREFUSED
[2019-09-25T12:15:40+0500] INFO Activity URL collector Exception in activity url collector: NoBrokersAvailable

From the logs it seems like the connection was made i.e.

<connecting> [IPv4 ('192.168.3.102', 30035)]>: Connection complete.
Bootstrap succeeded: found 1 brokers and 26 topics.

But then it got disconnected.

Please help me out in figuring out what am I missing and how can I resolve this. Thanks.

-- el323
apache-kafka
docker
kafka-consumer-api
kafka-producer-api
kubernetes

3 Answers

4/3/2020

I would also propose Strimzi for Kafka on Kubernetes. For the external access this article saved me https://developers.redhat.com/blog/2019/06/11/accessing-apache-kafka-in-strimzi-part-4-load-balancers/. My config looks like:

apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    version: 2.4.0
    replicas: 1
    listeners:
      plain: {}
      tls: {}
      external:
        type: loadbalancer
        tls: false
    config:
      offsets.topic.replication.factor: 1
      transaction.state.log.replication.factor: 1
      transaction.state.log.min.isr: 1
      log.message.format.version: "2.4"
    storage:
      type: ephemeral
  zookeeper:
    replicas: 1
    storage:
      type: ephemeral
  entityOperator:
    topicOperator: {}
    userOperator: {}

and this for retrieving the IP

kubectl get service my-cluster-kafka-external-bootstrap -o=jsonpath='{.status.loadBalancer.ingress[0].ip}{"\n"}'
-- Oana
Source: StackOverflow

11/22/2019

Simple configuration of Kafka/Zookeeper on Kubernetes in AWS/DigitalOcean/GCE/Azure with external access:

https://github.com/StanislavKo/k8s_digitalocean_kafka

You can connect to Kafka from outside of AWS/DO/GCE by regular binary protocol. Connection is PLAINTEXT or SASL_PLAINTEXT (username/password).

Kafka cluster is StatefulSet, so you can scale cluster easily.

-- StanislavKo
Source: StackOverflow

9/25/2019

You're bootstrapping to port 30035, and getting the initial connection, then the advertised port of 9093 is being returned for subsequent connections, not 30035.

You need the NodePort and advertised one to be the same, or at least both be externally routable. Then you additionally need to have port forwarding on your VM if your code is running on your host machine

Note: Confluent or Strimzi Helm Charts exist for setting up Kafka in Kubernetes

-- OneCricketeer
Source: StackOverflow