Refactor code and fix leader election tests

Signed-off-by: faizanahmad055 <faizan.ahmad55@outlook.com>
This commit is contained in:
faizanahmad055
2026-05-11 20:48:54 +02:00
parent 199587dd42
commit 96ac8d1daf
6 changed files with 175 additions and 70 deletions

View File

@@ -192,7 +192,7 @@ func startReloader(cmd *cobra.Command, args []string) {
lock := leadership.GetNewLock(clientset.CoordinationV1(), constants.LockName, podName, podNamespace)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go leadership.RunLeaderElection(lock, ctx, cancel, podName, controllers)
leadership.RunLeaderElection(lock, ctx, cancel, podName, controllers)
}
common.PublishMetaInfoConfigmap(clientset)

View File

@@ -3,6 +3,7 @@ package controller
import (
"fmt"
"slices"
"sync"
"sync/atomic"
"time"
@@ -247,23 +248,36 @@ func (c *Controller) enqueue(item interface{}) {
func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
defer runtime.HandleCrash()
// Let the workers stop when we are done
defer c.queue.ShutDown()
var wg sync.WaitGroup
go c.informer.Run(stopCh)
wg.Add(1)
go func() {
defer wg.Done()
c.informer.Run(stopCh)
}()
// Wait for all involved caches to be synced, before processing items from the queue is started
if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
runtime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
c.queue.ShutDown()
wg.Wait()
return
}
for i := 0; i < threadiness; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
wg.Add(1)
go func() {
defer wg.Done()
wait.Until(c.runWorker, time.Second, stopCh)
}()
}
<-stopCh
logrus.Infof("Stopping Controller")
logrus.Infof("Stopping Controller for %s", c.resource)
c.queue.ShutDown() // unblock workers so they drain and exit
logrus.Infof("Queue shut down for %s, waiting for goroutines", c.resource)
wg.Wait() // block until informer and all workers have exited
logrus.Infof("All goroutines exited for %s", c.resource)
}
func (c *Controller) runWorker() {

View File

@@ -35,50 +35,71 @@ func GetNewLock(client coordinationv1.CoordinationV1Interface, lockName, podname
}
}
// runLeaderElection runs leadership election. If an instance of the controller is the leader and stops leading it will shutdown.
func RunLeaderElection(lock *resourcelock.LeaseLock, ctx context.Context, cancel context.CancelFunc, id string, controllers []*controller.Controller) {
// Construct channels for the controllers to use
var stopChannels []chan struct{}
for i := 0; i < len(controllers); i++ {
stop := make(chan struct{})
stopChannels = append(stopChannels, stop)
}
// RunLeaderElection runs leadership election in a background goroutine and
// returns a channel that is closed once the goroutine has fully exited
// (i.e., OnStoppedLeading has run and all controller goroutines have returned).
func RunLeaderElection(lock *resourcelock.LeaseLock, ctx context.Context, cancel context.CancelFunc, id string, controllers []*controller.Controller) <-chan struct{} {
stopped := make(chan struct{})
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
ReleaseOnCancel: true,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(c context.Context) {
logrus.Info("became leader, starting controllers")
runControllers(controllers, stopChannels)
},
OnStoppedLeading: func() {
logrus.Info("no longer leader, shutting down")
stopControllers(stopChannels)
cancel()
m.Lock()
defer m.Unlock()
healthy = false
},
OnNewLeader: func(current_id string) {
if current_id == id {
logrus.Info("still the leader!")
return
}
logrus.Infof("new leader is %s", current_id)
},
},
})
}
go func() {
defer close(stopped)
func runControllers(controllers []*controller.Controller, stopChannels []chan struct{}) {
for i, c := range controllers {
var stopChannels []chan struct{}
for range controllers {
stopChannels = append(stopChannels, make(chan struct{}))
}
go c.Run(1, stopChannels[i])
}
// controllerWg tracks the controller.Run goroutines so that
// OnStoppedLeading can wait for them to fully exit before returning.
var controllerWg sync.WaitGroup
leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
ReleaseOnCancel: true,
LeaseDuration: 15 * time.Second,
RenewDeadline: 10 * time.Second,
RetryPeriod: 2 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(c context.Context) {
m.Lock()
healthy = true
m.Unlock()
logrus.Info("became leader, starting controllers")
for i, ctrl := range controllers {
controllerWg.Add(1)
go func(ctrl *controller.Controller, stopCh chan struct{}) {
defer controllerWg.Done()
ctrl.Run(1, stopCh)
}(ctrl, stopChannels[i])
}
},
OnStoppedLeading: func() {
logrus.Info("no longer leader, shutting down")
stopControllers(stopChannels)
// Wait for all controller.Run goroutines to fully exit.
// controller.Run blocks until its informer and workers exit,
// so this guarantees no controller goroutine is still running
// when OnStoppedLeading returns.
logrus.Info("waiting for all controller goroutines to exit")
controllerWg.Wait()
logrus.Info("all controller goroutines exited")
cancel()
m.Lock()
defer m.Unlock()
healthy = false
},
OnNewLeader: func(current_id string) {
if current_id == id {
logrus.Info("still the leader!")
return
}
logrus.Infof("new leader is %s", current_id)
},
},
})
}()
return stopped
}
func stopControllers(stopChannels []chan struct{}) {

View File

@@ -10,6 +10,7 @@ import (
"time"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/stakater/Reloader/internal/pkg/constants"
"github.com/stakater/Reloader/internal/pkg/controller"
@@ -71,13 +72,18 @@ func TestHealthz(t *testing.T) {
// TestRunLeaderElection validates that the liveness endpoint serves 500 when
// leadership election fails
func TestRunLeaderElection(t *testing.T) {
// Reset shared state left by TestHealthz
m.Lock()
healthy = true
m.Unlock()
ctx, cancel := context.WithCancel(context.TODO())
lock := GetNewLock(testutil.Clients.KubernetesClient.CoordinationV1(), constants.LockName, testutil.Pod, testutil.Namespace)
go RunLeaderElection(lock, ctx, cancel, testutil.Pod, []*controller.Controller{})
stopped := RunLeaderElection(lock, ctx, cancel, testutil.Pod, []*controller.Controller{})
// Liveness probe should be serving OK
// Before leadership is acquired the probe still reads the current healthy value (true)
request, err := http.NewRequest(http.MethodGet, "/live", nil)
if err != nil {
t.Fatalf(("failed to create request"))
@@ -87,7 +93,7 @@ func TestRunLeaderElection(t *testing.T) {
healthz(response, request)
got := response.Code
want := 500
want := 200
if got != want {
t.Fatalf("got: %d, want: %d", got, want)
@@ -96,6 +102,7 @@ func TestRunLeaderElection(t *testing.T) {
// Cancel the leader election context, so leadership is released and
// live endpoint serves 500
cancel()
<-stopped
request, err = http.NewRequest(http.MethodGet, "/live", nil)
if err != nil {
@@ -120,6 +127,16 @@ func TestRunLeaderElectionWithControllers(t *testing.T) {
t.Logf("Creating controller")
var controllers []*controller.Controller
for k := range kube.ResourceMap {
// Skip namespace controller when there is no namespace label selector
// (mirrors production behavior in startReloader).
if k == "namespaces" {
continue
}
// Skip CSI controller when CSI is not installed
// (mirrors production behavior in startReloader).
if k == constants.SecretProviderClassController {
continue
}
c, err := controller.NewController(testutil.Clients.KubernetesClient, k, testutil.Namespace, []string{}, "", "", metrics.NewCollectors())
if err != nil {
logrus.Fatalf("%s", err)
@@ -134,7 +151,7 @@ func TestRunLeaderElectionWithControllers(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
// Start running leadership election, this also starts the controllers
go RunLeaderElection(lock, ctx, cancel, testutil.Pod, controllers)
stopped := RunLeaderElection(lock, ctx, cancel, testutil.Pod, controllers)
time.Sleep(3 * time.Second)
// Create some stuff and do a thing
@@ -173,16 +190,48 @@ func TestRunLeaderElectionWithControllers(t *testing.T) {
}
time.Sleep(testutil.SleepDuration)
// Add reloader.stakater.com/ignore: "true" to the configmap BEFORE cancelling
// leadership. This prevents any Reloader instance running in the cluster
// (including ones external to this test) from processing the second configmap
// update below, making the assertion reliable in shared cluster environments.
// The ignore annotation is on the configmap itself: ShouldReload checks
// config.ResourceAnnotations (= configmap annotations) for this annotation.
// Note: only the annotation is changed here — the data SHA is unchanged so
// the still-running controllers will see no diff and skip the rolling upgrade.
cm, getCMErr := testutil.Clients.KubernetesClient.CoreV1().ConfigMaps(testutil.Namespace).Get(
context.TODO(), configmapName, metav1.GetOptions{})
if getCMErr != nil {
t.Fatalf("Failed to get configmap to add ignore annotation: %v", getCMErr)
}
if cm.Annotations == nil {
cm.Annotations = make(map[string]string)
}
cm.Annotations[options.IgnoreResourceAnnotation] = "true"
if _, err = testutil.Clients.KubernetesClient.CoreV1().ConfigMaps(testutil.Namespace).Update(
context.TODO(), cm, metav1.UpdateOptions{}); err != nil {
t.Fatalf("Failed to add ignore annotation to configmap: %v", err)
}
// Cancel the leader election context, so leadership is released
logrus.Info("shutting down controller from test")
cancel()
time.Sleep(5 * time.Second)
<-stopped // wait until OnStoppedLeading has run and all controller goroutines have exited
// Updating configmap again
updateErr = testutil.UpdateConfigMap(configmapClient, testutil.Namespace, configmapName, "", "www.stakater.com/new")
if updateErr != nil {
t.Fatalf("Configmap was not updated")
// Update the configmap data for the second time using a Get+modify+Update
// pattern so that the ignore annotation added above is preserved.
// Any Reloader (including external ones) will see ignore=true and skip the update.
cm, err = testutil.Clients.KubernetesClient.CoreV1().ConfigMaps(testutil.Namespace).Get(
context.TODO(), configmapName, metav1.GetOptions{})
if err != nil {
t.Fatalf("Failed to get configmap for second update: %v", err)
}
cm.Data["test.url"] = "www.stakater.com/new"
// ignore annotation is still present from the update above
if _, err = testutil.Clients.KubernetesClient.CoreV1().ConfigMaps(testutil.Namespace).Update(
context.TODO(), cm, metav1.UpdateOptions{}); err != nil {
t.Fatalf("Failed to update configmap: %v", err)
}
time.Sleep(3 * time.Second)
// Verifying that the deployment was not updated as leadership has been lost
logrus.Infof("Verifying pod envvars has not been updated")

View File

@@ -194,12 +194,22 @@ func GetResourceLabelSelector(slice []string) (string, error) {
// ShouldReload checks if a resource should be reloaded based on its annotations and the provided options.
func ShouldReload(config Config, resourceType string, annotations Map, podAnnotations Map, reloaderOpts *ReloaderOptions) ReloadCheckResult {
// Check if this workload type should be ignored
// Check if this workload type should be ignored.
// Use reloaderOpts.WorkloadTypesToIgnore directly instead of re-reading the
// global via util.GetIgnoredWorkloadTypesList(), so that invalid entries simply
// skip the ignore check (allowing reload) rather than silently blocking it.
if len(reloaderOpts.WorkloadTypesToIgnore) > 0 {
ignoredWorkloadTypes, err := util.GetIgnoredWorkloadTypesList()
if err != nil {
logrus.Errorf("Failed to parse ignored workload types: %v", err)
} else {
validIgnored := util.List{}
valid := true
for _, v := range reloaderOpts.WorkloadTypesToIgnore {
if v != "jobs" && v != "cronjobs" {
logrus.Errorf("Failed to parse ignored workload types: 'ignored-workload-types' accepts 'jobs', 'cronjobs', or both, not '%s'", v)
valid = false
break
}
validIgnored = append(validIgnored, v)
}
if valid {
// Map Kubernetes resource types to CLI-friendly names for comparison
var resourceToCheck string
switch resourceType {
@@ -208,14 +218,10 @@ func ShouldReload(config Config, resourceType string, annotations Map, podAnnota
case "CronJob":
resourceToCheck = "cronjobs"
default:
resourceToCheck = resourceType // For other types, use as-is
resourceToCheck = resourceType
}
// Check if current resource type should be ignored
if ignoredWorkloadTypes.Contains(resourceToCheck) {
return ReloadCheckResult{
ShouldReload: false,
}
if validIgnored.Contains(resourceToCheck) {
return ReloadCheckResult{ShouldReload: false}
}
}
}

View File

@@ -26,9 +26,14 @@ func TestGetImageRepository(t *testing.T) {
expected: "ghcr.io/stakater/reloader",
},
{
name: "image with digest (not fully supported)",
name: "image with digest",
image: "nginx@sha256:abc123",
expected: "nginx@sha256",
expected: "nginx",
},
{
name: "full image with digest",
image: "ghcr.io/stakater/reloader@sha256:deadbeef",
expected: "ghcr.io/stakater/reloader",
},
{
name: "simple image name",
@@ -88,6 +93,16 @@ func TestGetImageTag(t *testing.T) {
image: "myimage:sha-abc123",
expected: "sha-abc123",
},
{
name: "image with digest",
image: "nginx@sha256:abc123",
expected: "sha256:abc123",
},
{
name: "full image with digest",
image: "ghcr.io/stakater/reloader@sha256:deadbeef",
expected: "sha256:deadbeef",
},
}
for _, tt := range tests {