How to pass down kubernetes service endpoints IP into KAFKA advertised listener

4/29/2018

I would like to configure Kafka broker in Kubernetes. The docker image I am using is confluentinc/cp-kafka:latest. It requires KAFKA_ADVERTISED_LISTENERS environment variable which allows Kafka client to communicate with broker.

The problem is the difficulty to assign service endpoints IP to KAFKA_ADVERTISED_LISTENERS. If I am using localhost as this value, it is only working in local Kafka broker pod but it won't work for some Kafka client pods in kubernetes cluster to communicate with it. If I am using the service endpoint IP coming from kubectl get endpoints -l app=kafka, this is working but it is little overhead to use some audit script set this dynamic value every time.

I wonder is there a better way I can dynamically set this value in Kubernetes yaml file, so I don't need to programatically set this IP every time.

Here is the yaml file:

---
apiVersion: v1
kind: Service
metadata:
  name: kafka-broker
  labels:
    app: kafka
spec:
  type: NodePort
  ports:
  - port: 9092
    targetPort: 9092
  selector:
    app: kafka

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: kafka-broker
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka
  template:
    metadata:
      labels:
        app: kafka
    spec:
      hostname: broker
      containers:
      - name: kafka
        image: confluentinc/cp-kafka:latest
        ports:
        - containerPort: 9092
        env:
        - name: KAFKA_ADVERTISED_LISTENERS
          value: "PLAINTEXT://DYNAMIC_ENDPOINT_IP:9092"
        - name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
          value: "1"
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zoo1:2181

Thanks in advance.

Edit: I tried to use Server name, Service host environment variable, service source IP and Pod IP. Unfortunately, I still get the error in kafka log: java.lang.IllegalArgumentException: Error creating broker listeners from 'PLAINTEXT://$KAFKA_BROKER_SERVICE_HOST:9092': Unable to parse PLAINTEXT://$KAFKA_BROKER_SERVICE_HOST:9092 to a broker endpoint

If I am using kubectl exec -it kafa-broker-ssfjks env, those environment variables are actually set correctly in this pod. I guess it may be related to a Kafka broker configuration issue ?

-- Xiaohe Dong
apache-kafka
kubernetes

3 Answers

4/29/2018

Use the service name (kafka-broker) instead of it's IP. Kube-dns will resolve it for you. If the kafka client was placed at same namespace, you should use just "kafka-broker", if not, you must use the qualified name "kafka-broker.YOURNAMESPACE.svc"

-- Matheus Neder
Source: StackOverflow

2/26/2020

@Jakub got me on the right track, so for something like cp-kafka-connect my Dockerfile looks like:

FROM confluentinc/cp-kafka-connect:5.4.0
ENV CONNECT_GROUP_ID='kafkatosql'
ENV CONNECT_CONFIG_STORAGE_TOPIC="kafkatosql-config"
ENV CONNECT_OFFSET_STORAGE_TOPIC="kafkatosql-offsets"
ENV CONNECT_STATUS_STORAGE_TOPIC="kafkatosql-status"
ENV CONNECT_KEY_CONVERTER="io.confluent.connect.avro.AvroConverter"
ENV CONNECT_VALUE_CONVERTER="io.confluent.connect.avro.AvroConverter"
ENV CONNECT_LOG4J_ROOT_LOGLEVEL="ERROR"
ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:5.4.0
WORKDIR /app
COPY start.sh .
CMD exec ./start.sh

and then start.sh looks like:

kafka_connect_host=localhost:8083

export CONNECT_REST_ADVERTISED_HOST_NAME=$(hostname -I)

/etc/confluent/docker/run &

wait_counter=0
echo "Waiting for Kafka Connect to start listening on kafka-connect ⏳"
while true; do
  status=$(curl -s -o /dev/null -w %{http_code} http://$kafka_connect_host/connectors)
  if [ $status -eq 000 ]; then
    wait_counter=$((wait_counter+1))
    echo "Kafka Connect listener HTTP status: $status (waiting for 200)"
    if [ $wait_counter = 15 ]; then
      echo 'Waited too long!';
      exit 1;
    else
      echo "Retries: $wait_counter"
      sleep 3
    fi
  else
    break
  fi
done
echo -e "\n--\n+> Creating Kafka Connect Postgresql Sink"
curl -X PUT http://$kafka_connect_host/connectors/jdbc_sink_postgresql_00/config -H "Content-Type: application/json" -d '{
  "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
  "tasks.max": 1,
  "topics": "users",
  "connection.url": "jdbc:'"$DB_URL"'",
  "auto.create": false
}'

# ... other stuff

trap : TERM INT; sleep infinity & wait
-- rhyek
Source: StackOverflow

4/29/2018

You should let your clients connect through the service, so exposing the ip or dns of the service should work. By default services are exposed as variable names in the pod. If a dns plugin is configured dns can be used. More info: https://kubernetes.io/docs/concepts/services-networking/service/#environment-variables

-- Jeff
Source: StackOverflow