I have deployed apache airflow in azure kubernetes.
Helm repository of apache airflow: https://github.com/apache/airflow/tree/master/chart
AKS version: 1.16.13
Once I have airflow deployed I have tested it with this dag:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.operators.dummy_operator import DummyOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.utcnow(),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'kubernetes_sample', default_args=default_args, schedule_interval=timedelta(minutes=10))
start = DummyOperator(task_id='run_this_first', dag=dag)
passing = KubernetesPodOperator(namespace='default',
image="python:3.8-slim-buster",
cmds=["python3","-c"],
arguments=["print('hello world')"],
labels={"foo": "bar"},
name="passing-test",
task_id="passing-task",
get_logs=True,
dag=dag
)
passing.set_upstream(start)
It works fine. Now I want to use my own images. For that, I am using azure containers, and following this guide: https://airflow.readthedocs.io/en/latest/howto/operator/kubernetes.html, I am using this code to create the secret to access to my azure registry:
kubectl create secret docker-registry testquay \
--docker-server=quay.io \
--docker-username=<Profile name> \
--docker-password=<password>
I build my image and I test locally and it is working. I upload the image to azure container registry and I write the following dag:
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.contrib.kubernetes import secret
import logging
import os
import sys
import traceback
try:
env_var_secret = secret.Secret(
deploy_type='env',
deploy_target='VERSION_NUMBER',
secret='myregistrykey',
key='VERSION_NUMBER',
)
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.utcnow(),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'jordi_test_get_secert2', default_args=default_args, schedule_interval=timedelta(minutes=10))
start = DummyOperator(task_id='run_this_first', dag=dag)
quay_k8s = KubernetesPodOperator(
namespace='default',
name="passing-test7",
image='docker.io/test-pai-1',
image_pull_secrets=env_var_secret,
task_id="passing-task6",
get_logs=True,
dag=dag
)
start >> quay_k8s
except Exception as e:
error_message = {
"message": "An internal error ocurred"
,"error": str(e)
, "error information" : str(sys.exc_info())
, "traceback": str(traceback.format_exc())
}
logging.info(error_message)
And give me this error:
File "/home/airflow/.local/lib/python3.6/site-packages/airflow/kubernetes/pod_generator.py", line 272, in __init__
for image_pull_secret in image_pull_secrets.split(','):
AttributeError: 'Secret' object has no attribute 'split'
Following this guide: https://airflow.readthedocs.io/en/latest/howto/operator/kubernetes.html
from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.operators.dummy_operator import DummyOperator
from kubernetes.client import models as k8s
import logging
import os
import sys
import traceback
try:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.utcnow(),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'jordi_test2', default_args=default_args, schedule_interval=timedelta(minutes=10))
start = DummyOperator(task_id='run_this_first', dag=dag)
quay_k8s = KubernetesPodOperator(
namespace='default',
name="passing-test7",
image='docker.io/test-pai-1',
image_pull_secrets=[k8s.V1LocalObjectReference('myregistrykey')],
task_id="passing-task6",
get_logs=True,
dag=dag
)
start >> quay_k8s
except Exception as e:
error_message = {
"message": "An internal error ocurred"
,"error": str(e)
, "error information" : str(sys.exc_info())
, "traceback": str(traceback.format_exc())
}
logging.info(error_message)
But give me this error:
for image_pull_secret in image_pull_secrets.split(','):
AttributeError: 'list' object has no attribute 'split'
If I go to the Airflow documentation for the KubernetesPodOperator: https://airflow.apache.org/docs/stable/_api/airflow/contrib/operators/kubernetes_pod_operator/index.html
Say:
image_pull_secrets (str) – Any image pull secrets to be given to the pod. If more than one secret is required, provide a comma separated list: secret_a,secret_b
How is the correct way to write it?
Would ask for clarification as a comment but I don't have the reputation. So providing a solution with some assumptions.
Here your secret is named testquay
kubectl create secret docker-registry testquay \
--docker-server=quay.io \
--docker-username=<Profile name> \
--docker-password=<password>
However you reference it as myregistrykey in the code you posted that is meant to follow the example you reference.
image_pull_secrets=[k8s.V1LocalObjectReference('myregistrykey')],
It should reference the secret name according to the example.
image_pull_secrets=[k8s.V1LocalObjectReference('testquay')],
Also for the first DAG code, I don't believe you can pass a Secrets
object in that manner. Those are meant to be injected into the k8s pod either as a volume or env variable at runtime. https://github.com/apache/airflow/blob/v1-10-stable/airflow/kubernetes/secret.py#L35-L40