I'm running a 4 brokers Kafka cluster in Kubernetes. The replication factor is 3 and ISR is 2.
In addition, there's a producer service (running Spring stream) generating messages and a consumer service reading from the topic. Now I tried to update the Kafka cluster with a rolling update, hoping for no downtime, but during the update, the producer's log was filled with this error:
org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.
According to my calculation, when 1 broker is down there shouldn't be a problem because the min ISR is 2. However, it seems like the producer service is unaware of the rolling update and keep sending messages to the same broker...
Any ideas how to solve it?
This is my kafka.yaml
apiVersion: apps/v1beta1
kind: StatefulSet
metadata:
name: kafka
namespace: default
labels:
app: kafka
spec:
serviceName: kafka
replicas: 4
updateStrategy:
type: RollingUpdate
template:
metadata:
labels:
app: kafka
annotations:
prometheus.io/scrape: "true"
prometheus.io/port: "9308"
spec:
nodeSelector:
middleware.node: "true"
imagePullSecrets:
- name: nexus-registry
terminationGracePeriodSeconds: 300
containers:
- name: kafka
image: kafka:2.12-2.1.0
imagePullPolicy: IfNotPresent
resources:
limits:
cpu: 3000m
memory: 1800Mi
requests:
cpu: 2000m
memory: 1800Mi
env:
# Replication
- name: KAFKA_DEFAULT_REPLICATION_FACTOR
value: "3"
- name: KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR
value: "3"
- name: KAFKA_MIN_INSYNC_REPLICAS
value: "2"
# Protocol Version
- name: KAFKA_INTER_BROKER_PROTOCOL_VERSION
value: "2.1"
- name: KAFKA_LOG_MESSAGE_FORMAT_VERSION
value: "2.1"
- name: ENABLE_AUTO_EXTEND
value: "true"
- name: KAFKA_DELETE_TOPIC_ENABLE
value: "true"
- name: KAFKA_RESERVED_BROKER_MAX_ID
value: "999999999"
- name: KAFKA_AUTO_CREATE_TOPICS_ENABLE
value: "true"
- name: KAFKA_PORT
value: "9092"
- name: KAFKA_ADVERTISED_PORT
value: "9092"
- name: KAFKA_NUM_RECOVERY_THREADS_PER_DATA_DIR
value: "10"
- name: KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR
value: "3"
- name: KAFKA_LOG_RETENTION_BYTES
value: "1800000000000"
- name: KAFKA_ADVERTISED_HOST_NAME
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: KAFKA_OFFSETS_RETENTION_MINUTES
value: "10080"
- name: KAFKA_ZOOKEEPER_CONNECT
valueFrom:
configMapKeyRef:
name: zk-config
key: zk.endpoints
- name: KAFKA_LOG_DIRS
value: /kafka/kafka-logs
ports:
- name: kafka
containerPort: 9092
- name: prometheus
containerPort: 7071
volumeMounts:
- name: data
mountPath: /kafka
readinessProbe:
tcpSocket:
port: 9092
timeoutSeconds: 1
failureThreshold: 12
initialDelaySeconds: 10
periodSeconds: 30
successThreshold: 1
- name: kafka-exporter
image: danielqsj/kafka-exporter:latest
resources:
requests:
cpu: 100m
memory: 100Mi
limits:
cpu: 500m
memory: 500Mi
ports:
- containerPort: 9308
volumeClaimTemplates:
- metadata:
name: data
labels:
app: kafka
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 2000Gi