Kubernetes Executor does not spawn Kubernetes Pod Operator

8/15/2021

can't figure out where to look. I am running Airflow on GKE. It was running fine but recently started failing. Can't understand where to look. Basically, DAG starts, and then tasks fail, but they were running okay a week ago.

It seems like something changed in a cluster, but based on logs, can't figure out what.

My KubernetesExecutor stopped spawning KubernetesPodOperators and there are no logs or errors. If I run directly (kubectl apply -f) template I use for Operator, it runs successfully.

Airflow 2.1.2

Kubectl

Client Version: version.Info{Major:"1", Minor:"20", GitVersion:"v1.20.7", GitCommit:"132a687512d7fb058d0f5890f07d4121b3f0a2e2", GitTreeState:"clean", BuildDate:"2021-05-12T12:40:09Z", GoVersion:"go1.15.12", Compiler:"gc", Platform:"darwin/amd64"}
Server Version: version.Info{Major:"1", Minor:"20+", GitVersion:"v1.20.8-gke.900", GitCommit:"28ab8501be88ea42e897ca8514d7cd0b436253d9", GitTreeState:"clean", BuildDate:"2021-06-30T09:23:36Z", GoVersion:"go1.15.13b5", Compiler:"gc", Platform:"linux/amd64"}

Executor Template

apiVersion: v1
kind: Pod
metadata:
  ...
spec:
  restartPolicy: Never
  serviceAccountName: airflow # this account have rights to create pods
  automountServiceAccountToken: true
  volumes:
      - name: dags
        emptyDir: {}
      - name: logs
        emptyDir: {}
      - configMap:
          name: airflow-git-sync-configmap
        name: airflow-git-sync-configmap
  initContainers:
    - name: git-sync-clone
      securityContext:
        runAsUser: 65533
        runAsGroup: 65533
      image: k8s.gcr.io/git-sync/git-sync:v3.3.1
      imagePullPolicy: Always
      volumeMounts:
        - mountPath: /tmp/git
          name: dags
      resources:
         ...
      args: ["--one-time"]
      envFrom:
      - configMapRef:
          name: airflow-git-sync-configmap
      - secretRef:
          name: airflow-git-sync-secret
  containers:
    - name: base
      image: <artifactory_url>/airflow:latest
      volumeMounts:
        - name: dags
          mountPath: /opt/airflow/dags
        - name: logs
          mountPath: /opt/airflow/logs
      imagePullPolicy: Always

Pod template

apiVersion: v1
kind: Pod
metadata:
   ....
spec:
  serviceAccountName: airflow
  automountServiceAccountToken: true
  volumes:
    - name: sql
      emptyDir: {}
  initContainers:
    - name: git-sync
      image: k8s.gcr.io/git-sync/git-sync:v3.3.1
      imagePullPolicy: Always
      args: ["--one-time"]
      volumeMounts:
        - name: sql
          mountPath: /tmp/git/
      resources:
        requests:
          memory: 300Mi
          cpu: 500m
        limits:
          memory: 600Mi
          cpu: 1000m
      envFrom:
      - configMapRef:
          name: git-sync-configmap
      - secretRef:
          name: airflow-git-sync-secret
  containers:
    - name: base
      imagePullPolicy: Always
      image: <artifactory_url>/clickhouse-client-gcloud:20.6.4.44
      volumeMounts:
        - name: sql
          mountPath: /opt/sql
      resources:
         ....
      env:
        - name: GS_SERVICE_ACCOUNT
          valueFrom:
            secretKeyRef:
              name: gs-service-account
              key: service_account.json
        - name: DB_CREDENTIALS
          valueFrom:
            secretKeyRef:
              name: estimation-db-secret
              key: db_cred.json

DAG code

from textwrap import dedent

from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator

TEMPLATE_PATH = "/opt/airflow/dags/airflow-dags.git/pod_templates"

args = {
    ...
}


def create_pipeline(dag_: DAG):

    task_startup_client = KubernetesPodOperator(
        name="clickhouse-client",
        task_id="clickhouse-client",
        labels={"application": "clickhouse-client-gsutil"},
        pod_template_file=f"{TEMPLATE_PATH}/template.yaml",
        cmds=["sleep", "60000"],
        reattach_on_restart=True,
        is_delete_operator_pod=False,
        get_logs=True,
        log_events_on_failure=True,
        dag=dag_,
    )
  
    task_startup_client


with DAG(
    dag_id="MANUAL-GKE-clickhouse-client",
    default_args=args,
    schedule_interval=None,
    max_active_runs=1,
    start_date=days_ago(2),
    tags=["utility"],
) as dag:

    create_pipeline(dag)

I ran Airflow with DEBUG logging and there is nothing, successful completion:

Scheduler log

...
Event: manualgkeclickhouseclientaticlickhouseclient.9959fa1fd13a4b6fbdaf40549a09d2f9 Succeeded
...

*Executor logs

[2021-08-15 18:40:27,045] {settings.py:208} DEBUG - Setting up DB connection pool (PID 1)
[2021-08-15 18:40:27,046] {settings.py:276} DEBUG - settings.prepare_engine_args(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=1800, pid=1
[2021-08-15 18:40:27,095] {cli_action_loggers.py:40} DEBUG - Adding <function default_action_log at 0x7f0556c5e280> to pre execution callback
[2021-08-15 18:40:28,070] {cli_action_loggers.py:66} DEBUG - Calling callbacks: [<function default_action_log at 0x7f0556c5e280>]
[2021-08-15 18:40:28,106] {settings.py:208} DEBUG - Setting up DB connection pool (PID 1)
[2021-08-15 18:40:28,107] {settings.py:244} DEBUG - settings.prepare_engine_args(): Using NullPool
[2021-08-15 18:40:28,109] {dagbag.py:496} INFO - Filling up the DagBag from /opt/airflow/dags/ati-airflow-dags.git/dag_clickhouse-client.py
[2021-08-15 18:40:28,110] {dagbag.py:311} DEBUG - Importing /opt/airflow/dags/ati-airflow-dags.git/dag_clickhouse-client.py
/usr/local/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py:26 DeprecationWarning: This module is deprecated. Please use `kubernetes.client.models.V1Volume`.
/usr/local/lib/python3.9/site-packages/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py:27 DeprecationWarning: This module is deprecated. Please use `kubernetes.client.models.V1VolumeMount`.
[2021-08-15 18:40:28,135] {dagbag.py:461} DEBUG - Loaded DAG <DAG: MANUAL-GKE-clickhouse-client>
[2021-08-15 18:40:28,176] {plugins_manager.py:281} DEBUG - Loading plugins
[2021-08-15 18:40:28,176] {plugins_manager.py:225} DEBUG - Loading plugins from directory: /opt/airflow/plugins
[2021-08-15 18:40:28,177] {plugins_manager.py:205} DEBUG - Loading plugins from entrypoints
[2021-08-15 18:40:28,238] {plugins_manager.py:418} DEBUG - Integrate DAG plugins
Running <TaskInstance: MANUAL-GKE-clickhouse-client.clickhouse-client 2021-08-15T18:39:38.150950+00:00 [queued]> on host manualgkeclickhouseclientclickhouseclient.9959fa1fd13a4b6fbd
[2021-08-15 18:40:28,670] {cli_action_loggers.py:84} DEBUG - Calling callbacks: []
[2021-08-15 18:40:28,670] {settings.py:302} DEBUG - Disposing DB connection pool (PID 1)
-- Simon Osipov
airflow
kubernetes

0 Answers