Cannot run Airflow's KubernetesPodOperator using Default Cloud Composer Environment

2/26/2019

Overview

I just deployed a Cloud Composer environment using all defaults. I then followed this guide to try a simple KubernetesPodOperator out. When I copy the script into my DAGs directory and let it run, I get an error involving No SSH tunnels currently open. Were the targets able to accept an ssh-key for user....

Source Code

I copy/pasted a snippet directly from the example code.

import datetime
from airflow import models
from airflow.contrib.kubernetes import pod
from airflow.contrib.kubernetes import secret
from airflow.contrib.operators import kubernetes_pod_operator

YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
with models.DAG(
        dag_id='composer_sample_kubernetes_pod',
        schedule_interval=datetime.timedelta(days=1),
        start_date=YESTERDAY) as dag:
    kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
        task_id='ex-kube-templates',
        name='ex-kube-templates',
        namespace='default',
        image='bash',
        # All parameters below are able to be templated with jinja -- cmds,
        # arguments, env_vars, and config_file. For more information visit:
        # https://airflow.apache.org/code.html#default-variables

        # Entrypoint of the container, if not specified the Docker container's
        # entrypoint is used. The cmds parameter is templated.
        cmds=['echo'],
        # DS in jinja is the execution date as YYYY-MM-DD, this docker image
        # will echo the execution date. Arguments to the entrypoint. The docker
        # image's CMD is used if this is not provided. The arguments parameter
        # is templated.
        arguments=['{{ ds }}'])

Error Trace

*** Reading remote log from gs://[BUCKET]/logs/composer_sample_kubernetes_pod/ex-kube-templates/2019-02-25T15:46:57.225930+00:00/1.log.
[2019-02-26 15:47:08,609] {models.py:1361} INFO - Dependencies all met for <TaskInstance: composer_sample_kubernetes_pod.ex-kube-templates 2019-02-25T15:46:57.225930+00:00 [queued]>
[2019-02-26 15:47:08,617] {models.py:1361} INFO - Dependencies all met for <TaskInstance: composer_sample_kubernetes_pod.ex-kube-templates 2019-02-25T15:46:57.225930+00:00 [queued]>
[2019-02-26 15:47:08,617] {models.py:1573} INFO -
-------------------------------------------------------------------------------
Starting attempt 1 of 
-------------------------------------------------------------------------------

[2019-02-26 15:47:08,659] {models.py:1595} INFO - Executing <Task(KubernetesPodOperator): ex-kube-templates> on 2019-02-25T15:46:57.225930+00:00
[2019-02-26 15:47:08,660] {base_task_runner.py:118} INFO - Running: ['bash', '-c', 'airflow run composer_sample_kubernetes_pod ex-kube-templates 2019-02-25T15:46:57.225930+00:00 --job_id 2 --raw -sd DAGS_FOLDER/dag_pod_helloworld.py --cfg_path /tmp/tmp6xuy0jka']
[2019-02-26 15:47:10,302] {base_task_runner.py:101} INFO - Job 2: Subtask ex-kube-templates [2019-02-26 15:47:10,302] {settings.py:176} INFO - setting.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800
[2019-02-26 15:47:10,672] {base_task_runner.py:101} INFO - Job 2: Subtask ex-kube-templates [2019-02-26 15:47:10,671] {default_celery.py:80} WARNING - You have configured a result_backend of redis://airflow-redis-service:6379/0, it is highly recommended to use an alternative result_backend (i.e. a database).
[2019-02-26 15:47:10,673] {base_task_runner.py:101} INFO - Job 2: Subtask ex-kube-templates [2019-02-26 15:47:10,673] {__init__.py:51} INFO - Using executor CeleryExecutor
[2019-02-26 15:47:10,750] {base_task_runner.py:101} INFO - Job 2: Subtask ex-kube-templates [2019-02-26 15:47:10,750] {app.py:51} WARNING - Using default Composer Environment Variables. Overrides have not been applied.
[2019-02-26 15:47:10,758] {base_task_runner.py:101} INFO - Job 2: Subtask ex-kube-templates [2019-02-26 15:47:10,758] {configuration.py:516} INFO - Reading the config from /etc/airflow/airflow.cfg
[2019-02-26 15:47:10,766] {base_task_runner.py:101} INFO - Job 2: Subtask ex-kube-templates [2019-02-26 15:47:10,766] {configuration.py:516} INFO - Reading the config from /etc/airflow/airflow.cfg
[2019-02-26 15:47:10,875] {base_task_runner.py:101} INFO - Job 2: Subtask ex-kube-templates [2019-02-26 15:47:10,875] {models.py:271} INFO - Filling up the DagBag from /home/airflow/gcs/dags/dag_pod_helloworld.py
[2019-02-26 15:47:12,293] {base_task_runner.py:101} INFO - Job 2: Subtask ex-kube-templates [2019-02-26 15:47:12,292] {cli.py:484} INFO - Running <TaskInstance: composer_sample_kubernetes_pod.ex-kube-templates 2019-02-25T15:46:57.225930+00:00 [running]> on host airflow-worker-584cf75548-rbf7x
[2019-02-26 15:47:12,829] {base_task_runner.py:101} INFO - Job 2: Subtask ex-kube-templates [2019-02-26 15:47:12,827] {pod_launcher.py:121} INFO - Event: ex-kube-templates-c87ea4f8 had an event of type Pending
[2019-02-26 15:47:13,836] {base_task_runner.py:101} INFO - Job 2: Subtask ex-kube-templates [2019-02-26 15:47:13,836] {pod_launcher.py:121} INFO - Event: ex-kube-templates-c87ea4f8 had an event of type Pending
[2019-02-26 15:47:14,843] {base_task_runner.py:101} INFO - Job 2: Subtask ex-kube-templates [2019-02-26 15:47:14,843] {pod_launcher.py:121} INFO - Event: ex-kube-templates-c87ea4f8 had an event of type Pending
[2019-02-26 15:47:15,850] {base_task_runner.py:101} INFO - Job 2: Subtask ex-kube-templates [2019-02-26 15:47:15,850] {pod_launcher.py:121} INFO - Event: ex-kube-templates-c87ea4f8 had an event of type Pending
[2019-02-26 15:47:16,858] {base_task_runner.py:101} INFO - Job 2: Subtask ex-kube-templates [2019-02-26 15:47:16,857] {pod_launcher.py:121} INFO - Event: ex-kube-templates-c87ea4f8 had an event of type Succeeded
[2019-02-26 15:47:16,859] {base_task_runner.py:101} INFO - Job 2: Subtask ex-kube-templates [2019-02-26 15:47:16,858] {pod_launcher.py:184} INFO - Event with job id ex-kube-templates-c87ea4f8 Succeeded
[2019-02-26 15:47:16,896] {models.py:1760} ERROR - (500
Reason: Internal Server Erro
HTTP response headers: HTTPHeaderDict({'Audit-Id': '03bf375f-a5b3-4514-9903-a17cc3a3f886', 'Content-Type': 'application/json', 'Date': 'Tue, 26 Feb 2019 15:47:16 GMT', 'Content-Length': '322'}
HTTP response body: b'{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Get https://10.142.15.217:10250/containerLogs/default/ex-kube-templates-c87ea4f8/base?follow=true\\u0026tailLines=10: No SSH tunnels currently open. Were the targets able to accept an ssh-key for user \\"gke-9d36bbaf757434bb291e\\"?","code":500}\n

Traceback (most recent call last)
  File "/usr/local/lib/airflow/airflow/models.py", line 1659, in _run_raw_tas
    result = task_copy.execute(context=context
  File "/usr/local/lib/airflow/airflow/contrib/operators/kubernetes_pod_operator.py", line 123, in execut
    get_logs=self.get_logs
  File "/usr/local/lib/airflow/airflow/contrib/kubernetes/pod_launcher.py", line 90, in run_po
    return self._monitor_pod(pod, get_logs
  File "/usr/local/lib/airflow/airflow/contrib/kubernetes/pod_launcher.py", line 102, in _monitor_po
    _preload_content=False
  File "/opt/python3.6/lib/python3.6/site-packages/kubernetes/client/apis/core_v1_api.py", line 18583, in read_namespaced_pod_lo
    (data) = self.read_namespaced_pod_log_with_http_info(name, namespace, **kwargs
  File "/opt/python3.6/lib/python3.6/site-packages/kubernetes/client/apis/core_v1_api.py", line 18689, in read_namespaced_pod_log_with_http_inf
    collection_formats=collection_formats
  File "/opt/python3.6/lib/python3.6/site-packages/kubernetes/client/api_client.py", line 321, in call_ap
    _return_http_data_only, collection_formats, _preload_content, _request_timeout
  File "/opt/python3.6/lib/python3.6/site-packages/kubernetes/client/api_client.py", line 155, in __call_ap
    _request_timeout=_request_timeout
  File "/opt/python3.6/lib/python3.6/site-packages/kubernetes/client/api_client.py", line 342, in reques
    headers=headers
  File "/opt/python3.6/lib/python3.6/site-packages/kubernetes/client/rest.py", line 231, in GE
    query_params=query_params
  File "/opt/python3.6/lib/python3.6/site-packages/kubernetes/client/rest.py", line 222, in reques
    raise ApiException(http_resp=r
kubernetes.client.rest.ApiException: (500
Reason: Internal Server Erro
HTTP response headers: HTTPHeaderDict({'Audit-Id': '03bf375f-a5b3-4514-9903-a17cc3a3f886', 'Content-Type': 'application/json', 'Date': 'Tue, 26 Feb 2019 15:47:16 GMT', 'Content-Length': '322'}
HTTP response body: b'{"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Get https://10.142.15.217:10250/containerLogs/default/ex-kube-templates-c87ea4f8/base?follow=true\\u0026tailLines=10: No SSH tunnels currently open. Were the targets able to accept an ssh-key for user \\"gke-9d36bbaf757434bb291e\\"?","code":500}\n
-- Tom
airflow
google-cloud-composer
kubernetes

1 Answer

2/26/2019

Fix - Improper Configuration

User error! It was a GCP Firewall rule priority issue. We had a firewall rule blocking all traffic at a specific priority, and that priority was lower in number (a higher priority) than the default priority 1000 assigned to Cloud Composer SSH firewall rule that lets the K8s master talk to the nodes.

Once I elevated the Cloud Composer firewall rules, it worked perfectly.

-- Tom
Source: StackOverflow