Apache Airflow kubernetes pod operator how do we pass configMap value in `value_from` while forming the environment variable in DAG

5/15/2020

I am using Apache Airflow where in one of our DAG's task we are using Kubernetes Pod Operator. This is being done to execute one of our application process in a kubernetes pod. The Kubernetes pod operator works all good.Passing environmental variables via the pod operator works all good. However I am trying to pass an environment variable value from a Kubernetes ConfigMap it is not able to get the values from ConfigMap.

Code snippet is as below. In the code snippet please focus on the line 'SPARK_CONFIG': '{"valueFrom": {"configMapKeyRef": {"key": "endpoint","name": "spark-config"}}}'

Please find below the code snippet

pod_process_task = KubernetesPodOperator(
        namespace=cons.K8_NAMESPACE,
        image=cons.UNCOMPRESS_IMAGE_NAME,
        config_file=cons.K8_CONFIG_FILE,
        env_vars={
            'FRT_ID': '{{ dag_run.conf["transaction_id"] }}',
            'FILE_NAME': '{{ dag_run.conf["filename"]}}',
            'FILE_PATH': '{{dag_run.conf["filepath"]}}' + "/" + '{{ dag_run.conf["filename"]}}',
            'LOG_FILE': '{{ ti.xcom_pull(key="process_log_dict")["loglocation"] }}',
            'SPARK_CONFIG': '{"valueFrom": {"configMapKeyRef": {"key": "endpoint","name": "spark-config"}}}'
        },
        name=create_pod_name(),
        # name= 'integrator',
        task_id="decrypt-951",
        retries=3,
        retry_delay=timedelta(seconds=60),
        is_delete_operator_pod=True,
        volumes=[volume-a,volume_for_configuration],
        volume_mounts=[volume_mount_a,volume_mount_config],
        resources=pod_resource_specification,
        startup_timeout_seconds=cons.K8_POD_TIMEOUT,
        get_logs=True,
        on_failure_callback=log_failure_unzip_decrypt,
        dag=dag
    )

Then on trying to print the variable from the pod I am getting as below. Please note that the other ENV_VARIABLE values have been populated except for the one where I am trying to reference the configMap. Please find below the log output that we get to see while the K8-Pod is being spawned. [In the snippet below , please focus on the parameter 'name': 'SPARK_CONFIG' ]. The rest of the env_variables seems to have the value populated as what I have provided in the code snippet above with Jinga-templating.

'containers': [{'args': None,
                          'command': None,
                          'env': [{'name': 'FRT_ID',
                                   'value': '20180902_01605',
                                   'value_from': None},
                                  {'name': 'FILE_NAME',
                                   'value': 'transact_2018-09-02_0321_file_MAR.zip',
                                   'value_from': None},
                                  {'name': 'FILE_PATH',
                                   'value': '/etc/data/app/trk0057.zip',
                                   'value_from': None},                                  
                                  {'name': 'LOG_FILE',
                                   'value': 'log-0057_2018-09.log',
                                   'value_from': None},
                                  {'name': 'SPARK_CONFIG',
                                   'value': '{"valueFrom": {"configMapKeyRef": '
                                            '{"key": "endpoint","name": '
                                            '"spark-config"}}}',
                                   'value_from': None}],
                          'env_from': None

...
...
...
...

The point is as to how do we pass the ConfigMap value as with the value_from while forming the environment variable in the Apache Airflow kubernetes pod operator

-- Shanit
airflow
airflow-operator
environment-variables
kubernetes-pod

1 Answer

5/15/2020

You should be able to accomplish this by using the KubernetesPodOperator configmaps parameter. You can see the docstring here: https://github.com/apache/airflow/blob/v1-10-stable/airflow/contrib/operators/kubernetes_pod_operator.py#L104

So in this way you would pass configmaps=["spark-config"] presuming your configmap is named spark-config.

-- chris.mclennon
Source: StackOverflow