diff --git a/internal/pkg/cmd/reloader.go b/internal/pkg/cmd/reloader.go index 18f0999..9810583 100644 --- a/internal/pkg/cmd/reloader.go +++ b/internal/pkg/cmd/reloader.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "net/http" "os" "strings" @@ -21,11 +20,6 @@ import ( v1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) -var ( - // Used for liveness probe - healthy bool = true -) - // NewReloaderCommand starts the reloader controller func NewReloaderCommand() *cobra.Command { cmd := &cobra.Command{ @@ -167,14 +161,13 @@ func startReloader(cmd *cobra.Command, args []string) { // Run leadership election if options.EnableHA { podName, podNamespace := getHAEnvs() - lock := leadership.GetNewLock(clientset, constants.LockName, podName, podNamespace) + lock := leadership.GetNewLock(clientset.CoordinationV1(), constants.LockName, podName, podNamespace) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - leadership.RunLeaderElection(lock, ctx, cancel, podName, controllers, healthy) + leadership.RunLeaderElection(lock, ctx, cancel, podName, controllers) } - http.HandleFunc("/live", healthz) - logrus.Fatal(http.ListenAndServe(":8080", nil)) + logrus.Fatal(leadership.Healthz()) } func getIgnoredNamespacesList(cmd *cobra.Command) (util.List, error) { @@ -209,12 +202,3 @@ func getIgnoredResourcesList(cmd *cobra.Command) (util.List, error) { return ignoredResourcesList, nil } - -func healthz(w http.ResponseWriter, req *http.Request) { - if healthy { - w.WriteHeader(http.StatusOK) - return - } - - w.WriteHeader(http.StatusInternalServerError) -} diff --git a/internal/pkg/handler/create.go b/internal/pkg/handler/create.go index e6cc41a..f6364c5 100644 --- a/internal/pkg/handler/create.go +++ b/internal/pkg/handler/create.go @@ -19,6 +19,7 @@ func (r ResourceCreatedHandler) Handle() error { logrus.Errorf("Resource creation handler received nil resource") } else { config, _ := r.GetConfig() + // process resource based on its type return doRollingUpgrade(config, r.Collectors) } return nil diff --git a/internal/pkg/handler/update.go b/internal/pkg/handler/update.go index 91f320a..4854151 100644 --- a/internal/pkg/handler/update.go +++ b/internal/pkg/handler/update.go @@ -21,6 +21,7 @@ func (r ResourceUpdatedHandler) Handle() error { } else { config, oldSHAData := r.GetConfig() if config.SHAValue != oldSHAData { + // process resource based on its type return doRollingUpgrade(config, r.Collectors) } } diff --git a/internal/pkg/leadership/leadership.go b/internal/pkg/leadership/leadership.go index cdc3766..0e8429b 100644 --- a/internal/pkg/leadership/leadership.go +++ b/internal/pkg/leadership/leadership.go @@ -2,23 +2,32 @@ package leadership import ( "context" + "net/http" "time" "github.com/sirupsen/logrus" "github.com/stakater/Reloader/internal/pkg/controller" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" + + coordinationv1 "k8s.io/client-go/kubernetes/typed/coordination/v1" ) -func GetNewLock(clientset *kubernetes.Clientset, lockName, podname, namespace string) *resourcelock.LeaseLock { +const healthPort string = ":9091" + +var ( + // Used for liveness probe + healthy bool = true +) + +func GetNewLock(client coordinationv1.CoordinationV1Interface, lockName, podname, namespace string) *resourcelock.LeaseLock { return &resourcelock.LeaseLock{ LeaseMeta: v1.ObjectMeta{ Name: lockName, Namespace: namespace, }, - Client: clientset.CoordinationV1(), + Client: client, LockConfig: resourcelock.ResourceLockConfig{ Identity: podname, }, @@ -26,7 +35,7 @@ func GetNewLock(clientset *kubernetes.Clientset, lockName, podname, namespace st } // runLeaderElection runs leadership election. If an instance of the controller is the leader and stops leading it will shutdown. -func RunLeaderElection(lock *resourcelock.LeaseLock, ctx context.Context, cancel context.CancelFunc, id string, controllers []*controller.Controller, health bool) { +func RunLeaderElection(lock *resourcelock.LeaseLock, ctx context.Context, cancel context.CancelFunc, id string, controllers []*controller.Controller) { // Construct channels for the controllers to use var stopChannels []chan struct{} for i := 0; i < len(controllers); i++ { @@ -49,7 +58,7 @@ func RunLeaderElection(lock *resourcelock.LeaseLock, ctx context.Context, cancel logrus.Info("no longer leader, shutting down") stopControllers(stopChannels) cancel() - health = false + healthy = false }, OnNewLeader: func(current_id string) { if current_id == id { @@ -74,3 +83,20 @@ func stopControllers(stopChannels []chan struct{}) { close(c) } } + +// Healthz serves the liveness probe endpoint. If leadership election is +// enabled and a replica stops leading the liveness probe will fail and the +// kubelet will restart the container. +func Healthz() error { + http.HandleFunc("/live", healthz) + return http.ListenAndServe(healthPort, nil) +} + +func healthz(w http.ResponseWriter, req *http.Request) { + if healthy { + w.Write([]byte("alive")) + return + } + + w.WriteHeader(http.StatusInternalServerError) +} diff --git a/internal/pkg/leadership/leadership_test.go b/internal/pkg/leadership/leadership_test.go new file mode 100644 index 0000000..2d64d0c --- /dev/null +++ b/internal/pkg/leadership/leadership_test.go @@ -0,0 +1,213 @@ +package leadership + +import ( + "context" + "fmt" + "net/http" + "net/http/httptest" + "os" + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/stakater/Reloader/internal/pkg/constants" + "github.com/stakater/Reloader/internal/pkg/controller" + "github.com/stakater/Reloader/internal/pkg/handler" + "github.com/stakater/Reloader/internal/pkg/metrics" + "github.com/stakater/Reloader/internal/pkg/options" + "github.com/stakater/Reloader/internal/pkg/testutil" + "github.com/stakater/Reloader/internal/pkg/util" + "github.com/stakater/Reloader/pkg/kube" +) + +func TestMain(m *testing.M) { + + testutil.CreateNamespace(testutil.Namespace, testutil.Clients.KubernetesClient) + + logrus.Infof("Running Testcases") + retCode := m.Run() + + testutil.DeleteNamespace(testutil.Namespace, testutil.Clients.KubernetesClient) + + os.Exit(retCode) +} + +func TestHealthz(t *testing.T) { + request, err := http.NewRequest(http.MethodGet, "/live", nil) + if err != nil { + t.Fatalf(("failed to create request")) + } + + response := httptest.NewRecorder() + + healthz(response, request) + got := response.Code + want := 200 + + if got != want { + t.Fatalf("got: %q, want: %q", got, want) + } + + // Have the liveness probe serve a 500 + healthy = false + + request, err = http.NewRequest(http.MethodGet, "/live", nil) + if err != nil { + t.Fatalf(("failed to create request")) + } + + response = httptest.NewRecorder() + + healthz(response, request) + got = response.Code + want = 500 + + if got != want { + t.Fatalf("got: %q, want: %q", got, want) + } +} + +// TestRunLeaderElection validates that the liveness endpoint serves 500 when +// leadership election fails +func TestRunLeaderElection(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + + lock := GetNewLock(testutil.Clients.KubernetesClient.CoordinationV1(), constants.LockName, testutil.Pod, testutil.Namespace) + + go RunLeaderElection(lock, ctx, cancel, testutil.Pod, []*controller.Controller{}) + + // Liveness probe should be serving OK + request, err := http.NewRequest(http.MethodGet, "/live", nil) + if err != nil { + t.Fatalf(("failed to create request")) + } + + response := httptest.NewRecorder() + + healthz(response, request) + got := response.Code + want := 500 + + if got != want { + t.Fatalf("got: %q, want: %q", got, want) + } + + // Cancel the leader election context, so leadership is released and + // live endpoint serves 500 + cancel() + + request, err = http.NewRequest(http.MethodGet, "/live", nil) + if err != nil { + t.Fatalf(("failed to create request")) + } + + response = httptest.NewRecorder() + + healthz(response, request) + got = response.Code + want = 500 + + if got != want { + t.Fatalf("got: %q, want: %q", got, want) + } +} + +// TestRunLeaderElectionWithControllers tests that leadership election works +// wiht real controllers and that on context cancellation the controllers stop +// running. +func TestRunLeaderElectionWithControllers(t *testing.T) { + t.Logf("Creating controller") + var controllers []*controller.Controller + for k := range kube.ResourceMap { + c, err := controller.NewController(testutil.Clients.KubernetesClient, k, testutil.Namespace, []string{}, metrics.NewCollectors()) + if err != nil { + logrus.Fatalf("%s", err) + } + + controllers = append(controllers, c) + } + time.Sleep(3 * time.Second) + + lock := GetNewLock(testutil.Clients.KubernetesClient.CoordinationV1(), fmt.Sprintf("%s-%d", constants.LockName, 1), testutil.Pod, testutil.Namespace) + + ctx, cancel := context.WithCancel(context.TODO()) + + // Start running leadership election, this also starts the controllers + go RunLeaderElection(lock, ctx, cancel, testutil.Pod, controllers) + time.Sleep(3 * time.Second) + + // Create some stuff and do a thing + configmapName := testutil.ConfigmapNamePrefix + "-update-" + testutil.RandSeq(5) + configmapClient, err := testutil.CreateConfigMap(testutil.Clients.KubernetesClient, testutil.Namespace, configmapName, "www.google.com") + if err != nil { + t.Fatalf("Error while creating the configmap %v", err) + } + + // Creating deployment + _, err = testutil.CreateDeployment(testutil.Clients.KubernetesClient, configmapName, testutil.Namespace, true) + if err != nil { + t.Fatalf("Error in deployment creation: %v", err) + } + + // Updating configmap for first time + updateErr := testutil.UpdateConfigMap(configmapClient, testutil.Namespace, configmapName, "", "www.stakater.com") + if updateErr != nil { + t.Fatalf("Configmap was not updated") + } + time.Sleep(3 * time.Second) + + // Verifying deployment update + logrus.Infof("Verifying pod envvars has been created") + shaData := testutil.ConvertResourceToSHA(testutil.ConfigmapResourceType, testutil.Namespace, configmapName, "www.stakater.com") + config := util.Config{ + Namespace: testutil.Namespace, + ResourceName: configmapName, + SHAValue: shaData, + Annotation: options.ConfigmapUpdateOnChangeAnnotation, + } + deploymentFuncs := handler.GetDeploymentRollingUpgradeFuncs() + updated := testutil.VerifyResourceEnvVarUpdate(testutil.Clients, config, constants.ConfigmapEnvVarPostfix, deploymentFuncs) + if !updated { + t.Fatalf("Deployment was not updated") + } + time.Sleep(testutil.SleepDuration) + + // Cancel the leader election context, so leadership is released + logrus.Info("shutting down controller from test") + cancel() + time.Sleep(5 * time.Second) + + // Updating configmap again + updateErr = testutil.UpdateConfigMap(configmapClient, testutil.Namespace, configmapName, "", "www.stakater.com/new") + if updateErr != nil { + t.Fatalf("Configmap was not updated") + } + + // Verifying that the deployment was not updated as leadership has been lost + logrus.Infof("Verifying pod envvars has not been updated") + shaData = testutil.ConvertResourceToSHA(testutil.ConfigmapResourceType, testutil.Namespace, configmapName, "www.stakater.com/new") + config = util.Config{ + Namespace: testutil.Namespace, + ResourceName: configmapName, + SHAValue: shaData, + Annotation: options.ConfigmapUpdateOnChangeAnnotation, + } + deploymentFuncs = handler.GetDeploymentRollingUpgradeFuncs() + updated = testutil.VerifyResourceEnvVarUpdate(testutil.Clients, config, constants.ConfigmapEnvVarPostfix, deploymentFuncs) + if updated { + t.Fatalf("Deployment was updated") + } + + // Deleting deployment + err = testutil.DeleteDeployment(testutil.Clients.KubernetesClient, testutil.Namespace, configmapName) + if err != nil { + logrus.Errorf("Error while deleting the deployment %v", err) + } + + // Deleting configmap + err = testutil.DeleteConfigMap(testutil.Clients.KubernetesClient, testutil.Namespace, configmapName) + if err != nil { + logrus.Errorf("Error while deleting the configmap %v", err) + } + time.Sleep(testutil.SleepDuration) +} diff --git a/internal/pkg/testutil/kube.go b/internal/pkg/testutil/kube.go index 7397aec..35c969f 100644 --- a/internal/pkg/testutil/kube.go +++ b/internal/pkg/testutil/kube.go @@ -16,6 +16,7 @@ import ( "github.com/stakater/Reloader/internal/pkg/callbacks" "github.com/stakater/Reloader/internal/pkg/constants" "github.com/stakater/Reloader/internal/pkg/crypto" + "github.com/stakater/Reloader/internal/pkg/metrics" "github.com/stakater/Reloader/internal/pkg/options" "github.com/stakater/Reloader/internal/pkg/util" "github.com/stakater/Reloader/pkg/kube" @@ -34,6 +35,19 @@ var ( SecretResourceType = "secrets" ) +var ( + Clients = kube.GetClients() + Pod = "test-reloader-" + RandSeq(5) + Namespace = "test-reloader-" + RandSeq(5) + ConfigmapNamePrefix = "testconfigmap-reloader" + SecretNamePrefix = "testsecret-reloader" + Data = "dGVzdFNlY3JldEVuY29kaW5nRm9yUmVsb2FkZXI=" + NewData = "dGVzdE5ld1NlY3JldEVuY29kaW5nRm9yUmVsb2FkZXI=" + UpdatedData = "dGVzdFVwZGF0ZWRTZWNyZXRFbmNvZGluZ0ZvclJlbG9hZGVy" + Collectors = metrics.NewCollectors() + SleepDuration = 3 * time.Second +) + // CreateNamespace creates namespace for testing func CreateNamespace(namespace string, client kubernetes.Interface) { _, err := client.CoreV1().Namespaces().Create(context.TODO(), &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, metav1.CreateOptions{})