I have a simple Airflow DAG with 2 operators (a PythonOperator, and a KubernetesPodOperator):
with DAG(dag_id="dummy", start_date=datetime(2020, 11, 7), catchup=False) as dag:
logger = logging.getLogger("airflow.task")
volume_mount = k8s.v1_volume_mount.V1VolumeMount(name='osm-config',
mount_path=ROOT_PATH,
sub_path=None,
read_only=False)
pvc = k8s.V1PersistentVolumeClaimVolumeSource(claim_name="osm-config-pv-claim")
volume = k8s.v1_volume.V1Volume(name="osm-config",
persistent_volume_claim=pvc)
def do_it():
logger.debug("do work")
start = DummyOperator(task_id="start", dag=dag)
test = PythonOperator(task_id="test",
python_callable=do_it,
executor_config={
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
volume_mounts=[volume_mount]
)
],
volumes=[volume],
)
)
},
dag=dag)
download_data = KubernetesPodOperator(task_id="download_data",
namespace="default",
name="openmaptiles_download_data",
image="openmaptiles/openmaptiles-tools",
cmds=["download-osm"],
volumes=[volume],
volume_mounts=[volume_mount],
dag=dag)
start >> download_data >> test
The goal is to have 1 persistent volume that's used by both operators. The k8s operator gets the value mounted as expected, and downloads everything as required. However, the PythonOperator stays in queued
status forever.
Tailing the scheduler pod shows the following error:
Pod in version "v1" cannot be handled as a Pod: v1.Pod.Spec: v1.PodSpec.Containers: []v1.Container: v1.Container.VolumeMounts: []v1.VolumeMount: readObjectStart: expect { or n, but found ", error found in #10 byte of ...|-data"}
I suspect this is due to the volume/volume mounts not being set up correctly, as the format looks off:
...
"volumeMounts": [
{
"mountPath": "/opt/airflow/dags",
"name": "dags-data"
},
{
"mountPath": "/opt/airflow/logs",
"name": "logs-data"
},
"{'mount_path': '/osm_config',\n 'mount_propagation': None,\n 'name': 'test',\n 'read_only': False,\n 'sub_path': None,\n 'sub_path_expr': None}"
]
But the configuration I have seems consistent with Airflow documentation
The problem was the type of Volume
being passed to the PythonOperator.
My original example used k8s.v1_volume.V1Volume
and k8s.v1_volume_mount.V1VolumeMount
, but switching instead to k8s.V1Volume
and k8s.V1VolumeMount
creates a pod with the volumes mounted as expected.