This might be a hard one to solve, but I'm depending on you! This question is bothering me for days now and I can't seem to figure it out on my own. Our Airflow instance is deployed using the Kubernetes Executor. The Executor starts Worker Pods, which in turn start Pods with our data-transformation logic. The worker and operator pods all run fine, but Airflow has trouble adopting the status.phase:'Completed' pods. It tries and tries, but to no avail.
All the pods are running in the airflow-build Kubernetes namespace.
The Airflow workers are created with this template:
pod_template_file.yaml:
apiVersion: v1
kind: Pod
metadata:
name: dummy-name
spec:
containers:
- args: []
command: []
resources:
requests:
memory: "100Mi"
cpu: "100m"
limits:
memory: "2Gi"
cpu: "1"
imagePullPolicy: IfNotPresent
name: base
image: dummy_image
env:
- name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: connection-secret
key: connection
- name: AIRFLOW__CORE__FERNET_KEY
valueFrom:
secretKeyRef:
name: fernet-secret
key: fernet
volumeMounts:
- name: airflow-logs
mountPath: "/opt/airflow/logs"
subPath: airflow/logs
- name: airflow-dags
mountPath: /opt/airflow/dags
readOnly: true
subPath: airflow/dags
- name: airflow-config
mountPath: "/opt/airflow/airflow.cfg"
subPath: airflow.cfg
readOnly: true
- name: airflow-config
mountPath: /opt/airflow/pod_template_file.yaml
subPath: pod_template_file.yaml
readOnly: true
restartPolicy: Never
serviceAccountName: scheduler-serviceaccount
volumes:
- name: airflow-dags
persistentVolumeClaim:
claimName: airflow-dags-claim
- name: airflow-logs
persistentVolumeClaim:
claimName: airflow-logs-claim
- name: airflow-config
configMap:
name: base-config
My simple DAG looks like this:
from airflow import DAG
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow import configuration as conf
from datetime import datetime, timedelta
from kubernetes.client import CoreV1Api, models as k8s
# Default settings applied to all tasks
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG('test-mike',
start_date=datetime(2019, 1, 1),
max_active_runs=1,
default_args=default_args,
catchup=False
)
namespace = conf.get('kubernetes', 'NAMESPACE')
op=KubernetesPodOperator(
task_id='mike-test-task',
namespace=namespace,
service_account_name='scheduler-serviceaccount',
image=f'acrdatahubaragdev.azurecr.io/data-test-mike2:latest',
name='mike-test-name',
is_delete_operator_pod=True,
startup_timeout_seconds=300,
in_cluster=True,
get_logs=True,
dag=dag)
Now, everything runs fine. When the Operator Pod is done, it is cleaned up quickly. However, the worker Pod remains on the "Completed" status, like this:
Name: testmikemiketesttask-f312cd42164b4907af4214e8ee7af8b3
Namespace: airflow-build
Priority: 0
Node: aks-default-23404167-vmss000000/10.161.132.13
Start Time: Tue, 23 Mar 2021 08:12:57 +0100
Labels: airflow-worker=201
airflow_version=2.0.0
dag_id=test-mike
execution_date=2021-03-23T07_12_54.887218_plus_00_00
kubernetes_executor=True
task_id=mike-test-task
try_number=1
Annotations: dag_id: test-mike
execution_date: 2021-03-23T07:12:54.887218+00:00
task_id: mike-test-task
try_number: 1
Status: Succeeded
IP: 10.161.132.18
IPs:
IP: 10.161.132.18
Containers:
base:
Container ID: docker://abe9e33c356de398af865736e0054b0eaaa6f3b99c44a6d021b4ca3a981161ce
Image: apache/airflow:2.0.0-python3.8
Image ID: docker-pullable://apache/airflow@sha256:76bd7cd6d47ffea98df98f5744680a860663bc26836fd3d67d529b06caaf97a7
Port: <none>
Host Port: <none>
Args:
airflow
tasks
run
test-mike
mike-test-task
2021-03-23T07:12:54.887218+00:00
--local
--pool
default_pool
--subdir
/opt/airflow/dags/rev-5428eb02735b885217bf43fc900c95fb0312c536/test-dag-mike.py
State: Terminated
Reason: Completed
Exit Code: 0
Started: Tue, 23 Mar 2021 08:12:59 +0100
Finished: Tue, 23 Mar 2021 08:13:39 +0100
Ready: False
Restart Count: 0
Limits:
cpu: 1
memory: 2Gi
Requests:
cpu: 100m
memory: 100Mi
Environment:
AIRFLOW__CORE__SQL_ALCHEMY_CONN: <set to the key 'connection' in secret 'connection-secret'> Optional: false
AIRFLOW__CORE__FERNET_KEY: <set to the key 'fernet' in secret 'fernet-secret'> Optional: false
AIRFLOW_IS_K8S_EXECUTOR_POD: True
Mounts:
/opt/airflow/airflow.cfg from airflow-config (ro,path="airflow.cfg")
/opt/airflow/dags from airflow-dags (ro,path="airflow/dags")
/opt/airflow/logs from airflow-logs (rw,path="airflow/logs")
/opt/airflow/pod_template_file.yaml from airflow-config (ro,path="pod_template_file.yaml")
/var/run/secrets/kubernetes.io/serviceaccount from scheduler-serviceaccount-token-wt57g (ro)
Conditions:
Type Status
Initialized True
Ready False
ContainersReady False
PodScheduled True
Volumes:
airflow-dags:
Type: PersistentVolumeClaim (a reference to a PersistentVolumeClaim in the same namespace)
ClaimName: airflow-dags-claim
ReadOnly: false
airflow-logs:
Type: PersistentVolumeClaim (a reference to a PersistentVolumeClaim in the same namespace)
ClaimName: airflow-logs-claim
ReadOnly: false
airflow-config:
Type: ConfigMap (a volume populated by a ConfigMap)
Name: base-config
Optional: false
scheduler-serviceaccount-token-wt57g:
Type: Secret (a volume populated by a Secret)
SecretName: scheduler-serviceaccount-token-wt57g
Optional: false
QoS Class: Burstable
Node-Selectors: <none>
Tolerations: node.kubernetes.io/memory-pressure:NoSchedule
node.kubernetes.io/not-ready:NoExecute for 300s
node.kubernetes.io/unreachable:NoExecute for 300s
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal Scheduled <unknown> Successfully assigned airflow-build/testmikemiketesttask-f312cd42164b4907af4214e8ee7af8b3 to aks-default-23404167-vmss000000
Normal Pulled 75s kubelet, aks-default-23404167-vmss000000 Container image "apache/airflow:2.0.0-python3.8" already present on machine
Normal Created 74s kubelet, aks-default-23404167-vmss000000 Created container base
Normal Started 74s kubelet, aks-default-23404167-vmss000000 Started container base
The scheduler tries to adopt the completed Worker pod. But this pod is not cleaned and the scheduler keeps trying:
[2021-03-23 07:14:46,270] {scheduler_job.py:1751} INFO - Resetting orphaned tasks for active dag runs
[2021-03-23 07:14:46,377] {rest.py:228} DEBUG - response body:
{
"kind": "PodList",
"apiVersion": "v1",
"metadata": {
"selfLink": "/api/v1/namespaces/airflow-build/pods",
"resourceVersion": "2190163"
},
"items": [
{
"metadata": {
"name": "testmikemiketesttask-f312cd42164b4907af4214e8ee7af8b3",
"namespace": "airflow-build",
"selfLink": "/api/v1/namespaces/airflow-build/pods/testmikemiketesttask-f312cd42164b4907af4214e8ee7af8b3",
"uid": "24b7341b-99a3-4dae-9266-40e79ac7cd70",
"resourceVersion": "2190012",
"creationTimestamp": "2021-03-23T07:12:57Z",
"labels": {
"airflow-worker": "201",
"airflow_version": "2.0.0",
"dag_id": "test-mike",
"execution_date": "2021-03-23T07_12_54.887218_plus_00_00",
"kubernetes_executor": "True",
"task_id": "mike-test-task",
"try_number": "1"
},
"annotations": {
"dag_id": "test-mike",
"execution_date": "2021-03-23T07:12:54.887218+00:00",
"task_id": "mike-test-task",
"try_number": "1"
},
"managedFields": [
{
"manager": "kubelet",
"operation": "Update",
"apiVersion": "v1",
"time": "2021-03-23T07:13:40Z",
"fieldsType": "FieldsV1",
"fieldsV1": {
"f:status": {
"f:conditions": {
"k:{\"type\":\"ContainersReady\"}": {
".": {},
"f:lastProbeTime": {},
"f:lastTransitionTime": {},
"f:reason": {},
"f:status": {},
"f:type": {}
},
"k:{\"type\":\"Initialized\"}": {
".": {},
"f:lastProbeTime": {},
"f:lastTransitionTime": {},
"f:reason": {},
"f:status": {},
"f:type": {}
},
"k:{\"type\":\"Ready\"}": {
".": {},
"f:lastProbeTime": {},
"f:lastTransitionTime": {},
"f:reason": {},
"f:status": {},
"f:type": {}
}
},
"f:hostIP": {},
"f:phase": {},
"f:podIP": {},
"f:podIPs": {
".": {},
"k:{\"ip\":\"10.161.132.18\"}": {
".": {},
"f:ip": {}
}
},
"f:startTime": {}
}
}
},
{
"manager": "OpenAPI-Generator",
"operation": "Update",
"apiVersion": "v1",
"time": "2021-03-23T07:13:42Z",
"fieldsType": "FieldsV1",
"fieldsV1": {
"f:metadata": {
"f:annotations": {
".": {},
"f:dag_id": {},
"f:execution_date": {},
"f:task_id": {},
"f:try_number": {}
},
"f:labels": {
".": {},
"f:airflow-worker": {},
"f:airflow_version": {},
"f:dag_id": {},
"f:execution_date": {},
"f:kubernetes_executor": {},
"f:task_id": {},
"f:try_number": {}
}
},
<.....>
"status": {
"phase": "Succeeded",
"conditions": [
{
"type": "Initialized",
"status": "True",
"lastProbeTime": null,
"lastTransitionTime": "2021-03-23T07:12:57Z",
"reason": "PodCompleted"
},
{
"type": "Ready",
"status": "False",
"lastProbeTime": null,
"lastTransitionTime": "2021-03-23T07:13:40Z",
"reason": "PodCompleted"
},
{
"type": "ContainersReady",
"status": "False",
"lastProbeTime": null,
"lastTransitionTime": "2021-03-23T07:13:40Z",
"reason": "PodCompleted"
},
{
"type": "PodScheduled",
"status": "True",
"lastProbeTime": null,
"lastTransitionTime": "2021-03-23T07:12:57Z"
}
],
"hostIP": "10.161.132.13",
"podIP": "10.161.132.18",
"podIPs": [
{
"ip": "10.161.132.18"
}
],
"startTime": "2021-03-23T07:12:57Z",
"containerStatuses": [
{
"name": "base",
"state": {
"terminated": {
"exitCode": 0,
"reason": "Completed",
"startedAt": "2021-03-23T07:12:59Z",
"finishedAt": "2021-03-23T07:13:39Z",
"containerID": "docker://abe9e33c356de398af865736e0054b0eaaa6f3b99c44a6d021b4ca3a981161ce"
}
},
"lastState": {},
"ready": false,
"restartCount": 0,
"image": "apache/airflow:2.0.0-python3.8",
"imageID": "docker-pullable://apache/airflow@sha256:76bd7cd6d47ffea98df98f5744680a860663bc26836fd3d67d529b06caaf97a7",
"containerID": "docker://abe9e33c356de398af865736e0054b0eaaa6f3b99c44a6d021b4ca3a981161ce",
"started": false
}
],
"qosClass": "Burstable"
}
}
]
}
[2021-03-23 07:14:46,382] {kubernetes_executor.py:661} INFO - Attempting to adopt pod testmikemiketesttask-f312cd42164b4907af4214e8ee7af8b3
[2021-03-23 07:14:46,463] {rest.py:228} DEBUG - response body:
{
"kind": "Pod",
"apiVersion": "v1",
"metadata": {
"name": "testmikemiketesttask-f312cd42164b4907af4214e8ee7af8b3",
"namespace": "airflow-build",
"selfLink": "/api/v1/namespaces/airflow-build/pods/testmikemiketesttask-f312cd42164b4907af4214e8ee7af8b3",
"uid": "24b7341b-99a3-4dae-9266-40e79ac7cd70",
"resourceVersion": "2190012",
"creationTimestamp": "2021-03-23T07:12:57Z",
"labels": {
"airflow-worker": "201",
"airflow_version": "2.0.0",
"dag_id": "test-mike",
"execution_date": "2021-03-23T07_12_54.887218_plus_00_00",
"kubernetes_executor": "True",
"task_id": "mike-test-task",
"try_number": "1"
},
"annotations": {
"dag_id": "test-mike",
"execution_date": "2021-03-23T07:12:54.887218+00:00",
"task_id": "mike-test-task",
"try_number": "1"
},
"managedFields": [
{
"manager": "kubelet",
"operation": "Update",
"apiVersion": "v1",
"time": "2021-03-23T07:13:40Z",
"fieldsType": "FieldsV1",
"fieldsV1": {
"f:status": {
"f:conditions": {
"k:{\"type\":\"ContainersReady\"}": {
".": {},
"f:lastProbeTime": {},
"f:lastTransitionTime": {},
"f:reason": {},
"f:status": {},
"f:type": {}
},
"k:{\"type\":\"Initialized\"}": {
".": {},
"f:lastProbeTime": {},
"f:lastTransitionTime": {},
"f:reason": {},
"f:status": {},
"f:type": {}
},
"k:{\"type\":\"Ready\"}": {
".": {},
"f:lastProbeTime": {},
"f:lastTransitionTime": {},
"f:reason": {},
"f:status": {},
"f:type": {}
}
},
"f:hostIP": {},
"f:phase": {},
"f:podIP": {},
"f:podIPs": {
".": {},
"k:{\"ip\":\"10.161.132.18\"}": {
".": {},
"f:ip": {}
}
},
"f:startTime": {}
}
}
},
{
"manager": "OpenAPI-Generator",
"operation": "Update",
"apiVersion": "v1",
"time": "2021-03-23T07:13:42Z",
"fieldsType": "FieldsV1",
"fieldsV1": {
"f:metadata": {
"f:annotations": {
".": {},
"f:dag_id": {},
"f:execution_date": {},
"f:task_id": {},
"f:try_number": {}
},
"f:labels": {
".": {},
"f:airflow-worker": {},
"f:airflow_version": {},
"f:dag_id": {},
"f:execution_date": {},
"f:kubernetes_executor": {},
"f:task_id": {},
"f:try_number": {}
}
},
"f:spec": {
<....>
"tolerations": [
{
"key": "node.kubernetes.io/not-ready",
"operator": "Exists",
"effect": "NoExecute",
"tolerationSeconds": 300
},
{
"key": "node.kubernetes.io/unreachable",
"operator": "Exists",
"effect": "NoExecute",
"tolerationSeconds": 300
},
{
"key": "node.kubernetes.io/memory-pressure",
"operator": "Exists",
"effect": "NoSchedule"
}
],
"priority": 0,
"enableServiceLinks": true,
"preemptionPolicy": "PreemptLowerPriority"
},
"status": {
"phase": "Succeeded",
"conditions": [
{
"type": "Initialized",
"status": "True",
"lastProbeTime": null,
"lastTransitionTime": "2021-03-23T07:12:57Z",
"reason": "PodCompleted"
},
{
"type": "Ready",
"status": "False",
"lastProbeTime": null,
"lastTransitionTime": "2021-03-23T07:13:40Z",
"reason": "PodCompleted"
},
{
"type": "ContainersReady",
"status": "False",
"lastProbeTime": null,
"lastTransitionTime": "2021-03-23T07:13:40Z",
"reason": "PodCompleted"
},
{
"type": "PodScheduled",
"status": "True",
"lastProbeTime": null,
"lastTransitionTime": "2021-03-23T07:12:57Z"
}
],
"hostIP": "10.161.132.13",
"podIP": "10.161.132.18",
"podIPs": [
{
"ip": "10.161.132.18"
}
],
"startTime": "2021-03-23T07:12:57Z",
"containerStatuses": [
{
"name": "base",
"state": {
"terminated": {
"exitCode": 0,
"reason": "Completed",
"startedAt": "2021-03-23T07:12:59Z",
"finishedAt": "2021-03-23T07:13:39Z",
"containerID": "docker://abe9e33c356de398af865736e0054b0eaaa6f3b99c44a6d021b4ca3a981161ce"
}
},
"lastState": {},
"ready": false,
"restartCount": 0,
"image": "apache/airflow:2.0.0-python3.8",
"imageID": "docker-pullable://apache/airflow@sha256:76bd7cd6d47ffea98df98f5744680a860663bc26836fd3d67d529b06caaf97a7",
"containerID": "docker://abe9e33c356de398af865736e0054b0eaaa6f3b99c44a6d021b4ca3a981161ce",
"started": false
}
],
"qosClass": "Burstable"
}
}
Thanks for helping me out!
Did you try to play with these values?
AIRFLOWKUBERNETESDELETE_WORKER_PODS: "True" AIRFLOWKUBERNETESDELETE_WORKER_PODS_ON_FAILURE: "False"