Duplicate message consumption in Kafka due to auto-downscaling/deletion of pods

4/12/2021

Background

We have a simple producer/consumer style application with Kafka as the message broker and Consumer Processes running as Kubernetes pods. We have defined two topics namely the in-topic and the out-topic. A set of consumer pods that belong to the same consumer group read messages from the in-topic, perform some work and finally write out the same message (key) to the out-topic once the work is complete.

Issue Description

We noticed that there are duplicate messages being written out to the out-topic by the consumers that are running in the Kubernetes pods. To rephrase, two different consumers are consuming the same messages from the in-topic twice and thus publishing the same message twice to the out-topic as well. We analyzed the issue and can safely conclude that this issue only occurs when pods are auto-downscaled/deleted by Kubernetes.

In fact, an interesting observation we have is that if any message is read by two different consumers from the in-topic (and thus published twice in the out-topic), the given message is always the last message consumed by one of the pods that was downscaled. In other words, if a message is consumed twice, the root cause is always the downscaling of a pod.

We can conclude that a pod is getting downscaled after a consumer writes the message to the out-topic but before Kafka can commit the offset to the in-topic.

Consumer configuration

props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "3600000");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG"org.apache.kafka.common.serialization.StringDeserializer")

Zookeeper/broker logs :

[2021-04-07 02:42:22,708] INFO [GroupCoordinator 0]: Preparing to rebalance group PortfolioEnrichmentGroup14 in state PreparingRebalance with old generation 1 (__consumer_offsets-17) (reason: removing member PortfolioEnrichmentConsumer13-9aa71765-2518-
493f-a312-6c1633225015 on heartbeat expiration) (kafka.coordinator.group.GroupCoordinator)
[2021-04-07 02:42:23,331] INFO [GroupCoordinator 0]: Stabilized group PortfolioEnrichmentGroup14 generation 2 (__consumer_offsets-17) (kafka.coordinator.group.GroupCoordinator)
[2021-04-07 02:42:23,335] INFO [GroupCoordinator 0]: Assignment received from leader for group PortfolioEnrichmentGroup14 for generation 2 (kafka.coordinator.group.GroupCoordinator)

What we tried

Looking at the logs, it was clear that rebalancing takes place because of the heartbeat expiration. We added the following configuration parameters to increase the heartbeat and also increase the session time out :

props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "10000")
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "900000");
props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, "512");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");

However, this did not solve the issue. Looking at the broker logs, we can confirm that the issue is due to the downscaling of pods.

Question : What could be causing this behavior where a message is consumed twice when a pod gets downscaled?

Note : I already understand the root cause of the issue; however, considering that a consumer is a long lived process running in an infinite loop, how and why is Kubernetes downscaling/killing a pod before the consumer commits the offset? How do I tell Kubernetes not to remove a running pod from a consumer group until all Kafka commits are completed?

-- Chetan Kinger
apache-kafka
horizontal-pod-autoscaling
kafka-consumer-api
kubernetes
producer-consumer

2 Answers

5/26/2021

If you want your consumer to commit message/s before exiting you would need to handle exit signal to your consumer. A lot of languages do support this. Have a look at this thread on how to do this in java - https://stackoverflow.com/questions/46581674/how-to-finish-kafka-consumer-safetyis-there-meaning-to-call-threadjoin-inside.

That being said, please note that there is no 100% guarantee to achieving exactly once. Your process can be killed forcefully by OS before even given time to run any exit clean up (kill -9 <process_id>.

-- Rishabh Sharma
Source: StackOverflow

4/12/2021

"What could be causing this behavior where a message is consumed twice when a pod gets downscaled?"

You have provided the answer already yourself: "... that a pod is getting downscaled after a consumer writes the message to the out-topic but before Kafka can commit the offset to the in-topic."

As the message was processed but not committed, another pod is re-processing the same message again after the downscaling happens. Remember that adding or removing a consumer from a consumer group always initiates a Rebalancing. You have now first-hand experience why this should generally be avoided as much as feasible. Depending on the Kafka version a rebalance will cause every single consumer of the consumer group to stop consuming until the rebalancing is done.

To solve your issue, I see two options:

  • Only remove running pods out of the Consumer Group when they are idle
  • Reduce the consumer configuration auto.commit.interval.ms to 1 as this defaults to 5 seconds. This will only work if you set enable.auto.commit to true.
-- Michael Heil
Source: StackOverflow