How do I ensure same environment for all my workers(containers) in Airflow?

5/3/2019

I have a config for deploying 4 pods(hence, 4 workers) for Airflow on Kubernetes using Docker. However, all of a sudden, worker-0 is unable to make a certain curl request whereas other workers are able to make one. This is resulting in the failure of pipelines.

I have tried reading about mismatching configs and stateful sets but in my case, there is one config for all the workers and this is the only single source of truth.

statefulsets-workers.yaml file is as follows:

# Workers are not in deployment, but in StatefulSet, to allow each worker expose a mini-server
# that only serve logs, that will be used by the web server.

apiVersion: apps/v1beta1
kind: StatefulSet
metadata:
  name: {{ template "airflow.fullname" . }}-worker
  labels:
    app: {{ template "airflow.name" . }}-worker
    chart: {{ template "airflow.chart" . }}
    release: {{ .Release.Name }}
    heritage: {{ .Release.Service }}
spec:
  serviceName: "{{ template "airflow.fullname" . }}-worker"
  updateStrategy:
    type: RollingUpdate
  # Use experimental burst mode for faster StatefulSet scaling
  #   https://github.com/kubernetes/kubernetes/commit/****
  podManagementPolicy: Parallel
  replicas: {{ .Values.celery.num_workers }}
  template:
    metadata:
      {{- if .Values.airflow.pallet.config_path }}
      annotations:
        checksum/config: {{ include (print $.Template.BasePath "/configmap.yaml") . | sha256sum }}
      {{- end }}
      labels:
        app: {{ template "airflow.name" . }}-worker
        release: {{ .Release.Name }}
    spec:
      restartPolicy: Always
      terminationGracePeriodSeconds: 30
      securityContext:
        runAsUser: 1002
        fsGroup: 1002
      containers:
        - name: {{ .Chart.Name }}-worker
          imagePullPolicy: {{ .Values.airflow.image_pull_policy }}
          image: "{{ .Values.airflow.image }}:{{ .Values.airflow.imageTag }}"
          volumeMounts:
            {{- if .Values.airflow.storage.enabled }}
            - name: google-cloud-key
              mountPath: /var/secrets/google
              readOnly: true
            {{- end }}
            - name: worker-logs
              mountPath: /usr/local/airflow/logs
            - name: data
              mountPath: /usr/local/airflow/rootfs
          env:
            {{- if .Values.airflow.storage.enabled }}
            - name: GOOGLE_APPLICATION_CREDENTIALS
              value: /var/secrets/google/key.json
            {{- end }}
            {{- range $setting, $option := .Values.airflow.config }}
            - name: {{ $setting }}
              value: {{ $option }}
            {{- end }}
          securityContext:
            allowPrivilegeEscalation: false
          envFrom:
            - configMapRef:
                name: pallet-env-file
          args: ["worker"]
          ports:
            - name: wlog
              containerPort: 8793
              protocol: TCP
      {{- if .Values.airflow.image_pull_secret }}
      imagePullSecrets:
        - name: {{ .Values.airflow.image_pull_secret }}
      {{- end }}
      {{- if .Values.airflow.storage.enabled }}
      volumes:
        - name: google-cloud-key
          secret:
            secretName: {{ .Values.airflow.storage.secretName }}
      {{- end }}
  volumeClaimTemplates:
    - metadata:
        name: worker-logs
      spec:
        accessModes: [ "ReadWriteOnce" ]
        resources:
          requests:
            storage: 50Gi
    - metadata:
        name: data
      spec:
        accessModes: [ "ReadWriteOnce" ]
        resources:
          requests:
            storage: 50Gi

I expect all the workers to be able to connect to the service to which I am making the curl request.

-- Aviral Srivastava
airflow
docker
kubernetes

1 Answer

5/6/2019

It turns out that the environment was indeed the same, however the receiving machine didn't have the new IP of the node whitelisted.

When all the pods crashed, they took the node down with them and restarting the node gave it a new IP. Hence, connection timed out for the worker in that node.

-- Aviral Srivastava
Source: StackOverflow