I have a single node Kafka broker running inside a pod on a single node kubernetes environment. I am using this image for kafka: https://hub.docker.com/r/wurstmeister/kafka
kafka version = 1.1.0
Kubernetes cluster is running inside a VM on a server. The VM has the following IP on the active interface ens32 - 192.168.3.102
Kafka.yaml
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
namespace: casb-deployment
name: kafkaservice
spec:
replicas: 1
template:
metadata:
labels:
app: kafkaservice
spec:
hostname: kafkaservice
containers:
- name: kafkaservice
imagePullPolicy: IfNotPresent
image: wurstmeister/kafka:1.1.0
env:
- name: KAFKA_BROKER_ID
value: "1"
# - name: KAFKA_ADVERTISED_HOST_NAME
# value: "kafkaservice"
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: "INTERNAL_PLAINTEXT:PLAINTEXT,EXTERNAL_PLAINTEXT:PLAINTEXT"
- name: KAFKA_LISTENERS
value: "INTERNAL_PLAINTEXT://0.0.0.0:9092,EXTERNAL_PLAINTEXT://0.0.0.0:9093"
- name: KAFKA_ADVERTISED_LISTENERS
value: "INTERNAL_PLAINTEXT://kafkaservice:9092,EXTERNAL_PLAINTEXT://192.168.3.102:9093"
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: "INTERNAL_PLAINTEXT"
- name: KAFKA_CREATE_TOPICS
value: "topic-1:100:1,topic-2:1:1"
- name: KAFKA_ZOOKEEPER_CONNECT
value: "zookeeper:2181"
ports:
- name: port9092
containerPort: 9092
- name: port9093
containerPort: 9093
volumeMounts:
- mountPath: /kafka/kafka-logs-kafkaservice
name: kafka-volume
volumes:
- name: kafka-volume
hostPath:
path: /home/volume/kafka-logs
---
apiVersion: v1
kind: Service
metadata:
namespace: casb-deployment
name: kafkaservice
labels:
app: kafkaservice
spec:
selector:
app: kafkaservice
ports:
- name: port9092
port: 9092
targetPort: 9092
protocol: TCP
---
apiVersion: v1
kind: Service
metadata:
namespace: casb-deployment
name: kafkaservice-external
labels:
app: kafkaservice-external
spec:
selector:
app: kafkaservice
ports:
- name: port9093
port: 9093
protocol: TCP
nodePort: 30035
type: NodePort
I am able to ping the VM i.e. the kubernetes node from my local machine ping 192.168.3.102
and I am using nodePort to expose the service.
I can also telnet telnet 192.168.3.102 30035
and it gives:
Trying 192.168.3.102...
Connected to 192.168.3.102.
Escape character is '^]'.
I tried running kafka console consumer and producer from my local machine:
Consumer: bin/kafka-console-consumer.sh --bootstrap-server 192.168.3.102:30035 --topic topic-1 --from-beginning
Output:
[2019-09-25 12:30:40,716] WARN [Consumer clientId=consumer-1, groupId=console-consumer-20551] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
Producer:
bin/kafka-console-producer.sh --broker-list 192.168.3.102:30035 --topic topic-1
Output:
[2019-09-25 12:32:07,958] WARN [Producer clientId=console-producer] Connection to node 1 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
Netstat:
netstat -tulpn | grep 30035
tcp6 0 0 :::30035 :::* LISTEN 113545/kube-proxy
I tried running a python based consumer i.e kafka-python==1.4.2 it gave me the following logs:
[2019-09-25T12:15:39+0500] INFO kafka.client Bootstrapping cluster metadata from [('192.168.3.102', 30035, <AddressFamily.AF_INET: 2>)]
[2019-09-25T12:15:39+0500] INFO kafka.conn <BrokerConnection node_id=bootstrap host=192.168.3.102:30035 <connecting> [IPv4 ('192.168.3.102', 30035)]>: connecting to 192.168.3.102:30035 [('192.168.3.102', 30035) IPv4]
[2019-09-25T12:15:39+0500] INFO kafka.conn <BrokerConnection node_id=bootstrap host=192.168.3.102:30035 <connecting> [IPv4 ('192.168.3.102', 30035)]>: Connection complete.
[2019-09-25T12:15:39+0500] INFO kafka.client Bootstrap succeeded: found 1 brokers and 26 topics.
[2019-09-25T12:15:39+0500] INFO kafka.conn <BrokerConnection node_id=bootstrap host=192.168.3.102:30035 <connected> [IPv4 ('192.168.3.102', 30035)]>: Closing connection.
[2019-09-25T12:15:39+0500] INFO kafka.conn <BrokerConnection node_id=1 host=192.168.3.102:9093 <connecting> [IPv4 ('192.168.3.102', 9093)]>: connecting to 192.168.3.102:9093 [('192.168.3.102', 9093) IPv4]
[2019-09-25T12:15:39+0500] INFO kafka.conn Probing node 1 broker version
[2019-09-25T12:15:39+0500] ERROR kafka.conn Connect attempt to <BrokerConnection node_id=1 host=192.168.3.102:9093 <connecting> [IPv4 ('192.168.3.102', 9093)]> returned error 111. Disconnecting.
[2019-09-25T12:15:39+0500] INFO kafka.conn <BrokerConnection node_id=1 host=192.168.3.102:9093 <connecting> [IPv4 ('192.168.3.102', 9093)]>: Closing connection. ConnectionError: 111 ECONNREFUSED
[2019-09-25T12:15:40+0500] INFO kafka.conn <BrokerConnection node_id=1 host=192.168.3.102:9093 <connecting> [IPv4 ('192.168.3.102', 9093)]>: connecting to 192.168.3.102:9093 [('192.168.3.102', 9093) IPv4]
[2019-09-25T12:15:40+0500] ERROR kafka.conn Connect attempt to <BrokerConnection node_id=1 host=192.168.3.102:9093 <connecting> [IPv4 ('192.168.3.102', 9093)]> returned error 111. Disconnecting.
[2019-09-25T12:15:40+0500] INFO kafka.conn <BrokerConnection node_id=1 host=192.168.3.102:9093 <connecting> [IPv4 ('192.168.3.102', 9093)]>: Closing connection. ConnectionError: 111 ECONNREFUSED
[2019-09-25T12:15:40+0500] INFO Activity URL collector Exception in activity url collector: NoBrokersAvailable
From the logs it seems like the connection was made i.e.
<connecting> [IPv4 ('192.168.3.102', 30035)]>: Connection complete.
Bootstrap succeeded: found 1 brokers and 26 topics.
But then it got disconnected.
Please help me out in figuring out what am I missing and how can I resolve this. Thanks.
I would also propose Strimzi for Kafka on Kubernetes. For the external access this article saved me https://developers.redhat.com/blog/2019/06/11/accessing-apache-kafka-in-strimzi-part-4-load-balancers/. My config looks like:
apiVersion: kafka.strimzi.io/v1beta1
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
version: 2.4.0
replicas: 1
listeners:
plain: {}
tls: {}
external:
type: loadbalancer
tls: false
config:
offsets.topic.replication.factor: 1
transaction.state.log.replication.factor: 1
transaction.state.log.min.isr: 1
log.message.format.version: "2.4"
storage:
type: ephemeral
zookeeper:
replicas: 1
storage:
type: ephemeral
entityOperator:
topicOperator: {}
userOperator: {}
and this for retrieving the IP
kubectl get service my-cluster-kafka-external-bootstrap -o=jsonpath='{.status.loadBalancer.ingress[0].ip}{"\n"}'
Simple configuration of Kafka/Zookeeper on Kubernetes in AWS/DigitalOcean/GCE/Azure with external access:
https://github.com/StanislavKo/k8s_digitalocean_kafka
You can connect to Kafka from outside of AWS/DO/GCE by regular binary protocol. Connection is PLAINTEXT or SASL_PLAINTEXT (username/password).
Kafka cluster is StatefulSet, so you can scale cluster easily.
You're bootstrapping to port 30035, and getting the initial connection, then the advertised port of 9093 is being returned for subsequent connections, not 30035.
You need the NodePort and advertised one to be the same, or at least both be externally routable. Then you additionally need to have port forwarding on your VM if your code is running on your host machine
Note: Confluent or Strimzi Helm Charts exist for setting up Kafka in Kubernetes