I have already uploaded an image with everything I need to run in GCP using KubernetesPodOperator and I get the message below, could anyone help me understand what is going on?
Below is a summary of my script and error message:
import os
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from airflow.contrib.operators.mssql_to_gcs import MsSqlToGoogleCloudStorageOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
import pyarrow
import airflow
from airflow import DAG
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
default_args = {
'owner': 'me',
'start_date': airflow.utils.dates.days_ago(0),
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'depends_on_past': False,
'catchup': False,
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
with DAG('test_kube', default_args=default_args, description='Kubernetes Operator',
schedule_interval='00 12 01 * *') as dag:
k = KubernetesPodOperator(namespace='kubenm',
image="teste-kube:latest",
name="test",
task_id="test",
is_delete_operator_pod=False,
hostnetwork=False,
dag=dag
)
k
This is the first time I am using this operator and I am wondering if it will meet my needs.
Log:
INFO - Job 11344: Subtask test Traceback (most recent call last):
INFO - Job 11344: Subtask test File "/usr/local/bin/airflow", line 32, in <module>
INFO - Job 11344: Subtask test args.func(args)
INFO - Job 11344: Subtask test File "/usr/local/lib/python3.7/site-packages/airflow/utils/cli.py", line 74, in wrapper
INFO - Job 11344: Subtask test return f(*args, **kwargs)
INFO - Job 11344: Subtask test File "/usr/local/lib/python3.7/site-packages/airflow/bin/cli.py", line 522, in run
INFO - Job 11344: Subtask test _run(args, dag, ti)
INFO - Job 11344: Subtask test File "/usr/local/lib/python3.7/site-packages/airflow/bin/cli.py", line 440, in _run
INFO - Job 11344: Subtask test pool=args.pool,
INFO - Job 11344: Subtask test File "/usr/local/lib/python3.7/site-packages/airflow/utils/db.py", line 74, in wrapper
INFO - Job 11344: Subtask test return func(*args, **kwargs)
INFO - Job 11344: Subtask test File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 926, in _run_raw_task
INFO - Job 11344: Subtask test result = task_copy.execute(context=context)
INFO - Job 11344: Subtask test File "/usr/local/lib/python3.7/site-packages/airflow/contrib/operators/kubernetes_pod_operator.py", line 111, in execute
INFO - Job 11344: Subtask test config_file=self.config_file)
INFO - Job 11344: Subtask test File "/usr/local/lib/python3.7/site-packages/airflow/contrib/kubernetes/kube_client.py", line 56, in get_kube_client
INFO - Job 11344: Subtask test return _load_kube_config(in_cluster, cluster_context, config_file)
INFO - Job 11344: Subtask test File "/usr/local/lib/python3.7/site-packages/airflow/contrib/kubernetes/kube_client.py", line 38, in _load_kube_config
INFO - Job 11344: Subtask test config.load_kube_config(config_file=config_file, context=cluster_context)
INFO - Job 11344: Subtask test File "/usr/local/lib/python3.7/site-packages/kubernetes/config/kube_config.py", line 645, in load_kube_config
INFO - Job 11344: Subtask test persist_config=persist_config)
INFO - Job 11344: Subtask test File "/usr/local/lib/python3.7/site-packages/kubernetes/config/kube_config.py", line 613, in _get_kube_config_loader_for_yaml_file
INFO - Job 11344: Subtask test **kwargs)
INFO - Job 11344: Subtask test File "/usr/local/lib/python3.7/site-packages/kubernetes/config/kube_config.py", line 153, in __init__
INFO - Job 11344: Subtask test self.set_active_context(active_context)
INFO - Job 11344: Subtask test File "/usr/local/lib/python3.7/site-packages/kubernetes/config/kube_config.py", line 173, in set_active_context
INFO - Job 11344: Subtask test context_name = self._config['current-context']
INFO - Job 11344: Subtask test File "/usr/local/lib/python3.7/site-packages/kubernetes/config/kube_config.py", line 495, in __getitem__
INFO - Job 11344: Subtask test v = self.safe_get(key)
INFO - Job 11344: Subtask test File "/usr/local/lib/python3.7/site-packages/kubernetes/config/kube_config.py", line 491, in safe_get
INFO - Job 11344: Subtask test key in self.value):
INFO - Job 11344: Subtask test TypeError: argument of type 'NoneType' is not iterable
INFO - [[34m2019-09-30 17:18:16,274[0m] {[34mlocal_task_job.py:[0m172} WARNING[0m - State of this instance has been externally set to [1mup_for_retry[0m. Taking the poison pill.[0m
INFO - Sending Signals.SIGTERM to GPID 9
INFO - Process psutil.Process(pid=9, status='terminated') (9) terminated with exit code -15
INFO - [[34m2019-09-30 17:18:16,303[0m] {[34mlocal_task_job.py:[0m105} INFO[0m
I made some adjustments to the script that made the operation work:
with DAG('test_kube', default_args=default_args, description='Kubernetes Operator',
schedule_interval='00 12 01 * *') as dag:
k = KubernetesPodOperator(namespace='kubenm',
image="gcr.io/project/teste-kube:latest", #Image path was incorrect
name="test",
in_cluster=True, #To trigger cluster kubeconfig.
image_pull_policy="Always", #In my case, I need the image update to occur whenever there is an update
task_id="test",
is_delete_operator_pod=False,
hostnetwork=False,
dag=dag
)
k