running KubernetesPodOperator

10/10/2019

I’m running airflow on Kubernetes and using “Kubernetes Operator”. When I run BashOperator or PythonOperator it works fine Using:

executor_config = {
    "KubernetesExecutor": {
    "image": "image_with_airflow_and_my_code:latest"        
    }
}

When I try run run dag with KubernetesPodOperator it fails

for example:

k = KubernetesPodOperator(namespace='default',
    image="ubuntu:18.04",
    cmds=["bash", "-cx"],
    arguments=["echo", "10"],
    name="test",
    task_id="task",
    is_delete_operator_pod=False,
    dag=dag
)

I see that the docker image that was created is not the image that I specified above (ubuntu:18.04) but the default image from the configuration (AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY)

in the scheduler log I see:

[2019-10-06 12:59:56,279] {{scheduler_job.py:921}} INFO - 1 tasks up for execution: [2019-10-06 12:59:56,325] {{scheduler_job.py:953}} INFO - Figuring out tasks to run in Pool(name=default_pool) with 128 open slots and 1 task instances ready to be queued [2019-10-06 12:59:56,326] {{scheduler_job.py:981}} INFO - DAG koperator has 0/16 running and queued tasks [2019-10-06 12:59:56,361] {{scheduler_job.py:1031}} INFO - Setting the following tasks to queued state: [2019-10-06 12:59:56,398] {{scheduler_job.py:1107}} INFO - Setting the following 1 tasks to queued state: [2019-10-06 12:59:56,401] {{scheduler_job.py:1143}} INFO - Sending ('koperator', 'task', datetime.datetime(2019, 10, 6, 12, 59, 50, 146375, tzinfo=), 1) to executor with priority 1 and queue default [2019-10-06 12:59:56,403] {{base_executor.py:59}} INFO - Adding to queue: ['airflow', 'run', 'koperator', 'task', '2019-10-06T12:59:50.146375+00:00', '--local', '--pool', 'default_pool', '-sd', '/usr/local/airflow/dags/KubernetesPodOperator.py'] [2019-10-06 12:59:56,405] {{kubernetes_executor.py:764}} INFO - Add task ('koperator', 'task', datetime.datetime(2019, 10, 6, 12, 59, 50, 146375, tzinfo=), 1) with command ['airflow', 'run', 'koperator', 'task', '2019-10-06T12:59:50.146375+00:00', '--local', '--pool', 'default_pool', '-sd', '/usr/local/airflow/dags/KubernetesPodOperator.py'] with executor_config {} [2019-10-06 12:59:56,417] {{kubernetes_executor.py:441}} INFO - Kubernetes job is (('koperator', 'task', datetime.datetime(2019, 10, 6, 12, 59, 50, 146375, tzinfo=), 1), ['airflow', 'run', 'koperator', 'task', '2019-10-06T12:59:50.146375+00:00', '--local', '--pool', 'default_pool', '-sd', '/usr/local/airflow/dags/KubernetesPodOperator.py'], KubernetesExecutorConfig(image=None, image_pull_policy=None, request_memory=None, request_cpu=None, limit_memory=None, limit_cpu=None, limit_gpu=None, gcp_service_account_key=None, node_selectors=None, affinity=None, annotations={}, volumes=[], volume_mounts=[], tolerations=None, labels={})) [2019-10-06 12:59:56,498] {{kubernetes_executor.py:353}} INFO - Event: koperatortask-2f35f3b347a149bcb2133ef58cf9e77d had an event of type ADDED [2019-10-06 12:59:56,509] {{kubernetes_executor.py:385}} INFO - Event: koperatortask-2f35f3b347a149bcb2133ef58cf9e77d Pending [2019-10-06 12:59:56,528] {{kubernetes_executor.py:353}} INFO - Event: koperatortask-2f35f3b347a149bcb2133ef58cf9e77d had an event of type MODIFIED [2019-10-06 12:59:56,529] {{kubernetes_executor.py:385}} INFO - Event: koperatortask-2f35f3b347a149bcb2133ef58cf9e77d Pending [2019-10-06 12:59:56,543] {{kubernetes_executor.py:353}} INFO - Event: koperatortask-2f35f3b347a149bcb2133ef58cf9e77d had an event of type MODIFIED [2019-10-06 12:59:56,544] {{kubernetes_executor.py:385}} INFO - Event: koperatortask-2f35f3b347a149bcb2133ef58cf9e77d Pending [2019-10-06 12:59:59,492] {{kubernetes_executor.py:353}} INFO - Event: koperatortask-2f35f3b347a149bcb2133ef58cf9e77d had an event of type MODIFIED [2019-10-06 12:59:59,492] {{kubernetes_executor.py:393}} INFO - Event: koperatortask-2f35f3b347a149bcb2133ef58cf9e77d is Running [2019-10-06 13:00:10,873] {{kubernetes_executor.py:353}} INFO - Event: koperatortask-2f35f3b347a149bcb2133ef58cf9e77d had an event of type MODIFIED [2019-10-06 13:00:10,874] {{kubernetes_executor.py:390}} INFO - Event: koperatortask-2f35f3b347a149bcb2133ef58cf9e77d Succeeded [2019-10-06 13:00:12,236] {{kubernetes_executor.py:493}} INFO - Attempting to finish pod; pod_id: koperatortask-2f35f3b347a149bcb2133ef58cf9e77d; state: None; labels: {'airflow-worker': 'b46fd37e-959c-4844-81e1-dff9df2e98e2', 'dag_id': 'koperator', 'execution_date': '2019-10-06T12_59_50.146375_plus_00_00', 'task_id': 'task', 'try_number': '1'} [2019-10-06 13:00:12,245] {{kubernetes_executor.py:616}} INFO - Checking 1 task instances. [2019-10-06 13:00:12,247] {{kubernetes_executor.py:626}} INFO - Found matching task koperator-task (2019-10-06 12:59:50.146375+00:00) with current state of up_for_retry [2019-10-06 13:00:12,253] {{kubernetes_executor.py:783}} INFO - Changing state of (('koperator', 'task', datetime.datetime(2019, 10, 6, 12, 59, 50, 146375, tzinfo=tzlocal()), 1), None, 'koperatortask-2f35f3b347a149bcb2133ef58cf9e77d', '34894988') to None [2019-10-06 13:00:12,273] {{scheduler_job.py:1283}} INFO - Executor reports execution of koperator.task execution_date=2019-10-06 12:59:50.146375+00:00 exited with status None for

the log of the raised pod:

[2019-10-06 12:02:11,961] {{init.py:51}} INFO - Using executor LocalExecutor [2019-10-06 12:02:12,844] {{dagbag.py:90}} INFO - Filling up the DagBag from /usr/local/airflow/dags/KubernetesPodOperator.py [2019-10-06 12:02:13,571] {{cli.py:516}} INFO - Running on host koperatortask-bd0c81d6039c4b329ae8dd2292c0c566

what am I doing wrong?

how can I run dag on kubernetes with KubernetesPodOperator?

thanks, Aviad

-- Aviad
airflow
airflow-operator
kubernetes

1 Answer

3/24/2020

unfortunately I do not see enough information to determine what is wrong. add the parameter.

"get_logs": True

to the KubernetesPodOperator.

That way, the run will combine the stdouts from both the KubernetesExecutor and KubernetesPodOperator pods into an Airflow task log.

It should give you a much clearer idea of what is going on.

-- Grant T
Source: StackOverflow