I have a Kafka broker running in a multi VM environment (Private cloud - with our own Kubernetes cluster - 4 node cluster).
I have created a spring boot application that has a publisher that has to publish a message to the Kafka topic inside the Kafka broker. I have both the containers(Kafka broker & spring boot app) running in the same Kubernetes cluster.
I couldn't access the Kafka broker (running in the same k8s cluster) by providing the Kafka's service name: port id in the publisher's bootstrap.servers
Spring boot's publisher configuration:
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-svc:9092");- -> not working
Cluster information:
kafka-broker yaml file:
apiVersion: extensions/v1beta1
kind: Deployment
metadata:
name: kafka-deploy
spec:
replicas: 1
selector:
matchLabels:
app: kafka
template:
metadata:
labels:
app: kafka
spec:
containers:
- env:
- name: KAFKA_PORT
value: "9092"
- name: KAFKA_ADVERTISED_PORT
value: "9092"
- name: KAFKA_ADVERTISED_HOST_NAME
value: kafka-svc
- name: KAFKA_CREATE_TOPICS
value: "test:1:1"
- name: KAFKA_ZOOKEEPER_CONNECT
value: zookeeper-svc:2181
- name: KAFKA_BROKER_ID
value: "1"
name: kafka
image: wurstmeister/kafka
ports:
- containerPort: 9092
------
apiVersion: v1
kind: Service
metadata:
name: kafka-svc
spec:
type: NodePort
ports:
- port: 9092
targetPort: 9092
name: http
protocol: TCP
selector:
app: kafka
kafka-broker-id: "1"
When I attempt to push a string message on to Kafka topic, I am getting the below exception:
"2020-08-18 13:28:04.525 INFO 1 --- [nio-8080-exec-1] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1597757284523
2020-08-18 13:29:04.539 ERROR 1 --- [nio-8080-exec-1] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='TEST' to topic test:
org.apache.kafka.common.errors.TimeoutException: Topic test not present in metadata after 60000 ms.
2020-08-18 13:29:04.544 ERROR 1 --- [nio-8080-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet] : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.kafka.KafkaException: Send failed; nested exception is org.apache.kafka.common.errors.TimeoutException: Topic test not present in metadata after 60000 ms.] with root cause
It looks like you need to create test
topic before you can publish anything.
You can see if the topic is available in your Kafka cluster. For example from the one of Kafka broker pods:
bin/kafka-topics.sh --bootstrap-server localhost:9092 --zookeeper zookeeper-svc:2181 --topic test --describe
Or you can create it on your Spring Boot app:
package io.stockgeeks.kafka.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;
@Configuration
public class KafkaTopicConfiguration {
@Bean
public NewTopic topicExample() {
return TopicBuilder.name("test")
.partitions(6)
.replicas(3)
.build();
}
}
✌️