How to configure in Kubernetes a static hostname of multiple replicas of Flink TaskManagers Deployment and fetch it in a Prometheus ConfigMap?

9/24/2020

I have a flink JobManager with only one TaskManager running on top of Kubernetes. For this I use a Service and a Deployment for the TaskManager with replicas: 1.

apiVersion: v1
kind: Service
metadata:
  name: flink-taskmanager
spec:
  type: ClusterIP
  ports:
  - name: prometheus
    port: 9250
  selector:
    app: flink
    component: taskmanager

the Deployment:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      hostname: flink-taskmanager
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties
      - name: tpch-dbgen-data
        persistentVolumeClaim:
          claimName: tpch-dbgen-data-pvc
      - name: tpch-dbgen-datarate
        persistentVolumeClaim:
          claimName: tpch-dbgen-datarate-pvc
      containers:
      - name: taskmanager
        image: felipeogutierrez/explore-flink:1.11.1-scala_2.12
        # imagePullPolicy: Always
        env:
        args: ["taskmanager"]
        ports:
        - containerPort: 6122
          name: rpc
        - containerPort: 6125
          name: query-state
        - containerPort: 9250
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        - name: tpch-dbgen-data
          mountPath: /opt/tpch-dbgen/data
          subPath: data
        - mountPath: /tmp
          name: tpch-dbgen-datarate
          subPath: tmp
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary

Then I exchange data from Flink TaskManager to Prometheus and I use one Service, ConfigMap, and Deployment to set Prometheus on top of Kubernetes and make it fetch data from Flink Task Manager.

apiVersion: v1
kind: Service
metadata:
  name: prometheus-service
spec:
  type: ClusterIP
  ports:
  - name: promui
    protocol: TCP
    port: 9090
    targetPort: 9090
  selector:
    app: flink
    component: prometheus

the ConfigMap is where I set the host of Flink Task Manager - targets: ['flink-jobmanager:9250', 'flink-jobmanager:9251', 'flink-taskmanager:9250'] that matches with the Kubernetes object Service for Flink (flink-taskmanager):

apiVersion: v1
kind: ConfigMap
metadata:
  name: prometheus-config
  labels:
    app: flink
data:
  prometheus.yml: |+
    global:
      scrape_interval: 15s

    scrape_configs:
      - job_name: 'prometheus'
        scrape_interval: 5s
        static_configs:
          - targets: ['localhost:9090']
      - job_name: 'flink'
        scrape_interval: 5s
        static_configs:
          - targets: ['flink-jobmanager:9250', 'flink-jobmanager:9251', 'flink-taskmanager:9250']
        metrics_path: /

the Deployment:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: prometheus-deployment
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: prometheus
  template:
    metadata:
      labels:
        app: flink
        component: prometheus
    spec:
      hostname: prometheus
      volumes:
      - name: prometheus-config-volume
        configMap:
          name: prometheus-config
          items:
          - key: prometheus.yml
            path: prometheus.yml
      containers:
      - name: prometheus
        image: prom/prometheus
        ports:
        - containerPort: 9090
        volumeMounts:
          - name: prometheus-config-volume
            mountPath: /etc/prometheus/prometheus.yml
            subPath: prometheus.yml

This works well and I can query data of Flink task managers on the Prometheus WEB-UI. But, as soon I change replicas: 1 to replicas: 3 for instance, I cannot query data from the task managers anymore. I guess it is because the configuration - targets: ['flink-jobmanager:9250', 'flink-jobmanager:9251', 'flink-taskmanager:9250'] is not valid anymore when there are more replicas of Flink TaskManagers. But, since it is Kubernetes that manages the creation of new TaskManager replicas I don't know what to configure on this option in Prometheus. I guess it should be something dynamic or with * or with some regex expression that can fetch all Task Managers for me. Does someone have any idea of how to configure it?

-- Knoblauch
apache-flink
kubernetes
prometheus

1 Answer

9/24/2020

I got to put it to work based on this answer https://stackoverflow.com/a/55139221/2096986 and the documentation. The first thing is that I had to use StatefulSet instead of Deployment. With this I can set the Pod IP to be stateful. Something that was not clear is that I had to set the Service to use clusterIP: None instead of type: ClusterIP. So here is my service:

apiVersion: v1
kind: Service
metadata:
  name: flink-taskmanager
  labels:
    app: flink-taskmanager
spec:
  clusterIP: None # type: ClusterIP
  ports:
  - name: prometheus
    port: 9250
  selector:
    app: flink-taskmanager

and here is my StatefulSet:

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: flink-taskmanager
spec:
  replicas: 3
  serviceName: flink-taskmanager
  selector:
    matchLabels:
      app: flink-taskmanager # has to match .spec.template.metadata.labels
  template:
    metadata:
      labels:
        app: flink-taskmanager # has to match .spec.selector.matchLabels
    spec:
      hostname: flink-taskmanager
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties
      - name: tpch-dbgen-data
        persistentVolumeClaim:
          claimName: tpch-dbgen-data-pvc
      - name: tpch-dbgen-datarate
        persistentVolumeClaim:
          claimName: tpch-dbgen-datarate-pvc
      containers:
      - name: taskmanager
        image: felipeogutierrez/explore-flink:1.11.1-scala_2.12
        # imagePullPolicy: Always
        env:
        args: ["taskmanager"]
        ports:
        - containerPort: 6122
          name: rpc
        - containerPort: 6125
          name: query-state
        - containerPort: 9250
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        - name: tpch-dbgen-data
          mountPath: /opt/tpch-dbgen/data
          subPath: data
        - mountPath: /tmp
          name: tpch-dbgen-datarate
          subPath: tmp
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary

and on prometheus config file prometheus.yml I mapped the hosts with the patters StatefulSetName-{0..N-1}.ServiceName.default.svc.cluster.local:

apiVersion: v1
kind: ConfigMap
metadata:
  name: prometheus-config
  labels:
    app: flink
data:
  prometheus.yml: |+
    global:
      scrape_interval: 15s

    scrape_configs:
      - job_name: 'prometheus'
        scrape_interval: 5s
        static_configs:
          - targets: ['localhost:9090']
      - job_name: 'flink'
        scrape_interval: 5s
        static_configs:
          - targets: ['flink-jobmanager:9250', 'flink-jobmanager:9251', 'flink-taskmanager-0.flink-taskmanager.default.svc.cluster.local:9250', 'flink-taskmanager-1.flink-taskmanager.default.svc.cluster.local:9250', 'flink-taskmanager-2.flink-taskmanager.default.svc.cluster.local:9250']
        metrics_path: /
-- Knoblauch
Source: StackOverflow