I have a flink JobManager with only one TaskManager running on top of Kubernetes. For this I use a Service
and a Deployment
for the TaskManager with replicas: 1
.
apiVersion: v1
kind: Service
metadata:
name: flink-taskmanager
spec:
type: ClusterIP
ports:
- name: prometheus
port: 9250
selector:
app: flink
component: taskmanager
the Deployment
:
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
hostname: flink-taskmanager
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: tpch-dbgen-data
persistentVolumeClaim:
claimName: tpch-dbgen-data-pvc
- name: tpch-dbgen-datarate
persistentVolumeClaim:
claimName: tpch-dbgen-datarate-pvc
containers:
- name: taskmanager
image: felipeogutierrez/explore-flink:1.11.1-scala_2.12
# imagePullPolicy: Always
env:
args: ["taskmanager"]
ports:
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query-state
- containerPort: 9250
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
- name: tpch-dbgen-data
mountPath: /opt/tpch-dbgen/data
subPath: data
- mountPath: /tmp
name: tpch-dbgen-datarate
subPath: tmp
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
Then I exchange data from Flink TaskManager to Prometheus and I use one Service
, ConfigMap
, and Deployment
to set Prometheus on top of Kubernetes and make it fetch data from Flink Task Manager.
apiVersion: v1
kind: Service
metadata:
name: prometheus-service
spec:
type: ClusterIP
ports:
- name: promui
protocol: TCP
port: 9090
targetPort: 9090
selector:
app: flink
component: prometheus
the ConfigMap
is where I set the host of Flink Task Manager - targets: ['flink-jobmanager:9250', 'flink-jobmanager:9251', 'flink-taskmanager:9250']
that matches with the Kubernetes object Service
for Flink (flink-taskmanager
):
apiVersion: v1
kind: ConfigMap
metadata:
name: prometheus-config
labels:
app: flink
data:
prometheus.yml: |+
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'prometheus'
scrape_interval: 5s
static_configs:
- targets: ['localhost:9090']
- job_name: 'flink'
scrape_interval: 5s
static_configs:
- targets: ['flink-jobmanager:9250', 'flink-jobmanager:9251', 'flink-taskmanager:9250']
metrics_path: /
the Deployment
:
apiVersion: apps/v1
kind: Deployment
metadata:
name: prometheus-deployment
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: prometheus
template:
metadata:
labels:
app: flink
component: prometheus
spec:
hostname: prometheus
volumes:
- name: prometheus-config-volume
configMap:
name: prometheus-config
items:
- key: prometheus.yml
path: prometheus.yml
containers:
- name: prometheus
image: prom/prometheus
ports:
- containerPort: 9090
volumeMounts:
- name: prometheus-config-volume
mountPath: /etc/prometheus/prometheus.yml
subPath: prometheus.yml
This works well and I can query data of Flink task managers on the Prometheus WEB-UI. But, as soon I change replicas: 1
to replicas: 3
for instance, I cannot query data from the task managers anymore. I guess it is because the configuration - targets: ['flink-jobmanager:9250', 'flink-jobmanager:9251', 'flink-taskmanager:9250']
is not valid anymore when there are more replicas of Flink TaskManagers. But, since it is Kubernetes that manages the creation of new TaskManager replicas I don't know what to configure on this option in Prometheus. I guess it should be something dynamic or with * or with some regex expression that can fetch all Task Managers for me. Does someone have any idea of how to configure it?
I got to put it to work based on this answer https://stackoverflow.com/a/55139221/2096986 and the documentation. The first thing is that I had to use StatefulSet
instead of Deployment
. With this I can set the Pod IP to be stateful. Something that was not clear is that I had to set the Service
to use clusterIP: None
instead of type: ClusterIP
. So here is my service:
apiVersion: v1
kind: Service
metadata:
name: flink-taskmanager
labels:
app: flink-taskmanager
spec:
clusterIP: None # type: ClusterIP
ports:
- name: prometheus
port: 9250
selector:
app: flink-taskmanager
and here is my StatefulSet
:
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: flink-taskmanager
spec:
replicas: 3
serviceName: flink-taskmanager
selector:
matchLabels:
app: flink-taskmanager # has to match .spec.template.metadata.labels
template:
metadata:
labels:
app: flink-taskmanager # has to match .spec.selector.matchLabels
spec:
hostname: flink-taskmanager
volumes:
- name: flink-config-volume
configMap:
name: flink-config
items:
- key: flink-conf.yaml
path: flink-conf.yaml
- key: log4j-console.properties
path: log4j-console.properties
- name: tpch-dbgen-data
persistentVolumeClaim:
claimName: tpch-dbgen-data-pvc
- name: tpch-dbgen-datarate
persistentVolumeClaim:
claimName: tpch-dbgen-datarate-pvc
containers:
- name: taskmanager
image: felipeogutierrez/explore-flink:1.11.1-scala_2.12
# imagePullPolicy: Always
env:
args: ["taskmanager"]
ports:
- containerPort: 6122
name: rpc
- containerPort: 6125
name: query-state
- containerPort: 9250
livenessProbe:
tcpSocket:
port: 6122
initialDelaySeconds: 30
periodSeconds: 60
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf/
- name: tpch-dbgen-data
mountPath: /opt/tpch-dbgen/data
subPath: data
- mountPath: /tmp
name: tpch-dbgen-datarate
subPath: tmp
securityContext:
runAsUser: 9999 # refers to user _flink_ from official flink image, change if necessary
and on prometheus config file prometheus.yml
I mapped the hosts with the patters StatefulSetName-{0..N-1}.ServiceName.default.svc.cluster.local
:
apiVersion: v1
kind: ConfigMap
metadata:
name: prometheus-config
labels:
app: flink
data:
prometheus.yml: |+
global:
scrape_interval: 15s
scrape_configs:
- job_name: 'prometheus'
scrape_interval: 5s
static_configs:
- targets: ['localhost:9090']
- job_name: 'flink'
scrape_interval: 5s
static_configs:
- targets: ['flink-jobmanager:9250', 'flink-jobmanager:9251', 'flink-taskmanager-0.flink-taskmanager.default.svc.cluster.local:9250', 'flink-taskmanager-1.flink-taskmanager.default.svc.cluster.local:9250', 'flink-taskmanager-2.flink-taskmanager.default.svc.cluster.local:9250']
metrics_path: /