How to create kubeconfig for Kubernetes Airflow worker pod launching KubernetesPodOperator

1/24/2019

I'm setting up Airflow in Kubernetes Engine, and I now have the following (running) pods:

  • postgres (with a mounted PersistentVolumeClaim)
  • flower
  • web (airflow dashboard)
  • rabbitmq
  • scheduler
  • worker

From Airflow, I'd like to run a task starting a pod which - in this case - downloads some file from an SFTP server. However, the KubernetesPodOperator in Airflow which should start this new pod can't run, because the kubeconfig cannot be found.

The Airflow worker is configured as below. The other Airflow pods are exactly the same apart from different args.

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  name: worker
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: airflow
        tier: worker
    spec:
      restartPolicy: Always
      containers:
        - name: worker
          image: my-gcp-project/kubernetes-airflow-in-container-registry:v1
          imagePullPolicy: IfNotPresent
          env:
            - name: AIRFLOW_HOME
              value: "/usr/local/airflow"
          args: ["worker"]

The KubernetesPodOperator is configured as follows:

maybe_download = KubernetesPodOperator(
    task_id='maybe_download_from_sftp',
    image='some/image:v1',
    namespace='default',
    name='maybe-download-from-sftp',
    arguments=['sftp_download'],
    image_pull_policy='IfNotPresent',
    dag=dag,
    trigger_rule='dummy',
)

The following error shows there's no kubeconfig on the pod.

[2019-01-24 12:37:04,706] {models.py:1789} INFO - All retries failed; marking task as FAILED
[2019-01-24 12:37:04,722] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp Traceback (most recent call last):
[2019-01-24 12:37:04,722] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp   File "/usr/local/bin/airflow", line 32, in <module>
[2019-01-24 12:37:04,722] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp     args.func(args)
[2019-01-24 12:37:04,722] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp   File "/usr/local/lib/python3.6/site-packages/airflow/utils/cli.py", line 74, in wrapper
[2019-01-24 12:37:04,722] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp     return f(*args, **kwargs)
[2019-01-24 12:37:04,722] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp   File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 490, in run
[2019-01-24 12:37:04,722] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp     _run(args, dag, ti)
[2019-01-24 12:37:04,722] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp   File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 406, in _run
[2019-01-24 12:37:04,722] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp     pool=args.pool,
[2019-01-24 12:37:04,722] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp   File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 74, in wrapper
[2019-01-24 12:37:04,722] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp     return func(*args, **kwargs)
[2019-01-24 12:37:04,722] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp   File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1659, in _run_raw_task
[2019-01-24 12:37:04,723] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp     result = task_copy.execute(context=context)
[2019-01-24 12:37:04,723] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp   File "/usr/local/lib/python3.6/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 90, in execute
[2019-01-24 12:37:04,723] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp     config_file=self.config_file)
[2019-01-24 12:37:04,723] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp   File "/usr/local/lib/python3.6/site-packages/airflow/contrib/kubernetes/kube_client.py", line 51, in get_kube_client
[2019-01-24 12:37:04,723] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp     return _load_kube_config(in_cluster, cluster_context, config_file)
[2019-01-24 12:37:04,723] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp   File "/usr/local/lib/python3.6/site-packages/airflow/contrib/kubernetes/kube_client.py", line 38, in _load_kube_config
[2019-01-24 12:37:04,723] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp     config.load_kube_config(config_file=config_file, context=cluster_context)
[2019-01-24 12:37:04,723] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp   File "/usr/local/airflow/.local/lib/python3.6/site-packages/kubernetes/config/kube_config.py", line 537, inload_kube_config
[2019-01-24 12:37:04,723] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp     config_persister=config_persister)
[2019-01-24 12:37:04,723] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp   File "/usr/local/airflow/.local/lib/python3.6/site-packages/kubernetes/config/kube_config.py", line 494, in_get_kube_config_loader_for_yaml_file
[2019-01-24 12:37:04,723] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp     with open(filename) as f:
[2019-01-24 12:37:04,723] {base_task_runner.py:101} INFO - Job 8: Subtask maybe_download_from_sftp FileNotFoundError: [Errno 2] No such file or directory: '/usr/local/airflow/.kube/config'
[2019-01-24 12:37:08,300] {logging_mixin.py:95} INFO - [2019-01-24 12:37:08,299] {jobs.py:2627} INFO - Task exited with return code 1

I'd like the pod to start and "automatically" contain the context of the Kubernetes cluster it's in - if that makes sense. I feel like I'm missing something fundamental. Could anyone help?

-- bartcode
airflow
google-cloud-platform
kubernetes

1 Answer

1/26/2019

As is described in The Fine Manual, you will want in_cluster=True to advise KPO that it is, in fact, in-cluster.

I would actually recommend filing a bug with Airflow because Airflow can trivially detect the fact that it is running inside the cluster, and should have a much more sane default than your experience.

-- mdaniel
Source: StackOverflow