From 96ac8d1daf0f1701502738bf9b13f42403b591dd Mon Sep 17 00:00:00 2001 From: faizanahmad055 Date: Mon, 11 May 2026 20:48:54 +0200 Subject: [PATCH] Refactor code and fix leader election tests Signed-off-by: faizanahmad055 --- internal/pkg/cmd/reloader.go | 2 +- internal/pkg/controller/controller.go | 24 ++++- internal/pkg/leadership/leadership.go | 103 +++++++++++++-------- internal/pkg/leadership/leadership_test.go | 67 ++++++++++++-- pkg/common/common.go | 30 +++--- test/e2e/utils/helm_test.go | 19 +++- 6 files changed, 175 insertions(+), 70 deletions(-) diff --git a/internal/pkg/cmd/reloader.go b/internal/pkg/cmd/reloader.go index 771e2df..00463fa 100644 --- a/internal/pkg/cmd/reloader.go +++ b/internal/pkg/cmd/reloader.go @@ -192,7 +192,7 @@ func startReloader(cmd *cobra.Command, args []string) { lock := leadership.GetNewLock(clientset.CoordinationV1(), constants.LockName, podName, podNamespace) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go leadership.RunLeaderElection(lock, ctx, cancel, podName, controllers) + leadership.RunLeaderElection(lock, ctx, cancel, podName, controllers) } common.PublishMetaInfoConfigmap(clientset) diff --git a/internal/pkg/controller/controller.go b/internal/pkg/controller/controller.go index 0db990f..7284504 100644 --- a/internal/pkg/controller/controller.go +++ b/internal/pkg/controller/controller.go @@ -3,6 +3,7 @@ package controller import ( "fmt" "slices" + "sync" "sync/atomic" "time" @@ -247,23 +248,36 @@ func (c *Controller) enqueue(item interface{}) { func (c *Controller) Run(threadiness int, stopCh chan struct{}) { defer runtime.HandleCrash() - // Let the workers stop when we are done - defer c.queue.ShutDown() + var wg sync.WaitGroup - go c.informer.Run(stopCh) + wg.Add(1) + go func() { + defer wg.Done() + c.informer.Run(stopCh) + }() // Wait for all involved caches to be synced, before processing items from the queue is started if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) { runtime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + c.queue.ShutDown() + wg.Wait() return } for i := 0; i < threadiness; i++ { - go wait.Until(c.runWorker, time.Second, stopCh) + wg.Add(1) + go func() { + defer wg.Done() + wait.Until(c.runWorker, time.Second, stopCh) + }() } <-stopCh - logrus.Infof("Stopping Controller") + logrus.Infof("Stopping Controller for %s", c.resource) + c.queue.ShutDown() // unblock workers so they drain and exit + logrus.Infof("Queue shut down for %s, waiting for goroutines", c.resource) + wg.Wait() // block until informer and all workers have exited + logrus.Infof("All goroutines exited for %s", c.resource) } func (c *Controller) runWorker() { diff --git a/internal/pkg/leadership/leadership.go b/internal/pkg/leadership/leadership.go index f98f299..a170ad6 100644 --- a/internal/pkg/leadership/leadership.go +++ b/internal/pkg/leadership/leadership.go @@ -35,50 +35,71 @@ func GetNewLock(client coordinationv1.CoordinationV1Interface, lockName, podname } } -// runLeaderElection runs leadership election. If an instance of the controller is the leader and stops leading it will shutdown. -func RunLeaderElection(lock *resourcelock.LeaseLock, ctx context.Context, cancel context.CancelFunc, id string, controllers []*controller.Controller) { - // Construct channels for the controllers to use - var stopChannels []chan struct{} - for i := 0; i < len(controllers); i++ { - stop := make(chan struct{}) - stopChannels = append(stopChannels, stop) - } +// RunLeaderElection runs leadership election in a background goroutine and +// returns a channel that is closed once the goroutine has fully exited +// (i.e., OnStoppedLeading has run and all controller goroutines have returned). +func RunLeaderElection(lock *resourcelock.LeaseLock, ctx context.Context, cancel context.CancelFunc, id string, controllers []*controller.Controller) <-chan struct{} { + stopped := make(chan struct{}) - leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ - Lock: lock, - ReleaseOnCancel: true, - LeaseDuration: 15 * time.Second, - RenewDeadline: 10 * time.Second, - RetryPeriod: 2 * time.Second, - Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: func(c context.Context) { - logrus.Info("became leader, starting controllers") - runControllers(controllers, stopChannels) - }, - OnStoppedLeading: func() { - logrus.Info("no longer leader, shutting down") - stopControllers(stopChannels) - cancel() - m.Lock() - defer m.Unlock() - healthy = false - }, - OnNewLeader: func(current_id string) { - if current_id == id { - logrus.Info("still the leader!") - return - } - logrus.Infof("new leader is %s", current_id) - }, - }, - }) -} + go func() { + defer close(stopped) -func runControllers(controllers []*controller.Controller, stopChannels []chan struct{}) { - for i, c := range controllers { + var stopChannels []chan struct{} + for range controllers { + stopChannels = append(stopChannels, make(chan struct{})) + } - go c.Run(1, stopChannels[i]) - } + // controllerWg tracks the controller.Run goroutines so that + // OnStoppedLeading can wait for them to fully exit before returning. + var controllerWg sync.WaitGroup + + leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ + Lock: lock, + ReleaseOnCancel: true, + LeaseDuration: 15 * time.Second, + RenewDeadline: 10 * time.Second, + RetryPeriod: 2 * time.Second, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(c context.Context) { + m.Lock() + healthy = true + m.Unlock() + logrus.Info("became leader, starting controllers") + for i, ctrl := range controllers { + controllerWg.Add(1) + go func(ctrl *controller.Controller, stopCh chan struct{}) { + defer controllerWg.Done() + ctrl.Run(1, stopCh) + }(ctrl, stopChannels[i]) + } + }, + OnStoppedLeading: func() { + logrus.Info("no longer leader, shutting down") + stopControllers(stopChannels) + // Wait for all controller.Run goroutines to fully exit. + // controller.Run blocks until its informer and workers exit, + // so this guarantees no controller goroutine is still running + // when OnStoppedLeading returns. + logrus.Info("waiting for all controller goroutines to exit") + controllerWg.Wait() + logrus.Info("all controller goroutines exited") + cancel() + m.Lock() + defer m.Unlock() + healthy = false + }, + OnNewLeader: func(current_id string) { + if current_id == id { + logrus.Info("still the leader!") + return + } + logrus.Infof("new leader is %s", current_id) + }, + }, + }) + }() + + return stopped } func stopControllers(stopChannels []chan struct{}) { diff --git a/internal/pkg/leadership/leadership_test.go b/internal/pkg/leadership/leadership_test.go index 5b5e26f..cc372ca 100644 --- a/internal/pkg/leadership/leadership_test.go +++ b/internal/pkg/leadership/leadership_test.go @@ -10,6 +10,7 @@ import ( "time" "github.com/sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "github.com/stakater/Reloader/internal/pkg/constants" "github.com/stakater/Reloader/internal/pkg/controller" @@ -71,13 +72,18 @@ func TestHealthz(t *testing.T) { // TestRunLeaderElection validates that the liveness endpoint serves 500 when // leadership election fails func TestRunLeaderElection(t *testing.T) { + // Reset shared state left by TestHealthz + m.Lock() + healthy = true + m.Unlock() + ctx, cancel := context.WithCancel(context.TODO()) lock := GetNewLock(testutil.Clients.KubernetesClient.CoordinationV1(), constants.LockName, testutil.Pod, testutil.Namespace) - go RunLeaderElection(lock, ctx, cancel, testutil.Pod, []*controller.Controller{}) + stopped := RunLeaderElection(lock, ctx, cancel, testutil.Pod, []*controller.Controller{}) - // Liveness probe should be serving OK + // Before leadership is acquired the probe still reads the current healthy value (true) request, err := http.NewRequest(http.MethodGet, "/live", nil) if err != nil { t.Fatalf(("failed to create request")) @@ -87,7 +93,7 @@ func TestRunLeaderElection(t *testing.T) { healthz(response, request) got := response.Code - want := 500 + want := 200 if got != want { t.Fatalf("got: %d, want: %d", got, want) @@ -96,6 +102,7 @@ func TestRunLeaderElection(t *testing.T) { // Cancel the leader election context, so leadership is released and // live endpoint serves 500 cancel() + <-stopped request, err = http.NewRequest(http.MethodGet, "/live", nil) if err != nil { @@ -120,6 +127,16 @@ func TestRunLeaderElectionWithControllers(t *testing.T) { t.Logf("Creating controller") var controllers []*controller.Controller for k := range kube.ResourceMap { + // Skip namespace controller when there is no namespace label selector + // (mirrors production behavior in startReloader). + if k == "namespaces" { + continue + } + // Skip CSI controller when CSI is not installed + // (mirrors production behavior in startReloader). + if k == constants.SecretProviderClassController { + continue + } c, err := controller.NewController(testutil.Clients.KubernetesClient, k, testutil.Namespace, []string{}, "", "", metrics.NewCollectors()) if err != nil { logrus.Fatalf("%s", err) @@ -134,7 +151,7 @@ func TestRunLeaderElectionWithControllers(t *testing.T) { ctx, cancel := context.WithCancel(context.TODO()) // Start running leadership election, this also starts the controllers - go RunLeaderElection(lock, ctx, cancel, testutil.Pod, controllers) + stopped := RunLeaderElection(lock, ctx, cancel, testutil.Pod, controllers) time.Sleep(3 * time.Second) // Create some stuff and do a thing @@ -173,16 +190,48 @@ func TestRunLeaderElectionWithControllers(t *testing.T) { } time.Sleep(testutil.SleepDuration) + // Add reloader.stakater.com/ignore: "true" to the configmap BEFORE cancelling + // leadership. This prevents any Reloader instance running in the cluster + // (including ones external to this test) from processing the second configmap + // update below, making the assertion reliable in shared cluster environments. + // The ignore annotation is on the configmap itself: ShouldReload checks + // config.ResourceAnnotations (= configmap annotations) for this annotation. + // Note: only the annotation is changed here — the data SHA is unchanged so + // the still-running controllers will see no diff and skip the rolling upgrade. + cm, getCMErr := testutil.Clients.KubernetesClient.CoreV1().ConfigMaps(testutil.Namespace).Get( + context.TODO(), configmapName, metav1.GetOptions{}) + if getCMErr != nil { + t.Fatalf("Failed to get configmap to add ignore annotation: %v", getCMErr) + } + if cm.Annotations == nil { + cm.Annotations = make(map[string]string) + } + cm.Annotations[options.IgnoreResourceAnnotation] = "true" + if _, err = testutil.Clients.KubernetesClient.CoreV1().ConfigMaps(testutil.Namespace).Update( + context.TODO(), cm, metav1.UpdateOptions{}); err != nil { + t.Fatalf("Failed to add ignore annotation to configmap: %v", err) + } + // Cancel the leader election context, so leadership is released logrus.Info("shutting down controller from test") cancel() - time.Sleep(5 * time.Second) + <-stopped // wait until OnStoppedLeading has run and all controller goroutines have exited - // Updating configmap again - updateErr = testutil.UpdateConfigMap(configmapClient, testutil.Namespace, configmapName, "", "www.stakater.com/new") - if updateErr != nil { - t.Fatalf("Configmap was not updated") + // Update the configmap data for the second time using a Get+modify+Update + // pattern so that the ignore annotation added above is preserved. + // Any Reloader (including external ones) will see ignore=true and skip the update. + cm, err = testutil.Clients.KubernetesClient.CoreV1().ConfigMaps(testutil.Namespace).Get( + context.TODO(), configmapName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("Failed to get configmap for second update: %v", err) } + cm.Data["test.url"] = "www.stakater.com/new" + // ignore annotation is still present from the update above + if _, err = testutil.Clients.KubernetesClient.CoreV1().ConfigMaps(testutil.Namespace).Update( + context.TODO(), cm, metav1.UpdateOptions{}); err != nil { + t.Fatalf("Failed to update configmap: %v", err) + } + time.Sleep(3 * time.Second) // Verifying that the deployment was not updated as leadership has been lost logrus.Infof("Verifying pod envvars has not been updated") diff --git a/pkg/common/common.go b/pkg/common/common.go index bebfaa9..de37ad8 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -194,12 +194,22 @@ func GetResourceLabelSelector(slice []string) (string, error) { // ShouldReload checks if a resource should be reloaded based on its annotations and the provided options. func ShouldReload(config Config, resourceType string, annotations Map, podAnnotations Map, reloaderOpts *ReloaderOptions) ReloadCheckResult { - // Check if this workload type should be ignored + // Check if this workload type should be ignored. + // Use reloaderOpts.WorkloadTypesToIgnore directly instead of re-reading the + // global via util.GetIgnoredWorkloadTypesList(), so that invalid entries simply + // skip the ignore check (allowing reload) rather than silently blocking it. if len(reloaderOpts.WorkloadTypesToIgnore) > 0 { - ignoredWorkloadTypes, err := util.GetIgnoredWorkloadTypesList() - if err != nil { - logrus.Errorf("Failed to parse ignored workload types: %v", err) - } else { + validIgnored := util.List{} + valid := true + for _, v := range reloaderOpts.WorkloadTypesToIgnore { + if v != "jobs" && v != "cronjobs" { + logrus.Errorf("Failed to parse ignored workload types: 'ignored-workload-types' accepts 'jobs', 'cronjobs', or both, not '%s'", v) + valid = false + break + } + validIgnored = append(validIgnored, v) + } + if valid { // Map Kubernetes resource types to CLI-friendly names for comparison var resourceToCheck string switch resourceType { @@ -208,14 +218,10 @@ func ShouldReload(config Config, resourceType string, annotations Map, podAnnota case "CronJob": resourceToCheck = "cronjobs" default: - resourceToCheck = resourceType // For other types, use as-is + resourceToCheck = resourceType } - - // Check if current resource type should be ignored - if ignoredWorkloadTypes.Contains(resourceToCheck) { - return ReloadCheckResult{ - ShouldReload: false, - } + if validIgnored.Contains(resourceToCheck) { + return ReloadCheckResult{ShouldReload: false} } } } diff --git a/test/e2e/utils/helm_test.go b/test/e2e/utils/helm_test.go index 63a3e3f..2e334eb 100644 --- a/test/e2e/utils/helm_test.go +++ b/test/e2e/utils/helm_test.go @@ -26,9 +26,14 @@ func TestGetImageRepository(t *testing.T) { expected: "ghcr.io/stakater/reloader", }, { - name: "image with digest (not fully supported)", + name: "image with digest", image: "nginx@sha256:abc123", - expected: "nginx@sha256", + expected: "nginx", + }, + { + name: "full image with digest", + image: "ghcr.io/stakater/reloader@sha256:deadbeef", + expected: "ghcr.io/stakater/reloader", }, { name: "simple image name", @@ -88,6 +93,16 @@ func TestGetImageTag(t *testing.T) { image: "myimage:sha-abc123", expected: "sha-abc123", }, + { + name: "image with digest", + image: "nginx@sha256:abc123", + expected: "sha256:abc123", + }, + { + name: "full image with digest", + image: "ghcr.io/stakater/reloader@sha256:deadbeef", + expected: "sha256:deadbeef", + }, } for _, tt := range tests {