Airflow on kubernetes worker pod completed but Web-Ui can't get the status

2/23/2022

When i set my airflow on kubernetes infra i got some problem. I refered this blog. and some setting was changed for my situation. and I think everything work out but I run dag manually or scheduled. worker pod work nicely ( I think ) but web-ui always didn't change the status just running and queued... I want to know what is wrong...

here is my setting value.

Version info

AWS EKS ( kubernetes version: 1.21 )
Airflow ( 2.2.3 )
Python 3.8

container.yaml

#  Licensed to the Apache Software Foundation (ASF) under one   *
#  or more contributor license agreements.  See the NOTICE file *
#  distributed with this work for additional information        *
#  regarding copyright ownership.  The ASF licenses this file   *
#  to you under the Apache License, Version 2.0 (the            *
#  "License"); you may not use this file except in compliance   *
#  with the License.  You may obtain a copy of the License at   *
#                                                               *
#    http://www.apache.org/licenses/LICENSE-2.0                 *
#                                                               *
#  Unless required by applicable law or agreed to in writing,   *
#  software distributed under the License is distributed on an  *
#  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
#  KIND, either express or implied.  See the License for the    *
#  specific language governing permissions and limitations      *
#  under the License.                                           *

# Note: The airflow image used in this example is obtained by   *
# building the image from the local docker subdirectory.        *
---
apiVersion: v1
kind: ServiceAccount
metadata:
  name: airflow
  namespace: airflow
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  namespace: airflow
  name: airflow
rules:
  - apiGroups: [""] # "" indicates the core API group
    resources: ["pods"]
    verbs: ["get", "list", "watch", "create", "update", "delete"]
  - apiGroups: [ "" ]
    resources: [ "pods/log" ]
    verbs: [ "get", "list" ]
  - apiGroups: [ "" ]
    resources: [ "pods/exec" ]
    verbs: [ "create", "get" ]
  - apiGroups: ["batch", "extensions"]
    resources: ["jobs"]
    verbs: ["get", "list", "watch", "create", "update", "patch", "delete"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: airflow
  namespace: airflow
subjects:
  - kind: ServiceAccount
    name: airflow # Name of the ServiceAccount
    namespace: airflow
roleRef:
  kind: Role # This must be Role or ClusterRole
  name: airflow # This must match the name of the Role
                #   or ClusterRole you wish to bind to
  apiGroup: rbac.authorization.k8s.io
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: airflow
  namespace: airflow
spec:
  replicas: 1
  selector:
    matchLabels:
      name: airflow
  template:
    metadata:
      labels:
        name: airflow
    spec:
      serviceAccountName: airflow
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
            - matchExpressions:
              - key: lifecycle
                operator: NotIn
                values:
                - Ec2Spot
      initContainers:
      - name: "init"
        image: {{AIRFLOW_IMAGE}}:{{AIRFLOW_TAG}}
        imagePullPolicy: Always
        volumeMounts:
        - name: airflow-configmap
          mountPath: /opt/airflow/airflow.cfg
          subPath: airflow.cfg
        - name: {{POD_AIRFLOW_VOLUME_NAME}}
          mountPath: /opt/airflow/dags
        env:
        - name: SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              name: airflow-secrets
              key: sql_alchemy_conn
        command: ["/bin/bash", "-c"]
        args:
          - /tmp/airflow-test-env-init.sh {{INIT_GIT_SYNC}};
      containers:
      - name: webserver
        image: {{AIRFLOW_IMAGE}}:{{AIRFLOW_TAG}}
        imagePullPolicy: Always
        ports:
        - name: webserver
          containerPort: 8080
        args: ["webserver"]
        env:
        - name: AIRFLOW_KUBE_NAMESPACE
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace
        - name: SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              name: airflow-secrets
              key: sql_alchemy_conn
        volumeMounts:
        - name: airflow-configmap
          mountPath: /opt/airflow/airflow.cfg
          subPath: airflow.cfg
        - name: {{POD_AIRFLOW_VOLUME_NAME}}
          mountPath: /opt/airflow/dags
        - name: {{POD_AIRFLOW_VOLUME_NAME}}
          mountPath: /opt/airflow/logs
      - name: scheduler
        image: {{AIRFLOW_IMAGE}}:{{AIRFLOW_TAG}}
        imagePullPolicy: Always
        args: ["scheduler"]
        env:
        - name: AIRFLOW_KUBE_NAMESPACE
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace
        - name: SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              name: airflow-secrets
              key: sql_alchemy_conn
        volumeMounts:
        - name: airflow-configmap
          mountPath: /opt/airflow/airflow.cfg
          subPath: airflow.cfg
        - name: {{POD_AIRFLOW_VOLUME_NAME}}
          mountPath: /opt/airflow/dags
        - name: {{POD_AIRFLOW_VOLUME_NAME}}
          mountPath: /opt/airflow/logs
      - name: git-sync
        image: k8s.gcr.io/git-sync/git-sync:v3.4.0
        imagePullPolicy: IfNotPresent
        envFrom:
          - configMapRef:
              name: airflow-gitsync
          - secretRef:
              name: airflow-secrets
        volumeMounts:
          - name: {{POD_AIRFLOW_VOLUME_NAME}}
            mountPath: /git
      volumes:
      - name: {{INIT_DAGS_VOLUME_NAME}}
        emptyDir: {}
      - name: {{POD_AIRFLOW_VOLUME_NAME}}
        persistentVolumeClaim:
          claimName: airflow-efs-pvc
      - name: airflow-dags-fake
        emptyDir: {}
      - name: airflow-configmap
        configMap:
          name: airflow-configmap
      securityContext:
        runAsUser: 50000
        fsGroup: 0
---
apiVersion: v1
kind: Service
metadata:
  name: airflow
  namespace: airflow
  annotations:
    service.beta.kubernetes.io/aws-load-balancer-type: "nlb"
    service.beta.kubernetes.io/aws-load-balancer-backend-protocol: tcp
    service.beta.kubernetes.io/aws-load-balancer-ssl-ports: "443"
    service.beta.kubernetes.io/aws-load-balancer-ssl-cert: {{AOK_SSL_ENDPOINT}}
spec:
  type: LoadBalancer
  ports:
    - protocol: TCP
      port: 80
      targetPort: 8080
      nodePort: 30031
      name: http
    - protocol: TCP
      port: 443
      targetPort: 8080
      nodePort: 30032
      name: https
  selector:
    name: airflow

airflow.cfg

#  Licensed to the Apache Software Foundation (ASF) under one   *

---
apiVersion: v1
kind: ConfigMap
metadata:
  name: airflow-gitsync
  namespace: airflow
data:
  GIT_SYNC_REPO: GITREPO      GIT_SYNC_BRANCH: xxx
  GIT_SYNC_ROOT: xxx
  GIT_SYNC_DEST: xxx
  GIT_SYNC_DEPTH: xxx
  GIT_SYNC_ONE_TIME: "false"
  GIT_SYNC_WAIT: "60"
  GIT_SYNC_USERNAME: xxx
  GIT_SYNC_PERMISSIONS: xxx
  GIT_KNOWN_HOSTS: "false"
  GIT_SYNC_PASSWORD: xxx
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: airflow-configmap
  namespace: airflow
data:
  airflow.cfg: |
    [core]
    dags_folder = {{CONFIGMAP_DAGS_FOLDER}}
    executor = KubernetesExecutor
    parallelism = 32
    load_examples = True
    plugins_folder = /opt/airflow/plugins
    sql_alchemy_conn = $SQL_ALCHEMY_CONN
    hide_sensitive_variable_fields = True

    [logging]
    logging_level = DEBUG
    base_log_folder = /opt/airflow/logs
    remote_logging = False 
    remote_log_conn_id = my_s3_conn
    remote_base_log_folder = s3://airflow/logs

    [scheduler]
    dag_dir_list_interval = 300
    child_process_log_directory = /opt/airflow/logs/scheduler
    # Task instances listen for external kill signal (when you clear tasks
    # from the CLI or the UI), this defines the frequency at which they should
    # listen (in seconds).
    job_heartbeat_sec = 5
    max_threads = 2

    # The scheduler constantly tries to trigger new tasks (look at the
    # scheduler section in the docs for more information). This defines
    # how often the scheduler should run (in seconds).
    scheduler_heartbeat_sec = 5

    # after how much time a new DAGs should be picked up from the filesystem
    min_file_process_interval = 0

    statsd_host = localhost
    statsd_port = 8125
    statsd_prefix = airflow

    # How many seconds to wait between file-parsing loops to prevent the logs from being spammed.
    min_file_parsing_loop_time = 1

    print_stats_interval = 30
    scheduler_zombie_task_threshold = 300
    max_tis_per_query = 0
    authenticate = False

    catchup_by_default = True

    [webserver]
    base_url = http://0.0.0.0:8080
    rbac=True

    # The ip specified when starting the web server
    web_server_host = 0.0.0.0

    # The port on which to run the web server
    web_server_port = 8080

    # Paths to the SSL certificate and key for the web server. When both are
    # provided SSL will be enabled. This does not change the web server port.
    web_server_ssl_cert =
    web_server_ssl_key =

    # Number of seconds the webserver waits before killing gunicorn master that doesn't respond
    web_server_master_timeout = 120

    # Number of seconds the gunicorn webserver waits before timing out on a worker
    web_server_worker_timeout = 120

    # Number of workers to refresh at a time. When set to 0, worker refresh is
    # disabled. When nonzero, airflow periodically refreshes webserver workers by
    # bringing up new ones and killing old ones.
    worker_refresh_batch_size = 1

    # Number of seconds to wait before refreshing a batch of workers.
    worker_refresh_interval = 30

    # Secret key used to run your flask app
    secret_key = xxxxxxxxxxx

    # Number of workers to run the Gunicorn web server
    workers = 4

    # The worker class gunicorn should use. Choices include
    # sync (default), eventlet, gevent
    worker_class = sync

    # Log files for the gunicorn webserver. '-' means log to stderr.
    access_logfile = -
    error_logfile = -

    # Expose the configuration file in the web server
    expose_config = False

    # Default DAG view.  Valid values are:
    # tree, graph, duration, gantt, landing_times
    dag_default_view = tree

    # Default DAG orientation. Valid values are:
    # LR (Left->Right), TB (Top->Bottom), RL (Right->Left), BT (Bottom->Top)
    dag_orientation = LR

    # Puts the webserver in demonstration mode; blurs the names of Operators for
    # privacy.
    demo_mode = False

    # The amount of time (in secs) webserver will wait for initial handshake
    # while fetching logs from other worker machine
    log_fetch_timeout_sec = 5

    # By default, the webserver shows paused DAGs. Flip this to hide paused
    # DAGs by default
    hide_paused_dags_by_default = False

    # Consistent page size across all listing views in the UI
    page_size = 100

    [metrics]
    statsd_on = False

    [operators]
    # Default queue that tasks get assigned to and that worker listen on.
    default_queue = default

    [smtp]
    # If you want airflow to send emails on retries, failure, and you want to use
    # the airflow.utils.email.send_email_smtp function, you have to configure an
    # smtp server here
    smtp_host = localhost
    smtp_starttls = True
    smtp_ssl = False
    # Uncomment and set the user/pass settings if you want to use SMTP AUTH
    # smtp_user = airflow
    # smtp_password = airflow
    smtp_port = 25
    smtp_mail_from = airflow@example.com

    [kubernetes]
    run_as_user = 50000
    airflow_configmap = airflow-configmap
    worker_container_repository = {{AIRFLOW_IMAGE}}
    worker_container_tag = {{AIRFLOW_TAG}}
    worker_container_image_pull_policy = IfNotPresent
    worker_service_account_name = airflow
    worker_dags_folder=/opt/airflow/dags
    namespace = airflow
    pod_template_file = /opt/airflow/yamls/pod_template.yaml
    delete_worker_pods = False
    delete_worker_pods_on_failure = False
    dags_in_image = False
    git_repo = xxxx
    git_branch = xxx
    git_subpath = airflow/contrib/example_dags/
    git_sync_depth = xxx
    git_user = xxx
    git_password = {{CONFIGMAP_GIT_PASSWORD}}
    git_sync_root = /git
    git_sync_dest = projects
    git_dags_folder_mount_point = {{CONFIGMAP_GIT_DAGS_FOLDER_MOUNT_POINT}}
    dags_volume_claim = {{CONFIGMAP_DAGS_VOLUME_CLAIM}}
    dags_volume_subpath =
    logs_volume_claim = {{CONFIGMAP_DAGS_VOLUME_CLAIM}} 
    logs_volume_subpath =
    dags_volume_host =
    logs_volume_host =
    in_cluster = True
    gcp_service_account_keys =
    enable_tcp_keepalive = True

    # Example affinity and toleration definitions.
    affinity = {"nodeAffinity":{"requiredDuringSchedulingIgnoredDuringExecution":{"weight":1,"preference":[{"matchExpressions":[{"key":"lifecycle","operator":"In","value":"Ec2Spot"}]}],"nodeSelectorTerms":[{"matchExpressions":[{"key":"kubernetes.io/hostname","operator":"NotIn","values":["4e5e6a99-e28a-450b-bba9-e0124853de9b"]}]}]}}}
    tolerations = [{ "key": "spotInstance", "operator": "Equal", "value": "true", "effect": "PreferNoSchedule" },{ "key": "dedicated", "operator": "Equal", "value": "airflow", "effect": "NoSchedule" }, { "key": "prod", "operator": "Exists" }]
    # affinity = {"nodeAffinity":{"requiredDuringSchedulingIgnoredDuringExecution":{"nodeSelectorTerms":[{"matchExpressions":[{"key":"kubernetes.io/hostname","operator":"NotIn","values":["4e5e6a99-e28a-450b-bba9-e0124853de9b"]}]}]}}}
    # tolerations = [{ "key": "dedicated", "operator": "Equal", "value": "airflow", "effect": "NoSchedule" }, { "key": "prod", "operator": "Exists" }]

    # For cloning DAGs from git repositories into volumes: https://github.com/kubernetes/git-sync
    git_sync_container_repository = k8s.gcr.io/git-sync
    git_sync_container_tag = v3.3.0
    git_sync_init_container_name = git-sync-clone

    [kubernetes_node_selectors]
    # The Key-value pairs to be given to worker pods.
    # The worker pods will be scheduled to the nodes of the specified key-value pairs.
    # Should be supplied in the format: key = value

    [kubernetes_annotations]
    # The Key-value annotations pairs to be given to worker pods.
    # Should be supplied in the format: key = value

    [kubernetes_secrets]
    SQL_ALCHEMY_CONN = airflow-secrets=sql_alchemy_conn

    [hive]
    # Default mapreduce queue for HiveOperator tasks
    default_hive_mapred_queue =

    [celery]
    # This section only applies if you are using the CeleryExecutor in
    # [core] section above

    # The app name that will be used by celery
    celery_app_name = airflow.executors.celery_executor

    worker_concurrency = 16

    worker_log_server_port = 8793

    broker_url = sqla+mysql://airflow:airflow@localhost:3306/airflow
result-backend-settings
    result_backend = db+mysql://airflow:airflow@localhost:3306/airflow

    flower_host = 0.0.0.0

    # The root URL for Flower
    # Ex: flower_url_prefix = /flower
    flower_url_prefix =

    # This defines the port that Celery Flower runs on
    flower_port = 5555

    # Securing Flower with Basic Authentication
    # Accepts user:password pairs separated by a comma
    # Example: flower_basic_auth = user1:password1,user2:password2
    flower_basic_auth =


    # How many processes CeleryExecutor uses to sync task state.
    # 0 means to use max(1, number of cores - 1) processes.
    sync_parallelism = 0

    # Import path for celery configuration options
    celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG

    visibility_timeout = 21600



    [ldap]
    # set this to ldaps://<your.ldap.server>:<port>
    uri =
    user_filter = objectClass=*
    user_name_attr = uid
    group_member_attr = memberOf
    superuser_filter =
    data_profiler_filter =
    bind_user = cn=Manager,dc=example,dc=com
    bind_password = insecure
    basedn = dc=example,dc=com
    cacert = /etc/ca/ldap_ca.crt
    search_scope = LEVEL

    [kerberos]
    ccache = /tmp/airflow_krb5_ccache
    # gets augmented with fqdn
    principal = airflow
    reinit_frequency = 3600
    kinit_path = kinit
    keytab = airflow.keytab

    [cli]
    api_client = airflow.api.client.json_client
    endpoint_url = http://0.0.0.0:8080

    [api]
    auth_backend = airflow.api.auth.backend.default

    [github_enterprise]
    api_rev = v3

    [admin]
    # UI to hide sensitive variable fields when set to True

    [elasticsearch]
    elasticsearch_host =

    [tests]
    unit_test_mode = False

volume.yaml

apiVersion: storage.k8s.io/v1
kind: CSIDriver
metadata:
  name: efs.csi.aws.com
spec:
  attachRequired: false
---
kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
  name: efs-sc
provisioner: efs.csi.aws.com
---
apiVersion: v1
kind: PersistentVolume
metadata:
  name: airflow-efs-pv
spec:
  capacity:
    storage: 100Gi
  volumeMode: Filesystem
  accessModes:
    - ReadWriteMany
  persistentVolumeReclaimPolicy: Retain
  storageClassName: efs-sc
  csi:
    driver: efs.csi.aws.com
    volumeHandle: {{AOK_EFS_FS_ID}}::{{AOK_EFS_AP}}
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: airflow-efs-pvc
  namespace: airflow
spec:
  accessModes:
    - ReadWriteMany
  storageClassName: efs-sc
  resources:
    requests:
      storage: 3Gi

pod_template.yaml

---
apiVersion: v1
kind: Pod
metadata:
  name: airflow
  namespace: airflow
spec:
  serviceAccountName: airflow # this account have rights to create pods
  automountServiceAccountToken: true
  initContainers:
  - name: git-sync-clone
    image: xxx
    imagePullPolicy: IfNotPresent
    envFrom:
      - configMapRef:
          name: airflow-gitsync
      - secretRef:
          name: airflow-secrets
    volumeMounts:
      - name: airflow-dags
        mountPath: /opt/airflow/dags
      - name: airflow-configmap
        mountPath: /opt/airflow/airflow.cfg
        subPath: airflow.cfg
    env:
    - name: SQL_ALCHEMY_CONN
      valueFrom:
        secretKeyRef:
          name: airflow-secrets
          key: sql_alchemy_conn
    - name: AIRFLOW__KUBERNETES__DELETE_WORKER_PODS
      value: false
  containers:
  - name: base
    image: xxx
    imagePullPolicy: IfNotPresent
    volumeMounts:
      - name: airflow-dags
        mountPath: /opt/airflow/logs
      - name: airflow-dags
        mountPath: /opt/airflow/dags
        readOnly: true
      - name: airflow-configmap
        mountPath: /opt/airflow/airflow.cfg
        readOnly: true
        subPath: airflow.cfg
    env:
      - name: AIRFLOW_HOME
        value: /opt/airflow
      - name: AIRFLOW__CORE__EXECUTOR
        value: LocalExecutor
      - name: AIRFLOW__KUBERNETES__DELETE_WORKER_PODS
        value: false
      - name: AIRFLOW__KUBERNETES__ENABLE_TCP_KEEPALIVE
        value: true
  restartPolicy: Never
  securityContext:
    runAsUser: 50000
    fsGroup: 0
  volumes:
    - name: airflow-dags
      persistentVolumeClaim:
        claimName: airflow-efs-pvc
    - name: airflow-logs
      emptyDir: {}
    - name: airflow-configmap
      configMap:
        name: airflow-configmap

Here is get pods status

NAME                                                               READY   STATUS      RESTARTS   AGE
airflow-85fdc74b9d-88gvn                                           3/3     Running     0          9m40s
examplebashoperatoralsorunthis.3190630823b84cc4a6eba09544c303a2    0/1     Completed   0          26s
examplebashoperatorrunme0.3babf6b0543f4f3ca9b231c12ae54d7d         0/1     Completed   0          28s
examplebashoperatorrunme1.a16ac8f949414b93bd8f39d9e67cee3a         0/1     Completed   0          29s
examplebashoperatorrunme2.a79ebbb01cd149b688285a857ed01a17         0/1     Completed   0          29s
examplebashoperatorthiswillskip.41b60a9693b2463f97af35b72a32b082   0/1     Completed   0          27s

and my Web Ui status my web ui

I want to know why this sync is incorrect... and I can't get the worker pod log...

-- jihwan Kim
airflow
amazon-eks
amazon-web-services
kubernetes

1 Answer

3/15/2022

the issue is with the airflow Docker image you are using.

The ENTRYPOINT I see is a custom .sh file you have written and that decides whether to run a webserver or scheduler.

Airflow scheduler submits a pod for the tasks with args as follows

containers:
  - args:
    - airflow
    - tasks
    - run
    - hello-world-dag
    - print_date_dag
    - scheduled__2022-03-13T00:00:00+00:00
    - --local
    - --subdir
    - DAGS_FOLDER/sample_dags/hello_world.py

The airflow image you are using doesn't know what to do with this command as the .sh file doesn't have a case to handle this command as the first arg isn't either of scheduler or webserver.

This makes the scheduler to submit the pod and it would exit without doing anything because the pod didn't run anything. Hence, it was successful as far as k8s is concerned and airflow doesn't know the status. So it updates the task table on airflow metadata db as None status for this task.

Please use the following entrypoint file to make it work

#!/usr/bin/env bash

# launch the appropriate process

if [ "$1" = "webserver" ]
then
	exec airflow webserver
elif [ "$1" = "scheduler" ]
then
	exec airflow scheduler
else
	exec "$@"
fi
-- Sai Krishna
Source: StackOverflow