Spring Boot Kafka listener is inconsistent

11/28/2020

I am trying to have several different Spring Cloud Microservices all connect to a Kafka/Zookeeper Cluster, all within Kubernetes. The Microservices are using org.springframework.kafka:spring-kafka - as both consumers and producers of events.

All of the services connect to kafka okay - and topics are created; however the consumers of each service are very inconsistent.

For example, when the services start one time, all of the consumers will listen for the message and the function is invoked. But then, when I restart everything (kafka and zookeeper included), it will either just not work, or some of the consumers in different services will work etc...

Here is some of my config - I don't have any Java-based config - just in my application.yml as follows:

spring:

  ....

  kafka:
    consumer:
      bootstrap-servers: api-kafka.default.svc.cluster.local:9092
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: api-event
      enable-auto-commit: false

    producer:
      bootstrap-servers: api-kafka.default.svc.cluster.local:9092
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      ack-mode: manual

...

And my main class:

@EnableCaching
@SpringBootApplication
@EnableJpaRepositories
@EnableDiscoveryClient
@EnableKafka /* <<<<<<<------------- ENABLED HERE */
public class ExampleServiceApplication {

  public static void main(String[] args) {
    SpringApplication.run(ExampleServiceApplication.class, args);
  }

  .....
}

And finally, my consumer:

@Component
public class MessageListener {

  @KafkaListener(
      topics = "myTopic")
  public void eventListener(String serializedMessage) {
    try {
....

The messages are getting sent to the broker fine, but just aren't consumed by the other services.

I realize that there is no mapping to the topics on each of the service properties, how do I do this via application.yml?

I bet I'm making a really nooby mistake but yeah! I'd really appreciate any comments or help

-- Ben Neighbour
apache-kafka
java
kubernetes
spring-boot
spring-cloud

1 Answer

11/28/2020

Btw, you can read here more about the relation between the number of partitions and the number of parallel consumers (consumers with the same group id).

https://docs.confluent.io/platform/current/streams/architecture.html

Slightly simplified, the maximum parallelism at which your application may run is bounded by the maximum number of stream tasks, which itself is determined by maximum number of partitions of the input topic(s) the application is reading from. For example, if your input topic has 5 partitions, then you can run up to 5 applications instances. These instances will collaboratively process the topic’s data. If you run a larger number of app instances than partitions of the input topic, the “excess” app instances will launch but remain idle; however, if one of the busy instances goes down, one of the idle instances will resume the former’s work. We provide a more detailed explanation and example in the FAQ.

-- Octavian R.
Source: StackOverflow