We have some dags that launch pods using KubernetesPodOperator and I'm trying to get some information inside the pod, like dag_id, task_id, try_number, environment, etc.
I know that I can get this information from the context of the Airflow task (for example, kwargs on Python Operator) but I've been wondering, is there a way that I can get that context from the pod that was launched?
Thanks!
I found a pretty good solution to this
I made a Custom Wrapper for the class KubernetesPodOperator and update the env_vars with the context of the Airflow Task
import airflow.configuration as config
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator as AirflowKubernetesPodOperator
class KubernetesPodOperator(AirflowKubernetesPodOperator):
def execute(self, context):
environment = config.conf.get('webserver', 'web_server_name')
ti = context['ti']
dag_id = ti.dag_id
task_id = ti.task_id
run_id = context['run_id']
try_number = str(ti._try_number)
labels = {
'ENVIRONMENT' : environment,
'DAG_ID' : dag_id,
'TASK_ID' : task_id,
'RUN_ID' : run_id,
'TRY_NUMBER' : try_number,
}
self.env_vars.update(labels)
super().execute(context)