Airflow with Kubernetes Executor unable to adopt and remove worker pods

3/23/2021

This might be a hard one to solve, but I'm depending on you! This question is bothering me for days now and I can't seem to figure it out on my own. Our Airflow instance is deployed using the Kubernetes Executor. The Executor starts Worker Pods, which in turn start Pods with our data-transformation logic. The worker and operator pods all run fine, but Airflow has trouble adopting the status.phase:'Completed' pods. It tries and tries, but to no avail.

All the pods are running in the airflow-build Kubernetes namespace.

The Airflow workers are created with this template:

pod_template_file.yaml:
apiVersion: v1
kind: Pod
metadata:
  name: dummy-name
spec:

  containers:
    - args: []
      command: []
      resources: 
        requests:
          memory: "100Mi"
          cpu: "100m"
        limits:
          memory: "2Gi"
          cpu: "1"

      imagePullPolicy: IfNotPresent
      name: base
      image: dummy_image 

      env:
        - name: AIRFLOW__CORE__SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              name: connection-secret
              key: connection
              
        - name: AIRFLOW__CORE__FERNET_KEY
          valueFrom:
            secretKeyRef:
              name: fernet-secret
              key: fernet

     
      volumeMounts:
        - name: airflow-logs
          mountPath: "/opt/airflow/logs"
          subPath: airflow/logs
        - name: airflow-dags
          mountPath: /opt/airflow/dags
          readOnly: true
          subPath: airflow/dags
        - name: airflow-config
          mountPath: "/opt/airflow/airflow.cfg"
          subPath: airflow.cfg
          readOnly: true
        - name: airflow-config
          mountPath: /opt/airflow/pod_template_file.yaml
          subPath: pod_template_file.yaml
          readOnly: true

  restartPolicy: Never

  serviceAccountName: scheduler-serviceaccount

  volumes:
    - name: airflow-dags
      persistentVolumeClaim:
        claimName: airflow-dags-claim
    - name: airflow-logs
      persistentVolumeClaim:
        claimName: airflow-logs-claim
    - name: airflow-config
      configMap:
        name: base-config

My simple DAG looks like this:

from airflow import DAG
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow import configuration as conf
from datetime import datetime, timedelta
from kubernetes.client import CoreV1Api, models as k8s


# Default settings applied to all tasks
default_args = {
    'owner''airflow',
    'depends_on_past'False,
    'email_on_failure'False,
    'email_on_retry'False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}


dag = DAG('test-mike',
         start_date=datetime(2019, 1, 1),
         max_active_runs=1,
         default_args=default_args,
         catchup=False 
         ) 

namespace = conf.get('kubernetes''NAMESPACE')

op=KubernetesPodOperator(
        task_id='mike-test-task',
        namespace=namespace,
        service_account_name='scheduler-serviceaccount',
        image=f'acrdatahubaragdev.azurecr.io/data-test-mike2:latest',
        name='mike-test-name',
        is_delete_operator_pod=True,
        startup_timeout_seconds=300,
        in_cluster=True,
        get_logs=True,
        dag=dag)

Now, everything runs fine. When the Operator Pod is done, it is cleaned up quickly. However, the worker Pod remains on the "Completed" status, like this:

Name:         testmikemiketesttask-f312cd42164b4907af4214e8ee7af8b3
Namespace:    airflow-build
Priority:     0
Node:         aks-default-23404167-vmss000000/10.161.132.13
Start Time:   Tue, 23 Mar 2021 08:12:57 +0100
Labels:       airflow-worker=201
              airflow_version=2.0.0
              dag_id=test-mike
              execution_date=2021-03-23T07_12_54.887218_plus_00_00
              kubernetes_executor=True
              task_id=mike-test-task
              try_number=1
Annotations:  dag_id: test-mike
              execution_date: 2021-03-23T07:12:54.887218+00:00
              task_id: mike-test-task
              try_number: 1
Status:       Succeeded
IP:           10.161.132.18
IPs:
  IP:  10.161.132.18
Containers:
  base:
    Container ID:  docker://abe9e33c356de398af865736e0054b0eaaa6f3b99c44a6d021b4ca3a981161ce
    Image:         apache/airflow:2.0.0-python3.8
    Image ID:      docker-pullable://apache/airflow@sha256:76bd7cd6d47ffea98df98f5744680a860663bc26836fd3d67d529b06caaf97a7
    Port:          <none>
    Host Port:     <none>
    Args:
      airflow
      tasks
      run
      test-mike
      mike-test-task
      2021-03-23T07:12:54.887218+00:00
      --local
      --pool
      default_pool
      --subdir
      /opt/airflow/dags/rev-5428eb02735b885217bf43fc900c95fb0312c536/test-dag-mike.py
    State:          Terminated
      Reason:       Completed
      Exit Code:    0
      Started:      Tue, 23 Mar 2021 08:12:59 +0100
      Finished:     Tue, 23 Mar 2021 08:13:39 +0100
    Ready:          False
    Restart Count:  0
    Limits:
      cpu:     1
      memory:  2Gi
    Requests:
      cpu:     100m
      memory:  100Mi
    Environment:
      AIRFLOW__CORE__SQL_ALCHEMY_CONN:  <set to the key 'connection' in secret 'connection-secret'>  Optional: false
      AIRFLOW__CORE__FERNET_KEY:        <set to the key 'fernet' in secret 'fernet-secret'>          Optional: false
      AIRFLOW_IS_K8S_EXECUTOR_POD:      True
    Mounts:
      /opt/airflow/airflow.cfg from airflow-config (ro,path="airflow.cfg")
      /opt/airflow/dags from airflow-dags (ro,path="airflow/dags")
      /opt/airflow/logs from airflow-logs (rw,path="airflow/logs")
      /opt/airflow/pod_template_file.yaml from airflow-config (ro,path="pod_template_file.yaml")
      /var/run/secrets/kubernetes.io/serviceaccount from scheduler-serviceaccount-token-wt57g (ro)
Conditions:
  Type              Status
  Initialized       True 
  Ready             False 
  ContainersReady   False 
  PodScheduled      True 
Volumes:
  airflow-dags:
    Type:       PersistentVolumeClaim (a reference to a PersistentVolumeClaim in the same namespace)
    ClaimName:  airflow-dags-claim
    ReadOnly:   false
  airflow-logs:
    Type:       PersistentVolumeClaim (a reference to a PersistentVolumeClaim in the same namespace)
    ClaimName:  airflow-logs-claim
    ReadOnly:   false
  airflow-config:
    Type:      ConfigMap (a volume populated by a ConfigMap)
    Name:      base-config
    Optional:  false
  scheduler-serviceaccount-token-wt57g:
    Type:        Secret (a volume populated by a Secret)
    SecretName:  scheduler-serviceaccount-token-wt57g
    Optional:    false
QoS Class:       Burstable
Node-Selectors:  <none>
Tolerations:     node.kubernetes.io/memory-pressure:NoSchedule
                 node.kubernetes.io/not-ready:NoExecute for 300s
                 node.kubernetes.io/unreachable:NoExecute for 300s
Events:
  Type    Reason     Age        From                                      Message
  ----    ------     ----       ----                                      -------
  Normal  Scheduled  <unknown>                                            Successfully assigned airflow-build/testmikemiketesttask-f312cd42164b4907af4214e8ee7af8b3 to aks-default-23404167-vmss000000
  Normal  Pulled     75s        kubelet, aks-default-23404167-vmss000000  Container image "apache/airflow:2.0.0-python3.8" already present on machine
  Normal  Created    74s        kubelet, aks-default-23404167-vmss000000  Created container base
  Normal  Started    74s        kubelet, aks-default-23404167-vmss000000  Started container base

The scheduler tries to adopt the completed Worker pod. But this pod is not cleaned and the scheduler keeps trying:

[2021-03-23 07:14:46,270] {scheduler_job.py:1751} INFO - Resetting orphaned tasks for active dag runs
[2021-03-23 07:14:46,377] {rest.py:228} DEBUG - response body: 
{
    "kind": "PodList",
    "apiVersion": "v1",
    "metadata": {
        "selfLink": "/api/v1/namespaces/airflow-build/pods",
        "resourceVersion": "2190163"
    },
    "items": [
        {
            "metadata": {
                "name": "testmikemiketesttask-f312cd42164b4907af4214e8ee7af8b3",
                "namespace": "airflow-build",
                "selfLink": "/api/v1/namespaces/airflow-build/pods/testmikemiketesttask-f312cd42164b4907af4214e8ee7af8b3",
                "uid": "24b7341b-99a3-4dae-9266-40e79ac7cd70",
                "resourceVersion": "2190012",
                "creationTimestamp": "2021-03-23T07:12:57Z",
                "labels": {
                    "airflow-worker": "201",
                    "airflow_version": "2.0.0",
                    "dag_id": "test-mike",
                    "execution_date": "2021-03-23T07_12_54.887218_plus_00_00",
                    "kubernetes_executor": "True",
                    "task_id": "mike-test-task",
                    "try_number": "1"
                },
                "annotations": {
                    "dag_id": "test-mike",
                    "execution_date": "2021-03-23T07:12:54.887218+00:00",
                    "task_id": "mike-test-task",
                    "try_number": "1"
                },
                "managedFields": [
                    {
                        "manager": "kubelet",
                        "operation": "Update",
                        "apiVersion": "v1",
                        "time": "2021-03-23T07:13:40Z",
                        "fieldsType": "FieldsV1",
                        "fieldsV1": {
                            "f:status": {
                                "f:conditions": {
                                    "k:{\"type\":\"ContainersReady\"}": {
                                        ".": {},
                                        "f:lastProbeTime": {},
                                        "f:lastTransitionTime": {},
                                        "f:reason": {},
                                        "f:status": {},
                                        "f:type": {}
                                    },
                                    "k:{\"type\":\"Initialized\"}": {
                                        ".": {},
                                        "f:lastProbeTime": {},
                                        "f:lastTransitionTime": {},
                                        "f:reason": {},
                                        "f:status": {},
                                        "f:type": {}
                                    },
                                    "k:{\"type\":\"Ready\"}": {
                                        ".": {},
                                        "f:lastProbeTime": {},
                                        "f:lastTransitionTime": {},
                                        "f:reason": {},
                                        "f:status": {},
                                        "f:type": {}
                                    }
                                },
                                "f:hostIP": {},
                                "f:phase": {},
                                "f:podIP": {},
                                "f:podIPs": {
                                    ".": {},
                                    "k:{\"ip\":\"10.161.132.18\"}": {
                                        ".": {},
                                        "f:ip": {}
                                    }
                                },
                                "f:startTime": {}
                            }
                        }
                    },
                    {
                        "manager": "OpenAPI-Generator",
                        "operation": "Update",
                        "apiVersion": "v1",
                        "time": "2021-03-23T07:13:42Z",
                        "fieldsType": "FieldsV1",
                        "fieldsV1": {
                            "f:metadata": {
                                "f:annotations": {
                                    ".": {},
                                    "f:dag_id": {},
                                    "f:execution_date": {},
                                    "f:task_id": {},
                                    "f:try_number": {}
                                },
                                "f:labels": {
                                    ".": {},
                                    "f:airflow-worker": {},
                                    "f:airflow_version": {},
                                    "f:dag_id": {},
                                    "f:execution_date": {},
                                    "f:kubernetes_executor": {},
                                    "f:task_id": {},
                                    "f:try_number": {}
                                }
                            },
                          <.....>
            "status": {
                "phase": "Succeeded",
                "conditions": [
                    {
                        "type": "Initialized",
                        "status": "True",
                        "lastProbeTime": null,
                        "lastTransitionTime": "2021-03-23T07:12:57Z",
                        "reason": "PodCompleted"
                    },
                    {
                        "type": "Ready",
                        "status": "False",
                        "lastProbeTime": null,
                        "lastTransitionTime": "2021-03-23T07:13:40Z",
                        "reason": "PodCompleted"
                    },
                    {
                        "type": "ContainersReady",
                        "status": "False",
                        "lastProbeTime": null,
                        "lastTransitionTime": "2021-03-23T07:13:40Z",
                        "reason": "PodCompleted"
                    },
                    {
                        "type": "PodScheduled",
                        "status": "True",
                        "lastProbeTime": null,
                        "lastTransitionTime": "2021-03-23T07:12:57Z"
                    }
                ],
                "hostIP": "10.161.132.13",
                "podIP": "10.161.132.18",
                "podIPs": [
                    {
                        "ip": "10.161.132.18"
                    }
                ],
                "startTime": "2021-03-23T07:12:57Z",
                "containerStatuses": [
                    {
                        "name": "base",
                        "state": {
                            "terminated": {
                                "exitCode": 0,
                                "reason": "Completed",
                                "startedAt": "2021-03-23T07:12:59Z",
                                "finishedAt": "2021-03-23T07:13:39Z",
                                "containerID": "docker://abe9e33c356de398af865736e0054b0eaaa6f3b99c44a6d021b4ca3a981161ce"
                            }
                        },
                        "lastState": {},
                        "ready": false,
                        "restartCount": 0,
                        "image": "apache/airflow:2.0.0-python3.8",
                        "imageID": "docker-pullable://apache/airflow@sha256:76bd7cd6d47ffea98df98f5744680a860663bc26836fd3d67d529b06caaf97a7",
                        "containerID": "docker://abe9e33c356de398af865736e0054b0eaaa6f3b99c44a6d021b4ca3a981161ce",
                        "started": false
                    }
                ],
                "qosClass": "Burstable"
            }
        }
    ]
}

[2021-03-23 07:14:46,382] {kubernetes_executor.py:661} INFO - Attempting to adopt pod testmikemiketesttask-f312cd42164b4907af4214e8ee7af8b3

[2021-03-23 07:14:46,463] {rest.py:228} DEBUG - response body: 
{
    "kind": "Pod",
    "apiVersion": "v1",
    "metadata": {
        "name": "testmikemiketesttask-f312cd42164b4907af4214e8ee7af8b3",
        "namespace": "airflow-build",
        "selfLink": "/api/v1/namespaces/airflow-build/pods/testmikemiketesttask-f312cd42164b4907af4214e8ee7af8b3",
        "uid": "24b7341b-99a3-4dae-9266-40e79ac7cd70",
        "resourceVersion": "2190012",
        "creationTimestamp": "2021-03-23T07:12:57Z",
        "labels": {
            "airflow-worker": "201",
            "airflow_version": "2.0.0",
            "dag_id": "test-mike",
            "execution_date": "2021-03-23T07_12_54.887218_plus_00_00",
            "kubernetes_executor": "True",
            "task_id": "mike-test-task",
            "try_number": "1"
        },
        "annotations": {
            "dag_id": "test-mike",
            "execution_date": "2021-03-23T07:12:54.887218+00:00",
            "task_id": "mike-test-task",
            "try_number": "1"
        },
        "managedFields": [
            {
                "manager": "kubelet",
                "operation": "Update",
                "apiVersion": "v1",
                "time": "2021-03-23T07:13:40Z",
                "fieldsType": "FieldsV1",
                "fieldsV1": {
                    "f:status": {
                        "f:conditions": {
                            "k:{\"type\":\"ContainersReady\"}": {
                                ".": {},
                                "f:lastProbeTime": {},
                                "f:lastTransitionTime": {},
                                "f:reason": {},
                                "f:status": {},
                                "f:type": {}
                            },
                            "k:{\"type\":\"Initialized\"}": {
                                ".": {},
                                "f:lastProbeTime": {},
                                "f:lastTransitionTime": {},
                                "f:reason": {},
                                "f:status": {},
                                "f:type": {}
                            },
                            "k:{\"type\":\"Ready\"}": {
                                ".": {},
                                "f:lastProbeTime": {},
                                "f:lastTransitionTime": {},
                                "f:reason": {},
                                "f:status": {},
                                "f:type": {}
                            }
                        },
                        "f:hostIP": {},
                        "f:phase": {},
                        "f:podIP": {},
                        "f:podIPs": {
                            ".": {},
                            "k:{\"ip\":\"10.161.132.18\"}": {
                                ".": {},
                                "f:ip": {}
                            }
                        },
                        "f:startTime": {}
                    }
                }
            },
            {
                "manager": "OpenAPI-Generator",
                "operation": "Update",
                "apiVersion": "v1",
                "time": "2021-03-23T07:13:42Z",
                "fieldsType": "FieldsV1",
                "fieldsV1": {
                    "f:metadata": {
                        "f:annotations": {
                            ".": {},
                            "f:dag_id": {},
                            "f:execution_date": {},
                            "f:task_id": {},
                            "f:try_number": {}
                        },
                        "f:labels": {
                            ".": {},
                            "f:airflow-worker": {},
                            "f:airflow_version": {},
                            "f:dag_id": {},
                            "f:execution_date": {},
                            "f:kubernetes_executor": {},
                            "f:task_id": {},
                            "f:try_number": {}
                        }
                    },
                    "f:spec": {
                       <....>
        "tolerations": [
            {
                "key": "node.kubernetes.io/not-ready",
                "operator": "Exists",
                "effect": "NoExecute",
                "tolerationSeconds": 300
            },
            {
                "key": "node.kubernetes.io/unreachable",
                "operator": "Exists",
                "effect": "NoExecute",
                "tolerationSeconds": 300
            },
            {
                "key": "node.kubernetes.io/memory-pressure",
                "operator": "Exists",
                "effect": "NoSchedule"
            }
        ],
        "priority": 0,
        "enableServiceLinks": true,
        "preemptionPolicy": "PreemptLowerPriority"
    },
    "status": {
        "phase": "Succeeded",
        "conditions": [
            {
                "type": "Initialized",
                "status": "True",
                "lastProbeTime": null,
                "lastTransitionTime": "2021-03-23T07:12:57Z",
                "reason": "PodCompleted"
            },
            {
                "type": "Ready",
                "status": "False",
                "lastProbeTime": null,
                "lastTransitionTime": "2021-03-23T07:13:40Z",
                "reason": "PodCompleted"
            },
            {
                "type": "ContainersReady",
                "status": "False",
                "lastProbeTime": null,
                "lastTransitionTime": "2021-03-23T07:13:40Z",
                "reason": "PodCompleted"
            },
            {
                "type": "PodScheduled",
                "status": "True",
                "lastProbeTime": null,
                "lastTransitionTime": "2021-03-23T07:12:57Z"
            }
        ],
        "hostIP": "10.161.132.13",
        "podIP": "10.161.132.18",
        "podIPs": [
            {
                "ip": "10.161.132.18"
            }
        ],
        "startTime": "2021-03-23T07:12:57Z",
        "containerStatuses": [
            {
                "name": "base",
                "state": {
                    "terminated": {
                        "exitCode": 0,
                        "reason": "Completed",
                        "startedAt": "2021-03-23T07:12:59Z",
                        "finishedAt": "2021-03-23T07:13:39Z",
                        "containerID": "docker://abe9e33c356de398af865736e0054b0eaaa6f3b99c44a6d021b4ca3a981161ce"
                    }
                },
                "lastState": {},
                "ready": false,
                "restartCount": 0,
                "image": "apache/airflow:2.0.0-python3.8",
                "imageID": "docker-pullable://apache/airflow@sha256:76bd7cd6d47ffea98df98f5744680a860663bc26836fd3d67d529b06caaf97a7",
                "containerID": "docker://abe9e33c356de398af865736e0054b0eaaa6f3b99c44a6d021b4ca3a981161ce",
                "started": false
            }
        ],
        "qosClass": "Burstable"
    }
}

Thanks for helping me out!

-- Mike Pieters
airflow
airflow-scheduler
kubernetes

1 Answer

4/21/2021

Did you try to play with these values?

AIRFLOWKUBERNETESDELETE_WORKER_PODS: "True" AIRFLOWKUBERNETESDELETE_WORKER_PODS_ON_FAILURE: "False"

-- KIRY4
Source: StackOverflow