Unable to execute Airflow KubernetesExecutor

8/1/2019

Following the project from here, I am trying to integrate airflow kubernetes executor using NFS server as backed storage PV. I've a PV airflow-pv which is linked with NFS server. Airflow webserver and scheduler are using a PVC airflow-pvc which is bound with airflow-pv. I've placed my dag files in NFS server /var/nfs/airflow/development/<dags/logs>. I can see newly added DAGS in webserver UI aswell. However when I execute a DAG from UI, the scheduler fires a new POD for that tasks BUT the new worker pod fails to run saying

Unable to mount volumes for pod "tutorialprintdate-3e1a4443363e4c9f81fd63438cdb9873_development(976b1e64-b46d-11e9-92af-025000000001)": timeout expired waiting for volumes to attach or mount for pod "development"/"tutorialprintdate-3e1a4443363e4c9f81fd63438cdb9873". list of unmounted volumes=[airflow-dags]. list of unattached volumes=[airflow-dags airflow-logs airflow-config default-token-hjwth]

here is my webserver and scheduler deployment files;

apiVersion: v1
kind: Service
metadata:
  name: airflow-webserver-svc
  namespace: development
spec:
  type: NodePort
  ports:
    - name: web
      protocol: TCP
      port: 8080
  selector:
    app: airflow-webserver-app
    namespace: development

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: airflow-webserver-dep
  namespace: development
spec:
  replicas: 1
  selector:
    matchLabels:
      app: airflow-webserver-app
      namespace: development
  template:
    metadata:
      labels:
        app: airflow-webserver-app
        namespace: development
    spec:
      restartPolicy: Always
      containers:
      - name: airflow-webserver-app
        image: airflow:externalConfigs
        imagePullPolicy: IfNotPresent
        ports:
        - containerPort: 8080
        args: ["-webserver"]
        env:
        - name: AIRFLOW_KUBE_NAMESPACE
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace
        - name: AIRFLOW__CORE__FERNET_KEY
          valueFrom:
            secretKeyRef:
              name: airflow-secrets
              key: AIRFLOW__CORE__FERNET_KEY
        - name: MYSQL_ROOT_PASSWORD
          valueFrom:
            secretKeyRef:
              name: airflow-secrets
              key: MYSQL_PASSWORD
        - name: MYSQL_PASSWORD
          valueFrom:
            secretKeyRef:
              name: airflow-secrets
              key: MYSQL_PASSWORD
        - name: DB_HOST
          value: mysql-svc.development.svc.cluster.local
        - name: DB_PORT
          value: "3306"
        - name: MYSQL_DATABASE
          value: airflow
        - name: MYSQL_USER
          value: airflow
        - name: MYSQL_PASSWORD
          value: airflow
        - name: AIRFLOW__CORE__EXECUTOR
          value: "KubernetesExecutor"
        volumeMounts:
        - name: airflow-config
          mountPath: /usr/local/airflow/airflow.cfg
          subPath: airflow.cfg
        - name: airflow-files
          mountPath: /usr/local/airflow/dags
          subPath: airflow/development/dags
        - name: airflow-files
          mountPath: /usr/local/airflow/plugins
          subPath: airflow/development/plugins
        - name: airflow-files
          mountPath: /usr/local/airflow/logs
          subPath: airflow/development/logs
        - name: airflow-files
          mountPath: /usr/local/airflow/temp
          subPath: airflow/development/temp
      volumes:
        - name: airflow-files
          persistentVolumeClaim:
            claimName: airflow-pvc
        - name: airflow-config
          configMap:
            name: airflow-config

The scheduler yaml file is exactly the same except the container args is args: ["-scheduler"]. Here is my airflow.cfg file,

apiVersion: v1
kind: ConfigMap
metadata:
  name: "airflow-config"
  namespace: development
data:
  airflow.cfg: |
    [core]
    airflow_home = /usr/local/airflow
    dags_folder = /usr/local/airflow/dags
    base_log_folder = /usr/local/airflow/logs
    executor = KubernetesExecutor
    plugins_folder = /usr/local/airflow/plugins

    load_examples = false

    [scheduler]
    child_process_log_directory = /usr/local/airflow/logs/scheduler

    [webserver]
    rbac = false

    [kubernetes]
    airflow_configmap =
    worker_container_repository = airflow
    worker_container_tag = externalConfigs
    worker_container_image_pull_policy = IfNotPresent
    delete_worker_pods = true
    dags_volume_claim = airflow-pvc
    dags_volume_subpath =
    logs_volume_claim = airflow-pvc
    logs_volume_subpath =

    env_from_configmap_ref = airflow-config
    env_from_secret_ref = airflow-secrets

    in_cluster = true
    namespace = development

    [kubernetes_node_selectors]
    # the key-value pairs to be given to worker pods.
    # the worker pods will be scheduled to the nodes of the specified key-value pairs.
    # should be supplied in the format: key = value

    [kubernetes_environment_variables]
    //the below configs gets overwritten by above [kubernetes] configs
    AIRFLOW__KUBERNETES__DAGS_VOLUME_CLAIM = airflow-pvc
    AIRFLOW__KUBERNETES__DAGS_VOLUME_SUBPATH = var/nfs/airflow/development/dags
    AIRFLOW__KUBERNETES__LOGS_VOLUME_CLAIM = airflow-pvc
    AIRFLOW__KUBERNETES__LOGS_VOLUME_SUBPATH = var/nfs/airflow/development/logs

    [kubernetes_secrets]
    AIRFLOW__CORE__SQL_ALCHEMY_CONN = airflow-secrets=AIRFLOW__CORE__SQL_ALCHEMY_CONN
    AIRFLOW_HOME = airflow-secrets=AIRFLOW_HOME

    [cli]
    api_client = airflow.api.client.json_client
    endpoint_url = https://airflow.crunchanalytics.cloud

    [api]
    auth_backend = airflow.api.auth.backend.default

    [admin]
    # ui to hide sensitive variable fields when set to true
    hide_sensitive_variable_fields = true

After firing a manual task, the logs of the Scheduler tells me that KubernetesExecutorConfig() executed with all values as None. Seems like it didn't picked up the configs ? I've tried almost everything I know of, but cannot manage to make it work. Could someone tell me waht am I missing ?

[2019-08-01 14:44:22,944] {jobs.py:1341} INFO - Sending ('kubernetes_sample', 'run_this_first', datetime.datetime(2019, 8, 1, 13, 45, 51, 874679, tzinfo=<Timezone [UTC]>), 1) to executor with priority 3 and queue default
[2019-08-01 14:44:22,944] {base_executor.py:56} INFO - Adding to queue: airflow run kubernetes_sample run_this_first 2019-08-01T13:45:51.874679+00:00 --local -sd /usr/local/airflow/dags/airflow/development/dags/k8s_dag.py
[2019-08-01 14:44:22,948] {kubernetes_executor.py:629} INFO - Add task ('kubernetes_sample', 'run_this_first', datetime.datetime(2019, 8, 1, 13, 45, 51, 874679, tzinfo=<Timezone [UTC]>), 1) with command airflow run kubernetes_sample run_this_first 2019-08-01T13:45:51.874679+00:00 --local -sd /usr/local/airflow/dags/airflow/development/dags/k8s_dag.py with executor_config {}
[2019-08-01 14:44:22,949] {kubernetes_executor.py:379} INFO - Kubernetes job is (('kubernetes_sample', 'run_this_first', datetime.datetime(2019, 8, 1, 13, 45, 51, 874679, tzinfo=<Timezone [UTC]>), 1), 'airflow run kubernetes_sample run_this_first 2019-08-01T13:45:51.874679+00:00 --local -sd /usr/local/airflow/dags/airflow/development/dags/k8s_dag.py', KubernetesExecutorConfig(image=None, image_pull_policy=None, request_memory=None, request_cpu=None, limit_memory=None, limit_cpu=None, gcp_service_account_key=None, node_selectors=None, affinity=None, annotations={}, volumes=[], volume_mounts=[], tolerations=None))
[2019-08-01 14:44:23,042] {kubernetes_executor.py:292} INFO - Event: kubernetessamplerunthisfirst-7fe05ddb34aa4cb9a5604e420d5b60a3 had an event of type ADDED
[2019-08-01 14:44:23,046] {kubernetes_executor.py:324} INFO - Event: kubernetessamplerunthisfirst-7fe05ddb34aa4cb9a5604e420d5b60a3 Pending
[2019-08-01 14:44:23,049] {kubernetes_executor.py:292} INFO - Event: kubernetessamplerunthisfirst-7fe05ddb34aa4cb9a5604e420d5b60a3 had an event of type MODIFIED
[2019-08-01 14:44:23,049] {kubernetes_executor.py:324} INFO - Event: kubernetessamplerunthisfirst-7fe05ddb34aa4cb9a5604e420d5b60a3 Pending

for reference, here is my PV and PVC;

kind: PersistentVolume
apiVersion: v1
metadata:
  name: airflow-pv
  labels:
    mode: local
    environment: development
spec:
  persistentVolumeReclaimPolicy: Retain
  storageClassName: airflow-pv
  capacity:
    storage: 4Gi
  accessModes:
    - ReadWriteMany
  nfs:
    server: 10.105.225.217
    path: "/"
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: airflow-pvc
  namespace: development
spec:
  storageClassName: airflow-pv
  accessModes:
    - ReadWriteMany
  resources:
    requests:
      storage: 1Gi
  selector:
    matchLabels:
      mode: local
      environment: development

Using Airflow version: 1.10.3

-- Anum Sheraz
airflow
airflow-scheduler
kubernetes

1 Answer

8/7/2019

Since no answer yet, I'll share my findings so far. In my airflow.conf under kubernetes section, we are to pass the following values

dags_volume_claim = airflow-pvc
dags_volume_subpath = airflow/development/dags
logs_volume_claim = airflow-pvc
logs_volume_subpath = airflow/development/logs

the way how scheduler creates a new pod from the above configs is as follows (only mentioning the volumes and volumeMounts);

"volumes": [
  {
    "name": "airflow-dags",
    "persistentVolumeClaim": {
      "claimName": "airflow-pvc"
    }
  },
  {
    "name": "airflow-logs",
    "persistentVolumeClaim": {
      "claimName": "airflow-pvc"
    }
  }],
"containers": [
    { ...
  "volumeMounts": [
      {
        "name": "airflow-dags",
        "readOnly": true,
        "mountPath": "/usr/local/airflow/dags",
        "subPath": "airflow/development/dags"
      },
      {
        "name": "airflow-logs",
        "mountPath": "/usr/local/airflow/logs",
        "subPath": "airflow/development/logs"
      }]
...}]

K8s DOESN'T likes multiple volumes pointing to same pvc (airflow-pvc). To fix this, I'd to create two PVC (and PV) for dags and logs dags_volume_claim = airflow-dags-pvc and logs_volume_claim = airflow-log-pvc which works fine.

I don't kow if this has already been addressed in newer version of airflow (I am using 1.10.3). The airflow scheduler should handle this case when ppl using same PVC then create a pod with single volume and 2 volumeMounts referring to that Volume e.g.

"volumes": [
  {
    "name": "airflow-dags-logs",    <--just an example name
    "persistentVolumeClaim": {
      "claimName": "airflow-pvc"
    }
  }
"containers": [
    { ...
  "volumeMounts": [
      {
        "name": "airflow-dags-logs",
        "readOnly": true,
        "mountPath": "/usr/local/airflow/dags",
        "subPath": "airflow/development/dags"     <--taken from configs
      },
      {
        "name": "airflow-dags-logs",
        "mountPath": "/usr/local/airflow/logs",
        "subPath": "airflow/development/logs"     <--taken from configs
      }]
...}]

I deployed a pod with above configurations and it works!

-- Anum Sheraz
Source: StackOverflow