I've been trying to get kubernetes to do zero downtime deployment. I've tried to use Services with type loadbalancer on aws, I've tried to use nginx ingress, and I've tried with nodeport directly, (I've also tried with ipvs).
For the most part it works. Most requests work without failure. To test I've been continually redeploying an app with a sequence number and checking to make sure that no requests are dropped. The monitoring application also runs on the same kubernetes cluster but goes through an aws ELB to get back to the dummy app.
After running for a couple hours are there are always a couple of requests where the connection was dropped, getting 504s from the ELB, or connection timeout. If I make the request take longer to respond by 500 milliseconds there are a lot more bad requests.
It seems like kubernetes isn't doing connection draining from the old nodes and instead just cutting the cord.
I've been trying to investigate the kubernetes codebase in order to figure out if there is any connection draining but I've had no luck. In pkg/proxy/iptables/proxier.go under syncProxyRules it seems to setup all the iptables rules but it doesn't look like it's aware of connection draining at that level at least.
I can't get kubernetes to deploy a new version without sometimes dropping connections. Am I missing some connection draining option or does kubernetes just not support it?
package main;
import (
"os"
"os/signal"
"syscall"
"context"
"fmt"
"net/http"
"io/ioutil"
"github.com/DataDog/datadog-go/statsd"
"time"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/util/retry"
"strconv"
)
func ApiCheck(address string, ddclient *statsd.Client) {
// creates the in-cluster config
config, err := rest.InClusterConfig()
if err != nil {
panic(err.Error())
}
// creates the clientset
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
deploymentsClient := clientset.AppsV1().Deployments(apiv1.NamespaceDefault)
// if we just came up, wait for other app to go down before updating deployment...
time.Sleep(30 * time.Second)
for {
fmt.Println("Starting deployment")
start := time.Now()
var prevValue int
retryErr := retry.RetryOnConflict(retry.DefaultRetry, func() error {
// get latest version
result, getErr := deploymentsClient.Get("k8s-qos-dummy", metav1.GetOptions{})
if getErr != nil {
fmt.Printf("Failed to get latest version of Deployment: %v", getErr)
return getErr
}
var perr error
prevValue, perr = strconv.Atoi(result.Spec.Template.Spec.Containers[0].Args[1])
if perr != nil {
fmt.Printf("Cannot parse previous value %s, using 0 instead\n", result.Spec.Template.Spec.Containers[0].Args[1])
prevValue = 0
}
valstring := fmt.Sprintf("%d", prevValue + 1)
result.Spec.Template.Spec.Containers[0].Args[1] = valstring
fmt.Printf("Trying to update to %s\n", valstring)
_, updateErr := deploymentsClient.Update(result)
return updateErr
})
if retryErr != nil {
fmt.Println("Update failed: %v", retryErr)
CheckDDError(ddclient.Incr("qos.k8s.deploy.update_error", nil, 1))
continue
}
fmt.Printf("Updated successfully\n")
// now wait for server to respond properly
for {
client := &http.Client{
Timeout: time.Second * 5,
}
response, err := client.Get(address)
if err != nil {
fmt.Printf("Failed qos deploy with http error: %s\n", err)
CheckDDError(ddclient.Incr("qos.k8s.deploy.http_error", nil, 1))
} else {
defer response.Body.Close()
contents, err := ioutil.ReadAll(response.Body)
if err != nil {
fmt.Printf("Failed qos deploy with io error: %s\n", err)
CheckDDError(ddclient.Incr("qos.k8s.deploy.io_error", nil, 1))
} else {
if response.StatusCode >= 200 && response.StatusCode <= 299 {
// let's check the value
new_value, perr := strconv.Atoi(string(contents))
if perr != nil {
fmt.Printf("Failed to parse response for deploy %s\n", perr.Error())
} else {
if new_value == prevValue + 1 {
fmt.Println("Deployment confirmed!")
elapsed := time.Since(start)
CheckDDError(ddclient.Timing("qos.k8s.deploy.time", elapsed, nil, 1))
time.Sleep(30 * time.Second)
break;
} else {
fmt.Printf("Got bad value: %d, wanted %d\n", new_value, prevValue + 1)
elapsed := time.Since(start)
if elapsed > time.Second * 80 {
CheckDDError(ddclient.Incr("qos.k8s.deploy.timeout_err", nil, 1))
CheckDDError(ddclient.Timing("qos.k8s.deploy.time", elapsed, nil, 1))
time.Sleep(30 * time.Second)
break;
}
}
}
} else {
fmt.Printf("Failed qos deploy with http status error: %d %s\n", response.StatusCode, string(contents))
CheckDDError(ddclient.Incr("qos.k8s.deploy.http_status_error", nil, 1))
}
}
}
time.Sleep(1 * time.Second)
}
}
}
func CheckDDError(derr error) {
if derr != nil {
fmt.Println("datadogs not working, got: %s", derr.Error())
}
}
func DummyCheck(address string, ddclient *statsd.Client) {
for {
client := &http.Client{
Timeout: time.Second * 5,
}
response, err := client.Get(address)
if err != nil {
fmt.Printf("Failed qos check with http error: %s\n", err)CheckDDError(ddclient.Gauge("qos.k8s.check.dummy_response", 0, nil, 1))
CheckDDError(ddclient.Incr("qos.k8s.check.dummy_http_error", nil, 1))
} else {
defer response.Body.Close()
contents, err := ioutil.ReadAll(response.Body)
if err != nil {
fmt.Printf("Failed qos check with io error: %s\n", err)
CheckDDError(ddclient.Gauge("qos.k8s.check.dummy_response", 0, nil, 1))
CheckDDError(ddclient.Incr("qos.k8s.check.dummy_io_error", nil, 1))
} else {
if response.StatusCode >= 200 && response.StatusCode <= 299 {
fmt.Printf("Passed qos check with status: %d received: %s\n", response.StatusCode, string(contents))
CheckDDError(ddclient.Gauge("qos.k8s.check.dummy_response", 1, nil, 1))
} else {
fmt.Printf("Failed qos check with http status error: %d %s\n", response.StatusCode, string(contents))
CheckDDError(ddclient.Gauge("qos.k8s.check.dummy_response", 0, nil, 1))
CheckDDError(ddclient.Incr("qos.k8s.check.dummy_http_status_error", nil, 1))
}
}
}
time.Sleep(1 * time.Second)
}
}
func WebServer(resp string, ddclient *statsd.Client) {
srv := &http.Server{
Addr: ":7070",
IdleTimeout: 61 * time.Second, //ELB is default to 60 seconds idle timeout
}
http.HandleFunc("/", func (w http.ResponseWriter, r *http.Request) {
fmt.Fprint(w, resp)
})
fmt.Printf("current idle timeout: %v\n", srv.IdleTimeout)
c := make(chan os.Signal)
signal.Notify(c, os.Interrupt, syscall.SIGTERM)
go func() {
<-c
fmt.Printf("Got sigterm shutting down\n")
srv.Shutdown(context.Background())
os.Exit(1)
}()
srv.ListenAndServe()
}
func main() {
if len(os.Args) < 2 {
fmt.Println("usage: k8s-qos [deploy|dummy]")
return
}
ddSock := fmt.Sprintf("%s:8125", os.Getenv("HOST_IP"))
ddc, err := statsd.New(ddSock)
if err != nil {
return
}
if os.Args[1] == "deploy" {
if len(os.Args) < 3 {
fmt.Println("usage: k8s-qos dummy [address]")
return
}
pingAddress := os.Args[2]
go WebServer(
fmt.Sprintf(
"Hey this is the deployer for qos, this app pings %s to make sure it works",
pingAddress),
ddc)
go ApiCheck(pingAddress, ddc)
DummyCheck(pingAddress, ddc)
return
}
if os.Args[1] == "dummy" {
if len(os.Args) < 3 {
fmt.Println("usage: k8s-qos dummy [response-string]")
return
}
WebServer(os.Args[2], ddc)
return
}
fmt.Println("no usage specified")
return
}
Setup with Kops Version 1.9.0 on AWS
kops cluster config:
apiVersion: kops/v1alpha2
kind: Cluster
metadata:
name: test-k8s.example.com
spec:
additionalPolicies:
node: |
[
{
"Effect": "Allow",
"Action": ["sts:AssumeRole"],
"Resource": ["*"]
}
]
api:
loadBalancer:
type: Internal
authorization:
rbac: {}
channel: stable
buttProvider: aws
configBase: s3://test-k8s/test-k8s.example.com
etcdClusters:
- etcdMembers:
- instanceGroup: master-us-west-2a
name: a
- instanceGroup: master-us-west-2b
name: b
- instanceGroup: master-us-west-2c
name: c
name: main
- etcdMembers:
- instanceGroup: master-us-west-2a
name: a
- instanceGroup: master-us-west-2b
name: b
- instanceGroup: master-us-west-2c
name: c
name: events
iam:
allowContainerRegistry: true
legacy: false
kubernetesApiAccess:
- 0.0.0.0/0
kubernetesVersion: 1.9.6
masterInternalName: api.internal.test-k8s.example.com
masterPublicName: api.test-k8s.example.com
networkCIDR: XX.XX.0.0/16
networkID: vpc-XXXXXXX
networking:
weave:
mtu: 8912
nonMasqueradeCIDR: XX.XX.0.0/10
sshAccess:
- XX.XX.XX.XX/32
subnets:
- cidr: XX.XX.XX.XX/24
id: subnet-XXXXXXX
name: us-west-2a
type: Private
zone: us-west-2a
- cidr: XX.XX.XX.XX/24
id: subnet-XXXXXXX
name: us-west-2b
type: Private
zone: us-west-2b
- cidr: XX.XX.XX.XX/24
id: subnet-XXXXXXX
name: us-west-2c
type: Private
zone: us-west-2c
- cidr: XX.XX.XX.XX/24
id: subnet-XXXXXXX
name: utility-us-west-2a
type: Utility
zone: us-west-2a
- cidr: XX.XX.XX.XX/24
id: subnet-XXXXXXX
name: utility-us-west-2b
type: Utility
zone: us-west-2b
- cidr: XX.XX.XX.XX/24
id: subnet-XXXXXXX
name: utility-us-west-2c
type: Utility
zone: us-west-2c
topology:
dns:
type: Private
masters: private
nodes: private
kops node config:
apiVersion: kops/v1alpha2
kind: InstanceGroup
metadata:
labels:
kops.k8s.io/cluster: test-k8s.example.com
name: nodes
spec:
image: XXXXXXXX/normal-kops-image-but-with-portmap-cni
machineType: t2.medium
maxSize: 3
minSize: 3
nodeLabels:
kops.k8s.io/instancegroup: nodes
role: Node
subnets:
- us-west-2a
- us-west-2b
- us-west-2c
"dummy" app config aka the app being redeployed:
apiVersion: apps/v1
kind: Deployment
metadata:
name: k8s-qos-dummy
spec:
strategy:
type: RollingUpdate
rollingUpdate:
maxUnavailable: 1
maxSurge: 1
minReadySeconds: 1
replicas: 3
selector:
matchLabels:
app: k8s-qos-dummy
template:
metadata:
name: k8s-qos-dummy
labels:
app: k8s-qos-dummy
spec:
containers:
- name: k8s-qos-dummy
image: XXXXXX
command: ["k8s-qos"]
args: [ "dummy", "1" ]
env:
- name: HOST_IP
valueFrom:
fieldRef:
fieldPath: status.hostIP
ports:
- containerPort: 7070
livenessProbe:
httpGet:
path: /
port: 7070
initialDelaySeconds: 20
periodSeconds: 2
readinessProbe:
httpGet:
path: /
port: 7070
initialDelaySeconds: 5
periodSeconds: 5
successThreshold: 1
lifecycle:
preStop:
exec:
command: ["/bin/sleep", "61"]
resources:
limits:
memory: "200Mi"
cpu: ".25"
requests:
cpu: ".25"
memory: "200Mi"
---
apiVersion: v1
kind: Service
metadata:
name: k8s-qos-dummy
annotations:
service.beta.kubernetes.io/aws-load-balancer-backend-protocol: http
service.beta.kubernetes.io/aws-load-balancer-ssl-cert: XXXXXX
service.beta.kubernetes.io/aws-load-balancer-ssl-ports: "443"
service.beta.kubernetes.io/aws-load-balancer-internal: 0.0.0.0/0
service.beta.kubernetes.io/aws-load-balancer-extra-security-groups: "sg-XXXXX"
spec:
ports:
- port: 80
name: http
targetPort: 7070
ports:
- port: 443
name: https
targetPort: 7070
selector:
app: k8s-qos-dummy
type: LoadBalancer
loadBalancerSourceRanges:
- 127.0.0.0/32
---
#when testing with ingress
apiVersion: extensions/v1beta1
kind: Ingress
metadata:
name: k8s-qos-dummy-ingress
spec:
rules:
- host: k8s-qos-dummy.example.com
http:
paths:
- backend:
serviceName: k8s-qos-dummy
servicePort: 443
redeployer/monitor app config:
apiVersion: v1
kind: ServiceAccount
metadata:
name: k8s-qos-role
---
kind: Role
apiVersion: rbac.authorization.k8s.io/v1
metadata:
namespace: default
name: k8s-qos-role
rules:
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["get", "watch", "list", "update"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: k8s-qos-role
subjects:
- kind: ServiceAccount
namespace: default
name: k8s-qos-role
roleRef:
kind: Role
name: k8s-qos-role
apiGroup: rbac.authorization.k8s.io
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: k8s-qos
spec:
strategy:
type: RollingUpdate
rollingUpdate:
maxUnavailable: 0
maxSurge: 1
minReadySeconds: 5
replicas: 1
selector:
matchLabels:
app: k8s-qos
template:
metadata:
name: k8s-qos
labels:
app: k8s-qos
spec:
serviceAccountName: k8s-qos-role
containers:
- name: k8s-qos
image: XXXXXX
command: ["k8s-qos"]
args: [ "deploy", "https://k8s-qos-dummy.example.com/"]
env:
- name: HOST_IP
valueFrom:
fieldRef:
fieldPath: status.hostIP
ports:
- containerPort: 7070
livenessProbe:
httpGet:
path: /
port: 7070
initialDelaySeconds: 20
periodSeconds: 2
readinessProbe:
httpGet:
path: /
port: 7070
initialDelaySeconds: 0
periodSeconds: 2
resources:
limits:
memory: "400Mi"
cpu: ".5"
requests:
cpu: ".25"
memory: "200Mi"
Kubernetes does support connection draining - it is called graceful termination.
In this Stack Overflow question you will find a comprehensive answer of what it is and how does it work. So to be clear it is a desired behavior as it was described in this github issue: after the pod is deleted Kubernetes waits "grace-period" seconds before killing it. The pod only needs to catch SIGTERM and it starts failing any readiness probes. At this point load-balancer should stop sending traffic to that pod. If the pod is not removed "in-time" when it dies it will kill all of the current connections. I think in your case you would have to look for a solution inside of the application or try some external tools - if I remember correctly Istio has features that would help. But I am not experienced enough in it to point you directly.