Create reflectors asynchronously

Reflectors are created and run within the same function, asynchronously from the main thread.
Creating reflectors may require calls to the kubernetes api, which can return errors.
API errors are not handled in the main thread, but are handled asynchronously by retries.
This commit is contained in:
Roberto Bruggemann
2018-02-02 15:49:49 +00:00
parent 35d97bc2bd
commit 37771a0607

View File

@@ -134,20 +134,17 @@ func NewClient(config ClientConfig) (Client, error) {
client: c,
}
result.cronJobStore, err = result.setupCronjobStore()
if err != nil {
return nil, err
}
result.podStore = NewEventStore(result.triggerPodWatches, cache.MetaNamespaceKeyFunc)
result.runReflectorUntil("pods", result.podStore)
podStore := NewEventStore(result.triggerPodWatches, cache.MetaNamespaceKeyFunc)
result.podStore = result.setupStore(c.CoreV1().RESTClient(), "pods", &apiv1.Pod{}, podStore)
result.serviceStore = result.setupStore(c.CoreV1().RESTClient(), "services", &apiv1.Service{}, nil)
result.nodeStore = result.setupStore(c.CoreV1().RESTClient(), "nodes", &apiv1.Node{}, nil)
result.namespaceStore = result.setupStore(c.CoreV1().RESTClient(), "namespaces", &apiv1.Namespace{}, nil)
result.deploymentStore = result.setupStore(c.ExtensionsV1beta1().RESTClient(), "deployments", &apiextensionsv1beta1.Deployment{}, nil)
result.daemonSetStore = result.setupStore(c.ExtensionsV1beta1().RESTClient(), "daemonsets", &apiextensionsv1beta1.DaemonSet{}, nil)
result.jobStore = result.setupStore(c.BatchV1().RESTClient(), "jobs", &apibatchv1.Job{}, nil)
result.statefulSetStore = result.setupStore(c.AppsV1beta1().RESTClient(), "statefulsets", &apiappsv1beta1.StatefulSet{}, nil)
result.serviceStore = result.setupStore("services")
result.nodeStore = result.setupStore("nodes")
result.namespaceStore = result.setupStore("namespaces")
result.deploymentStore = result.setupStore("deployments")
result.daemonSetStore = result.setupStore("daemonsets")
result.jobStore = result.setupStore("jobs")
result.statefulSetStore = result.setupStore("statefulsets")
result.cronJobStore = result.setupStore("cronjobs")
return result, nil
}
@@ -170,39 +167,56 @@ func (c *client) isResourceSupported(groupVersion schema.GroupVersion, resource
return false, nil
}
func (c *client) setupStore(kclient rest.Interface, resource string, itemType interface{}, nonDefaultStore cache.Store) cache.Store {
lw := cache.NewListWatchFromClient(kclient, resource, metav1.NamespaceAll, fields.Everything())
store := nonDefaultStore
if store == nil {
store = cache.NewStore(cache.MetaNamespaceKeyFunc)
}
c.runReflectorUntil(cache.NewReflector(lw, itemType, store, c.resyncPeriod), kclient.APIVersion(), resource)
func (c *client) setupStore(resource string) cache.Store {
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
c.runReflectorUntil(resource, store)
return store
}
func (c *client) setupCronjobStore() (cache.Store, error) {
const resource = "cronjobs"
ok, err := c.isResourceSupported(c.client.BatchV1beta1().RESTClient().APIVersion(), resource)
if err != nil {
return nil, err
func (c *client) clientAndType(resource string) (rest.Interface, interface{}, error) {
switch resource {
case "pods":
return c.client.CoreV1().RESTClient(), &apiv1.Pod{}, nil
case "services":
return c.client.CoreV1().RESTClient(), &apiv1.Service{}, nil
case "nodes":
return c.client.CoreV1().RESTClient(), &apiv1.Node{}, nil
case "namespaces":
return c.client.CoreV1().RESTClient(), &apiv1.Namespace{}, nil
case "deployments":
return c.client.ExtensionsV1beta1().RESTClient(), &apiextensionsv1beta1.Deployment{}, nil
case "daemonsets":
return c.client.ExtensionsV1beta1().RESTClient(), &apiextensionsv1beta1.DaemonSet{}, nil
case "jobs":
return c.client.BatchV1().RESTClient(), &apibatchv1.Job{}, nil
case "statefulsets":
return c.client.AppsV1beta1().RESTClient(), &apiappsv1beta1.StatefulSet{}, nil
case "cronjobs":
ok, err := c.isResourceSupported(c.client.BatchV1beta1().RESTClient().APIVersion(), resource)
if err != nil {
return nil, nil, err
}
if ok {
// kubernetes >= 1.8
return c.client.BatchV1beta1().RESTClient(), &apibatchv1beta1.CronJob{}, nil
}
// kubernetes < 1.8
return c.client.BatchV2alpha1().RESTClient(), &apibatchv2alpha1.CronJob{}, nil
}
if ok {
// kubernetes >= 1.8
return c.setupStore(c.client.BatchV1beta1().RESTClient(), resource, &apibatchv1beta1.CronJob{}, nil), nil
}
// kubernetes < 1.8
return c.setupStore(c.client.BatchV2alpha1().RESTClient(), resource, &apibatchv2alpha1.CronJob{}, nil), nil
return nil, nil, fmt.Errorf("Invalid resource: %v", resource)
}
// runReflectorUntil runs cache.Reflector#ListAndWatch in an endless loop, after checking that the resource is supported by kubernetes.
// Errors are logged and retried with exponential backoff.
func (c *client) runReflectorUntil(r *cache.Reflector, groupVersion schema.GroupVersion, resource string) {
func (c *client) runReflectorUntil(resource string, store cache.Store) {
var r *cache.Reflector
listAndWatch := func() (bool, error) {
select {
case <-c.quit:
return true, nil
default:
ok, err := c.isResourceSupported(groupVersion, resource)
if r == nil {
kclient, itemType, err := c.clientAndType(resource)
if err != nil {
return false, err
}
ok, err := c.isResourceSupported(kclient.APIVersion(), resource)
if err != nil {
return false, err
}
@@ -210,7 +224,15 @@ func (c *client) runReflectorUntil(r *cache.Reflector, groupVersion schema.Group
log.Infof("%v are not supported by this Kubernetes version", resource)
return true, nil
}
err = r.ListAndWatch(c.quit)
lw := cache.NewListWatchFromClient(kclient, resource, metav1.NamespaceAll, fields.Everything())
r = cache.NewReflector(lw, itemType, store, c.resyncPeriod)
}
select {
case <-c.quit:
return true, nil
default:
err := r.ListAndWatch(c.quit)
return false, err
}
}