Extracted some functions to public package to reuse them in gateway (#966)

* separate methods

* basic refactoring

* moved common code to util package to use it in gateway

* common check for argo rollouts

* made code compilable with latest changes on master

* Moved options to separate package and created CommandLineOptions instance that will be in sync with options values.

* reverted extra changes

* initialize CommandLineOptions with default options in module init

* wait for paused at annotation before checking deployment paused

* moved things around to fix things

* reverted unnecessary changes

* reverted rolling_upgrade changes

* reverted extra change
This commit is contained in:
Muhammad Safwan Karim
2025-07-29 11:50:02 +05:00
committed by GitHub
parent 9039956c32
commit 49409dce54
9 changed files with 486 additions and 400 deletions

View File

@@ -19,6 +19,7 @@ import (
"github.com/stakater/Reloader/internal/pkg/metrics"
"github.com/stakater/Reloader/internal/pkg/options"
"github.com/stakater/Reloader/internal/pkg/util"
"github.com/stakater/Reloader/pkg/common"
"github.com/stakater/Reloader/pkg/kube"
)
@@ -101,6 +102,7 @@ func getHAEnvs() (string, string) {
}
func startReloader(cmd *cobra.Command, args []string) {
common.GetCommandLineOptions()
err := configureLogging(options.LogFormat, options.LogLevel)
if err != nil {
logrus.Warn(err)
@@ -188,7 +190,7 @@ func startReloader(cmd *cobra.Command, args []string) {
go leadership.RunLeaderElection(lock, ctx, cancel, podName, controllers)
}
util.PublishMetaInfoConfigmap(clientset)
common.PublishMetaInfoConfigmap(clientset)
leadership.SetupLivenessEndpoint()
logrus.Fatal(http.ListenAndServe(constants.DefaultHttpListenAddr, nil))

View File

@@ -7,9 +7,6 @@ import (
"fmt"
"io"
"os"
"regexp"
"strconv"
"strings"
"github.com/parnurzeal/gorequest"
"github.com/prometheus/client_golang/prometheus"
@@ -20,6 +17,7 @@ import (
"github.com/stakater/Reloader/internal/pkg/metrics"
"github.com/stakater/Reloader/internal/pkg/options"
"github.com/stakater/Reloader/internal/pkg/util"
"github.com/stakater/Reloader/pkg/common"
"github.com/stakater/Reloader/pkg/kube"
app "k8s.io/api/apps/v1"
v1 "k8s.io/api/core/v1"
@@ -264,143 +262,76 @@ func upgradeResource(clients kube.Clients, config util.Config, upgradeFuncs call
return err
}
}
annotations := upgradeFuncs.AnnotationsFunc(resource)
podAnnotations := upgradeFuncs.PodAnnotationsFunc(resource)
result := common.ShouldReload(config, upgradeFuncs.ResourceType, annotations, podAnnotations, common.GetCommandLineOptions())
ignoreResourceAnnotatonValue := config.ResourceAnnotations[options.IgnoreResourceAnnotation]
if ignoreResourceAnnotatonValue == "true" {
if !result.ShouldReload {
logrus.Debugf("No changes detected in '%s' of type '%s' in namespace '%s'", config.ResourceName, config.Type, config.Namespace)
return nil
}
strategyResult := strategy(upgradeFuncs, resource, config, result.AutoReload)
if strategyResult.Result != constants.Updated {
return nil
}
// find correct annotation and update the resource
annotations := upgradeFuncs.AnnotationsFunc(resource)
annotationValue, found := annotations[config.Annotation]
searchAnnotationValue, foundSearchAnn := annotations[options.AutoSearchAnnotation]
reloaderEnabledValue, foundAuto := annotations[options.ReloaderAutoAnnotation]
typedAutoAnnotationEnabledValue, foundTypedAuto := annotations[config.TypedAutoAnnotation]
excludeConfigmapAnnotationValue, foundExcludeConfigmap := annotations[options.ConfigmapExcludeReloaderAnnotation]
excludeSecretAnnotationValue, foundExcludeSecret := annotations[options.SecretExcludeReloaderAnnotation]
pauseInterval, foundPauseInterval := annotations[options.PauseDeploymentAnnotation]
if !found && !foundAuto && !foundTypedAuto && !foundSearchAnn {
annotations = upgradeFuncs.PodAnnotationsFunc(resource)
annotationValue = annotations[config.Annotation]
searchAnnotationValue = annotations[options.AutoSearchAnnotation]
reloaderEnabledValue = annotations[options.ReloaderAutoAnnotation]
typedAutoAnnotationEnabledValue = annotations[config.TypedAutoAnnotation]
}
isResourceExcluded := false
switch config.Type {
case constants.ConfigmapEnvVarPostfix:
if foundExcludeConfigmap {
isResourceExcluded = checkIfResourceIsExcluded(config.ResourceName, excludeConfigmapAnnotationValue)
}
case constants.SecretEnvVarPostfix:
if foundExcludeSecret {
isResourceExcluded = checkIfResourceIsExcluded(config.ResourceName, excludeSecretAnnotationValue)
}
}
if isResourceExcluded {
return nil
}
strategyResult := InvokeStrategyResult{constants.NotUpdated, nil}
reloaderEnabled, _ := strconv.ParseBool(reloaderEnabledValue)
typedAutoAnnotationEnabled, _ := strconv.ParseBool(typedAutoAnnotationEnabledValue)
if reloaderEnabled || typedAutoAnnotationEnabled || reloaderEnabledValue == "" && typedAutoAnnotationEnabledValue == "" && options.AutoReloadAll {
strategyResult = strategy(upgradeFuncs, resource, config, true)
}
if strategyResult.Result != constants.Updated && annotationValue != "" {
values := strings.Split(annotationValue, ",")
for _, value := range values {
value = strings.TrimSpace(value)
re := regexp.MustCompile("^" + value + "$")
if re.Match([]byte(config.ResourceName)) {
strategyResult = strategy(upgradeFuncs, resource, config, false)
if strategyResult.Result == constants.Updated {
break
}
}
}
}
if strategyResult.Result != constants.Updated && searchAnnotationValue == "true" {
matchAnnotationValue := config.ResourceAnnotations[options.SearchMatchAnnotation]
if matchAnnotationValue == "true" {
strategyResult = strategy(upgradeFuncs, resource, config, true)
}
}
if strategyResult.Result == constants.Updated {
if foundPauseInterval {
deployment, ok := resource.(*app.Deployment)
if !ok {
logrus.Warnf("Annotation '%s' only applicable for deployments", options.PauseDeploymentAnnotation)
} else {
_, err = PauseDeployment(deployment, clients, config.Namespace, pauseInterval)
if err != nil {
logrus.Errorf("Failed to pause deployment '%s' in namespace '%s': %v", resourceName, config.Namespace, err)
return err
}
}
}
var err error
if upgradeFuncs.SupportsPatch && strategyResult.Patch != nil {
err = upgradeFuncs.PatchFunc(clients, config.Namespace, resource, strategyResult.Patch.Type, strategyResult.Patch.Bytes)
if foundPauseInterval {
deployment, ok := resource.(*app.Deployment)
if !ok {
logrus.Warnf("Annotation '%s' only applicable for deployments", options.PauseDeploymentAnnotation)
} else {
err = upgradeFuncs.UpdateFunc(clients, config.Namespace, resource)
_, err = PauseDeployment(deployment, clients, config.Namespace, pauseInterval)
if err != nil {
logrus.Errorf("Failed to pause deployment '%s' in namespace '%s': %v", resourceName, config.Namespace, err)
return err
}
}
}
if err != nil {
message := fmt.Sprintf("Update for '%s' of type '%s' in namespace '%s' failed with error %v", resourceName, upgradeFuncs.ResourceType, config.Namespace, err)
logrus.Errorf("Update for '%s' of type '%s' in namespace '%s' failed with error %v", resourceName, upgradeFuncs.ResourceType, config.Namespace, err)
if upgradeFuncs.SupportsPatch && strategyResult.Patch != nil {
err = upgradeFuncs.PatchFunc(clients, config.Namespace, resource, strategyResult.Patch.Type, strategyResult.Patch.Bytes)
} else {
err = upgradeFuncs.UpdateFunc(clients, config.Namespace, resource)
}
collectors.Reloaded.With(prometheus.Labels{"success": "false"}).Inc()
collectors.ReloadedByNamespace.With(prometheus.Labels{"success": "false", "namespace": config.Namespace}).Inc()
if recorder != nil {
recorder.Event(resource, v1.EventTypeWarning, "ReloadFail", message)
}
return err
} else {
message := fmt.Sprintf("Changes detected in '%s' of type '%s' in namespace '%s'", config.ResourceName, config.Type, config.Namespace)
message += fmt.Sprintf(", Updated '%s' of type '%s' in namespace '%s'", resourceName, upgradeFuncs.ResourceType, config.Namespace)
if err != nil {
message := fmt.Sprintf("Update for '%s' of type '%s' in namespace '%s' failed with error %v", resourceName, upgradeFuncs.ResourceType, config.Namespace, err)
logrus.Errorf("Update for '%s' of type '%s' in namespace '%s' failed with error %v", resourceName, upgradeFuncs.ResourceType, config.Namespace, err)
logrus.Infof("Changes detected in '%s' of type '%s' in namespace '%s'; updated '%s' of type '%s' in namespace '%s'", config.ResourceName, config.Type, config.Namespace, resourceName, upgradeFuncs.ResourceType, config.Namespace)
collectors.Reloaded.With(prometheus.Labels{"success": "false"}).Inc()
collectors.ReloadedByNamespace.With(prometheus.Labels{"success": "false", "namespace": config.Namespace}).Inc()
if recorder != nil {
recorder.Event(resource, v1.EventTypeWarning, "ReloadFail", message)
}
return err
} else {
message := fmt.Sprintf("Changes detected in '%s' of type '%s' in namespace '%s'", config.ResourceName, config.Type, config.Namespace)
message += fmt.Sprintf(", Updated '%s' of type '%s' in namespace '%s'", resourceName, upgradeFuncs.ResourceType, config.Namespace)
collectors.Reloaded.With(prometheus.Labels{"success": "true"}).Inc()
collectors.ReloadedByNamespace.With(prometheus.Labels{"success": "true", "namespace": config.Namespace}).Inc()
alert_on_reload, ok := os.LookupEnv("ALERT_ON_RELOAD")
if recorder != nil {
recorder.Event(resource, v1.EventTypeNormal, "Reloaded", message)
}
if ok && alert_on_reload == "true" {
msg := fmt.Sprintf(
"Reloader detected changes in *%s* of type *%s* in namespace *%s*. Hence reloaded *%s* of type *%s* in namespace *%s*",
config.ResourceName, config.Type, config.Namespace, resourceName, upgradeFuncs.ResourceType, config.Namespace)
alert.SendWebhookAlert(msg)
}
logrus.Infof("Changes detected in '%s' of type '%s' in namespace '%s'; updated '%s' of type '%s' in namespace '%s'", config.ResourceName, config.Type, config.Namespace, resourceName, upgradeFuncs.ResourceType, config.Namespace)
collectors.Reloaded.With(prometheus.Labels{"success": "true"}).Inc()
collectors.ReloadedByNamespace.With(prometheus.Labels{"success": "true", "namespace": config.Namespace}).Inc()
alert_on_reload, ok := os.LookupEnv("ALERT_ON_RELOAD")
if recorder != nil {
recorder.Event(resource, v1.EventTypeNormal, "Reloaded", message)
}
if ok && alert_on_reload == "true" {
msg := fmt.Sprintf(
"Reloader detected changes in *%s* of type *%s* in namespace *%s*. Hence reloaded *%s* of type *%s* in namespace *%s*",
config.ResourceName, config.Type, config.Namespace, resourceName, upgradeFuncs.ResourceType, config.Namespace)
alert.SendWebhookAlert(msg)
}
}
return nil
}
func checkIfResourceIsExcluded(resourceName, excludedResources string) bool {
if excludedResources == "" {
return false
}
excludedResourcesList := strings.Split(excludedResources, ",")
for _, excludedResource := range excludedResourcesList {
if strings.TrimSpace(excludedResource) == resourceName {
return true
}
}
return false
}
func getVolumeMountName(volumes []v1.Volume, mountType string, volumeName string) string {
for i := range volumes {
if mountType == constants.ConfigmapEnvVarPostfix {

View File

@@ -4125,6 +4125,13 @@ func testPausingDeployment(t *testing.T, reloadStrategy string, testName string,
_ = PerformAction(clients, config, deploymentFuncs, collectors, nil, invokeReloadStrategy)
// Wait for deployment to have paused-at annotation
logrus.Infof("Waiting for deployment %s to have paused-at annotation", testName)
err := waitForDeploymentPausedAtAnnotation(clients, deploymentFuncs, config.Namespace, testName, 30*time.Second)
if err != nil {
t.Errorf("Failed to wait for deployment paused-at annotation: %v", err)
}
if promtestutil.ToFloat64(collectors.Reloaded.With(labelSucceeded)) != 1 {
t.Errorf("Counter was not increased")
}
@@ -4185,3 +4192,25 @@ func isDeploymentPaused(deployments []runtime.Object, deploymentName string) (bo
}
return IsPaused(deployment), nil
}
// waitForDeploymentPausedAtAnnotation waits for a deployment to have the pause-period annotation
func waitForDeploymentPausedAtAnnotation(clients kube.Clients, deploymentFuncs callbacks.RollingUpgradeFuncs, namespace, deploymentName string, timeout time.Duration) error {
start := time.Now()
for time.Since(start) < timeout {
items := deploymentFuncs.ItemsFunc(clients, namespace)
deployment, err := FindDeploymentByName(items, deploymentName)
if err == nil {
annotations := deployment.GetAnnotations()
if annotations != nil {
if _, exists := annotations[options.PauseDeploymentTimeAnnotation]; exists {
return nil
}
}
}
time.Sleep(100 * time.Millisecond)
}
return fmt.Errorf("timeout waiting for deployment %s to have pause-period annotation", deploymentName)
}

View File

@@ -6,7 +6,7 @@ import (
v1 "k8s.io/api/core/v1"
)
//Config contains rolling upgrade configuration parameters
// Config contains rolling upgrade configuration parameters
type Config struct {
Namespace string
ResourceName string

View File

@@ -2,11 +2,9 @@ package util
import (
"bytes"
"context"
"encoding/base64"
"errors"
"fmt"
"os"
"sort"
"strings"
@@ -15,11 +13,8 @@ import (
"github.com/stakater/Reloader/internal/pkg/constants"
"github.com/stakater/Reloader/internal/pkg/crypto"
"github.com/stakater/Reloader/internal/pkg/options"
"github.com/stakater/Reloader/pkg/metainfo"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
)
// ConvertToEnvVarName converts the given text into a usable env var
@@ -64,43 +59,8 @@ func GetSHAfromSecret(data map[string][]byte) string {
return crypto.GenerateSHA(strings.Join(values, ";"))
}
func PublishMetaInfoConfigmap(clientset kubernetes.Interface) {
namespace := os.Getenv("RELOADER_NAMESPACE")
if namespace == "" {
logrus.Warn("RELOADER_NAMESPACE is not set, skipping meta info configmap creation")
return
}
metaInfo := &metainfo.MetaInfo{
BuildInfo: *metainfo.NewBuildInfo(),
ReloaderOptions: *metainfo.GetReloaderOptions(),
DeploymentInfo: metav1.ObjectMeta{
Name: os.Getenv("RELOADER_DEPLOYMENT_NAME"),
Namespace: namespace,
},
}
configMap := metaInfo.ToConfigMap()
if _, err := clientset.CoreV1().ConfigMaps(namespace).Get(context.Background(), configMap.Name, metav1.GetOptions{}); err == nil {
logrus.Info("Meta info configmap already exists, updating it")
_, err = clientset.CoreV1().ConfigMaps(namespace).Update(context.Background(), configMap, metav1.UpdateOptions{})
if err != nil {
logrus.Warn("Failed to update existing meta info configmap: ", err)
}
return
}
_, err := clientset.CoreV1().ConfigMaps(namespace).Create(context.Background(), configMap, metav1.CreateOptions{})
if err != nil {
logrus.Warn("Failed to create meta info configmap: ", err)
}
}
type List []string
type Map map[string]string
func (l *List) Contains(s string) bool {
for _, v := range *l {
if v == s {