I am currently running Airflow on Kubernetes in Google Cloud GCP. I based my project off of docker-airflow. I am able to start the UI but when I try to create a connection for google cloud and submit the connection I get the following errors.
ValueError: Fernet key must be 32 url-safe base64-encoded bytes.
[2018-09-21 19:45:13,345] AirflowException: Could not create Fernet object: Fernet key must be 32 url-safe base64-encoded bytes.
The first issue the docs recommend is to make sure you have cryptography installed, which I do. I installed both types, the one that comes with airflow and the standard one from PyPi.
pip3 install apache-airflow[kubernetes,crypto] and also tried
pip install cryptography
I tried to run the commands for generating and storing env variables as explained in the documentation, found here. (and shown below)
1) Either generate a fernet key manually and add to airflow.cfg
2) Set the environment variable and restarting the server.
python -c "from cryptography.fernet import Fernet;
print(Fernet.generate_key().decode())"
Example Key:81HqDtbqAywKSOumSha3BhWNOdQ26slT6K0YaZeZyPs=
Using kubernetes I am unable to restart the server using the typical method of shutting down the process ID since its tied to the container. I also tried putting a generated key (above) in the configmaps.yaml file of the kubernetes cluster (equal to airflow.cfg when deployed).
I tried running the GCP connection through DAG, via the UI, and manually by using the airflow command line client. All three methods returned the same error. I am including a picture of the UI submission here along with the full stack-trace.
Thanks for the help.
-RR
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 159, in get_fernet
_fernet = Fernet(configuration.conf.get('core', 'FERNET_KEY').encode('utf-8'))
File "/usr/local/lib/python3.6/site-packages/cryptography/fernet.py", line 37, in __init__
"Fernet key must be 32 url-safe base64-encoded bytes."
ValueError: Fernet key must be 32 url-safe base64-encoded bytes.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1982, in wsgi_app
response = self.full_dispatch_request()
File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1614, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1517, in handle_user_exception
reraise(exc_type, exc_value, tb)
File "/usr/local/lib/python3.6/site-packages/flask/_compat.py", line 33, in reraise
raise value
File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1612, in full_dispatch_request
rv = self.dispatch_request()
File "/usr/local/lib/python3.6/site-packages/flask/app.py", line 1598, in dispatch_request
return self.view_functions[rule.endpoint](**req.view_args)
File "/usr/local/lib/python3.6/site-packages/flask_appbuilder/security/decorators.py", line 26, in wraps
return f(self, *args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/flask_appbuilder/views.py", line 524, in edit
widgets = self._edit(pk)
File "/usr/local/lib/python3.6/site-packages/flask_appbuilder/baseviews.py", line 965, in _edit
form.populate_obj(item)
File "/usr/local/lib/python3.6/site-packages/wtforms/form.py", line 96, in populate_obj
field.populate_obj(obj, name)
File "/usr/local/lib/python3.6/site-packages/wtforms/fields/core.py", line 330, in populate_obj
setattr(obj, name, self.data)
File "<string>", line 1, in __set__
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 731, in set_extra
fernet = get_fernet()
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 163, in get_fernet
raise AirflowException("Could not create Fernet object: {}".format(ve))
airflow.exceptions.AirflowException: Could not create Fernet object:
Fernet key must be 32 url-safe base64-encoded bytes.
This is the YAML for the underlying persisted volumes.
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: airflow-dags
namespace: data
spec:
accessModes:
- ReadOnlyMany
storageClassName: standard
resources:
requests:
storage: 8Gi
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: airflow-logs
namespace: data
spec:
accessModes:
- ReadOnlyMany
storageClassName: standard
resources:
requests:
storage: 8Gi
This is the airflow configuration YAML.
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflow
namespace: data
labels:
name: airflow
spec:
replicas: 1
selector:
matchLabels:
name: airflow
template:
metadata:
labels:
name: airflow
spec:
serviceAccountName: spark-service-account
automountServiceAccountToken: true
initContainers:
- name: "init"
image: <image_name>
imagePullPolicy: Always
volumeMounts:
- name: airflow-configmap
mountPath: /root/airflow/airflow.cfg
subPath: airflow.cfg
- name: airflow-dags
mountPath: /root/airflow/dags
# - name: test-volume
# mountPath: /root/test_volume
env:
- name: SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: airflow-secrets
key: sql_alchemy_conn
command:
- "bash"
args:
- "-cx"
- "airflow initdb || true && airflow create_user -u airflow -l airflow -f jon -e airflow@apache.org -r Admin -p airflow || true"
containers:
- name: webserver
image: <image_name>
imagePullPolicy: IfNotPresent
ports:
- name: webserver
containerPort: 8080
env:
- name: <namespace_name>
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: airflow-secrets
key: sql_alchemy_conn
command: ["/bin/sh", "-c"]
args: ["airflow webserver"]
volumeMounts:
- name: airflow-configmap
mountPath: /root/airflow/airflow.cfg
subPath: airflow.cfg
- name: airflow-dags
mountPath: /root/airflow/dags
- name: airflow-logs
mountPath: /root/airflow/logs
# readinessProbe:
# initialDelaySeconds: 5
# timeoutSeconds: 5
# periodSeconds: 5
# httpGet:
# path: /login
# port: 8080
# livenessProbe:
# initialDelaySeconds: 5
# timeoutSeconds: 5
# failureThreshold: 5
# httpGet:
# path: /login
# port: 8080
- name: scheduler
image: image-name
imagePullPolicy: IfNotPresent
env:
- name: namespace_name
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: airflow-secrets
key: sql_alchemy_conn
command: ["/bin/sh", "-c"]
args: ["cp ./dags/* /root/airflow/dags/; airflow scheduler"]
volumeMounts:
- name: airflow-configmap
mountPath: /root/airflow/airflow.cfg
subPath: airflow.cfg
- name: airflow-dags
mountPath: /root/airflow/dags
- name: airflow-logs
mountPath: /root/airflow/logs
volumes:
- name: airflow-configmap
configMap:
name: airflow-configmap
- name: airflow-dags
persistentVolumeClaim:
claimName: airflow-dags
- name: airflow-logs
persistentVolumeClaim:
claimName: airflow-logs
---
apiVersion: v1
kind: Service
metadata:
name: airflow
namespace: data
spec:
type: NodePort
ports:
- port: 8080
nodePort: 30809
selector:
name: airflow
Restart the worker and webserver.
Your worker and webserver are operating on the old fernet key. You changed the key in your config, so all your newly stored or modified Connections will use the new key, but the webserver/worker are still operating on the old key. They will never match and continue to give this error, till they're restarted.