diff --git a/internal/pkg/handler/pause_deployment.go b/internal/pkg/handler/pause_deployment.go new file mode 100644 index 0000000..28d1b9e --- /dev/null +++ b/internal/pkg/handler/pause_deployment.go @@ -0,0 +1,242 @@ +package handler + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/sirupsen/logrus" + "github.com/stakater/Reloader/internal/pkg/options" + "github.com/stakater/Reloader/pkg/kube" + app "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + patchtypes "k8s.io/apimachinery/pkg/types" +) + +// Keeps track of currently active timers +var activeTimers = make(map[string]*time.Timer) + +// Returns unique key for the activeTimers map +func getTimerKey(namespace, deploymentName string) string { + return fmt.Sprintf("%s/%s", namespace, deploymentName) +} + +// Checks if a deployment is currently paused +func IsPaused(deployment *app.Deployment) bool { + return deployment.Spec.Paused +} + +// Deployment paused by reloader ? +func IsPausedByReloader(deployment *app.Deployment) bool { + if IsPaused(deployment) { + pausedAtAnnotationValue := deployment.Annotations[options.PauseDeploymentTimeAnnotation] + return pausedAtAnnotationValue != "" + } + return false +} + +// Returns the time, the deployment was paused by reloader, nil otherwise +func GetPauseStartTime(deployment *app.Deployment) (*time.Time, error) { + if !IsPausedByReloader(deployment) { + return nil, nil + } + + pausedAtStr := deployment.Annotations[options.PauseDeploymentTimeAnnotation] + parsedTime, err := time.Parse(time.RFC3339, pausedAtStr) + if err != nil { + return nil, err + } + + return &parsedTime, nil +} + +// ParsePauseDuration parses the pause interval value and returns a time.Duration +func ParsePauseDuration(pauseIntervalValue string) (time.Duration, error) { + pauseDuration, err := time.ParseDuration(pauseIntervalValue) + if err != nil { + logrus.Warnf("Failed to parse pause interval value '%s': %v", pauseIntervalValue, err) + return 0, err + } + return pauseDuration, nil +} + +// Pauses a deployment for a specified duration and creates a timer to resume it +// after the specified duration +func PauseDeployment(deployment *app.Deployment, clients kube.Clients, namespace, pauseIntervalValue string) (*app.Deployment, error) { + deploymentName := deployment.Name + pauseDuration, err := ParsePauseDuration(pauseIntervalValue) + + if err != nil { + return nil, err + } + + if !IsPaused(deployment) { + logrus.Infof("Pausing Deployment '%s' in namespace '%s' for %s", deploymentName, namespace, pauseDuration) + + deploymentFuncs := GetDeploymentRollingUpgradeFuncs() + + pausePatch, err := CreatePausePatch() + if err != nil { + logrus.Errorf("Failed to create pause patch for deployment '%s': %v", deploymentName, err) + return deployment, err + } + + err = deploymentFuncs.PatchFunc(clients, namespace, deployment, patchtypes.StrategicMergePatchType, pausePatch) + + if err != nil { + logrus.Errorf("Failed to patch deployment '%s' in namespace '%s': %v", deploymentName, namespace, err) + return deployment, err + } + + updatedDeployment, err := clients.KubernetesClient.AppsV1().Deployments(namespace).Get(context.TODO(), deploymentName, metav1.GetOptions{}) + + CreateResumeTimer(deployment, clients, namespace, pauseDuration) + return updatedDeployment, err + } + + if !IsPausedByReloader(deployment) { + logrus.Infof("Deployment '%s' in namespace '%s' already paused", deploymentName, namespace) + return deployment, nil + } + + // Deployment has already been paused by reloader, check for timer + logrus.Debugf("Deployment '%s' in namespace '%s' is already paused by reloader", deploymentName, namespace) + + timerKey := getTimerKey(namespace, deploymentName) + _, timerExists := activeTimers[timerKey] + + if !timerExists { + logrus.Warnf("Timer does not exist for already paused deployment '%s' in namespace '%s', creating new one", + deploymentName, namespace) + HandleMissingTimer(deployment, pauseDuration, clients, namespace) + } + return deployment, nil +} + +// Handles the case where missing timers for deployments that have been paused by reloader. +// Could occur after new leader election or reloader restart +func HandleMissingTimer(deployment *app.Deployment, pauseDuration time.Duration, clients kube.Clients, namespace string) { + deploymentName := deployment.Name + pauseStartTime, err := GetPauseStartTime(deployment) + if err != nil { + logrus.Errorf("Error parsing pause start time for deployment '%s' in namespace '%s': %v. Resuming deployment immediately", + deploymentName, namespace, err) + ResumeDeployment(deployment, namespace, clients) + return + } + + if pauseStartTime == nil { + return + } + + elapsedPauseTime := time.Since(*pauseStartTime) + remainingPauseTime := pauseDuration - elapsedPauseTime + + if remainingPauseTime <= 0 { + logrus.Infof("Pause period for deployment '%s' in namespace '%s' has expired. Resuming immediately", + deploymentName, namespace) + ResumeDeployment(deployment, namespace, clients) + return + } + + logrus.Infof("Creating missing timer for already paused deployment '%s' in namespace '%s' with remaining time %s", + deploymentName, namespace, remainingPauseTime) + CreateResumeTimer(deployment, clients, namespace, remainingPauseTime) +} + +// CreateResumeTimer creates a timer to resume the deployment after the specified duration +func CreateResumeTimer(deployment *app.Deployment, clients kube.Clients, namespace string, pauseDuration time.Duration) { + deploymentName := deployment.Name + timerKey := getTimerKey(namespace, deployment.Name) + + // Check if there's an existing timer for this deployment + if _, exists := activeTimers[timerKey]; exists { + logrus.Debugf("Timer already exists for deployment '%s' in namespace '%s', Skipping creation", + deploymentName, namespace) + return + } + + // Create and store the new timer + timer := time.AfterFunc(pauseDuration, func() { + ResumeDeployment(deployment, namespace, clients) + }) + + // Add the new timer to the map + activeTimers[timerKey] = timer + + logrus.Debugf("Created pause timer for deployment '%s' in namespace '%s' with duration %s", + deploymentName, namespace, pauseDuration) +} + +// ResumeDeployment resumes a deployment that has been paused by reloader +func ResumeDeployment(deployment *app.Deployment, namespace string, clients kube.Clients) { + deploymentName := deployment.Name + + currentDeployment, err := clients.KubernetesClient.AppsV1().Deployments(namespace).Get(context.TODO(), deploymentName, metav1.GetOptions{}) + + if err != nil { + logrus.Errorf("Failed to get deployment '%s' in namespace '%s': %v", deploymentName, namespace, err) + return + } + + if !IsPausedByReloader(currentDeployment) { + logrus.Infof("Deployment '%s' in namespace '%s' not paused by Reloader. Skipping resume", deploymentName, namespace) + return + } + + deploymentFuncs := GetDeploymentRollingUpgradeFuncs() + + resumePatch, err := CreateResumePatch() + if err != nil { + logrus.Errorf("Failed to create resume patch for deployment '%s': %v", deploymentName, err) + return + } + + // Remove the timer + timerKey := getTimerKey(namespace, deploymentName) + if timer, exists := activeTimers[timerKey]; exists { + timer.Stop() + delete(activeTimers, timerKey) + logrus.Debugf("Removed pause timer for deployment '%s' in namespace '%s'", deploymentName, namespace) + } + + err = deploymentFuncs.PatchFunc(clients, namespace, currentDeployment, patchtypes.StrategicMergePatchType, resumePatch) + + if err != nil { + logrus.Errorf("Failed to resume deployment '%s' in namespace '%s': %v", deploymentName, namespace, err) + return + } + + logrus.Infof("Successfully resumed deployment '%s' in namespace '%s'", deploymentName, namespace) +} + +func CreatePausePatch() ([]byte, error) { + patchData := map[string]interface{}{ + "spec": map[string]interface{}{ + "paused": true, + }, + "metadata": map[string]interface{}{ + "annotations": map[string]string{ + options.PauseDeploymentTimeAnnotation: time.Now().Format(time.RFC3339), + }, + }, + } + + return json.Marshal(patchData) +} + +func CreateResumePatch() ([]byte, error) { + patchData := map[string]interface{}{ + "spec": map[string]interface{}{ + "paused": false, + }, + "metadata": map[string]interface{}{ + "annotations": map[string]interface{}{ + options.PauseDeploymentTimeAnnotation: nil, + }, + }, + } + + return json.Marshal(patchData) +} diff --git a/internal/pkg/handler/pause_deployment_test.go b/internal/pkg/handler/pause_deployment_test.go new file mode 100644 index 0000000..076b0b2 --- /dev/null +++ b/internal/pkg/handler/pause_deployment_test.go @@ -0,0 +1,392 @@ +package handler + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stakater/Reloader/internal/pkg/options" + "github.com/stakater/Reloader/pkg/kube" + "github.com/stretchr/testify/assert" + app "k8s.io/api/apps/v1" + appsv1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + testclient "k8s.io/client-go/kubernetes/fake" +) + +func TestIsPaused(t *testing.T) { + tests := []struct { + name string + deployment *appsv1.Deployment + paused bool + }{ + { + name: "paused deployment", + deployment: &appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Paused: true, + }, + }, + paused: true, + }, + { + name: "unpaused deployment", + deployment: &appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Paused: false, + }, + }, + paused: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + result := IsPaused(test.deployment) + assert.Equal(t, test.paused, result) + }) + } +} + +func TestIsPausedByReloader(t *testing.T) { + tests := []struct { + name string + deployment *appsv1.Deployment + pausedByReloader bool + }{ + { + name: "paused by reloader", + deployment: &appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Paused: true, + }, + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + options.PauseDeploymentTimeAnnotation: time.Now().Format(time.RFC3339), + }, + }, + }, + pausedByReloader: true, + }, + { + name: "not paused by reloader", + deployment: &appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Paused: true, + }, + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{}, + }, + }, + pausedByReloader: false, + }, + { + name: "not paused", + deployment: &appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Paused: false, + }, + }, + pausedByReloader: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + pausedByReloader := IsPausedByReloader(test.deployment) + assert.Equal(t, test.pausedByReloader, pausedByReloader) + }) + } +} + +func TestGetPauseStartTime(t *testing.T) { + now := time.Now() + nowStr := now.Format(time.RFC3339) + + tests := []struct { + name string + deployment *appsv1.Deployment + pausedByReloader bool + expectedStartTime time.Time + }{ + { + name: "valid pause time", + deployment: &appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Paused: true, + }, + ObjectMeta: metav1.ObjectMeta{ + Annotations: map[string]string{ + options.PauseDeploymentTimeAnnotation: nowStr, + }, + }, + }, + pausedByReloader: true, + expectedStartTime: now, + }, + { + name: "not paused by reloader", + deployment: &appsv1.Deployment{ + Spec: appsv1.DeploymentSpec{ + Paused: false, + }, + }, + pausedByReloader: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + actualStartTime, err := GetPauseStartTime(test.deployment) + + assert.NoError(t, err) + + if !test.pausedByReloader { + assert.Nil(t, actualStartTime) + } else { + assert.NotNil(t, actualStartTime) + assert.WithinDuration(t, test.expectedStartTime, *actualStartTime, time.Second) + } + }) + } +} + +func TestParsePauseDuration(t *testing.T) { + tests := []struct { + name string + pauseIntervalValue string + expectedDuration time.Duration + invalidDuration bool + }{ + { + name: "valid duration", + pauseIntervalValue: "10s", + expectedDuration: 10 * time.Second, + invalidDuration: false, + }, + { + name: "valid minute duration", + pauseIntervalValue: "2m", + expectedDuration: 2 * time.Minute, + invalidDuration: false, + }, + { + name: "invalid duration", + pauseIntervalValue: "invalid", + expectedDuration: 0, + invalidDuration: true, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + actualDuration, err := ParsePauseDuration(test.pauseIntervalValue) + + if test.invalidDuration { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.Equal(t, test.expectedDuration, actualDuration) + } + }) + } +} + +func TestHandleMissingTimerSimple(t *testing.T) { + tests := []struct { + name string + deployment *appsv1.Deployment + shouldBePaused bool // Should be unpaused after HandleMissingTimer ? + }{ + { + name: "deployment paused by reloader, pause period has expired and no timer", + deployment: &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment-1", + Annotations: map[string]string{ + options.PauseDeploymentTimeAnnotation: time.Now().Add(-6 * time.Minute).Format(time.RFC3339), + options.PauseDeploymentAnnotation: "5m", + }, + }, + Spec: appsv1.DeploymentSpec{ + Paused: true, + }, + }, + shouldBePaused: false, + }, + { + name: "deployment paused by reloader, pause period expires in the future and no timer", + deployment: &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment-2", + Annotations: map[string]string{ + options.PauseDeploymentTimeAnnotation: time.Now().Add(1 * time.Minute).Format(time.RFC3339), + options.PauseDeploymentAnnotation: "5m", + }, + }, + Spec: appsv1.DeploymentSpec{ + Paused: true, + }, + }, + shouldBePaused: true, + }, + } + + for _, test := range tests { + // Clean up any timers at the end of the test + defer func() { + for key, timer := range activeTimers { + timer.Stop() + delete(activeTimers, key) + } + }() + + t.Run(test.name, func(t *testing.T) { + fakeClient := testclient.NewSimpleClientset() + clients := kube.Clients{ + KubernetesClient: fakeClient, + } + + _, err := fakeClient.AppsV1().Deployments("default").Create( + context.TODO(), + test.deployment, + metav1.CreateOptions{}) + assert.NoError(t, err, "Expected no error when creating deployment") + + pauseDuration, _ := ParsePauseDuration(test.deployment.Annotations[options.PauseDeploymentAnnotation]) + HandleMissingTimer(test.deployment, pauseDuration, clients, "default") + + updatedDeployment, _ := fakeClient.AppsV1().Deployments("default").Get(context.TODO(), test.deployment.Name, metav1.GetOptions{}) + + assert.Equal(t, test.shouldBePaused, updatedDeployment.Spec.Paused, + "Deployment should have correct paused state after timer expiration") + + if test.shouldBePaused { + pausedAtAnnotationValue := updatedDeployment.Annotations[options.PauseDeploymentTimeAnnotation] + assert.NotEmpty(t, pausedAtAnnotationValue, + "Pause annotation should be present and contain a value when deployment is paused") + } + }) + } +} + +func TestPauseDeployment(t *testing.T) { + tests := []struct { + name string + deployment *appsv1.Deployment + expectedError bool + expectedPaused bool + expectedAnnotation bool // Should have pause time annotation + pauseInterval string + }{ + { + name: "deployment without pause annotation", + deployment: &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment", + Annotations: map[string]string{}, + }, + Spec: appsv1.DeploymentSpec{ + Paused: false, + }, + }, + expectedError: true, + expectedPaused: false, + expectedAnnotation: false, + pauseInterval: "", + }, + { + name: "deployment already paused but not by reloader", + deployment: &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment", + Annotations: map[string]string{ + options.PauseDeploymentAnnotation: "5m", + }, + }, + Spec: appsv1.DeploymentSpec{ + Paused: true, + }, + }, + expectedError: false, + expectedPaused: true, + expectedAnnotation: false, + pauseInterval: "5m", + }, + { + name: "deployment unpaused that needs to be paused by reloader", + deployment: &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-deployment-3", + Annotations: map[string]string{ + options.PauseDeploymentAnnotation: "5m", + }, + }, + Spec: appsv1.DeploymentSpec{ + Paused: false, + }, + }, + expectedError: false, + expectedPaused: true, + expectedAnnotation: true, + pauseInterval: "5m", + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + fakeClient := testclient.NewSimpleClientset() + clients := kube.Clients{ + KubernetesClient: fakeClient, + } + + _, err := fakeClient.AppsV1().Deployments("default").Create( + context.TODO(), + test.deployment, + metav1.CreateOptions{}) + assert.NoError(t, err, "Expected no error when creating deployment") + + updatedDeployment, err := PauseDeployment(test.deployment, clients, "default", test.pauseInterval) + if test.expectedError { + assert.Error(t, err, "Expected an error pausing the deployment") + return + } else { + assert.NoError(t, err, "Expected no error pausing the deployment") + } + + assert.Equal(t, test.expectedPaused, updatedDeployment.Spec.Paused, + "Deployment should have correct paused state after pause") + + if test.expectedAnnotation { + pausedAtAnnotationValue := updatedDeployment.Annotations[options.PauseDeploymentTimeAnnotation] + assert.NotEmpty(t, pausedAtAnnotationValue, + "Pause annotation should be present and contain a value when deployment is paused") + } else { + pausedAtAnnotationValue := updatedDeployment.Annotations[options.PauseDeploymentTimeAnnotation] + assert.Empty(t, pausedAtAnnotationValue, + "Pause annotation should not be present when deployment has not been paused by reloader") + } + }) + } +} + +// Simple helper function for test cases +func FindDeploymentByName(deployments []runtime.Object, deploymentName string) (*app.Deployment, error) { + for _, deployment := range deployments { + accessor, err := meta.Accessor(deployment) + if err != nil { + return nil, fmt.Errorf("error getting accessor for item: %v", err) + } + if accessor.GetName() == deploymentName { + deploymentObj, ok := deployment.(*app.Deployment) + if !ok { + return nil, fmt.Errorf("failed to cast to Deployment") + } + return deploymentObj, nil + } + } + return nil, fmt.Errorf("deployment '%s' not found", deploymentName) +} diff --git a/internal/pkg/handler/upgrade.go b/internal/pkg/handler/upgrade.go index ecb4aae..67ef778 100644 --- a/internal/pkg/handler/upgrade.go +++ b/internal/pkg/handler/upgrade.go @@ -21,6 +21,7 @@ import ( "github.com/stakater/Reloader/internal/pkg/options" "github.com/stakater/Reloader/internal/pkg/util" "github.com/stakater/Reloader/pkg/kube" + app "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" @@ -277,6 +278,7 @@ func upgradeResource(clients kube.Clients, config util.Config, upgradeFuncs call typedAutoAnnotationEnabledValue, foundTypedAuto := annotations[config.TypedAutoAnnotation] excludeConfigmapAnnotationValue, foundExcludeConfigmap := annotations[options.ConfigmapExcludeReloaderAnnotation] excludeSecretAnnotationValue, foundExcludeSecret := annotations[options.SecretExcludeReloaderAnnotation] + pauseInterval, foundPauseInterval := annotations[options.PauseDeploymentAnnotation] if !found && !foundAuto && !foundTypedAuto && !foundSearchAnn { annotations = upgradeFuncs.PodAnnotationsFunc(resource) @@ -331,6 +333,18 @@ func upgradeResource(clients kube.Clients, config util.Config, upgradeFuncs call } } if strategyResult.Result == constants.Updated { + if foundPauseInterval { + deployment, ok := resource.(*app.Deployment) + if !ok { + logrus.Warnf("Annotation '%s' only applicable for deployments", options.PauseDeploymentAnnotation) + } else { + _, err = PauseDeployment(deployment, clients, config.Namespace, pauseInterval) + if err != nil { + logrus.Errorf("Failed to pause deployment '%s' in namespace '%s': %v", resourceName, config.Namespace, err) + return err + } + } + } var err error if upgradeFuncs.SupportsPatch && strategyResult.Patch != nil { err = upgradeFuncs.PatchFunc(clients, config.Namespace, resource, strategyResult.Patch.Type, strategyResult.Patch.Bytes) diff --git a/internal/pkg/handler/upgrade_test.go b/internal/pkg/handler/upgrade_test.go index 5c77cae..61d8aeb 100644 --- a/internal/pkg/handler/upgrade_test.go +++ b/internal/pkg/handler/upgrade_test.go @@ -54,6 +54,7 @@ var ( arsConfigmapWithExcludeConfigMapAnnotation = "testconfigmapwithconfigmapexcludeannotationdeployment-handler-" + testutil.RandSeq(5) arsConfigmapWithIgnoreAnnotation = "testconfigmapWithIgnoreAnnotation-handler-" + testutil.RandSeq(5) arsSecretWithIgnoreAnnotation = "testsecretWithIgnoreAnnotation-handler-" + testutil.RandSeq(5) + arsConfigmapWithPausedDeployment = "testconfigmapWithPausedDeployment-handler-" + testutil.RandSeq(5) ersNamespace = "test-handler-" + testutil.RandSeq(5) ersConfigmapName = "testconfigmap-handler-" + testutil.RandSeq(5) @@ -79,6 +80,7 @@ var ( ersConfigmapWithConfigMapExcludeAnnotation = "testconfigmapwithconfigmapexcludeannotationdeployment-handler-" + testutil.RandSeq(5) ersConfigmapWithIgnoreAnnotation = "testconfigmapWithIgnoreAnnotation-handler-" + testutil.RandSeq(5) ersSecretWithIgnoreAnnotation = "testsecretWithIgnoreAnnotation-handler-" + testutil.RandSeq(5) + ersConfigmapWithPausedDeployment = "testconfigmapWithPausedDeployment-handler-" + testutil.RandSeq(5) ) func TestMain(m *testing.M) { @@ -207,6 +209,12 @@ func setupArs() { logrus.Errorf("Error in configmap creation: %v", err) } + // Creating configmap for testing pausing deployments + _, err = testutil.CreateConfigMap(clients.KubernetesClient, arsNamespace, arsConfigmapWithPausedDeployment, "www.google.com") + if err != nil { + logrus.Errorf("Error in configmap creation: %v", err) + } + // Creating secret used with secret auto annotation _, err = testutil.CreateSecret(clients.KubernetesClient, arsNamespace, arsSecretWithExcludeSecretAnnotation, data) if err != nil { @@ -457,6 +465,12 @@ func setupArs() { if err != nil { logrus.Errorf("Error in Deployment with both annotations: %v", err) } + + // Creating Deployment with pause annotation + _, err = testutil.CreateDeploymentWithAnnotations(clients.KubernetesClient, arsConfigmapWithPausedDeployment, arsNamespace, map[string]string{options.PauseDeploymentAnnotation: "10s"}, false) + if err != nil { + logrus.Errorf("Error in Deployment with configmap creation: %v", err) + } } func teardownArs() { @@ -658,6 +672,12 @@ func teardownArs() { logrus.Errorf("Error while deleting statefulSet with secret as env var source %v", statefulSetError) } + // Deleting Deployment with pasuse annotation + deploymentError = testutil.DeleteDeployment(clients.KubernetesClient, arsNamespace, arsConfigmapWithPausedDeployment) + if deploymentError != nil { + logrus.Errorf("Error while deleting deployment with configmap %v", deploymentError) + } + // Deleting Configmap err := testutil.DeleteConfigMap(clients.KubernetesClient, arsNamespace, arsConfigmapName) if err != nil { @@ -771,6 +791,12 @@ func teardownArs() { logrus.Errorf("Error while deleting the configmap used with configmap auto annotations: %v", err) } + // Deleting configmap for testing pausing deployments + err = testutil.DeleteConfigMap(clients.KubernetesClient, arsNamespace, arsConfigmapWithPausedDeployment) + if err != nil { + logrus.Errorf("Error while deleting the configmap: %v", err) + } + // Deleting namespace testutil.DeleteNamespace(arsNamespace, clients.KubernetesClient) @@ -830,6 +856,12 @@ func setupErs() { logrus.Errorf("Error in configmap creation: %v", err) } + // Creating configmap for testing pausing deployments + _, err = testutil.CreateConfigMap(clients.KubernetesClient, ersNamespace, ersConfigmapWithPausedDeployment, "www.google.com") + if err != nil { + logrus.Errorf("Error in configmap creation: %v", err) + } + // Creating secret _, err = testutil.CreateSecret(clients.KubernetesClient, ersNamespace, ersSecretWithInitEnv, data) if err != nil { @@ -1034,6 +1066,12 @@ func setupErs() { logrus.Errorf("Error in Deployment with configmap and with configmap exclude annotation: %v", err) } + // Creating Deployment with pause annotation + _, err = testutil.CreateDeploymentWithAnnotations(clients.KubernetesClient, ersConfigmapWithPausedDeployment, ersNamespace, map[string]string{options.PauseDeploymentAnnotation: "10s"}, false) + if err != nil { + logrus.Errorf("Error in Deployment with configmap creation: %v", err) + } + // Creating DaemonSet with configmap _, err = testutil.CreateDaemonSet(clients.KubernetesClient, ersConfigmapName, ersNamespace, true) if err != nil { @@ -1318,6 +1356,12 @@ func teardownErs() { logrus.Errorf("Error while deleting statefulSet with secret as env var source %v", statefulSetError) } + // Deleting Deployment for testing pausing deployments + deploymentError = testutil.DeleteDeployment(clients.KubernetesClient, ersNamespace, ersConfigmapWithPausedDeployment) + if deploymentError != nil { + logrus.Errorf("Error while deleting deployment with configmap %v", deploymentError) + } + // Deleting Configmap err := testutil.DeleteConfigMap(clients.KubernetesClient, ersNamespace, ersConfigmapName) if err != nil { @@ -1431,6 +1475,12 @@ func teardownErs() { logrus.Errorf("Error while deleting the configmap used with configmap exclude annotation: %v", err) } + // Deleting ConfigMap for testins pausing deployments + err = testutil.DeleteConfigMap(clients.KubernetesClient, ersNamespace, ersConfigmapWithPausedDeployment) + if err != nil { + logrus.Errorf("Error while deleting the configmap: %v", err) + } + // Deleting namespace testutil.DeleteNamespace(ersNamespace, clients.KubernetesClient) @@ -4053,3 +4103,85 @@ func TestFailedRollingUpgradeUsingErs(t *testing.T) { t.Errorf("Counter by namespace was not increased") } } + +func TestPausingDeploymentUsingErs(t *testing.T) { + options.ReloadStrategy = constants.EnvVarsReloadStrategy + testPausingDeployment(t, options.ReloadStrategy, ersConfigmapWithPausedDeployment, ersNamespace) +} + +func TestPausingDeploymentUsingArs(t *testing.T) { + options.ReloadStrategy = constants.AnnotationsReloadStrategy + testPausingDeployment(t, options.ReloadStrategy, arsConfigmapWithPausedDeployment, arsNamespace) +} + +func testPausingDeployment(t *testing.T, reloadStrategy string, testName string, namespace string) { + options.ReloadStrategy = reloadStrategy + envVarPostfix := constants.ConfigmapEnvVarPostfix + + shaData := testutil.ConvertResourceToSHA(testutil.ConfigmapResourceType, namespace, testName, "pause.stakater.com") + config := getConfigWithAnnotations(envVarPostfix, testName, shaData, options.ConfigmapUpdateOnChangeAnnotation, options.ConfigmapReloaderAutoAnnotation) + deploymentFuncs := GetDeploymentRollingUpgradeFuncs() + collectors := getCollectors() + + _ = PerformAction(clients, config, deploymentFuncs, collectors, nil, invokeReloadStrategy) + + if promtestutil.ToFloat64(collectors.Reloaded.With(labelSucceeded)) != 1 { + t.Errorf("Counter was not increased") + } + + if promtestutil.ToFloat64(collectors.ReloadedByNamespace.With(prometheus.Labels{"success": "true", "namespace": namespace})) != 1 { + t.Errorf("Counter by namespace was not increased") + } + + logrus.Infof("Verifying deployment has been paused") + items := deploymentFuncs.ItemsFunc(clients, config.Namespace) + deploymentPaused, err := isDeploymentPaused(items, testName) + if err != nil { + t.Errorf("%s", err.Error()) + } + if !deploymentPaused { + t.Errorf("Deployment has not been paused") + } + + shaData = testutil.ConvertResourceToSHA(testutil.ConfigmapResourceType, namespace, testName, "pause-changed.stakater.com") + config = getConfigWithAnnotations(envVarPostfix, testName, shaData, options.ConfigmapUpdateOnChangeAnnotation, options.ConfigmapReloaderAutoAnnotation) + + _ = PerformAction(clients, config, deploymentFuncs, collectors, nil, invokeReloadStrategy) + + if promtestutil.ToFloat64(collectors.Reloaded.With(labelSucceeded)) != 2 { + t.Errorf("Counter was not increased") + } + + if promtestutil.ToFloat64(collectors.ReloadedByNamespace.With(prometheus.Labels{"success": "true", "namespace": namespace})) != 2 { + t.Errorf("Counter by namespace was not increased") + } + + logrus.Infof("Verifying deployment is still paused") + items = deploymentFuncs.ItemsFunc(clients, config.Namespace) + deploymentPaused, err = isDeploymentPaused(items, testName) + if err != nil { + t.Errorf("%s", err.Error()) + } + if !deploymentPaused { + t.Errorf("Deployment should still be paused") + } + + logrus.Infof("Verifying deployment has been resumed after pause interval") + time.Sleep(11 * time.Second) + items = deploymentFuncs.ItemsFunc(clients, config.Namespace) + deploymentPaused, err = isDeploymentPaused(items, testName) + if err != nil { + t.Errorf("%s", err.Error()) + } + if deploymentPaused { + t.Errorf("Deployment should have been resumed after pause interval") + } +} + +func isDeploymentPaused(deployments []runtime.Object, deploymentName string) (bool, error) { + deployment, err := FindDeploymentByName(deployments, deploymentName) + if err != nil { + return false, err + } + return IsPaused(deployment), nil +} diff --git a/internal/pkg/options/flags.go b/internal/pkg/options/flags.go index 7c0e14e..bfa63fc 100644 --- a/internal/pkg/options/flags.go +++ b/internal/pkg/options/flags.go @@ -40,6 +40,12 @@ var ( SearchMatchAnnotation = "reloader.stakater.com/match" // RolloutStrategyAnnotation is an annotation to define rollout update strategy RolloutStrategyAnnotation = "reloader.stakater.com/rollout-strategy" + // PauseDeploymentAnnotation is an annotation to define the time period to pause a deployment after + // a configmap/secret change has been detected. Valid values are described here: https://pkg.go.dev/time#ParseDuration + // only positive values are allowed + PauseDeploymentAnnotation = "deployment.reloader.stakater.com/pause-period" + // Annotation set by reloader to indicate that the deployment has been paused + PauseDeploymentTimeAnnotation = "deployment.reloader.stakater.com/paused-at" // LogFormat is the log format to use (json, or empty string for default) LogFormat = "" // LogLevel is the log level to use (trace, debug, info, warning, error, fatal and panic) diff --git a/internal/pkg/testutil/kube.go b/internal/pkg/testutil/kube.go index f2d3bb4..7a1aae3 100644 --- a/internal/pkg/testutil/kube.go +++ b/internal/pkg/testutil/kube.go @@ -794,6 +794,26 @@ func CreateDeployment(client kubernetes.Interface, deploymentName string, namesp return deployment, err } +// CreateDeployment creates a deployment in given namespace and returns the Deployment +func CreateDeploymentWithAnnotations(client kubernetes.Interface, deploymentName string, namespace string, additionalAnnotations map[string]string, volumeMount bool) (*appsv1.Deployment, error) { + logrus.Infof("Creating Deployment") + deploymentClient := client.AppsV1().Deployments(namespace) + var deploymentObj *appsv1.Deployment + if volumeMount { + deploymentObj = GetDeployment(namespace, deploymentName) + } else { + deploymentObj = GetDeploymentWithEnvVars(namespace, deploymentName) + } + + for annotationKey, annotationValue := range additionalAnnotations { + deploymentObj.Annotations[annotationKey] = annotationValue + } + + deployment, err := deploymentClient.Create(context.TODO(), deploymentObj, metav1.CreateOptions{}) + time.Sleep(3 * time.Second) + return deployment, err +} + // CreateDeploymentConfig creates a deploymentConfig in given namespace and returns the DeploymentConfig func CreateDeploymentConfig(client appsclient.Interface, deploymentName string, namespace string, volumeMount bool) (*openshiftv1.DeploymentConfig, error) { logrus.Infof("Creating DeploymentConfig")