diff --git a/probe/kubernetes/client.go b/probe/kubernetes/client.go index 4f2af1d01..0545f64ac 100644 --- a/probe/kubernetes/client.go +++ b/probe/kubernetes/client.go @@ -134,20 +134,17 @@ func NewClient(config ClientConfig) (Client, error) { client: c, } - result.cronJobStore, err = result.setupCronjobStore() - if err != nil { - return nil, err - } + result.podStore = NewEventStore(result.triggerPodWatches, cache.MetaNamespaceKeyFunc) + result.runReflectorUntil("pods", result.podStore) - podStore := NewEventStore(result.triggerPodWatches, cache.MetaNamespaceKeyFunc) - result.podStore = result.setupStore(c.CoreV1().RESTClient(), "pods", &apiv1.Pod{}, podStore) - result.serviceStore = result.setupStore(c.CoreV1().RESTClient(), "services", &apiv1.Service{}, nil) - result.nodeStore = result.setupStore(c.CoreV1().RESTClient(), "nodes", &apiv1.Node{}, nil) - result.namespaceStore = result.setupStore(c.CoreV1().RESTClient(), "namespaces", &apiv1.Namespace{}, nil) - result.deploymentStore = result.setupStore(c.ExtensionsV1beta1().RESTClient(), "deployments", &apiextensionsv1beta1.Deployment{}, nil) - result.daemonSetStore = result.setupStore(c.ExtensionsV1beta1().RESTClient(), "daemonsets", &apiextensionsv1beta1.DaemonSet{}, nil) - result.jobStore = result.setupStore(c.BatchV1().RESTClient(), "jobs", &apibatchv1.Job{}, nil) - result.statefulSetStore = result.setupStore(c.AppsV1beta1().RESTClient(), "statefulsets", &apiappsv1beta1.StatefulSet{}, nil) + result.serviceStore = result.setupStore("services") + result.nodeStore = result.setupStore("nodes") + result.namespaceStore = result.setupStore("namespaces") + result.deploymentStore = result.setupStore("deployments") + result.daemonSetStore = result.setupStore("daemonsets") + result.jobStore = result.setupStore("jobs") + result.statefulSetStore = result.setupStore("statefulsets") + result.cronJobStore = result.setupStore("cronjobs") return result, nil } @@ -170,39 +167,56 @@ func (c *client) isResourceSupported(groupVersion schema.GroupVersion, resource return false, nil } -func (c *client) setupStore(kclient rest.Interface, resource string, itemType interface{}, nonDefaultStore cache.Store) cache.Store { - lw := cache.NewListWatchFromClient(kclient, resource, metav1.NamespaceAll, fields.Everything()) - store := nonDefaultStore - if store == nil { - store = cache.NewStore(cache.MetaNamespaceKeyFunc) - } - c.runReflectorUntil(cache.NewReflector(lw, itemType, store, c.resyncPeriod), kclient.APIVersion(), resource) +func (c *client) setupStore(resource string) cache.Store { + store := cache.NewStore(cache.MetaNamespaceKeyFunc) + c.runReflectorUntil(resource, store) return store } -func (c *client) setupCronjobStore() (cache.Store, error) { - const resource = "cronjobs" - ok, err := c.isResourceSupported(c.client.BatchV1beta1().RESTClient().APIVersion(), resource) - if err != nil { - return nil, err +func (c *client) clientAndType(resource string) (rest.Interface, interface{}, error) { + switch resource { + case "pods": + return c.client.CoreV1().RESTClient(), &apiv1.Pod{}, nil + case "services": + return c.client.CoreV1().RESTClient(), &apiv1.Service{}, nil + case "nodes": + return c.client.CoreV1().RESTClient(), &apiv1.Node{}, nil + case "namespaces": + return c.client.CoreV1().RESTClient(), &apiv1.Namespace{}, nil + case "deployments": + return c.client.ExtensionsV1beta1().RESTClient(), &apiextensionsv1beta1.Deployment{}, nil + case "daemonsets": + return c.client.ExtensionsV1beta1().RESTClient(), &apiextensionsv1beta1.DaemonSet{}, nil + case "jobs": + return c.client.BatchV1().RESTClient(), &apibatchv1.Job{}, nil + case "statefulsets": + return c.client.AppsV1beta1().RESTClient(), &apiappsv1beta1.StatefulSet{}, nil + case "cronjobs": + ok, err := c.isResourceSupported(c.client.BatchV1beta1().RESTClient().APIVersion(), resource) + if err != nil { + return nil, nil, err + } + if ok { + // kubernetes >= 1.8 + return c.client.BatchV1beta1().RESTClient(), &apibatchv1beta1.CronJob{}, nil + } + // kubernetes < 1.8 + return c.client.BatchV2alpha1().RESTClient(), &apibatchv2alpha1.CronJob{}, nil } - if ok { - // kubernetes >= 1.8 - return c.setupStore(c.client.BatchV1beta1().RESTClient(), resource, &apibatchv1beta1.CronJob{}, nil), nil - } - // kubernetes < 1.8 - return c.setupStore(c.client.BatchV2alpha1().RESTClient(), resource, &apibatchv2alpha1.CronJob{}, nil), nil + return nil, nil, fmt.Errorf("Invalid resource: %v", resource) } // runReflectorUntil runs cache.Reflector#ListAndWatch in an endless loop, after checking that the resource is supported by kubernetes. // Errors are logged and retried with exponential backoff. -func (c *client) runReflectorUntil(r *cache.Reflector, groupVersion schema.GroupVersion, resource string) { +func (c *client) runReflectorUntil(resource string, store cache.Store) { + var r *cache.Reflector listAndWatch := func() (bool, error) { - select { - case <-c.quit: - return true, nil - default: - ok, err := c.isResourceSupported(groupVersion, resource) + if r == nil { + kclient, itemType, err := c.clientAndType(resource) + if err != nil { + return false, err + } + ok, err := c.isResourceSupported(kclient.APIVersion(), resource) if err != nil { return false, err } @@ -210,7 +224,15 @@ func (c *client) runReflectorUntil(r *cache.Reflector, groupVersion schema.Group log.Infof("%v are not supported by this Kubernetes version", resource) return true, nil } - err = r.ListAndWatch(c.quit) + lw := cache.NewListWatchFromClient(kclient, resource, metav1.NamespaceAll, fields.Everything()) + r = cache.NewReflector(lw, itemType, store, c.resyncPeriod) + } + + select { + case <-c.quit: + return true, nil + default: + err := r.ListAndWatch(c.quit) return false, err } }