I'm planning to use Google Cloud Composer (Apache Airflow) to manage our data pipelines. Certain processing steps are defined in a Docker image that I'd like to trigger to run on Google Kubernetes Engine. These processing steps are often resource-intensive jobs and I'm wondering what's the best way to approach scheduling them.
I looked into the Kubernetes Operator to build a Docker image hosted on Google Container Registry. However, it's my understanding that this workload will be created within the existing Cloud Composer Kubernetes cluster. Therefore, the resources available to run the workload are limited by the amount of resources allocated to the Cloud Composer cluster. It seems wasteful to allocate a huge amount of resources to the Cloud Composer cluster to only be available when this certain task runs. Is there any type of autoscaling at the Cloud Composer cluster-level that could handle this?
As an alternative, I was thinking that Cloud Composer could have a DAG that creates an external Kubernetes cluster with the proper resources to run this step, and then tear down when completed. Does this sound like a valid approach? What would be the best way to implement this? I was thinking to use the BashOperator with gcloud commands to kubectl.
TLDR: Is it a valid pattern to use Cloud Composer to manage external Kubernetes clusters as a way to handle resource-intensive processing steps?
I think it's a good practice to separate your own pods on different nodes than the existing Airflow pods (executed on the default node pool of your Cloud Composer Kubernetes cluster). Doing so, you won't interfere with the existing Airflow pods in any manner.
If you don't want to use an external Kubernetes cluster, you can create a node pool directly inside your Cloud Composer Kubernetes cluster, with minimum 0 nodes and auto-scaling enabled. When there is no pod running, there will be no node in the node pool (you won't pay). When you will start a pod (using node affinity), a node will automatically be started. An other advantage is that you can choose the node pool's nodes machines type depending on your needs.
To schedule a pod on a specific node pool, use KubernetesPodOperator
's affinity
parameter :
KubernetesPodOperator(
task_id=task_id,
namespace='default',
image=image,
arguments=arguments,
name=task_id.replace('_', '-'),
affinity={
'nodeAffinity': {
'requiredDuringSchedulingIgnoredDuringExecution': {
'nodeSelectorTerms': [{
'matchExpressions': [{
'key': 'cloud.google.com/gke-nodepool',
'operator': 'In',
'values': [
'<name_of_your_node_pool>',
]
}]
}]
}
}
},
is_delete_operator_pod=True,
hostnetwork=False,
)
I'm using this in production and it works correctly.