Does Kubernetes 1.9.6 support proper connection draining?

7/30/2018

I've been trying to get kubernetes to do zero downtime deployment. I've tried to use Services with type loadbalancer on aws, I've tried to use nginx ingress, and I've tried with nodeport directly, (I've also tried with ipvs).

For the most part it works. Most requests work without failure. To test I've been continually redeploying an app with a sequence number and checking to make sure that no requests are dropped. The monitoring application also runs on the same kubernetes cluster but goes through an aws ELB to get back to the dummy app.

After running for a couple hours are there are always a couple of requests where the connection was dropped, getting 504s from the ELB, or connection timeout. If I make the request take longer to respond by 500 milliseconds there are a lot more bad requests.

It seems like kubernetes isn't doing connection draining from the old nodes and instead just cutting the cord.

I've been trying to investigate the kubernetes codebase in order to figure out if there is any connection draining but I've had no luck. In pkg/proxy/iptables/proxier.go under syncProxyRules it seems to setup all the iptables rules but it doesn't look like it's aware of connection draining at that level at least.

TL;DR

I can't get kubernetes to deploy a new version without sometimes dropping connections. Am I missing some connection draining option or does kubernetes just not support it?

My setup

Kubernetes qos app

package main;

import (
    "os"
    "os/signal"
    "syscall"
    "context"
    "fmt"
    "net/http"
    "io/ioutil"
    "github.com/DataDog/datadog-go/statsd"
    "time"
    apiv1 "k8s.io/api/core/v1"
    metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/rest"
    "k8s.io/client-go/util/retry"
    "strconv"
)

func ApiCheck(address string, ddclient *statsd.Client) {
    // creates the in-cluster config
    config, err := rest.InClusterConfig()
    if err != nil {
        panic(err.Error())
    }
    // creates the clientset
    clientset, err := kubernetes.NewForConfig(config)
    if err != nil {
        panic(err.Error())
    }

    deploymentsClient := clientset.AppsV1().Deployments(apiv1.NamespaceDefault)

    // if we just came up, wait for other app to go down before updating deployment...
    time.Sleep(30 * time.Second)

    for {
        fmt.Println("Starting deployment")
        start := time.Now()
        var prevValue int
        retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
            // get latest version
            result, getErr := deploymentsClient.Get("k8s-qos-dummy", metav1.GetOptions{})
            if getErr != nil {
                fmt.Printf("Failed to get latest version of Deployment: %v", getErr)
                return getErr
            }
            var perr error
            prevValue, perr = strconv.Atoi(result.Spec.Template.Spec.Containers[0].Args[1])
            if perr != nil {
                fmt.Printf("Cannot parse previous value %s, using 0 instead\n", result.Spec.Template.Spec.Containers[0].Args[1])
                prevValue = 0
            }
            valstring := fmt.Sprintf("%d", prevValue + 1)
            result.Spec.Template.Spec.Containers[0].Args[1] = valstring
            fmt.Printf("Trying to update to %s\n", valstring)
            _, updateErr := deploymentsClient.Update(result)
            return updateErr
        })
        if retryErr != nil {
            fmt.Println("Update failed: %v", retryErr)
            CheckDDError(ddclient.Incr("qos.k8s.deploy.update_error", nil, 1))
            continue
        }
        fmt.Printf("Updated successfully\n")
        // now wait for server to respond properly
        for {
            client := &http.Client{
                Timeout: time.Second * 5,
            }
            response, err := client.Get(address)
            if err != nil {
                fmt.Printf("Failed qos deploy with http error: %s\n", err)
                CheckDDError(ddclient.Incr("qos.k8s.deploy.http_error", nil, 1))
            } else {
                defer response.Body.Close()
                contents, err := ioutil.ReadAll(response.Body)
                if err != nil {
                    fmt.Printf("Failed qos deploy with io error: %s\n", err)
                    CheckDDError(ddclient.Incr("qos.k8s.deploy.io_error", nil, 1))
                } else {
                    if response.StatusCode >= 200 && response.StatusCode <= 299 {
                        // let's check the value
                        new_value, perr := strconv.Atoi(string(contents))
                        if perr != nil {
                            fmt.Printf("Failed to parse response for deploy %s\n", perr.Error())
                        } else {
                            if new_value == prevValue + 1 {
                                fmt.Println("Deployment confirmed!")
                                elapsed := time.Since(start)
                                CheckDDError(ddclient.Timing("qos.k8s.deploy.time", elapsed, nil, 1))
                                time.Sleep(30 * time.Second)
                                break;

                            } else {
                                fmt.Printf("Got bad value: %d, wanted %d\n", new_value, prevValue + 1)
                                elapsed := time.Since(start)
                                if elapsed > time.Second * 80 {
                                    CheckDDError(ddclient.Incr("qos.k8s.deploy.timeout_err", nil, 1))
                                    CheckDDError(ddclient.Timing("qos.k8s.deploy.time", elapsed, nil, 1))
                                    time.Sleep(30 * time.Second)
                                    break;
                                }
                            }
                        }
                    } else {
                        fmt.Printf("Failed qos deploy with http status error: %d %s\n", response.StatusCode, string(contents))
                        CheckDDError(ddclient.Incr("qos.k8s.deploy.http_status_error", nil, 1))
                    }
                }
            }
            time.Sleep(1 * time.Second)
        }
    }
}

func CheckDDError(derr error) {
    if derr != nil {
        fmt.Println("datadogs not working, got: %s", derr.Error())
    }
}

func DummyCheck(address string, ddclient *statsd.Client) {
    for {
        client := &http.Client{
            Timeout: time.Second * 5,
        }
        response, err := client.Get(address)
        if err != nil {
            fmt.Printf("Failed qos check with http error: %s\n", err)CheckDDError(ddclient.Gauge("qos.k8s.check.dummy_response", 0, nil, 1))
            CheckDDError(ddclient.Incr("qos.k8s.check.dummy_http_error", nil, 1))
        } else {
            defer response.Body.Close()
            contents, err := ioutil.ReadAll(response.Body)
            if err != nil {
                fmt.Printf("Failed qos check with io error: %s\n", err)
                CheckDDError(ddclient.Gauge("qos.k8s.check.dummy_response", 0, nil, 1))
                CheckDDError(ddclient.Incr("qos.k8s.check.dummy_io_error", nil, 1))
            } else {
                if response.StatusCode >= 200 && response.StatusCode <= 299 {
                    fmt.Printf("Passed qos check with status: %d received: %s\n", response.StatusCode, string(contents))
                    CheckDDError(ddclient.Gauge("qos.k8s.check.dummy_response", 1, nil, 1))
                } else {
                    fmt.Printf("Failed qos check with http status error: %d %s\n", response.StatusCode, string(contents))
                    CheckDDError(ddclient.Gauge("qos.k8s.check.dummy_response", 0, nil, 1))
                    CheckDDError(ddclient.Incr("qos.k8s.check.dummy_http_status_error", nil, 1))
                }
            }
        }
        time.Sleep(1 * time.Second)
    }
}

func WebServer(resp string, ddclient *statsd.Client) {

    srv := &http.Server{
        Addr: ":7070",
        IdleTimeout: 61 * time.Second, //ELB is default to 60 seconds idle timeout
    }

    http.HandleFunc("/", func (w http.ResponseWriter, r *http.Request) {
        fmt.Fprint(w, resp)
    })

    fmt.Printf("current idle timeout: %v\n", srv.IdleTimeout)

    c := make(chan os.Signal)
    signal.Notify(c, os.Interrupt, syscall.SIGTERM)
    go func() {
        <-c
        fmt.Printf("Got sigterm shutting down\n")
        srv.Shutdown(context.Background())
        os.Exit(1)
    }()

    srv.ListenAndServe()
}

func main() {
    if len(os.Args) < 2 {
        fmt.Println("usage: k8s-qos [deploy|dummy]")
        return
    }

    ddSock := fmt.Sprintf("%s:8125", os.Getenv("HOST_IP"))
    ddc, err := statsd.New(ddSock)
    if err != nil {
        return
    }

    if os.Args[1] == "deploy" {
        if len(os.Args) < 3 {
            fmt.Println("usage: k8s-qos dummy [address]")
            return
        }
        pingAddress := os.Args[2]
        go WebServer(
            fmt.Sprintf(
                "Hey this is the deployer for qos, this app pings %s to make sure it works",
                pingAddress),
            ddc)
        go ApiCheck(pingAddress, ddc)
        DummyCheck(pingAddress, ddc)
        return
    }
    if os.Args[1] == "dummy" {
        if len(os.Args) < 3 {
            fmt.Println("usage: k8s-qos dummy [response-string]")
            return
        }
        WebServer(os.Args[2], ddc)
        return
    }
    fmt.Println("no usage specified")
    return
}

Kubernetes cluster

Setup with Kops Version 1.9.0 on AWS

kops cluster config:

apiVersion: kops/v1alpha2
kind: Cluster
metadata:
  name: test-k8s.example.com
spec:
  additionalPolicies:
    node: |
      [
        {
          "Effect": "Allow",
          "Action": ["sts:AssumeRole"],
          "Resource": ["*"]
        }
      ]
  api:
    loadBalancer:
      type: Internal
  authorization:
    rbac: {}
  channel: stable
  buttProvider: aws
  configBase: s3://test-k8s/test-k8s.example.com
  etcdClusters:
  - etcdMembers:
    - instanceGroup: master-us-west-2a
      name: a
    - instanceGroup: master-us-west-2b
      name: b
    - instanceGroup: master-us-west-2c
      name: c
    name: main
  - etcdMembers:
    - instanceGroup: master-us-west-2a
      name: a
    - instanceGroup: master-us-west-2b
      name: b
    - instanceGroup: master-us-west-2c
      name: c
    name: events
  iam:
    allowContainerRegistry: true
    legacy: false
  kubernetesApiAccess:
  - 0.0.0.0/0
  kubernetesVersion: 1.9.6
  masterInternalName: api.internal.test-k8s.example.com
  masterPublicName: api.test-k8s.example.com
  networkCIDR: XX.XX.0.0/16
  networkID: vpc-XXXXXXX
  networking:
    weave:
      mtu: 8912
  nonMasqueradeCIDR: XX.XX.0.0/10
  sshAccess:
  - XX.XX.XX.XX/32
  subnets:
  - cidr: XX.XX.XX.XX/24
    id: subnet-XXXXXXX
    name: us-west-2a
    type: Private
    zone: us-west-2a
  - cidr: XX.XX.XX.XX/24
    id: subnet-XXXXXXX
    name: us-west-2b
    type: Private
    zone: us-west-2b
  - cidr: XX.XX.XX.XX/24
    id: subnet-XXXXXXX
    name: us-west-2c
    type: Private
    zone: us-west-2c
  - cidr: XX.XX.XX.XX/24
    id: subnet-XXXXXXX
    name: utility-us-west-2a
    type: Utility
    zone: us-west-2a
  - cidr: XX.XX.XX.XX/24
    id: subnet-XXXXXXX
    name: utility-us-west-2b
    type: Utility
    zone: us-west-2b
  - cidr: XX.XX.XX.XX/24
    id: subnet-XXXXXXX
    name: utility-us-west-2c
    type: Utility
    zone: us-west-2c
  topology:
    dns:
      type: Private
    masters: private
    nodes: private

kops node config:

apiVersion: kops/v1alpha2
kind: InstanceGroup
metadata:
  labels:
    kops.k8s.io/cluster: test-k8s.example.com
  name: nodes
spec:
  image: XXXXXXXX/normal-kops-image-but-with-portmap-cni
  machineType: t2.medium
  maxSize: 3
  minSize: 3
  nodeLabels:
    kops.k8s.io/instancegroup: nodes
  role: Node
  subnets:
  - us-west-2a
  - us-west-2b
  - us-west-2c

Kubernetes app config

"dummy" app config aka the app being redeployed:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: k8s-qos-dummy
spec:
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxUnavailable: 1
      maxSurge: 1
  minReadySeconds: 1
  replicas: 3
  selector:
    matchLabels:
      app: k8s-qos-dummy
  template:
    metadata:
      name: k8s-qos-dummy
      labels:
        app: k8s-qos-dummy
    spec:
      containers:
      - name: k8s-qos-dummy
        image: XXXXXX
        command: ["k8s-qos"]
        args: [ "dummy", "1" ]
        env:
        - name: HOST_IP
          valueFrom:
            fieldRef:
              fieldPath: status.hostIP
        ports:
        - containerPort: 7070
        livenessProbe:
          httpGet:
            path: /
            port: 7070
          initialDelaySeconds: 20
          periodSeconds: 2
        readinessProbe:
          httpGet:
            path: /
            port: 7070
          initialDelaySeconds: 5
          periodSeconds: 5
          successThreshold: 1
        lifecycle:
          preStop:
            exec:
              command: ["/bin/sleep", "61"]
        resources:
          limits:
            memory: "200Mi"
            cpu: ".25"
          requests:
            cpu: ".25"
            memory: "200Mi"

---
apiVersion: v1
kind: Service
metadata:
  name: k8s-qos-dummy
  annotations:
    service.beta.kubernetes.io/aws-load-balancer-backend-protocol: http
    service.beta.kubernetes.io/aws-load-balancer-ssl-cert: XXXXXX
    service.beta.kubernetes.io/aws-load-balancer-ssl-ports: "443"
    service.beta.kubernetes.io/aws-load-balancer-internal: 0.0.0.0/0
    service.beta.kubernetes.io/aws-load-balancer-extra-security-groups: "sg-XXXXX"
spec:
  ports:
    - port: 80
      name: http
      targetPort: 7070
  ports:
    - port: 443
      name: https
      targetPort: 7070
  selector:
    app: k8s-qos-dummy
  type: LoadBalancer
  loadBalancerSourceRanges:
    - 127.0.0.0/32
---
#when testing with ingress
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
  name: k8s-qos-dummy-ingress
spec:
  rules:
  - host: k8s-qos-dummy.example.com
    http:
      paths:
      - backend:
          serviceName: k8s-qos-dummy
          servicePort: 443

redeployer/monitor app config:

apiVersion: v1
kind: ServiceAccount
metadata:
  name: k8s-qos-role
---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
  namespace: default
  name: k8s-qos-role
rules:
- apiGroups: ["apps"]
  resources: ["deployments"]
  verbs: ["get", "watch", "list", "update"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: k8s-qos-role
subjects:
  - kind: ServiceAccount
    namespace: default
    name: k8s-qos-role
roleRef:
  kind: Role
  name: k8s-qos-role
  apiGroup: rbac.authorization.k8s.io
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: k8s-qos
spec:
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxUnavailable: 0
      maxSurge: 1
  minReadySeconds: 5
  replicas: 1
  selector:
    matchLabels:
      app: k8s-qos
  template:
    metadata:
      name: k8s-qos
      labels:
        app: k8s-qos
    spec:
      serviceAccountName: k8s-qos-role
      containers:
      - name: k8s-qos
        image: XXXXXX
        command: ["k8s-qos"]
        args: [ "deploy", "https://k8s-qos-dummy.example.com/"]
        env:
        - name: HOST_IP
          valueFrom:
            fieldRef:
              fieldPath: status.hostIP
        ports:
        - containerPort: 7070
        livenessProbe:
          httpGet:
            path: /
            port: 7070
          initialDelaySeconds: 20
          periodSeconds: 2
        readinessProbe:
          httpGet:
            path: /
            port: 7070
          initialDelaySeconds: 0
          periodSeconds: 2
        resources:
          limits:
            memory: "400Mi"
            cpu: ".5"
          requests:
            cpu: ".25"
            memory: "200Mi"
-- Patrick Riordan
kubernetes
kubernetes-ingress

1 Answer

7/31/2018

Kubernetes does support connection draining - it is called graceful termination.
In this Stack Overflow question you will find a comprehensive answer of what it is and how does it work. So to be clear it is a desired behavior as it was described in this github issue: after the pod is deleted Kubernetes waits "grace-period" seconds before killing it. The pod only needs to catch SIGTERM and it starts failing any readiness probes. At this point load-balancer should stop sending traffic to that pod. If the pod is not removed "in-time" when it dies it will kill all of the current connections. I think in your case you would have to look for a solution inside of the application or try some external tools - if I remember correctly Istio has features that would help. But I am not experienced enough in it to point you directly.

-- aurelius
Source: StackOverflow