Merge branch 'master' into leadership-election

This commit is contained in:
Alex Vest
2022-10-06 11:17:35 +01:00
18 changed files with 504 additions and 310 deletions

View File

@@ -4,21 +4,22 @@ import (
"fmt"
"time"
"github.com/stakater/Reloader/internal/pkg/options"
"github.com/sirupsen/logrus"
"github.com/stakater/Reloader/internal/pkg/handler"
"github.com/stakater/Reloader/internal/pkg/metrics"
"github.com/stakater/Reloader/internal/pkg/options"
"github.com/stakater/Reloader/internal/pkg/util"
"github.com/stakater/Reloader/pkg/kube"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
v1 "k8s.io/api/core/v1"
"k8s.io/kubectl/pkg/scheme"
)
// Controller for checking events
@@ -30,6 +31,7 @@ type Controller struct {
namespace string
ignoredNamespaces util.List
collectors metrics.Collectors
recorder record.EventRecorder
}
// controllerInitialized flag determines whether controlled is being initialized
@@ -44,6 +46,11 @@ func NewController(
namespace: namespace,
ignoredNamespaces: ignoredNamespaces,
}
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{
Interface: client.CoreV1().Events(""),
})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: fmt.Sprintf("reloader-%s", resource)})
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
listWatcher := cache.NewListWatchFromClient(client.CoreV1().RESTClient(), resource, namespace, fields.Everything())
@@ -57,6 +64,8 @@ func NewController(
c.informer = informer
c.queue = queue
c.collectors = collectors
c.recorder = recorder
logrus.Infof("created controller for: %s", resource)
return &c, nil
}
@@ -68,6 +77,7 @@ func (c *Controller) Add(obj interface{}) {
c.queue.Add(handler.ResourceCreatedHandler{
Resource: obj,
Collectors: c.collectors,
Recorder: c.recorder,
})
}
}
@@ -90,6 +100,7 @@ func (c *Controller) Update(old interface{}, new interface{}) {
Resource: new,
OldResource: old,
Collectors: c.collectors,
Recorder: c.recorder,
})
}
}
@@ -99,7 +110,7 @@ func (c *Controller) Delete(old interface{}) {
// Todo: Any future delete event can be handled here
}
//Run function for controller which handles the queue
// Run function for controller which handles the queue
func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
defer runtime.HandleCrash()