Airflow DAG getting psycopg2.OperationalError when creating tasks with KubernetesPodOperator

9/29/2019
ERROR - Scheduler heartbeat got an exception: (psycopg2.OperationalError) could not translate host name "airflow-prod-postgresql" to address: Temporary failure in name resolution  (Background on this error at: http://sqlalche.me/e/e3q8)

I am currently running a job on Airflow that has about 10 tasks running in parallel, and for the sake of simplicity we can assume that these tasks all connect to GBQ, run some SQL query, and then return the output as a pandas dataframe. I have Airflow set up such that each task is its own Docker container inside a Kubernetes pod generated by the KubernetesPodOperator. The Kubernetes cluster is set up with Amazon EKS, and currently has two nodes (EC2 instances) with auto-scaling enabled for a maximum of four nodes.

However, when running this Airflow DAG, only about 2/10 of the tasks finish successfully whereas the rest are receiving the psycopg2.OperationalError. Some of the other failed tasks receive a No host supplied message along with Log file does not exist.

Upon running the DAG at different times, the tasks that fail are not always consistent. It seems like the root of this problem has to do with the database connection because of the psycopg2 error message. From the SQLAlchemy docs regarding an OperationalError:

Exception raised for errors that are related to the database’s operation and not necessarily under the control of the programmer, e.g. an unexpected disconnect occurs, the data source name is not found, a transaction could not be processed, a memory allocation error occurred during processing, etc.
This error is a DBAPI Error and originates from the database driver (DBAPI), not SQLAlchemy itself.
The OperationalError is the most common (but not the only) error class used by drivers in the context of the database connection being dropped, or not being able to connect to the database. For tips on how to deal with this, see the section Dealing with Disconnects.

But to be frank, I'm not well-versed enough to determine whether the solution is some configuration at the Airflow core (global) level or DAG definition (local) level. Alternatively, the issue could be with the Kubernetes cluster setup such as the auto-scaling from Amazon EKS not working properly.

For more context, I have pool_size=5 and pool_recycle=1800 and using CeleryExecutor.

On a separate trial, I set concurrency=4 in the DAG definition, and I'm seeing similar failures except for the main psycopg2.OperationalError reads slightly different:

ERROR - Scheduler heartbeat got an exception: (psycopg2.OperationalError) could not connect to server: Connection timed out
    Is the server running on host "airflow-prod-postgresql" and accepting
    TCP/IP connections on port 5432?
 (Background on this error at: http://sqlalche.me/e/e3q8)

Additional error messages from other failed tasks -

First attempt:

  1. No host supplied
  2. Failed to fetch log file from worker

On the second attempt:

  1. dependency 'Task Instance Not Already Running' FAILED: Task is already running
  2. Dependencies not met for TaskInstance: dependency 'Task Instance State' FAILED: Task is in the 'running' state which is not a valid state for execution. The task must be cleared in order to be run.

I'm expecting all 10 tasks to run within this Airflow DAG with High Availability, as opposed to only the roughly 2 successful tasks.

Thank you so much in advance!!

-- Phillip
airflow
kubernetes
psycopg2
python
sqlalchemy

0 Answers