Check if k8s resources are supported in runReflectorUntil

`isResourceSupported` checks whether a kubernetes resource is supported by the api server.
This ensures that, if the probe is unable to communicate with the api server, the call is retried until a true/false response.

If `isResourceSupported` returns false, `ListAndWatch` is not called and `runReflectorUntil` just exits.
This commit is contained in:
Roberto Bruggemann
2018-01-19 16:44:19 +00:00
parent ea0b5499d9
commit 2f9e2fc9ce

View File

@@ -9,8 +9,10 @@ import (
"github.com/weaveworks/common/backoff"
log "github.com/Sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
apiv1 "k8s.io/client-go/pkg/api/v1"
@@ -61,23 +63,6 @@ type client struct {
podWatches []func(Event, Pod)
}
// runReflectorUntil runs cache.Reflector#ListAndWatch in an endless loop.
// Errors are logged and retried with exponential backoff.
func runReflectorUntil(r *cache.Reflector, stopCh <-chan struct{}, msg string) {
listAndWatch := func() (bool, error) {
select {
case <-stopCh:
return true, nil
default:
err := r.ListAndWatch(stopCh)
return false, err
}
}
bo := backoff.New(listAndWatch, fmt.Sprintf("Kubernetes reflector (%s)", msg))
bo.SetMaxBackoff(5 * time.Minute)
go bo.Start()
}
// ClientConfig establishes the configuration for the kubernetes client
type ClientConfig struct {
Interval time.Duration
@@ -134,7 +119,6 @@ func NewClient(config ClientConfig) (Client, error) {
if err != nil {
return nil, err
}
}
log.Infof("kubernetes: targeting api server %s", restConfig.Host)
@@ -151,46 +135,71 @@ func NewClient(config ClientConfig) (Client, error) {
podStore := NewEventStore(result.triggerPodWatches, cache.MetaNamespaceKeyFunc)
result.podStore = result.setupStore(c.CoreV1Client.RESTClient(), "pods", &apiv1.Pod{}, podStore)
result.serviceStore = result.setupStore(c.CoreV1Client.RESTClient(), "services", &apiv1.Service{}, nil)
result.nodeStore = result.setupStore(c.CoreV1Client.RESTClient(), "nodes", &apiv1.Node{}, nil)
result.namespaceStore = result.setupStore(c.CoreV1Client.RESTClient(), "namespaces", &apiv1.Namespace{}, nil)
// We list deployments here to check if this version of kubernetes is >= 1.2.
// We would use NegotiateVersion, but Kubernetes 1.1 "supports"
// extensions/v1beta1, but not deployments or daemonsets.
if _, err := c.Extensions().Deployments(metav1.NamespaceAll).List(metav1.ListOptions{}); err != nil {
log.Infof("Deployments and DaemonSets are not supported by this Kubernetes version: %v", err)
} else {
result.deploymentStore = result.setupStore(c.ExtensionsV1beta1Client.RESTClient(), "deployments", &apiextensionsv1beta1.Deployment{}, nil)
result.daemonSetStore = result.setupStore(c.ExtensionsV1beta1Client.RESTClient(), "daemonsets", &apiextensionsv1beta1.DaemonSet{}, nil)
}
// CronJobs and StatefulSets were introduced later. Easiest to use the same technique.
if _, err := c.BatchV2alpha1().CronJobs(metav1.NamespaceAll).List(metav1.ListOptions{}); err != nil {
log.Infof("CronJobs are not supported by this Kubernetes version: %v", err)
} else {
result.jobStore = result.setupStore(c.BatchV1Client.RESTClient(), "jobs", &apibatchv1.Job{}, nil)
result.cronJobStore = result.setupStore(c.BatchV2alpha1Client.RESTClient(), "cronjobs", &apibatchv2alpha1.CronJob{}, nil)
}
if _, err := c.Apps().StatefulSets(metav1.NamespaceAll).List(metav1.ListOptions{}); err != nil {
log.Infof("StatefulSets are not supported by this Kubernetes version: %v", err)
} else {
result.statefulSetStore = result.setupStore(c.AppsV1beta1Client.RESTClient(), "statefulsets", &apiappsv1beta1.StatefulSet{}, nil)
}
result.deploymentStore = result.setupStore(c.ExtensionsV1beta1Client.RESTClient(), "deployments", &apiextensionsv1beta1.Deployment{}, nil)
result.daemonSetStore = result.setupStore(c.ExtensionsV1beta1Client.RESTClient(), "daemonsets", &apiextensionsv1beta1.DaemonSet{}, nil)
result.jobStore = result.setupStore(c.BatchV1Client.RESTClient(), "jobs", &apibatchv1.Job{}, nil)
result.cronJobStore = result.setupStore(c.BatchV2alpha1Client.RESTClient(), "cronjobs", &apibatchv2alpha1.CronJob{}, nil)
result.statefulSetStore = result.setupStore(c.AppsV1beta1Client.RESTClient(), "statefulsets", &apiappsv1beta1.StatefulSet{}, nil)
return result, nil
}
func (c *client) setupStore(kclient cache.Getter, resource string, itemType interface{}, nonDefaultStore cache.Store) cache.Store {
func (c *client) isResourceSupported(groupVersion schema.GroupVersion, resource string) (bool, error) {
resourceList, err := c.client.Discovery().ServerResourcesForGroupVersion(groupVersion.String())
if err != nil {
if apierrors.IsNotFound(err) {
return false, nil
}
return false, err
}
for _, v := range resourceList.APIResources {
if v.Name == resource {
return true, nil
}
}
return false, nil
}
func (c *client) setupStore(kclient rest.Interface, resource string, itemType interface{}, nonDefaultStore cache.Store) cache.Store {
lw := cache.NewListWatchFromClient(kclient, resource, metav1.NamespaceAll, fields.Everything())
store := nonDefaultStore
if store == nil {
store = cache.NewStore(cache.MetaNamespaceKeyFunc)
}
runReflectorUntil(cache.NewReflector(lw, itemType, store, c.resyncPeriod), c.quit, resource)
c.runReflectorUntil(cache.NewReflector(lw, itemType, store, c.resyncPeriod), kclient.APIVersion(), resource)
return store
}
// runReflectorUntil runs cache.Reflector#ListAndWatch in an endless loop, after checking that the resource is supported by kubernetes.
// Errors are logged and retried with exponential backoff.
func (c *client) runReflectorUntil(r *cache.Reflector, groupVersion schema.GroupVersion, resource string) {
listAndWatch := func() (bool, error) {
select {
case <-c.quit:
return true, nil
default:
ok, err := c.isResourceSupported(groupVersion, resource)
if err != nil {
return false, err
}
if !ok {
log.Infof("%v are not supported by this Kubernetes version", resource)
return true, nil
}
err = r.ListAndWatch(c.quit)
return false, err
}
}
bo := backoff.New(listAndWatch, fmt.Sprintf("Kubernetes reflector (%s)", resource))
bo.SetMaxBackoff(5 * time.Minute)
go bo.Start()
}
func (c *client) WatchPods(f func(Event, Pod)) {
c.podWatchesMutex.Lock()
defer c.podWatchesMutex.Unlock()