kafka mongo db source connector with mongo db running on kubernetes

5/15/2020

I am trying to setup mongo db source connector [https://www.confluent.io/hub/mongodb/kafka-connect-mongodb] for confluent kafka platform. I was able to successfully establish the flow between mongo db and kafka when I used Mongo DB Atlas' connection URI. Problem arises when I am using mongo db running on my azure kubernetes cluster. I have created a mongo db statefulset with 3 replicas and I have exposed the mongo db service to the internet through a load balancer. I am able to connect to the mongo db exposed on public IP by using robo 3T and do CRUD operations. Now when I use the connection URI of mongo db running in kubernetes which looks like "mongodb://load-balance-ip:27017/test?ssl=false&authSource=admin&retryWrites=true&w=majority", I get the following error kafka connect logs

INFO Failed to resume change stream: The $changeStream stage is only supported on replica sets 40573 (com.mongodb.kafka.connect.source.MongoSourceTask:253)

Mongo db stateful set yml looks like this

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: eic-mongo-mongodb
spec:
  serviceName: eic-mongo-mongodb
  replicas: 3
  selector:
    matchLabels:
      app: eic-mongo-mongodb
  template:
    metadata:
      labels:
        app: eic-mongo-mongodb
        selector: eic-mongo-mongodb
    spec:
      affinity:
        # Try to put each ES data node on a different node in the K8s cluster
        podAntiAffinity:
          preferredDuringSchedulingIgnoredDuringExecution:
            - weight: 100
              podAffinityTerm:
                labelSelector:
                  matchExpressions:
                    - key: app
                      operator: In
                      values:
                        - eic-mongo-mongodb
                topologyKey: kubernetes.io/hostname
      containers:
        - name: eic-mongo-mongodb
          image: mongo:4.0.8
          resources:
            limits:
              cpu: 500m
              memory: "1Gi"
            requests:
              cpu: 500m
              memory: "1Gi"
          volumeMounts:
            - name: mongo-volume
              mountPath: /data/db
  volumeClaimTemplates:
    - metadata:
        name: mongo-volume
      spec:
        accessModes:
          - ReadWriteOnce
        volumeMode: Filesystem
        resources:
          requests:
            storage: 3Gi

And mongo DB service yml looks like this

apiVersion: v1
kind: Service
metadata:
  name: eic-mongo-mongodb
 ports:
  - name: "27017"
    nodePort: 31683
    port: 27017
    protocol: TCP
    targetPort: 27017
  selector:
    app: eic-mongo-mongodb
  type: LoadBalancer

Can someone please let me know where I am making a mistake?

-- Nitesh Ratnaparkhe
apache-kafka
apache-kafka-connect
kubernetes
mongodb

1 Answer

5/15/2020

I think You need to setup mongodb cluster in correct way. Please follow this to deploy mongodb replica set on kuberntes using operator. If you want to setup with statefulset only follow this

-- hoque
Source: StackOverflow