I would like to configure Kafka broker in Kubernetes. The docker image I am using is confluentinc/cp-kafka:latest
. It requires KAFKA_ADVERTISED_LISTENERS
environment variable which allows Kafka client to communicate with broker.
The problem is the difficulty to assign service endpoints IP to KAFKA_ADVERTISED_LISTENERS
. If I am using localhost
as this value, it is only working in local Kafka broker pod but it won't work for some Kafka client pods in kubernetes cluster to communicate with it. If I am using the service endpoint IP coming from kubectl get endpoints -l app=kafka
, this is working but it is little overhead to use some audit script set this dynamic value every time.
I wonder is there a better way I can dynamically set this value in Kubernetes yaml file, so I don't need to programatically set this IP every time.
Here is the yaml file:
---
apiVersion: v1
kind: Service
metadata:
name: kafka-broker
labels:
app: kafka
spec:
type: NodePort
ports:
- port: 9092
targetPort: 9092
selector:
app: kafka
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: kafka-broker
spec:
replicas: 1
selector:
matchLabels:
app: kafka
template:
metadata:
labels:
app: kafka
spec:
hostname: broker
containers:
- name: kafka
image: confluentinc/cp-kafka:latest
ports:
- containerPort: 9092
env:
- name: KAFKA_ADVERTISED_LISTENERS
value: "PLAINTEXT://DYNAMIC_ENDPOINT_IP:9092"
- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
value: "1"
- name: KAFKA_ZOOKEEPER_CONNECT
value: zoo1:2181
Thanks in advance.
Edit: I tried to use Server name, Service host environment variable, service source IP and Pod IP. Unfortunately, I still get the error in kafka log: java.lang.IllegalArgumentException: Error creating broker listeners from 'PLAINTEXT://$KAFKA_BROKER_SERVICE_HOST:9092': Unable to parse PLAINTEXT://$KAFKA_BROKER_SERVICE_HOST:9092 to a broker endpoint
If I am using kubectl exec -it kafa-broker-ssfjks env
, those environment variables are actually set correctly in this pod. I guess it may be related to a Kafka broker configuration issue ?
Use the service name (kafka-broker) instead of it's IP. Kube-dns will resolve it for you. If the kafka client was placed at same namespace, you should use just "kafka-broker", if not, you must use the qualified name "kafka-broker.YOURNAMESPACE.svc"
@Jakub got me on the right track, so for something like cp-kafka-connect my Dockerfile looks like:
FROM confluentinc/cp-kafka-connect:5.4.0
ENV CONNECT_GROUP_ID='kafkatosql'
ENV CONNECT_CONFIG_STORAGE_TOPIC="kafkatosql-config"
ENV CONNECT_OFFSET_STORAGE_TOPIC="kafkatosql-offsets"
ENV CONNECT_STATUS_STORAGE_TOPIC="kafkatosql-status"
ENV CONNECT_KEY_CONVERTER="io.confluent.connect.avro.AvroConverter"
ENV CONNECT_VALUE_CONVERTER="io.confluent.connect.avro.AvroConverter"
ENV CONNECT_LOG4J_ROOT_LOGLEVEL="ERROR"
ENV CONNECT_PLUGIN_PATH="/usr/share/java,/usr/share/confluent-hub-components"
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:5.4.0
WORKDIR /app
COPY start.sh .
CMD exec ./start.sh
and then start.sh looks like:
kafka_connect_host=localhost:8083
export CONNECT_REST_ADVERTISED_HOST_NAME=$(hostname -I)
/etc/confluent/docker/run &
wait_counter=0
echo "Waiting for Kafka Connect to start listening on kafka-connect ⏳"
while true; do
status=$(curl -s -o /dev/null -w %{http_code} http://$kafka_connect_host/connectors)
if [ $status -eq 000 ]; then
wait_counter=$((wait_counter+1))
echo "Kafka Connect listener HTTP status: $status (waiting for 200)"
if [ $wait_counter = 15 ]; then
echo 'Waited too long!';
exit 1;
else
echo "Retries: $wait_counter"
sleep 3
fi
else
break
fi
done
echo -e "\n--\n+> Creating Kafka Connect Postgresql Sink"
curl -X PUT http://$kafka_connect_host/connectors/jdbc_sink_postgresql_00/config -H "Content-Type: application/json" -d '{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": 1,
"topics": "users",
"connection.url": "jdbc:'"$DB_URL"'",
"auto.create": false
}'
# ... other stuff
trap : TERM INT; sleep infinity & wait
You should let your clients connect through the service, so exposing the ip or dns of the service should work. By default services are exposed as variable names in the pod. If a dns plugin is configured dns can be used. More info: https://kubernetes.io/docs/concepts/services-networking/service/#environment-variables