I've set up Airflow in a Kubernetes cluster. To run tasks, I'm using the KubernetesPodOperator
.
When I run a task and take a look at kubectl get pods
, I see a pod is created correctly and it also completes. However, when I look at Airflow, I see the state isn't updated and it says it's still in the running state.
[2019-01-27 12:43:56,580] {models.py:1595} INFO - Executing <Task(KubernetesPodOperator): xxx> on 2019-01-20T00:00:00+00:00
[2019-01-27 12:43:56,581] {base_task_runner.py:118} INFO - Running: ['bash', '-c', 'airflow run xxx xxx 2019-01-20T00:00:00+00:00 --job_id 15 --raw -sd DAGS_FOLDER/xxx.py --cfg_path /tmp/tmpxx39wldz']
[2019-01-27 12:45:21,603] {models.py:1355} INFO - Dependencies not met for <TaskInstance: xxx.xxx 2019-01-20T00:00:00+00:00 [running]>, dependency 'Task Instance Not Already Running' FAILED: Task is already running, it started on 2019-01-27 12:43:56.565328+00:00.
[2019-01-27 12:45:21,639] {models.py:1355} INFO - Dependencies not met for <TaskInstance: xxx.xxx 2019-01-20T00:00:00+00:00 [running]>, dependency 'Task Instance State' FAILED: Task is in the 'running' state which is not a valid state for execution. The task must be cleared in order to be run.
[2019-01-27 12:45:21,641] {logging_mixin.py:95} INFO - [2019-01-27 12:45:21,641] {jobs.py:2614} INFO - Task is not able to be run
Is there anything specific I should do to return the pod's state back to Airflow? The KubernetesPodOperator
is defined as follows:
do_something = KubernetesPodOperator(
task_id='xxx',
image='gcr.io/project/image',
namespace='default',
name='xxx',
arguments=['dummy'],
xcom_push=True,
in_cluster=True,
image_pull_policy='Always',
trigger_rule='dummy',
dag=dag,
)
Edit: It appears that the base container has completed, but airflow-xcom-sidecar
is still running. Is there anything specific I should do to stop that one?
Hard to tell exactly without looking at your setup, but it looks like the pod is done and it's trying to an xcom push to your main Airflow and it's not able to connect. I would check the logs for airflow-xcom-sidecar
. Something like:
$ kubectl logs <airflow-job-pod> -c airflow-xcom-sidecar
You can also try running your KubernetesOperator
with xcom_push=False
:
do_something = KubernetesPodOperator(
task_id='xxx',
image='gcr.io/project/image',
namespace='default',
name='xxx',
arguments=['dummy'],
xcom_push=False,
in_cluster=True,
image_pull_policy='Always',
trigger_rule='dummy',
dag=dag,
)