Python flask 502 bad request sent event stream deployed in GKE

12/22/2019

I'm having problem with my Python Flask server deployed in Google cloud Kubernetes engine. The code below is a simple flask server that supports text/event-stream. The problem is, at exactly 60 seconds of inactivity from the server (no messages from stream) the client shows a 502 bad gateway error.

Error: Server Error The server encountered a temporary error and could not complete your request. Please try again in 30 seconds.

The client will no longer receive any data from the server whenever this happens. Already tried adding timeouts as you can see on the kubernetes config file.

I tried spinning up a google cloud compute engine without using kubernetes. Deployed the same code in it and added a domain. In my surprise it works, it didn't show any 502 bad request error even if I leave the browser open.

It probably has something to do with the kubernetes config I'm running. I'd appreciate any help or idea I can get.

Update 1

I tried changing the kube service type to LoadBalancer instead of NodePort.

Accessing the IP endpoint generated works perfectly without showing a 502 error even after 60s of inactivity.

Update 2

Here is the errors generated by the LoadBalancer stackdriver logs

{
 httpRequest: {
  referer: "http://sse-dev.[REDACTED]/test"   
  remoteIp: "[REDACTED]"   
  requestMethod: "GET"   
  requestSize: "345"   
  requestUrl: "http://sse-dev.[REDACTED]/stream"   
  responseSize: "488"   
  serverIp: "[REDACTED]"   
  status: 502   
  userAgent: "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.108 Safari/537.36"   
 }
 insertId: "ptb7kfg2w2zz01"  
 jsonPayload: {
  @type: "type.googleapis.com/google.cloud.loadbalancing.type.LoadBalancerLogEntry"   
  statusDetails: "backend_timeout"   
 }
 logName: "projects/[REDACTED]-assist-dev/logs/requests"  
 receiveTimestamp: "2020-01-03T06:27:44.361706996Z"  
 resource: {
  labels: {
   backend_service_name: "k8s-be-30808--17630a0e8199e99b"    
   forwarding_rule_name: "k8s-fw-default-[REDACTED]-dev-ingress--17630a0e8199e99b"    
   project_id: "[REDACTED]-assist-dev"    
   target_proxy_name: "k8s-tp-default-[REDACTED]-dev-ingress--17630a0e8199e99b"    
   url_map_name: "k8s-um-default-[REDACTED]-dev-ingress--17630a0e8199e99b"    
   zone: "global"    
  }
  type: "http_load_balancer"   
 }
 severity: "WARNING"  
 spanId: "4b0767cace9b9500"  
 timestamp: "2020-01-03T06:26:43.381613Z"  
 trace: "projects/[REDACTED]-assist-dev/traces/d467f39f76b94c02d9a8e6998fdca17b"  
}

sse.py

from typing import Iterator
import random
import string

from collections import deque
from flask import Response, request
from gevent.queue import Queue
import gevent

def generate_id(size=6, chars=string.ascii_lowercase + string.digits):
    return ''.join(random.choice(chars) for _ in range(size))


class ServerSentEvent(object):
    """Class to handle server-sent events."""
    def __init__(self, data, event):
        self.data = data
        self.event = event
        self.event_id = generate_id(),
        self.retry = 5000
        self.desc_map = {
            self.data: "data",
            self.event: "event",
            self.event_id: "id",
            self.retry: 5000
        }

    def encode(self) -> str:
        """Encodes events as a string."""
        if not self.data:
            return ""
        lines = ["{}: {}".format(name, key)
                 for key, name in self.desc_map.items() if key]

        return "{}\n\n".format("\n".join(lines))


class Channel(object):
    def __init__(self, history_size=32):
        self.subscriptions = []
        self.history = deque(maxlen=history_size)
        self.history.append(ServerSentEvent('start_of_history', None))

    def notify(self, message):
        """Notify all subscribers with message."""
        for sub in self.subscriptions[:]:
            sub.put(message)

    def event_generator(self, last_id) -> Iterator[ServerSentEvent]:
        """Yields encoded ServerSentEvents."""
        q = Queue()
        self._add_history(q, last_id)
        self.subscriptions.append(q)
        try:
            while True:
                yield q.get()
        except GeneratorExit:
            self.subscriptions.remove(q)

    def subscribe(self):
        def gen(last_id) -> Iterator[str]:
            for sse in self.event_generator(last_id):
                yield sse.encode()
        return Response(
            gen(request.headers.get('Last-Event-ID')),
            mimetype="text/event-stream",
            headers={
                "Cache-Control": "no-cache",
                "Connection": "keep-alive",
                "Content-Type": "text/event-stream"
            })

    def _add_history(self, q, last_id):
        add = False
        for sse in self.history:
            if add:
                q.put(sse)
            if sse.event_id == last_id:
                add = True

    def publish(self, message, event=None):
        sse = ServerSentEvent(str(message), event)
        self.history.append(sse)
        gevent.spawn(self.notify, sse)

    def get_last_id(self) -> str:
        return self.history[-1].event_id

service.py

import json
import os

import requests
from app.controllers.sse import Channel
from flask import send_file, \
    jsonify, request, Blueprint, Response
from typing import Iterator

blueprint = Blueprint(__name__, __name__, url_prefix='')
flask_channel = Channel()

@blueprint.route("/stream")
def stream():
    return flask_channel.subscribe()


@blueprint.route('/sample/create', methods=['GET'])
def sample_create():
    branch_id = request.args.get('branch_id', None)
    params = request.get_json()
    if not params:
        params = {
            'id': 'sample_id',
            'description': 'sample_description'
        }
    flask_channel.publish(json.dumps(params), event=branch_id)
    return jsonify({'success': True}), 200

kubernetes-config.yaml

---
apiVersion: v1
kind: Service
metadata:
  name: sse-service
  labels:
    app: sse-service
spec:
  ports:
  - port: 80
    targetPort: 5000
    protocol: TCP
    name: http
  selector:
    app: sse-service
  sessionAffinity: ClientIP
  type: NodePort
---
apiVersion: "extensions/v1beta1"
kind: "Deployment"
metadata:
  name: "sse-service"
  namespace: "default"
  labels:
    app: "sse-service"
spec:
  replicas: 1
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxSurge: 1
      maxUnavailable: 25%
  selector:
    matchLabels:
      app: "sse-service"
  template:
    metadata:
      labels:
        app: "sse-service"
    spec:
      containers:
      - name: "sse-service"
        image: "{{IMAGE_NAME}}"
        imagePullPolicy: Always
        ports:
        - containerPort: 5000
        livenessProbe:
          httpGet:
            path: /health/check
            port: 5000
          initialDelaySeconds: 25
          periodSeconds: 15
        readinessProbe:
          httpGet:
            path: /health/check
            port: 5000
          initialDelaySeconds: 25
          periodSeconds: 15
---
apiVersion: "autoscaling/v2beta1"
kind: "HorizontalPodAutoscaler"
metadata:
  name: "sse-service-hpa"
  namespace: "default"
  labels:
    app: "sse-service"
spec:
  scaleTargetRef:
    kind: "Deployment"
    name: "sse-service"
    apiVersion: "apps/v1beta1"
  minReplicas: 1
  maxReplicas: 7
  metrics:
  - type: "Resource"
    resource:
      name: "cpu"
      targetAverageUtilization: 80
---
apiVersion: cloud.google.com/v1beta1
kind: BackendConfig
metadata:
  name: sse-service
spec:
  timeoutSec: 120
  connectionDraining:
    drainingTimeoutSec: 3600

Dockerfile

FROM python:3.6.5-jessie

ENV GUNICORN_PORT=5000
ENV PYTHONUNBUFFERED=TRUE
ENV GOOGLE_APPLICATION_CREDENTIALS=/opt/creds/account.json

COPY requirements.txt /opt/app/requirements.txt
COPY app /opt/app
COPY creds/account.json /opt/creds/account.json

WORKDIR /opt/app
RUN pip install -r requirements.txt

EXPOSE ${GUNICORN_PORT}
CMD gunicorn -b :${GUNICORN_PORT} wsgi:create_app\(\) --reload --timeout=300000 --config=config.py

Base.py

from flask import jsonify, Blueprint

blueprint = Blueprint(__name__, __name__)
@blueprint.route('/health/check', methods=['GET'])
def check_health():
    response = {
        'message': 'pong!',
        'status': 'success'
    }

    return jsonify(response), 200

bitbucket-pipelines.yml

options:
  docker: true

pipelines:
  branches:
    dev:
      - step:
          name: Build - Push - Deploy to Dev environment
          image: google/cloud-sdk:latest
          caches:
            - docker
            - pip
          deployment: development
          script:
            # Export all bitbucket credentials to the environment
            - echo $GOOGLE_APPLICATION_CREDENTIALS | base64 -di > ./creds/account.json
            - echo $CONTAINER_CREDENTIALS | base64 -di > ./creds/gcr.json
            - export CLOUDSDK_CONFIG='pwd'/creds/account.json
            - export GOOGLE_APPLICATION_CREDENTIALS='pwd'/creds/account.json

            # Configure docker to use gcp service account
            - gcloud auth activate-service-account $KUBERNETES_SERVICE_ACCOUNT --key-file=creds/gcr.json
            - gcloud config list
            - gcloud auth configure-docker -q

            # # Build docker image with name and tag
            - export IMAGE_NAME=$HOSTNAME/$PROJECT_ID/$IMAGE:v0.1.$BITBUCKET_BUILD_NUMBER
            - docker build -t $IMAGE_NAME .

            # # Push image to Google Container Repository
            - docker push $IMAGE_NAME

            # Initialize configs for kubernetes
            - gcloud config set project $PROJECT_ID
            - gcloud config set compute/zone $PROJECT_ZONE
            - gcloud container clusters get-credentials $PROJECT_CLUSTER

            # Run kubernetes configs
            - cat kubernetes-config.yaml | sed "s#{{IMAGE_NAME}}#$IMAGE_NAME#g" | kubectl apply -f -

ingress.yaml

apiVersion: extensions/v1beta1
kind: Ingress
metadata:
  annotations:
    ingress.kubernetes.io/backends: '{"k8s-be-30359--17630a0e8199e99b":"HEALTHY","k8s-be-30599--17630a0e8199e99b":"HEALTHY","k8s-be-30808--17630a0e8199e99b":"HEALTHY","k8s-be-30991--17630a0e8199e99b":"HEALTHY","k8s-be-31055--17630a0e8199e99b":"HEALTHY","k8s-be-31467--17630a0e8199e99b":"HEALTHY","k8s-be-31596--17630a0e8199e99b":"HEALTHY","k8s-be-31948--17630a0e8199e99b":"HEALTHY","k8s-be-32702--17630a0e8199e99b":"HEALTHY"}'
    ingress.kubernetes.io/forwarding-rule: k8s-fw-default-[REDACTED]-dev-ingress--17630a0e8199e99b
    ingress.kubernetes.io/https-forwarding-rule: k8s-fws-default-[REDACTED]-dev-ingress--17630a0e8199e99b
    ingress.kubernetes.io/https-target-proxy: k8s-tps-default-[REDACTED]-dev-ingress--17630a0e8199e99b
    ingress.kubernetes.io/ssl-cert: k8s-ssl-d6db2a7a17456a7b-64a79e74837f68e3--17630a0e8199e99b
    ingress.kubernetes.io/static-ip: k8s-fw-default-[REDACTED]-dev-ingress--17630a0e8199e99b
    ingress.kubernetes.io/target-proxy: k8s-tp-default-[REDACTED]-dev-ingress--17630a0e8199e99b
    ingress.kubernetes.io/url-map: k8s-um-default-[REDACTED]-dev-ingress--17630a0e8199e99b
    kubectl.kubernetes.io/last-applied-configuration: |
      {"apiVersion":"extensions/v1beta1","kind":"Ingress","metadata":{"annotations":{},"name":"[REDACTED]-dev-ingress","namespace":"default"},"spec":{"rules":[{"host":"bot-dev.[REDACTED]","http":{"paths":[{"backend":{"serviceName":"bot-service","servicePort":80}}]}},{"host":"client-dev.[REDACTED]","http":{"paths":[{"backend":{"serviceName":"client-service","servicePort":80}}]}},{"host":"team-dev.[REDACTED]","http":{"paths":[{"backend":{"serviceName":"team-service","servicePort":80}}]}},{"host":"chat-dev.[REDACTED]","http":{"paths":[{"backend":{"serviceName":"chat-service","servicePort":80}}]}},{"host":"chatb-dev.[REDACTED]","http":{"paths":[{"backend":{"serviceName":"chat-builder-service","servicePort":80}}]}},{"host":"action-dev.[REDACTED]","http":{"paths":[{"backend":{"serviceName":"action-service","servicePort":80}}]}},{"host":"message-dev.[REDACTED]","http":{"paths":[{"backend":{"serviceName":"message-service","servicePort":80}}]}}],"tls":[{"hosts":["bots-dev.[REDACTED]","client-dev.[REDACTED]","team-dev.[REDACTED]","chat-dev.[REDACTED]","chatb-dev.[REDACTED]","message-dev.[REDACTED]"],"secretName":"[REDACTED]-ssl"}]}}
  creationTimestamp: "2019-08-09T09:19:14Z"
  generation: 7
  name: [REDACTED]-dev-ingress
  namespace: default
  resourceVersion: "73975381"
  selfLink: /apis/extensions/v1beta1/namespaces/default/ingresses/[REDACTED]-dev-ingress
  uid: c176cc8c-ba86-11e9-89d6-42010a940181
spec:
  rules:
  - host: bot-dev.[REDACTED]
    http:
      paths:
      - backend:
          serviceName: bot-service
          servicePort: 80
  - host: client-dev.[REDACTED]
    http:
      paths:
      - backend:
          serviceName: client-service
          servicePort: 80
  - host: team-dev.[REDACTED]
    http:
      paths:
      - backend:
          serviceName: team-service
          servicePort: 80
  - host: chat-dev.[REDACTED]
    http:
      paths:
      - backend:
          serviceName: chat-service
          servicePort: 80
  - host: chatb-dev.[REDACTED]
    http:
      paths:
      - backend:
          serviceName: chat-builder-service
          servicePort: 80
  - host: action-dev.[REDACTED]
    http:
      paths:
      - backend:
          serviceName: action-service
          servicePort: 80
  - host: message-dev.[REDACTED]
    http:
      paths:
      - backend:
          serviceName: message-service
          servicePort: 80
  - host: sse-dev.[REDACTED]
    http:
      paths:
      - backend:
          serviceName: sse-service
          servicePort: 80
  tls:
  - hosts:
    - bots-dev.[REDACTED]
    - client-dev.[REDACTED]
    - team-dev.[REDACTED]
    - chat-dev.[REDACTED]
    - chatb-dev.[REDACTED]
    - message-dev.[REDACTED]
    - sse-dev.[REDACTED]
    secretName: [REDACTED]-ssl
status:
  loadBalancer:
    ingress:
    - ip: [REDACTED]
-- Ric Mercado
flask
kubernetes
python

1 Answer

12/23/2019

The health check from our Load Balancer comes from the readinessProbe configured in the deployment. You configured the path to be /health/check, however, your flask environment has nothing listening on that path. This means that the readinessProbe is likely failing and the health check from your Load Balancer is also failing.

With the health checks failing, your Load Balancer does not see any healthy backends so it returns a 502 error message.

You can verify this 3 ways:

  1. Check stackdriver logs, you will see the 502 responses logged, check the details fo the log to see more details about the 502. You will likely see that there are no healthy backends.

  2. Check the status of your pods using kubectl get po | grep sse-service, the pods are likely notReady.

  3. test the check from another pod in the cluster. (NOTE youwill need a pod that has curl installed or allows you to install it. If you don't have one, use busybox or nginx base image) a. kubectl get po -o wide | grep sse-service and take down the ip of one of the pods b. kubectl exec [test_pod] -- curl [sse-service_cluster_ip]/health/check this will do a curl from a pod in the cluster to one of your sse-service pods and will check if there is anything replying to /health/check. There likely is not.

To address this, you should have `@blueprint.route('/health/check', methods=['GET']. Define the function to just return 200

-- Patrick W
Source: StackOverflow