mirror of
https://github.com/weaveworks/scope.git
synced 2026-03-04 02:30:45 +00:00
Merge pull request #3050 from weaveworks/async-reflectors
Create reflectors asynchronously
This commit is contained in:
@@ -134,20 +134,17 @@ func NewClient(config ClientConfig) (Client, error) {
|
||||
client: c,
|
||||
}
|
||||
|
||||
result.cronJobStore, err = result.setupCronjobStore()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
result.podStore = NewEventStore(result.triggerPodWatches, cache.MetaNamespaceKeyFunc)
|
||||
result.runReflectorUntil("pods", result.podStore)
|
||||
|
||||
podStore := NewEventStore(result.triggerPodWatches, cache.MetaNamespaceKeyFunc)
|
||||
result.podStore = result.setupStore(c.CoreV1().RESTClient(), "pods", &apiv1.Pod{}, podStore)
|
||||
result.serviceStore = result.setupStore(c.CoreV1().RESTClient(), "services", &apiv1.Service{}, nil)
|
||||
result.nodeStore = result.setupStore(c.CoreV1().RESTClient(), "nodes", &apiv1.Node{}, nil)
|
||||
result.namespaceStore = result.setupStore(c.CoreV1().RESTClient(), "namespaces", &apiv1.Namespace{}, nil)
|
||||
result.deploymentStore = result.setupStore(c.ExtensionsV1beta1().RESTClient(), "deployments", &apiextensionsv1beta1.Deployment{}, nil)
|
||||
result.daemonSetStore = result.setupStore(c.ExtensionsV1beta1().RESTClient(), "daemonsets", &apiextensionsv1beta1.DaemonSet{}, nil)
|
||||
result.jobStore = result.setupStore(c.BatchV1().RESTClient(), "jobs", &apibatchv1.Job{}, nil)
|
||||
result.statefulSetStore = result.setupStore(c.AppsV1beta1().RESTClient(), "statefulsets", &apiappsv1beta1.StatefulSet{}, nil)
|
||||
result.serviceStore = result.setupStore("services")
|
||||
result.nodeStore = result.setupStore("nodes")
|
||||
result.namespaceStore = result.setupStore("namespaces")
|
||||
result.deploymentStore = result.setupStore("deployments")
|
||||
result.daemonSetStore = result.setupStore("daemonsets")
|
||||
result.jobStore = result.setupStore("jobs")
|
||||
result.statefulSetStore = result.setupStore("statefulsets")
|
||||
result.cronJobStore = result.setupStore("cronjobs")
|
||||
|
||||
return result, nil
|
||||
}
|
||||
@@ -170,39 +167,56 @@ func (c *client) isResourceSupported(groupVersion schema.GroupVersion, resource
|
||||
return false, nil
|
||||
}
|
||||
|
||||
func (c *client) setupStore(kclient rest.Interface, resource string, itemType interface{}, nonDefaultStore cache.Store) cache.Store {
|
||||
lw := cache.NewListWatchFromClient(kclient, resource, metav1.NamespaceAll, fields.Everything())
|
||||
store := nonDefaultStore
|
||||
if store == nil {
|
||||
store = cache.NewStore(cache.MetaNamespaceKeyFunc)
|
||||
}
|
||||
c.runReflectorUntil(cache.NewReflector(lw, itemType, store, c.resyncPeriod), kclient.APIVersion(), resource)
|
||||
func (c *client) setupStore(resource string) cache.Store {
|
||||
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
|
||||
c.runReflectorUntil(resource, store)
|
||||
return store
|
||||
}
|
||||
|
||||
func (c *client) setupCronjobStore() (cache.Store, error) {
|
||||
const resource = "cronjobs"
|
||||
ok, err := c.isResourceSupported(c.client.BatchV1beta1().RESTClient().APIVersion(), resource)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
func (c *client) clientAndType(resource string) (rest.Interface, interface{}, error) {
|
||||
switch resource {
|
||||
case "pods":
|
||||
return c.client.CoreV1().RESTClient(), &apiv1.Pod{}, nil
|
||||
case "services":
|
||||
return c.client.CoreV1().RESTClient(), &apiv1.Service{}, nil
|
||||
case "nodes":
|
||||
return c.client.CoreV1().RESTClient(), &apiv1.Node{}, nil
|
||||
case "namespaces":
|
||||
return c.client.CoreV1().RESTClient(), &apiv1.Namespace{}, nil
|
||||
case "deployments":
|
||||
return c.client.ExtensionsV1beta1().RESTClient(), &apiextensionsv1beta1.Deployment{}, nil
|
||||
case "daemonsets":
|
||||
return c.client.ExtensionsV1beta1().RESTClient(), &apiextensionsv1beta1.DaemonSet{}, nil
|
||||
case "jobs":
|
||||
return c.client.BatchV1().RESTClient(), &apibatchv1.Job{}, nil
|
||||
case "statefulsets":
|
||||
return c.client.AppsV1beta1().RESTClient(), &apiappsv1beta1.StatefulSet{}, nil
|
||||
case "cronjobs":
|
||||
ok, err := c.isResourceSupported(c.client.BatchV1beta1().RESTClient().APIVersion(), resource)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if ok {
|
||||
// kubernetes >= 1.8
|
||||
return c.client.BatchV1beta1().RESTClient(), &apibatchv1beta1.CronJob{}, nil
|
||||
}
|
||||
// kubernetes < 1.8
|
||||
return c.client.BatchV2alpha1().RESTClient(), &apibatchv2alpha1.CronJob{}, nil
|
||||
}
|
||||
if ok {
|
||||
// kubernetes >= 1.8
|
||||
return c.setupStore(c.client.BatchV1beta1().RESTClient(), resource, &apibatchv1beta1.CronJob{}, nil), nil
|
||||
}
|
||||
// kubernetes < 1.8
|
||||
return c.setupStore(c.client.BatchV2alpha1().RESTClient(), resource, &apibatchv2alpha1.CronJob{}, nil), nil
|
||||
return nil, nil, fmt.Errorf("Invalid resource: %v", resource)
|
||||
}
|
||||
|
||||
// runReflectorUntil runs cache.Reflector#ListAndWatch in an endless loop, after checking that the resource is supported by kubernetes.
|
||||
// Errors are logged and retried with exponential backoff.
|
||||
func (c *client) runReflectorUntil(r *cache.Reflector, groupVersion schema.GroupVersion, resource string) {
|
||||
func (c *client) runReflectorUntil(resource string, store cache.Store) {
|
||||
var r *cache.Reflector
|
||||
listAndWatch := func() (bool, error) {
|
||||
select {
|
||||
case <-c.quit:
|
||||
return true, nil
|
||||
default:
|
||||
ok, err := c.isResourceSupported(groupVersion, resource)
|
||||
if r == nil {
|
||||
kclient, itemType, err := c.clientAndType(resource)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
ok, err := c.isResourceSupported(kclient.APIVersion(), resource)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
@@ -210,7 +224,15 @@ func (c *client) runReflectorUntil(r *cache.Reflector, groupVersion schema.Group
|
||||
log.Infof("%v are not supported by this Kubernetes version", resource)
|
||||
return true, nil
|
||||
}
|
||||
err = r.ListAndWatch(c.quit)
|
||||
lw := cache.NewListWatchFromClient(kclient, resource, metav1.NamespaceAll, fields.Everything())
|
||||
r = cache.NewReflector(lw, itemType, store, c.resyncPeriod)
|
||||
}
|
||||
|
||||
select {
|
||||
case <-c.quit:
|
||||
return true, nil
|
||||
default:
|
||||
err := r.ListAndWatch(c.quit)
|
||||
return false, err
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user