diff --git a/internal/pkg/cmd/reloader.go b/internal/pkg/cmd/reloader.go index 15c8d3c..963070c 100644 --- a/internal/pkg/cmd/reloader.go +++ b/internal/pkg/cmd/reloader.go @@ -148,13 +148,12 @@ func startReloader(cmd *cobra.Command, args []string) { logrus.Fatalf("%s", err) } - // If HA is enabled then we need to run leadership election - if options.EnableHA { - c.SetLeader(false) - } - controllers = append(controllers, c) + // If HA is enabled we only run the controller when + if options.EnableHA { + continue + } // Now let's start the controller stop := make(chan struct{}) defer close(stop) @@ -164,11 +163,17 @@ func startReloader(cmd *cobra.Command, args []string) { // Run the leadership election if options.EnableHA { + var stopChannels []chan struct{} + for i := 0; i < len(controllers); i++ { + stop := make(chan struct{}) + stopChannels = append(stopChannels, stop) + } podName, podNamespace := getHAEnvs() lock := getNewLock(clientset, constants.LockName, podName, podNamespace) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - runLeaderElection(lock, ctx, podName, controllers) + runLeaderElection(lock, ctx, cancel, podName, controllers, stopChannels) + return } select {} @@ -187,22 +192,23 @@ func getNewLock(clientset *kubernetes.Clientset, lockName, podname, namespace st } } -func runLeaderElection(lock *resourcelock.LeaseLock, ctx context.Context, id string, controllers []*controller.Controller) { +// runLeaderElection runs leadership election. If an instance of the controller is the leader and stops leading it will shutdown. +func runLeaderElection(lock *resourcelock.LeaseLock, ctx context.Context, cancel context.CancelFunc, id string, controllers []*controller.Controller, stopChannels []chan struct{}) { leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ Lock: lock, ReleaseOnCancel: true, - // TODO Validate that keys persist in the cache for at least one leadership election cycle - LeaseDuration: 15 * time.Second, - RenewDeadline: 10 * time.Second, - RetryPeriod: 2 * time.Second, + LeaseDuration: 15 * time.Second, + RenewDeadline: 10 * time.Second, + RetryPeriod: 2 * time.Second, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(c context.Context) { - setLeader(controllers, true) - logrus.Info("became leader") + logrus.Info("became leader, starting controllers") + runControllers(controllers, stopChannels) }, OnStoppedLeading: func() { - setLeader(controllers, false) - logrus.Info("no longer leader") + logrus.Info("no longer leader, shutting down") + stopControllers(stopChannels) + cancel() }, OnNewLeader: func(current_id string) { if current_id == id { @@ -215,10 +221,16 @@ func runLeaderElection(lock *resourcelock.LeaseLock, ctx context.Context, id str }) } -func setLeader(controllers []*controller.Controller, isLeader bool) { - for _, c := range controllers { +func runControllers(controllers []*controller.Controller, stopChannels []chan struct{}) { + for i, c := range controllers { c := c - c.SetLeader(isLeader) + go c.Run(1, stopChannels[i]) + } +} + +func stopControllers(stopChannels []chan struct{}) { + for _, c := range stopChannels { + close(c) } } diff --git a/internal/pkg/controller/controller.go b/internal/pkg/controller/controller.go index 29b8c08..1e75bb3 100644 --- a/internal/pkg/controller/controller.go +++ b/internal/pkg/controller/controller.go @@ -30,7 +30,6 @@ type Controller struct { namespace string ignoredNamespaces util.List collectors metrics.Collectors - isLeader bool } // controllerInitialized flag determines whether controlled is being initialized @@ -58,7 +57,7 @@ func NewController( c.informer = informer c.queue = queue c.collectors = collectors - c.isLeader = true + logrus.Infof("created controller for: %s", resource) return &c, nil } @@ -143,7 +142,7 @@ func (c *Controller) processNextItem() bool { defer c.queue.Done(resourceHandler) // Invoke the method containing the business logic - err := resourceHandler.(handler.ResourceHandler).Handle(c.isLeader) + err := resourceHandler.(handler.ResourceHandler).Handle() // Handle the error if something went wrong during the execution of the business logic c.handleErr(err, resourceHandler) return true @@ -174,8 +173,3 @@ func (c *Controller) handleErr(err error, key interface{}) { runtime.HandleError(err) logrus.Infof("Dropping the key %q out of the queue: %v", key, err) } - -func (c *Controller) SetLeader(isLeader bool) { - c.isLeader = isLeader - logrus.Info("controller active") -} diff --git a/internal/pkg/handler/create.go b/internal/pkg/handler/create.go index da3c75e..e6cc41a 100644 --- a/internal/pkg/handler/create.go +++ b/internal/pkg/handler/create.go @@ -1,8 +1,6 @@ package handler import ( - "fmt" - "github.com/sirupsen/logrus" "github.com/stakater/Reloader/internal/pkg/metrics" "github.com/stakater/Reloader/internal/pkg/util" @@ -16,16 +14,12 @@ type ResourceCreatedHandler struct { } // Handle processes the newly created resource -func (r ResourceCreatedHandler) Handle(isLeader bool) error { +func (r ResourceCreatedHandler) Handle() error { if r.Resource == nil { logrus.Errorf("Resource creation handler received nil resource") } else { config, _ := r.GetConfig() - // process resource based on its type - if isLeader { - return doRollingUpgrade(config, r.Collectors) - } - return fmt.Errorf("instance is not leader, will not perform rolling upgrade on %s %s/%s", config.Type, config.ResourceName, config.Namespace) + return doRollingUpgrade(config, r.Collectors) } return nil } diff --git a/internal/pkg/handler/handler.go b/internal/pkg/handler/handler.go index 4dbf10e..634e080 100644 --- a/internal/pkg/handler/handler.go +++ b/internal/pkg/handler/handler.go @@ -6,6 +6,6 @@ import ( // ResourceHandler handles the creation and update of resources type ResourceHandler interface { - Handle(isLeader bool) error + Handle() error GetConfig() (util.Config, string) } diff --git a/internal/pkg/handler/update.go b/internal/pkg/handler/update.go index 77da76e..91f320a 100644 --- a/internal/pkg/handler/update.go +++ b/internal/pkg/handler/update.go @@ -1,8 +1,6 @@ package handler import ( - "fmt" - "github.com/sirupsen/logrus" "github.com/stakater/Reloader/internal/pkg/metrics" "github.com/stakater/Reloader/internal/pkg/util" @@ -17,17 +15,13 @@ type ResourceUpdatedHandler struct { } // Handle processes the updated resource -func (r ResourceUpdatedHandler) Handle(isLeader bool) error { +func (r ResourceUpdatedHandler) Handle() error { if r.Resource == nil || r.OldResource == nil { logrus.Errorf("Resource update handler received nil resource") } else { config, oldSHAData := r.GetConfig() if config.SHAValue != oldSHAData { - // process resource based on its type - if isLeader { - return doRollingUpgrade(config, r.Collectors) - } - return fmt.Errorf("instance is not leader, will not perform rolling upgrade on %s %s/%s", config.Type, config.ResourceName, config.Namespace) + return doRollingUpgrade(config, r.Collectors) } } return nil