mirror of
https://github.com/stakater/Reloader.git
synced 2026-05-17 06:06:39 +00:00
Merge master into feat/e2e-test
This commit is contained in:
@@ -2,9 +2,11 @@ package controller
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"slices"
|
||||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
"github.com/stakater/Reloader/internal/pkg/constants"
|
||||
"github.com/stakater/Reloader/internal/pkg/handler"
|
||||
"github.com/stakater/Reloader/internal/pkg/metrics"
|
||||
"github.com/stakater/Reloader/internal/pkg/options"
|
||||
@@ -21,7 +23,7 @@ import (
|
||||
"k8s.io/client-go/tools/record"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/kubectl/pkg/scheme"
|
||||
"k8s.io/utils/strings/slices"
|
||||
csiv1 "sigs.k8s.io/secrets-store-csi-driver/apis/v1"
|
||||
)
|
||||
|
||||
// Controller for checking events
|
||||
@@ -79,7 +81,12 @@ func NewController(
|
||||
}
|
||||
}
|
||||
|
||||
listWatcher := cache.NewFilteredListWatchFromClient(client.CoreV1().RESTClient(), resource, namespace, optionsModifier)
|
||||
getterRESTClient, err := getClientForResource(resource, client)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to initialize REST client for %s: %w", resource, err)
|
||||
}
|
||||
|
||||
listWatcher := cache.NewFilteredListWatchFromClient(getterRESTClient, resource, namespace, optionsModifier)
|
||||
|
||||
_, informer := cache.NewInformerWithOptions(cache.InformerOptions{
|
||||
ListerWatcher: listWatcher,
|
||||
@@ -103,30 +110,38 @@ func NewController(
|
||||
|
||||
// Add function to add a new object to the queue in case of creating a resource
|
||||
func (c *Controller) Add(obj interface{}) {
|
||||
c.collectors.RecordEventReceived("add", c.resource)
|
||||
|
||||
switch object := obj.(type) {
|
||||
case *v1.Namespace:
|
||||
c.addSelectedNamespaceToCache(*object)
|
||||
return
|
||||
case *csiv1.SecretProviderClassPodStatus:
|
||||
return
|
||||
}
|
||||
|
||||
if options.ReloadOnCreate == "true" {
|
||||
if !c.resourceInIgnoredNamespace(obj) && c.resourceInSelectedNamespaces(obj) && secretControllerInitialized && configmapControllerInitialized {
|
||||
c.queue.Add(handler.ResourceCreatedHandler{
|
||||
Resource: obj,
|
||||
Collectors: c.collectors,
|
||||
Recorder: c.recorder,
|
||||
c.enqueue(handler.ResourceCreatedHandler{
|
||||
Resource: obj,
|
||||
Collectors: c.collectors,
|
||||
Recorder: c.recorder,
|
||||
EnqueueTime: time.Now(),
|
||||
})
|
||||
} else {
|
||||
c.collectors.RecordSkipped("ignored_or_not_selected")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Controller) resourceInIgnoredNamespace(raw interface{}) bool {
|
||||
switch object := raw.(type) {
|
||||
switch obj := raw.(type) {
|
||||
case *v1.ConfigMap:
|
||||
return c.ignoredNamespaces.Contains(object.Namespace)
|
||||
return c.ignoredNamespaces.Contains(obj.Namespace)
|
||||
case *v1.Secret:
|
||||
return c.ignoredNamespaces.Contains(object.Namespace)
|
||||
return c.ignoredNamespaces.Contains(obj.Namespace)
|
||||
case *csiv1.SecretProviderClassPodStatus:
|
||||
return c.ignoredNamespaces.Contains(obj.Namespace)
|
||||
}
|
||||
return false
|
||||
}
|
||||
@@ -145,6 +160,10 @@ func (c *Controller) resourceInSelectedNamespaces(raw interface{}) bool {
|
||||
if slices.Contains(selectedNamespacesCache, object.GetNamespace()) {
|
||||
return true
|
||||
}
|
||||
case *csiv1.SecretProviderClassPodStatus:
|
||||
if slices.Contains(selectedNamespacesCache, object.GetNamespace()) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
@@ -166,31 +185,44 @@ func (c *Controller) removeSelectedNamespaceFromCache(namespace v1.Namespace) {
|
||||
|
||||
// Update function to add an old object and a new object to the queue in case of updating a resource
|
||||
func (c *Controller) Update(old interface{}, new interface{}) {
|
||||
c.collectors.RecordEventReceived("update", c.resource)
|
||||
|
||||
switch new.(type) {
|
||||
case *v1.Namespace:
|
||||
return
|
||||
}
|
||||
|
||||
if !c.resourceInIgnoredNamespace(new) && c.resourceInSelectedNamespaces(new) {
|
||||
c.queue.Add(handler.ResourceUpdatedHandler{
|
||||
c.enqueue(handler.ResourceUpdatedHandler{
|
||||
Resource: new,
|
||||
OldResource: old,
|
||||
Collectors: c.collectors,
|
||||
Recorder: c.recorder,
|
||||
EnqueueTime: time.Now(),
|
||||
})
|
||||
} else {
|
||||
c.collectors.RecordSkipped("ignored_or_not_selected")
|
||||
}
|
||||
}
|
||||
|
||||
// Delete function to add an object to the queue in case of deleting a resource
|
||||
func (c *Controller) Delete(old interface{}) {
|
||||
c.collectors.RecordEventReceived("delete", c.resource)
|
||||
|
||||
if _, ok := old.(*csiv1.SecretProviderClassPodStatus); ok {
|
||||
return
|
||||
}
|
||||
|
||||
if options.ReloadOnDelete == "true" {
|
||||
if !c.resourceInIgnoredNamespace(old) && c.resourceInSelectedNamespaces(old) && secretControllerInitialized && configmapControllerInitialized {
|
||||
c.queue.Add(handler.ResourceDeleteHandler{
|
||||
Resource: old,
|
||||
Collectors: c.collectors,
|
||||
Recorder: c.recorder,
|
||||
c.enqueue(handler.ResourceDeleteHandler{
|
||||
Resource: old,
|
||||
Collectors: c.collectors,
|
||||
Recorder: c.recorder,
|
||||
EnqueueTime: time.Now(),
|
||||
})
|
||||
} else {
|
||||
c.collectors.RecordSkipped("ignored_or_not_selected")
|
||||
}
|
||||
}
|
||||
|
||||
@@ -201,6 +233,13 @@ func (c *Controller) Delete(old interface{}) {
|
||||
}
|
||||
}
|
||||
|
||||
// enqueue adds an item to the queue and records metrics
|
||||
func (c *Controller) enqueue(item interface{}) {
|
||||
c.queue.Add(item)
|
||||
c.collectors.RecordQueueAdd()
|
||||
c.collectors.SetQueueDepth(c.queue.Len())
|
||||
}
|
||||
|
||||
// Run function for controller which handles the queue
|
||||
func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
|
||||
defer runtime.HandleCrash()
|
||||
@@ -242,13 +281,34 @@ func (c *Controller) processNextItem() bool {
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
|
||||
c.collectors.SetQueueDepth(c.queue.Len())
|
||||
|
||||
// Tell the queue that we are done with processing this key. This unblocks the key for other workers
|
||||
// This allows safe parallel processing because two events with the same key are never processed in
|
||||
// parallel.
|
||||
defer c.queue.Done(resourceHandler)
|
||||
|
||||
// Record queue latency if the handler supports it
|
||||
if h, ok := resourceHandler.(handler.TimedHandler); ok {
|
||||
queueLatency := time.Since(h.GetEnqueueTime())
|
||||
c.collectors.RecordQueueLatency(queueLatency)
|
||||
}
|
||||
|
||||
// Track reconcile/handler duration
|
||||
startTime := time.Now()
|
||||
|
||||
// Invoke the method containing the business logic
|
||||
err := resourceHandler.(handler.ResourceHandler).Handle()
|
||||
|
||||
duration := time.Since(startTime)
|
||||
|
||||
if err != nil {
|
||||
c.collectors.RecordReconcile("error", duration)
|
||||
} else {
|
||||
c.collectors.RecordReconcile("success", duration)
|
||||
}
|
||||
|
||||
// Handle the error if something went wrong during the execution of the business logic
|
||||
c.handleErr(err, resourceHandler)
|
||||
return true
|
||||
@@ -261,16 +321,26 @@ func (c *Controller) handleErr(err error, key interface{}) {
|
||||
// This ensures that future processing of updates for this key is not delayed because of
|
||||
// an outdated error history.
|
||||
c.queue.Forget(key)
|
||||
|
||||
// Record successful event processing
|
||||
c.collectors.RecordEventProcessed("unknown", c.resource, "success")
|
||||
return
|
||||
}
|
||||
|
||||
// Record error
|
||||
c.collectors.RecordError("handler_error")
|
||||
|
||||
// This controller retries 5 times if something goes wrong. After that, it stops trying.
|
||||
if c.queue.NumRequeues(key) < 5 {
|
||||
logrus.Errorf("Error syncing events: %v", err)
|
||||
|
||||
// Record retry
|
||||
c.collectors.RecordRetry()
|
||||
|
||||
// Re-enqueue the key rate limited. Based on the rate limiter on the
|
||||
// queue and the re-enqueue history, the key will be processed later again.
|
||||
c.queue.AddRateLimited(key)
|
||||
c.collectors.SetQueueDepth(c.queue.Len())
|
||||
return
|
||||
}
|
||||
|
||||
@@ -279,4 +349,17 @@ func (c *Controller) handleErr(err error, key interface{}) {
|
||||
runtime.HandleError(err)
|
||||
logrus.Errorf("Dropping key out of the queue: %v", err)
|
||||
logrus.Debugf("Dropping the key %q out of the queue: %v", key, err)
|
||||
|
||||
c.collectors.RecordEventProcessed("unknown", c.resource, "dropped")
|
||||
}
|
||||
|
||||
func getClientForResource(resource string, coreClient kubernetes.Interface) (cache.Getter, error) {
|
||||
if resource == constants.SecretProviderClassController {
|
||||
csiClient, err := kube.GetCSIClient()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get CSI client: %w", err)
|
||||
}
|
||||
return csiClient.SecretsstoreV1().RESTClient(), nil
|
||||
}
|
||||
return coreClient.CoreV1().RESTClient(), nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user