I just deployed a Cloud Composer environment using all defaults. I then followed this guide to try a simple KubernetesPodOperator
out. When I copy the script into my DAGs directory and let it run, I get an error involving No SSH tunnels currently open. Were the targets able to accept an ssh-key for user...
.
I copy/pasted a snippet directly from the example code.
import datetime
from airflow import models
from airflow.contrib.kubernetes import pod
from airflow.contrib.kubernetes import secret
from airflow.contrib.operators import kubernetes_pod_operator
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
with models.DAG(
dag_id='composer_sample_kubernetes_pod',
schedule_interval=datetime.timedelta(days=1),
start_date=YESTERDAY) as dag:
kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
task_id='ex-kube-templates',
name='ex-kube-templates',
namespace='default',
image='bash',
# All parameters below are able to be templated with jinja -- cmds,
# arguments, env_vars, and config_file. For more information visit:
# https://airflow.apache.org/code.html#default-variables
# Entrypoint of the container, if not specified the Docker container's
# entrypoint is used. The cmds parameter is templated.
cmds=['echo'],
# DS in jinja is the execution date as YYYY-MM-DD, this docker image
# will echo the execution date. Arguments to the entrypoint. The docker
# image's CMD is used if this is not provided. The arguments parameter
# is templated.
arguments=['{{ ds }}'])
*** Reading remote log from gs://[BUCKET]/logs/composer_sample_kubernetes_pod/ex-kube-templates/2019-02-25T15:46:57.225930+00:00/1.log.
[2019-02-26 15:47:08,609] {models.py:1361} INFO - Dependencies all met for <TaskInstance: composer_sample_kubernetes_pod.ex-kube-templates 2019-02-25T15:46:57.225930+00:00 [queued]>
[2019-02-26 15:47:08,617] {models.py:1361} INFO - Dependencies all met for <TaskInstance: composer_sample_kubernetes_pod.ex-kube-templates 2019-02-25T15:46:57.225930+00:00 [queued]>
[2019-02-26 15:47:08,617] {models.py:1573} INFO -
-------------------------------------------------------------------------------
Starting attempt 1 of
-------------------------------------------------------------------------------
[2019-02-26 15:47:08,659] {models.py:1595} INFO - Executing <Task(KubernetesPodOperator): ex-kube-templates> on 2019-02-25T15:46:57.225930+00:00
[2019-02-26 15:47:08,660] {base_task_runner.py:118} INFO - Running: ['bash', '-c', 'airflow run composer_sample_kubernetes_pod ex-kube-templates 2019-02-25T15:46:57.225930+00:00 --job_id 2 --raw -sd DAGS_FOLDER/dag_pod_helloworld.py --cfg_path /tmp/tmp6xuy0jka']
[2019-02-26 15:47:10,302] {base_task_runner.py:101} INFO - Job 2: Subtask ex-kube-templates [2019-02-26 15:47:10,302] {settings.py:176} INFO - setting.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800
[2019-02-26 15:47:10,672] {base_task_runner.py:101} INFO - Job 2: Subtask ex-kube-templates [2019-02-26 15:47:10,671] {default_celery.py:80} WARNING - You have configured a result_backend of redis://airflow-redis-service:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2019-02-26 15:47:10,673] {base_task_runner.py:101} INFO - Job 2: Subtask ex-kube-templates [2019-02-26 15:47:10,673] {__init__.py:51} INFO - Using executor CeleryExecutor
[2019-02-26 15:47:10,750] {base_task_runner.py:101} INFO - Job 2: Subtask ex-kube-templates [2019-02-26 15:47:10,750] {app.py:51} WARNING - Using default Composer Environment Variables. Overrides have not been applied.
[2019-02-26 15:47:10,758] {base_task_runner.py:101} INFO - Job 2: Subtask ex-kube-templates [2019-02-26 15:47:10,758] {configuration.py:516} INFO - Reading the config from /etc/airflow/airflow.cfg
[2019-02-26 15:47:10,766] {base_task_runner.py:101} INFO - Job 2: Subtask ex-kube-templates [2019-02-26 15:47:10,766] {configuration.py:516} INFO - Reading the config from /etc/airflow/airflow.cfg
[2019-02-26 15:47:10,875] {base_task_runner.py:101} INFO - Job 2: Subtask ex-kube-templates [2019-02-26 15:47:10,875] {models.py:271} INFO - Filling up the DagBag from /home/airflow/gcs/dags/dag_pod_helloworld.py
[2019-02-26 15:47:12,293] {base_task_runner.py:101} INFO - Job 2: Subtask ex-kube-templates [2019-02-26 15:47:12,292] {cli.py:484} INFO - Running <TaskInstance: composer_sample_kubernetes_pod.ex-kube-templates 2019-02-25T15:46:57.225930+00:00 [running]> on host airflow-worker-584cf75548-rbf7x
[2019-02-26 15:47:12,829] {base_task_runner.py:101} INFO - Job 2: Subtask ex-kube-templates [2019-02-26 15:47:12,827] {pod_launcher.py:121} INFO - Event: ex-kube-templates-c87ea4f8 had an event of type Pending
[2019-02-26 15:47:13,836] {base_task_runner.py:101} INFO - Job 2: Subtask ex-kube-templates [2019-02-26 15:47:13,836] {pod_launcher.py:121} INFO - Event: ex-kube-templates-c87ea4f8 had an event of type Pending
[2019-02-26 15:47:14,843] {base_task_runner.py:101} INFO - Job 2: Subtask ex-kube-templates [2019-02-26 15:47:14,843] {pod_launcher.py:121} INFO - Event: ex-kube-templates-c87ea4f8 had an event of type Pending
[2019-02-26 15:47:15,850] {base_task_runner.py:101} INFO - Job 2: Subtask ex-kube-templates [2019-02-26 15:47:15,850] {pod_launcher.py:121} INFO - Event: ex-kube-templates-c87ea4f8 had an event of type Pending
[2019-02-26 15:47:16,858] {base_task_runner.py:101} INFO - Job 2: Subtask ex-kube-templates [2019-02-26 15:47:16,857] {pod_launcher.py:121} INFO - Event: ex-kube-templates-c87ea4f8 had an event of type Succeeded
[2019-02-26 15:47:16,859] {base_task_runner.py:101} INFO - Job 2: Subtask ex-kube-templates [2019-02-26 15:47:16,858] {pod_launcher.py:184} INFO - Event with job id ex-kube-templates-c87ea4f8 Succeeded
[2019-02-26 15:47:16,896] {models.py:1760} ERROR - (500
Reason: Internal Server Erro
HTTP response headers: HTTPHeaderDict({'Audit-Id': '03bf375f-a5b3-4514-9903-a17cc3a3f886', 'Content-Type': 'application/json', 'Date': 'Tue, 26 Feb 2019 15:47:16 GMT', 'Content-Length': '322'}
HTTP response body: b'{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Get https://10.142.15.217:10250/containerLogs/default/ex-kube-templates-c87ea4f8/base?follow=true\\u0026tailLines=10: No SSH tunnels currently open. Were the targets able to accept an ssh-key for user \\"gke-9d36bbaf757434bb291e\\"?","code":500}\n
Traceback (most recent call last)
File "/usr/local/lib/airflow/airflow/models.py", line 1659, in _run_raw_tas
result = task_copy.execute(context=context
File "/usr/local/lib/airflow/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execut
get_logs=self.get_logs
File "/usr/local/lib/airflow/airflow/contrib/kubernetes/pod_launcher.py", line 90, in run_po
return self._monitor_pod(pod, get_logs
File "/usr/local/lib/airflow/airflow/contrib/kubernetes/pod_launcher.py", line 102, in _monitor_po
_preload_content=False
File "/opt/python3.6/lib/python3.6/site-packages/kubernetes/client/apis/core_v1_api.py", line 18583, in read_namespaced_pod_lo
(data) = self.read_namespaced_pod_log_with_http_info(name, namespace, **kwargs
File "/opt/python3.6/lib/python3.6/site-packages/kubernetes/client/apis/core_v1_api.py", line 18689, in read_namespaced_pod_log_with_http_inf
collection_formats=collection_formats
File "/opt/python3.6/lib/python3.6/site-packages/kubernetes/client/api_client.py", line 321, in call_ap
_return_http_data_only, collection_formats, _preload_content, _request_timeout
File "/opt/python3.6/lib/python3.6/site-packages/kubernetes/client/api_client.py", line 155, in __call_ap
_request_timeout=_request_timeout
File "/opt/python3.6/lib/python3.6/site-packages/kubernetes/client/api_client.py", line 342, in reques
headers=headers
File "/opt/python3.6/lib/python3.6/site-packages/kubernetes/client/rest.py", line 231, in GE
query_params=query_params
File "/opt/python3.6/lib/python3.6/site-packages/kubernetes/client/rest.py", line 222, in reques
raise ApiException(http_resp=r
kubernetes.client.rest.ApiException: (500
Reason: Internal Server Erro
HTTP response headers: HTTPHeaderDict({'Audit-Id': '03bf375f-a5b3-4514-9903-a17cc3a3f886', 'Content-Type': 'application/json', 'Date': 'Tue, 26 Feb 2019 15:47:16 GMT', 'Content-Length': '322'}
HTTP response body: b'{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Get https://10.142.15.217:10250/containerLogs/default/ex-kube-templates-c87ea4f8/base?follow=true\\u0026tailLines=10: No SSH tunnels currently open. Were the targets able to accept an ssh-key for user \\"gke-9d36bbaf757434bb291e\\"?","code":500}\n
User error! It was a GCP Firewall rule priority issue. We had a firewall rule blocking all traffic at a specific priority, and that priority was lower in number (a higher priority) than the default priority 1000
assigned to Cloud Composer SSH firewall rule that lets the K8s master talk to the nodes.
Once I elevated the Cloud Composer firewall rules, it worked perfectly.