Airflow allows you to put dependencies (external python code to the dag code) that dags rely on in the dag folder. this means any components/members or classes in those external python code is available for use in the dag code.
When doing this (in the GCS dag folder of the cloud compose environment) however, the dependencies' components are not available to the dags. an error similar to the following is displayed in the Airflow Web UI: Broken DAG: [/home/airflow/gcs/dags/....py] No module named tester. where tester is a separate python file in the dags folder.
when testing those tasks using Google's SDK (running actual Airflow commands) the tasks run fine but it seems somewhere in Kubernettes creating those container jobs, it does not seem to take over the dependencies too.
I realise Cloud Compose is in Beta but I was wondering if I am doing something wrong.
From the official docs on configuring Airflow:
The first time you run Airflow, it will create a file called airflow.cfg in your $AIRFLOW_HOME directory (~/airflow by default). This file contains Airflow’s configuration and you can edit it to change any of the settings
In this file set in the very first setting
[core]
# The home folder for airflow, default is ~/airflow
airflow_home = /home/airflow/gcs/dags
the base path to Airflow.
Are you looking for how to install Python dependencies? https://cloud.google.com/composer/docs/how-to/using/installing-python-dependencies
Also, the DAGs folder that lives in your GCS bucket (gcloud beta composer environments describe [environment]
to get this bucket; gs://{composer-bucket}/dags) should map to /home/airflow/gcs/dags in your pods. Have you tried SSHing into a node to find this?
You should put the module in a separate folder that contains an __init__.py file (Airflow doesn't like __init__.py files in its top-level DAGs directory).
For example, if you have the following directory structure:
dags/
my_dag.py
my_deps/
__init__.py
dep_a.py
dep_b.py
You can write from my_deps import dep_a, dep_b
in my_dag.py
.
I had the same issue and had help resolving it on the mailing list. For reference, see the thread here: https://groups.google.com/forum/#!topic/cloud-composer-discuss/wTI7Pbwc6ZY. There's a link to a handy Github Gist with some comments on it as well.
In order to write and import your own dependencies into your DAGs, you'll want to zip your dags and their dependencies as described here: https://airflow.apache.org/concepts.html?highlight=zip#packaged-dags.
You can upload that zip file directly to your Cloud Composer GCS bucket and Airflow will pick it up.
Make sure your dependencies are packages, not modules, at the top-level of your dags
directory.
from foo_dep.foo_dep import my_utility_function
will work here:
foo_dag.py
foo_dep/__init__.py
foo_dep/foo_dep.py
from foo_dep import my_utility_function
seems like it should work with the following dags directory structure (and will work locally), but it will not work in Airflow:
foo_dag.py
foo_dep.py