I use a watcherList, which is supported by the official golang kubernetes lib, to get notifications about created, updated and removed services inside a kubernetes namespace. Here the snippet.
func (kc *KubernetesCollector) streamEvents(ctx context.Context) {
kc.debugChannel <- fmt.Sprintf("Start streaming events from kubernetes API")
watchList := cache.NewListWatchFromClient(kc.k8sClient.RESTClient(), "services", kc.k8sNamespace, fields.Everything())
notificationCallbackToAddService := func(svc interface{}) {
service := svc.(*v1.Service)
kc.serviceNotificationChannel <- &serviceNotification{service, "add"}
}
notificationCallbackToDeleteService := func(svc interface{}) {
service := svc.(*v1.Service)
kc.serviceNotificationChannel <- &serviceNotification{service, "remove"}
}
callbacks := cache.ResourceEventHandlerFuncs{
AddFunc: notificationCallbackToAddService,
DeleteFunc: notificationCallbackToDeleteService,
}
_, controller := cache.NewInformer(watchList, &v1.Service{}, time.Second*0, callbacks)
go controller.Run(ctx.Done())
}
In my test I declare the kc.k8sClient
over the public api address, which is defined in k8sAPI
variable. Additionally I set the bearer token to authenticate against the cluster and skip to verify the insecure ssl certificate.
func TestK8sWatchList(t *testing.T) {
require := require.New(t)
...
k8sConfig, err := clientcmd.BuildConfigFromFlags(k8sAPI, "")
require.NoError(err)
k8sConfig.BearerToken = "<bearerToken>"
k8sConfig.Transport = &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: true,
},
}
k8sClient, err := kubernetes.NewForConfig(k8sConfig)
k8sCollector := NewK8sCollector(k8sClient, k8sNamespace)
...
}
When I execute the test, I receive the following error messages:
go test -v -timeout 500s <replaced>/t1k/pkg/collector -run TestK8sWatchList
=== RUN TestK8sWatchList
11.02.2020 16:55:55 DEBUG: Start streaming events from kubernetes API
E0211 16:55:51.706530 121803 reflector.go:153] pkg/mod/k8s.io/client-go@v0.0.0-20200106225816-7985654fe8ee/tools/cache/reflector.go:105: Failed to list *v1.Service: forbidden: User "system:serviceaccount:t1k:t1k-test-serviceaccount" cannot get path "/namespaces/t1k/services"
E0211 16:55:52.707520 121803 reflector.go:153] pkg/mod/k8s.io/client-go@v0.0.0-20200106225816-7985654fe8ee/tools/cache/reflector.go:105: Failed to list *v1.Service: forbidden: User "system:serviceaccount:t1k:t1k-test-serviceaccount" cannot get path "/namespaces/t1k/services"
E0211 16:55:53.705539 121803 reflector.go:153] pkg/mod/k8s.io/client-go@v0.0.0-20200106225816-7985654fe8ee/tools/cache/reflector.go:105: Failed to list *v1.Service: forbidden: User "system:serviceaccount:t1k:t1k-test-serviceaccount" cannot get path "/namespaces/t1k/services"
I don't understand why I get the error message, because the service account "t1k-test-serviceaccount" has in my opinion all required permissions. Now the defined service account, role and rolebinding for the test user.
apiVersion: v1
kind: ServiceAccount
metadata:
namespace: t1k
name: t1k-test-serviceaccount
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: t1k
name: t1k-test-role
rules:
- apiGroups: [""] # "" indicates the core API group
resources: ["*"]
verbs: ["*"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
namespace: t1k
name: t1k-test-rolebinding
subjects:
- name: t1k-test-serviceaccount
kind: ServiceAccount
apiGroup: ""
roleRef:
name: t1k-test-role
kind: Role
apiGroup: rbac.authorization.k8s.io
Additional informations:
I found the solution. The k8sClientSet
attribut of the KubernetesCollector
struct was a pointer. The reflection function of the package pkg/mod/k8s.io/client-go@v0.0.0-20200106225816-7985654fe8ee
can not handle pointer objects.
type KubernetesCollector struct {
...
k8sClient *kubernetes.ClientSet
namespace string
...
}
I replaced the k8sClient with the CoreV1Interface
from k8s.io/client-go/kubernetes/typed/core/v1
. Therefore I changed the call for the ListWatch.
type KubernetesCollector struct {
....
iface corev1.CoreV1Interface
namespace string
....
}
func (kc *KubernetesCollector) start(ctx context.Context) {
watchList := cache.NewListWatchFromClient(kc.iface.RESTClient(), "services", kc.namespace, fields.Everything())
....
}
You can check permission of the service account using below command:
kubectl auth can-i list services --namespace t1k --as=system:serviceaccount:t1k:t1k-test-serviceaccount
You don't need to set the token manually...you can use InClusterConfig
as in this example.Client-go uses the Service Account token mounted inside the Pod at the /var/run/secrets/kubernetes.io/serviceaccount path when the rest.InClusterConfig() is used.