Airflow GKEPodOperator xcom_push returns None

10/11/2019

So I've been struggling with this for hours. This is the code of my operator:

task1 = GKEPodOperator(
        task_id="task1",
        project_id="proj",
        location="location",
        cluster_name="cluster_name",
        name="cluster-calculator",
        namespace="default",
        image=Variable.get("cluster_calculator_image"),
        arguments=['--name clustercalculator'],
        env_vars=env_vars,
        xcom_push=True,
        is_delete_operator_pod=True,
        get_logs=True,
        dag=dag
    )

The pod runs a simple docker container with Java application inside doing some stuff and writing results to the default /airflow/xcom/result.json file.

This is how I try to obtain the xcom_push result:

def print_xcom_result(*op_args, **kwargs):
        print(op_args)
        print(kwargs['task_instance'].xcom_pull(task_ids='task1'))

    test_values = PythonOperator(
        task_id="task1_test",
        python_callable=print_xcom_result,
        provide_context=True,
        op_args=["{{task_instance.xcom_pull(task_ids='task1')}}"],
        dag=dag
    )

Whatever I try, it always prints None.

[2019-10-12 00:06:23,061] {{logging_mixin.py:95}} INFO - ('None',)
[2019-10-12 00:06:23,072] {{logging_mixin.py:95}} INFO - None

When I go to the XCOM on the Airflow UI it shows nothing. I also tried example from here: Failed to extract xcom from airflow pod - Kubernetes Pod Operator and it didn't work either.

The sidecar container is created for sure and I see its output in the logs:

Running command... [1mcat /airflow/xcom/return.json[0m
Running command... [1mkill -s SIGINT 1[0m
INFO[0m - {"clusterSize":2}[0m

I even tried to run the docker container externally, verified that the result is written to the xcom directory correctly, but have no luck obtaining this result during DAG execution.

Airflow version is latest. Python 3.7

If this matters, I have 6 containers running Airflow (webserver, flower, worker, scheduler, postgre, rabbitmq). Celery is the executor. Pods are running in the Kubernetes Engine on the Google Cloud.

There are no errors, both operators succeed.

Does anyone have any ideas? Thank you in advance.

-- Zarial
airflow
apache-airflow-xcom
docker
kubernetes
python

1 Answer

12/11/2019

There is a bug in airflow where execute of the GKEPodOperator doesnt have a return statement

super(GKEPodOperator, self).execute(context)

should be

return super(GKEPodOperator, self).execute(context)

The bug is fixed in airflow 2.0 (https://issues.apache.org/jira/browse/AIRFLOW-4072)

A suggestion could be to to create a plugin YourOwnGKEOperator that fixes this problem locally. (and deploy it into your plugins folder)

-- Anders Elton
Source: StackOverflow