Airflow - Conditional retry in KubernetesPodOperator

7/1/2019

I want to write advanced retry mechanism in my kube pod operator.

  1. if the first execution failed - rerun with more memory the above example with jinja templating does not working :-(
opr_kube = KubernetesPodOperator(
    namespace="development",
    image='bla',
    node_selectors={"servertype": "worker", "env": "development"},
    resources=pod.Resources(request_memory='{{task_instance.try_number}}g', limit_memory='{{task_instance.try_number}}g'),
    name="bla",
    task_id='bla',
    default_args=default_args,
    dag=dag
)
  1. I want to to understand if the error is because of out of memory event. there is an interface to recognize that? I want to write external function to kube-cli

    • on-retry hook is not relevant here because there is no option to trigger the operator again with more memory.
    • when I add inline function, inside the parameter, the scheduler will execute this function every X seconds and I want to execute it once the task failed.
-- Ohad Mata
airflow
kubernetes
python-3.x

1 Answer

7/3/2019

I want to to understand if the error is because of out of memory event:

For the reason behind failed task instances, check the Airflow web interface => DAG's Graph View

About Kubernetes Operator retries option, here's an example, but you should first understand the reason behind failed tasks. There is no reason to have an incremental memory retry mechanism without verify the real cause of the problem. Please make sure upfront that your workloads running through KubernetesPodOperator have sufficient resources anyway (you can consult also 'Ensuring appropriate resources for your environment' guide, made more specifically for managed version of Airflow)

-- Nepomucen
Source: StackOverflow