Airflow + Kubernetes worker database connection

1/7/2021

I've managed to set up an Airflow deployment in Kubernetes (webserver and scheduler) configured with Kubernetes Executor and a Postgres schema with Airflow metadata.

I'm trying to make the example_bash_operator DAG working but the worker pod keeps failing because of the following SQLAlchemy exception:

2021-01-07 10:29:33,091] {cli_action_loggers.py:105} WARNING - Failed to log action with (sqlite3.OperationalError) no such table: log SQL: INSERT INTO log (dttm, dag_id, task_id, event, execution_date, owner, extra) VALUES (?, ?, ?, ?, ?, ?, ?)parameters: ('2021-01-07 10:29:33.082018', 'example_bash_operator', 'runme_0', 'cli_task_run', '2021-01-07 09:11:29.947161', 'airflow', '{"host_name": "examplebashoperatorrunme0-495d985192ab4269a3dce023851a34b7", "full_command": "\'/usr/local/bin/airflow\', \'tasks\', \'run\', \'examp ... (70 characters truncated) ... \'--local\', \'--pool\', \'default_pool\', \'--subdir\', \'/usr/local/lib/python3.8/site-packages/airflow/example_dags/example_bash_operator.py\'"}')] (Background on this error at: http://sqlalche.me/e/13/e3q8) 2021-01-07 10:29:33,097 {dagbag.py:440} INFO - Filling up the DagBag from /usr/local/lib/python3.8/site-packages/airflow/example_dags/example_bash_operator.py 2021-01-07 10:29:33,197 {dagbag.py:297} ERROR - Failed to import: /usr/local/lib/python3.8/site-packages/airflow/example_dags/example_subdag_operator.py Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/base.py", line 1276, in _execute_context self.dialect.do_execute( File "/usr/local/lib/python3.8/site-packages/sqlalchemy/engine/default.py", line 609, in do_execute cursor.execute(statement, parameters) sqlite3.OperationalError: no such table: slot_pool

To my understanding, the worker pod is trying to fetch metadata from the sqlite db, but the database doesn't hold any of these tables. I already have a database for metadata and I'd like the workers to use the Postgres metadata database I set up as the default one.

A worker container image is the following:

FROM python:3.8.7-slim

ARG AIRFLOW_USER_HOME=/var/lib/airflow
ARG AIRFLOW_USER="airflow"
ARG AIRFLOW_UID="1000"
ARG AIRFLOW_GID="100"
ENV AIRFLOW_HOME=$AIRFLOW_USER_HOME
ENV AIRFLOW__CORE__SQL_ALCHEMY_CONN="postgresql://user:pass@databaseip:5432/airflow_metadata"
RUN mkdir $AIRFLOW_USER_HOME && \
  useradd -ms /bin/bash -u $AIRFLOW_UID airflow && \
  chown $AIRFLOW_USER:$AIRFLOW_GID $AIRFLOW_USER_HOME && \
  buildDeps='freetds-dev libkrb5-dev libsasl2-dev libssl-dev libffi-dev libpq-dev' \
  apt-get update && \
  apt-get install -yqq --no-install-recommends $buildDeps build-essential default-libmysqlclient-dev && \
  pip install --no-cache-dir 'apache-airflow[crypto,kubernetes,postgres]' && \
  apt-get purge --auto-remove -yqq $buildDeps && \
  apt-get autoremove -yqq --purge && \
  rm -rf /var/lib/apt/lists/*

USER $AIRFLOW_UID

WORKDIR $AIRFLOW_USER_HOME

It's clear I'm missing something very obvious in how airflow manages its worker. Can anybody please explain to me how Airflow manages this?

-- berrur
airflow
kubernetes
postgresql
python

1 Answer

1/7/2021

I finally managed to get the worker pods complete their runs on Kubernetes. Indeed if you have a metadata db and you get sqlite3 error, it is possible that your worker pod doesn't have the database environment variable (AIRFLOW__CORE__SQL_ALCHEMY_CONN) properly set. In my case the configuration was correct but Docker tagging system messed up and didn't properly sync the changes I was making to the worker dockerfile. I finally rebuilt the image with a new tag and pushed it to my private container registry. Then i edited my airflow deployment, by explicitly telling to grab the newly tagged version and finally it worked:

- name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_REPOSITORY
  value: 'my-container-registry/airflow-k8s/airflow-worker'
- name: AIRFLOW__KUBERNETES__WORKER_CONTAINER_TAG
  value: '0.1'
-- berrur
Source: StackOverflow