Use PythonOperator with mounted PersistentVolumeClaim

8/6/2021

I have a simple Airflow DAG with 2 operators (a PythonOperator, and a KubernetesPodOperator):

with DAG(dag_id="dummy", start_date=datetime(2020, 11, 7), catchup=False) as dag:
    logger = logging.getLogger("airflow.task")

    volume_mount = k8s.v1_volume_mount.V1VolumeMount(name='osm-config',
                                                     mount_path=ROOT_PATH,
                                                     sub_path=None,
                                                     read_only=False)

    pvc = k8s.V1PersistentVolumeClaimVolumeSource(claim_name="osm-config-pv-claim")

    volume = k8s.v1_volume.V1Volume(name="osm-config",
                                    persistent_volume_claim=pvc)

    def do_it():
        logger.debug("do work")


    start = DummyOperator(task_id="start", dag=dag)

    test = PythonOperator(task_id="test",
                          python_callable=do_it,
                          executor_config={
                              "pod_override": k8s.V1Pod(
                                  spec=k8s.V1PodSpec(
                                      containers=[
                                          k8s.V1Container(
                                              name="base",
                                              volume_mounts=[volume_mount]
                                          )
                                      ],
                                      volumes=[volume],
                                  )
                              )
                          },
                          dag=dag)

    download_data = KubernetesPodOperator(task_id="download_data",
                                          namespace="default",
                                          name="openmaptiles_download_data",
                                          image="openmaptiles/openmaptiles-tools",
                                          cmds=["download-osm"],
                                          volumes=[volume],
                                          volume_mounts=[volume_mount],
                                          dag=dag)


    start >> download_data >> test

The goal is to have 1 persistent volume that's used by both operators. The k8s operator gets the value mounted as expected, and downloads everything as required. However, the PythonOperator stays in queued status forever.

Tailing the scheduler pod shows the following error:

Pod in version "v1" cannot be handled as a Pod: v1.Pod.Spec: v1.PodSpec.Containers: []v1.Container: v1.Container.VolumeMounts: []v1.VolumeMount: readObjectStart: expect { or n, but found ", error found in #10 byte of ...|-data"}

I suspect this is due to the volume/volume mounts not being set up correctly, as the format looks off:

...

 "volumeMounts": [ 
   { 
     "mountPath": "/opt/airflow/dags", 
     "name": "dags-data" 
   }, 
   { 
     "mountPath": "/opt/airflow/logs", 
     "name": "logs-data" 
   }, 
   "{'mount_path': '/osm_config',\n 'mount_propagation': None,\n 'name': 'test',\n 'read_only': False,\n 'sub_path': None,\n 'sub_path_expr': None}" 
 ] 

But the configuration I have seems consistent with Airflow documentation

-- Darendal
airflow
airflow-scheduler
kubernetes

1 Answer

8/11/2021

The problem was the type of Volume being passed to the PythonOperator.

My original example used k8s.v1_volume.V1Volume and k8s.v1_volume_mount.V1VolumeMount, but switching instead to k8s.V1Volume and k8s.V1VolumeMount creates a pod with the volumes mounted as expected.

-- Darendal
Source: StackOverflow