Airflow KubernetesPodOperator in GKE/GCP does not start custom pods

6/3/2019

We are running a self-managed Airflow 1.10.2 with KubernetesExecutor on a GKE cluster in GCP. All internal operators are working fine so far, except the KubernetesPodOperator, which we would like to use for running our custom docker images. It seems that the Airflow worker images don't have privileges to start other pods inside the Kubernetes cluster. DAG just does not seem to be doing anything after starting it. This is what we found in the logs initially:

FileNotFoundError: [Errno 2] No such file or directory: '/root/.kube/config'

Next try - in_cluster=True parameter in the KubernetesPodOperator section does not seem to help. After that, we tried to use this parameter in airflow.cfg, section [kubernetes]:

gcp_service_account_keys = kubernetes-executor-private-key:/var/tmp/private/kubernetes_executor_private_key.json

and the error message was now TypeError: a bytes-like object is required, not 'str' This is the parameter definition from github:

# GCP Service Account Keys to be provided to tasks run on Kubernetes Executors
# Should be supplied in the format: key-name-1:key-path-1,key-name-2:key-path-2
gcp_service_account_keys =

Already tried using various kinds of parentheses and quotes here, no success.

DAG code:

from datetime import datetime, timedelta
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.operators.dummy_operator import DummyOperator


default_args = {
    'owner': 'xxx',
    'depends_on_past': False,
    'start_date': datetime.utcnow(),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'kubernetes_sample', default_args=default_args, schedule_interval=timedelta(minutes=10))


start = DummyOperator(task_id='run_this_first', dag=dag)

passing = KubernetesPodOperator(namespace='default',
                          image="Python:3.6",
                          cmds=["Python","-c"],
                          arguments=["print('hello world')"],
                          labels={"foo": "bar"},
                          name="passing-test",
                          task_id="passing-task",
                          in_cluster=True,
                          get_logs=True,
                          dag=dag
                          )

failing = KubernetesPodOperator(namespace='default',
                          image="ubuntu:1604",
                          cmds=["Python","-c"],
                          arguments=["print('hello world')"],
                          labels={"foo": "bar"},
                          in_cluster=True,
                          name="fail",
                          task_id="failing-task",
                          get_logs=True,
                          dag=dag
                          )

passing.set_upstream(start)
failing.set_upstream(start)

Anyone facing the same problem? Am i missing something here?

-- Palko
airflow
google-cloud-platform
google-kubernetes-engine
kubernetes

0 Answers