From 11ae057b0a80ef685ae2940656abb0267c9858ea Mon Sep 17 00:00:00 2001 From: Alex Vest Date: Fri, 16 Sep 2022 12:06:04 +0100 Subject: [PATCH] Add tests for leadership election Pull liveness into leadership to ease testing, logically the liveness probe is directly affected by leadership so it makes sense here. Moved some of the components of the controller tests into the testutil package for reuse in my own tests. --- internal/pkg/cmd/reloader.go | 22 +-- internal/pkg/handler/create.go | 1 + internal/pkg/handler/update.go | 1 + internal/pkg/leadership/leadership.go | 36 +++- internal/pkg/leadership/leadership_test.go | 213 +++++++++++++++++++++ internal/pkg/testutil/kube.go | 14 ++ 6 files changed, 263 insertions(+), 24 deletions(-) create mode 100644 internal/pkg/leadership/leadership_test.go diff --git a/internal/pkg/cmd/reloader.go b/internal/pkg/cmd/reloader.go index 18f0999..9810583 100644 --- a/internal/pkg/cmd/reloader.go +++ b/internal/pkg/cmd/reloader.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "net/http" "os" "strings" @@ -21,11 +20,6 @@ import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -var ( - // Used for liveness probe - healthy bool = true -) - // NewReloaderCommand starts the reloader controller func NewReloaderCommand() *cobra.Command { cmd := &cobra.Command{ @@ -167,14 +161,13 @@ func startReloader(cmd *cobra.Command, args []string) { // Run leadership election if options.EnableHA { podName, podNamespace := getHAEnvs() - lock := leadership.GetNewLock(clientset, constants.LockName, podName, podNamespace) + lock := leadership.GetNewLock(clientset.CoordinationV1(), constants.LockName, podName, podNamespace) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - leadership.RunLeaderElection(lock, ctx, cancel, podName, controllers, healthy) + leadership.RunLeaderElection(lock, ctx, cancel, podName, controllers) } - http.HandleFunc("/live", healthz) - logrus.Fatal(http.ListenAndServe(":8080", nil)) + logrus.Fatal(leadership.Healthz()) } func getIgnoredNamespacesList(cmd *cobra.Command) (util.List, error) { @@ -209,12 +202,3 @@ func getIgnoredResourcesList(cmd *cobra.Command) (util.List, error) { return ignoredResourcesList, nil } - -func healthz(w http.ResponseWriter, req *http.Request) { - if healthy { - w.WriteHeader(http.StatusOK) - return - } - - w.WriteHeader(http.StatusInternalServerError) -} diff --git a/internal/pkg/handler/create.go b/internal/pkg/handler/create.go index e6cc41a..f6364c5 100644 --- a/internal/pkg/handler/create.go +++ b/internal/pkg/handler/create.go @@ -19,6 +19,7 @@ func (r ResourceCreatedHandler) Handle() error { logrus.Errorf("Resource creation handler received nil resource") } else { config, _ := r.GetConfig() + // process resource based on its type return doRollingUpgrade(config, r.Collectors) } return nil diff --git a/internal/pkg/handler/update.go b/internal/pkg/handler/update.go index 91f320a..4854151 100644 --- a/internal/pkg/handler/update.go +++ b/internal/pkg/handler/update.go @@ -21,6 +21,7 @@ func (r ResourceUpdatedHandler) Handle() error { } else { config, oldSHAData := r.GetConfig() if config.SHAValue != oldSHAData { + // process resource based on its type return doRollingUpgrade(config, r.Collectors) } } diff --git a/internal/pkg/leadership/leadership.go b/internal/pkg/leadership/leadership.go index cdc3766..0e8429b 100644 --- a/internal/pkg/leadership/leadership.go +++ b/internal/pkg/leadership/leadership.go @@ -2,23 +2,32 @@ package leadership import ( "context" + "net/http" "time" "github.com/sirupsen/logrus" "github.com/stakater/Reloader/internal/pkg/controller" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" + + coordinationv1 "k8s.io/client-go/kubernetes/typed/coordination/v1" ) -func GetNewLock(clientset *kubernetes.Clientset, lockName, podname, namespace string) *resourcelock.LeaseLock { +const healthPort string = ":9091" + +var ( + // Used for liveness probe + healthy bool = true +) + +func GetNewLock(client coordinationv1.CoordinationV1Interface, lockName, podname, namespace string) *resourcelock.LeaseLock { return &resourcelock.LeaseLock{ LeaseMeta: v1.ObjectMeta{ Name: lockName, Namespace: namespace, }, - Client: clientset.CoordinationV1(), + Client: client, LockConfig: resourcelock.ResourceLockConfig{ Identity: podname, }, @@ -26,7 +35,7 @@ func GetNewLock(clientset *kubernetes.Clientset, lockName, podname, namespace st } // runLeaderElection runs leadership election. If an instance of the controller is the leader and stops leading it will shutdown. -func RunLeaderElection(lock *resourcelock.LeaseLock, ctx context.Context, cancel context.CancelFunc, id string, controllers []*controller.Controller, health bool) { +func RunLeaderElection(lock *resourcelock.LeaseLock, ctx context.Context, cancel context.CancelFunc, id string, controllers []*controller.Controller) { // Construct channels for the controllers to use var stopChannels []chan struct{} for i := 0; i < len(controllers); i++ { @@ -49,7 +58,7 @@ func RunLeaderElection(lock *resourcelock.LeaseLock, ctx context.Context, cancel logrus.Info("no longer leader, shutting down") stopControllers(stopChannels) cancel() - health = false + healthy = false }, OnNewLeader: func(current_id string) { if current_id == id { @@ -74,3 +83,20 @@ func stopControllers(stopChannels []chan struct{}) { close(c) } } + +// Healthz serves the liveness probe endpoint. If leadership election is +// enabled and a replica stops leading the liveness probe will fail and the +// kubelet will restart the container. +func Healthz() error { + http.HandleFunc("/live", healthz) + return http.ListenAndServe(healthPort, nil) +} + +func healthz(w http.ResponseWriter, req *http.Request) { + if healthy { + w.Write([]byte("alive")) + return + } + + w.WriteHeader(http.StatusInternalServerError) +} diff --git a/internal/pkg/leadership/leadership_test.go b/internal/pkg/leadership/leadership_test.go new file mode 100644 index 0000000..2d64d0c --- /dev/null +++ b/internal/pkg/leadership/leadership_test.go @@ -0,0 +1,213 @@ +package leadership + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "os" + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/stakater/Reloader/internal/pkg/constants" + "github.com/stakater/Reloader/internal/pkg/controller" + "github.com/stakater/Reloader/internal/pkg/handler" + "github.com/stakater/Reloader/internal/pkg/metrics" + "github.com/stakater/Reloader/internal/pkg/options" + "github.com/stakater/Reloader/internal/pkg/testutil" + "github.com/stakater/Reloader/internal/pkg/util" + "github.com/stakater/Reloader/pkg/kube" +) + +func TestMain(m *testing.M) { + + testutil.CreateNamespace(testutil.Namespace, testutil.Clients.KubernetesClient) + + logrus.Infof("Running Testcases") + retCode := m.Run() + + testutil.DeleteNamespace(testutil.Namespace, testutil.Clients.KubernetesClient) + + os.Exit(retCode) +} + +func TestHealthz(t *testing.T) { + request, err := http.NewRequest(http.MethodGet, "/live", nil) + if err != nil { + t.Fatalf(("failed to create request")) + } + + response := httptest.NewRecorder() + + healthz(response, request) + got := response.Code + want := 200 + + if got != want { + t.Fatalf("got: %q, want: %q", got, want) + } + + // Have the liveness probe serve a 500 + healthy = false + + request, err = http.NewRequest(http.MethodGet, "/live", nil) + if err != nil { + t.Fatalf(("failed to create request")) + } + + response = httptest.NewRecorder() + + healthz(response, request) + got = response.Code + want = 500 + + if got != want { + t.Fatalf("got: %q, want: %q", got, want) + } +} + +// TestRunLeaderElection validates that the liveness endpoint serves 500 when +// leadership election fails +func TestRunLeaderElection(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + + lock := GetNewLock(testutil.Clients.KubernetesClient.CoordinationV1(), constants.LockName, testutil.Pod, testutil.Namespace) + + go RunLeaderElection(lock, ctx, cancel, testutil.Pod, []*controller.Controller{}) + + // Liveness probe should be serving OK + request, err := http.NewRequest(http.MethodGet, "/live", nil) + if err != nil { + t.Fatalf(("failed to create request")) + } + + response := httptest.NewRecorder() + + healthz(response, request) + got := response.Code + want := 500 + + if got != want { + t.Fatalf("got: %q, want: %q", got, want) + } + + // Cancel the leader election context, so leadership is released and + // live endpoint serves 500 + cancel() + + request, err = http.NewRequest(http.MethodGet, "/live", nil) + if err != nil { + t.Fatalf(("failed to create request")) + } + + response = httptest.NewRecorder() + + healthz(response, request) + got = response.Code + want = 500 + + if got != want { + t.Fatalf("got: %q, want: %q", got, want) + } +} + +// TestRunLeaderElectionWithControllers tests that leadership election works +// wiht real controllers and that on context cancellation the controllers stop +// running. +func TestRunLeaderElectionWithControllers(t *testing.T) { + t.Logf("Creating controller") + var controllers []*controller.Controller + for k := range kube.ResourceMap { + c, err := controller.NewController(testutil.Clients.KubernetesClient, k, testutil.Namespace, []string{}, metrics.NewCollectors()) + if err != nil { + logrus.Fatalf("%s", err) + } + + controllers = append(controllers, c) + } + time.Sleep(3 * time.Second) + + lock := GetNewLock(testutil.Clients.KubernetesClient.CoordinationV1(), fmt.Sprintf("%s-%d", constants.LockName, 1), testutil.Pod, testutil.Namespace) + + ctx, cancel := context.WithCancel(context.TODO()) + + // Start running leadership election, this also starts the controllers + go RunLeaderElection(lock, ctx, cancel, testutil.Pod, controllers) + time.Sleep(3 * time.Second) + + // Create some stuff and do a thing + configmapName := testutil.ConfigmapNamePrefix + "-update-" + testutil.RandSeq(5) + configmapClient, err := testutil.CreateConfigMap(testutil.Clients.KubernetesClient, testutil.Namespace, configmapName, "www.google.com") + if err != nil { + t.Fatalf("Error while creating the configmap %v", err) + } + + // Creating deployment + _, err = testutil.CreateDeployment(testutil.Clients.KubernetesClient, configmapName, testutil.Namespace, true) + if err != nil { + t.Fatalf("Error in deployment creation: %v", err) + } + + // Updating configmap for first time + updateErr := testutil.UpdateConfigMap(configmapClient, testutil.Namespace, configmapName, "", "www.stakater.com") + if updateErr != nil { + t.Fatalf("Configmap was not updated") + } + time.Sleep(3 * time.Second) + + // Verifying deployment update + logrus.Infof("Verifying pod envvars has been created") + shaData := testutil.ConvertResourceToSHA(testutil.ConfigmapResourceType, testutil.Namespace, configmapName, "www.stakater.com") + config := util.Config{ + Namespace: testutil.Namespace, + ResourceName: configmapName, + SHAValue: shaData, + Annotation: options.ConfigmapUpdateOnChangeAnnotation, + } + deploymentFuncs := handler.GetDeploymentRollingUpgradeFuncs() + updated := testutil.VerifyResourceEnvVarUpdate(testutil.Clients, config, constants.ConfigmapEnvVarPostfix, deploymentFuncs) + if !updated { + t.Fatalf("Deployment was not updated") + } + time.Sleep(testutil.SleepDuration) + + // Cancel the leader election context, so leadership is released + logrus.Info("shutting down controller from test") + cancel() + time.Sleep(5 * time.Second) + + // Updating configmap again + updateErr = testutil.UpdateConfigMap(configmapClient, testutil.Namespace, configmapName, "", "www.stakater.com/new") + if updateErr != nil { + t.Fatalf("Configmap was not updated") + } + + // Verifying that the deployment was not updated as leadership has been lost + logrus.Infof("Verifying pod envvars has not been updated") + shaData = testutil.ConvertResourceToSHA(testutil.ConfigmapResourceType, testutil.Namespace, configmapName, "www.stakater.com/new") + config = util.Config{ + Namespace: testutil.Namespace, + ResourceName: configmapName, + SHAValue: shaData, + Annotation: options.ConfigmapUpdateOnChangeAnnotation, + } + deploymentFuncs = handler.GetDeploymentRollingUpgradeFuncs() + updated = testutil.VerifyResourceEnvVarUpdate(testutil.Clients, config, constants.ConfigmapEnvVarPostfix, deploymentFuncs) + if updated { + t.Fatalf("Deployment was updated") + } + + // Deleting deployment + err = testutil.DeleteDeployment(testutil.Clients.KubernetesClient, testutil.Namespace, configmapName) + if err != nil { + logrus.Errorf("Error while deleting the deployment %v", err) + } + + // Deleting configmap + err = testutil.DeleteConfigMap(testutil.Clients.KubernetesClient, testutil.Namespace, configmapName) + if err != nil { + logrus.Errorf("Error while deleting the configmap %v", err) + } + time.Sleep(testutil.SleepDuration) +} diff --git a/internal/pkg/testutil/kube.go b/internal/pkg/testutil/kube.go index 7397aec..35c969f 100644 --- a/internal/pkg/testutil/kube.go +++ b/internal/pkg/testutil/kube.go @@ -16,6 +16,7 @@ import ( "github.com/stakater/Reloader/internal/pkg/callbacks" "github.com/stakater/Reloader/internal/pkg/constants" "github.com/stakater/Reloader/internal/pkg/crypto" + "github.com/stakater/Reloader/internal/pkg/metrics" "github.com/stakater/Reloader/internal/pkg/options" "github.com/stakater/Reloader/internal/pkg/util" "github.com/stakater/Reloader/pkg/kube" @@ -34,6 +35,19 @@ var ( SecretResourceType = "secrets" ) +var ( + Clients = kube.GetClients() + Pod = "test-reloader-" + RandSeq(5) + Namespace = "test-reloader-" + RandSeq(5) + ConfigmapNamePrefix = "testconfigmap-reloader" + SecretNamePrefix = "testsecret-reloader" + Data = "dGVzdFNlY3JldEVuY29kaW5nRm9yUmVsb2FkZXI=" + NewData = "dGVzdE5ld1NlY3JldEVuY29kaW5nRm9yUmVsb2FkZXI=" + UpdatedData = "dGVzdFVwZGF0ZWRTZWNyZXRFbmNvZGluZ0ZvclJlbG9hZGVy" + Collectors = metrics.NewCollectors() + SleepDuration = 3 * time.Second +) + // CreateNamespace creates namespace for testing func CreateNamespace(namespace string, client kubernetes.Interface) { _, err := client.CoreV1().Namespaces().Create(context.TODO(), &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, metav1.CreateOptions{})