mirror of
https://github.com/stakater/Reloader.git
synced 2026-05-16 21:56:55 +00:00
Fix PR issues
Signed-off-by: faizanahmad055 <faizan.ahmad55@outlook.com>
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
version: "2"
|
||||
|
||||
run:
|
||||
go: "1.25"
|
||||
go: "1.26"
|
||||
timeout: 5m
|
||||
allow-parallel-runners: true
|
||||
|
||||
|
||||
2
Makefile
2
Makefile
@@ -141,7 +141,7 @@ manifest:
|
||||
docker manifest annotate --arch $(ARCH) $(REPOSITORY_GENERIC) $(REPOSITORY_ARCH)
|
||||
|
||||
test:
|
||||
"$(GOCMD)" test -timeout 1800s -v -short -count=1 ./internal/... ./test/e2e/utils/...
|
||||
"$(GOCMD)" test -timeout 1800s -v -count=1 ./internal/... ./pkg/... ./test/e2e/utils/...
|
||||
|
||||
##@ E2E Tests
|
||||
|
||||
|
||||
@@ -3,6 +3,7 @@ package controller
|
||||
import (
|
||||
"fmt"
|
||||
"slices"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/sirupsen/logrus"
|
||||
@@ -41,17 +42,19 @@ type Controller struct {
|
||||
resourceSelector string
|
||||
}
|
||||
|
||||
// controllerInitialized flag determines whether controlled is being initialized
|
||||
var secretControllerInitialized = false
|
||||
var configmapControllerInitialized = false
|
||||
// controllerInitialized flags guard against processing Add/Delete events before
|
||||
// the worker goroutines have started. Written by runWorker (in a goroutine) and
|
||||
// read by the informer event handlers, so they must be atomic.
|
||||
var secretControllerInitialized atomic.Bool
|
||||
var configmapControllerInitialized atomic.Bool
|
||||
var selectedNamespacesCache []string
|
||||
|
||||
// NewController for initializing a Controller
|
||||
func NewController(client kubernetes.Interface, resource string, namespace string, ignoredNamespaces []string, namespaceLabelSelector string, resourceLabelSelector string, collectors metrics.Collectors) (*Controller,
|
||||
error) {
|
||||
if options.SyncAfterRestart {
|
||||
secretControllerInitialized = true
|
||||
configmapControllerInitialized = true
|
||||
secretControllerInitialized.Store(true)
|
||||
configmapControllerInitialized.Store(true)
|
||||
}
|
||||
|
||||
c := Controller{
|
||||
@@ -121,7 +124,7 @@ func (c *Controller) Add(obj interface{}) {
|
||||
}
|
||||
|
||||
if options.ReloadOnCreate == "true" {
|
||||
if !c.resourceInIgnoredNamespace(obj) && c.resourceInSelectedNamespaces(obj) && secretControllerInitialized && configmapControllerInitialized {
|
||||
if !c.resourceInIgnoredNamespace(obj) && c.resourceInSelectedNamespaces(obj) && secretControllerInitialized.Load() && configmapControllerInitialized.Load() {
|
||||
c.enqueue(handler.ResourceCreatedHandler{
|
||||
Resource: obj,
|
||||
Collectors: c.collectors,
|
||||
@@ -214,7 +217,7 @@ func (c *Controller) Delete(old interface{}) {
|
||||
}
|
||||
|
||||
if options.ReloadOnDelete == "true" {
|
||||
if !c.resourceInIgnoredNamespace(old) && c.resourceInSelectedNamespaces(old) && secretControllerInitialized && configmapControllerInitialized {
|
||||
if !c.resourceInIgnoredNamespace(old) && c.resourceInSelectedNamespaces(old) && secretControllerInitialized.Load() && configmapControllerInitialized.Load() {
|
||||
c.enqueue(handler.ResourceDeleteHandler{
|
||||
Resource: old,
|
||||
Collectors: c.collectors,
|
||||
@@ -266,9 +269,9 @@ func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
|
||||
func (c *Controller) runWorker() {
|
||||
// At this point the controller is fully initialized and we can start processing the resources
|
||||
if c.resource == string(v1.ResourceSecrets) {
|
||||
secretControllerInitialized = true
|
||||
secretControllerInitialized.Store(true)
|
||||
} else if c.resource == string(v1.ResourceConfigMaps) {
|
||||
configmapControllerInitialized = true
|
||||
configmapControllerInitialized.Store(true)
|
||||
}
|
||||
|
||||
for c.processNextItem() {
|
||||
|
||||
@@ -43,8 +43,8 @@ func (m *mockResourceHandler) GetEnqueueTime() time.Time {
|
||||
|
||||
// resetGlobalState resets global variables between tests
|
||||
func resetGlobalState() {
|
||||
secretControllerInitialized = false
|
||||
configmapControllerInitialized = false
|
||||
secretControllerInitialized.Store(false)
|
||||
configmapControllerInitialized.Store(false)
|
||||
selectedNamespacesCache = []string{}
|
||||
}
|
||||
|
||||
@@ -386,8 +386,8 @@ func TestAddHandler(t *testing.T) {
|
||||
tt.name, func(t *testing.T) {
|
||||
resetGlobalState()
|
||||
options.ReloadOnCreate = tt.reloadOnCreate
|
||||
secretControllerInitialized = tt.controllersInit
|
||||
configmapControllerInitialized = tt.controllersInit
|
||||
secretControllerInitialized.Store(tt.controllersInit)
|
||||
configmapControllerInitialized.Store(tt.controllersInit)
|
||||
|
||||
c := newTestController(tt.ignoredNamespaces, "")
|
||||
c.Add(tt.resource)
|
||||
@@ -601,8 +601,8 @@ func TestDeleteHandler(t *testing.T) {
|
||||
tt.name, func(t *testing.T) {
|
||||
resetGlobalState()
|
||||
options.ReloadOnDelete = tt.reloadOnDelete
|
||||
secretControllerInitialized = tt.controllersInit
|
||||
configmapControllerInitialized = tt.controllersInit
|
||||
secretControllerInitialized.Store(tt.controllersInit)
|
||||
configmapControllerInitialized.Store(tt.controllersInit)
|
||||
|
||||
c := newTestController(tt.ignoredNamespaces, "")
|
||||
c.Delete(tt.resource)
|
||||
@@ -685,8 +685,8 @@ func TestDeleteHandlerWithNamespaceEvent(t *testing.T) {
|
||||
|
||||
c := newTestController([]string{}, "env=prod")
|
||||
options.ReloadOnDelete = "true"
|
||||
secretControllerInitialized = true
|
||||
configmapControllerInitialized = true
|
||||
secretControllerInitialized.Store(true)
|
||||
configmapControllerInitialized.Store(true)
|
||||
|
||||
ns := &v1.Namespace{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "ns-to-delete"},
|
||||
|
||||
@@ -131,6 +131,33 @@ func AddCSIVolume(spec *corev1.PodSpec, containerIdx int, spcName string) {
|
||||
}
|
||||
}
|
||||
|
||||
// AddCSIInitContainer adds an init container that mounts a CSI SecretProviderClass volume.
|
||||
// This is distinct from AddCSIVolume which mounts into a regular container.
|
||||
func AddCSIInitContainer(spec *corev1.PodSpec, spcName string) {
|
||||
volumeName := "csi-" + spcName
|
||||
mountPath := "/mnt/secrets-store/" + spcName
|
||||
spec.Volumes = append(spec.Volumes, corev1.Volume{
|
||||
Name: volumeName,
|
||||
VolumeSource: corev1.VolumeSource{
|
||||
CSI: &corev1.CSIVolumeSource{
|
||||
Driver: CSIDriverName,
|
||||
ReadOnly: ptr.To(true),
|
||||
VolumeAttributes: map[string]string{
|
||||
"secretProviderClass": spcName,
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
spec.InitContainers = append(spec.InitContainers, corev1.Container{
|
||||
Name: "init-csi",
|
||||
Image: DefaultImage,
|
||||
Command: []string{"sh", "-c", "echo init done"},
|
||||
VolumeMounts: []corev1.VolumeMount{
|
||||
{Name: volumeName, MountPath: mountPath, ReadOnly: true},
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
// AddInitContainer adds init container with optional envFrom references.
|
||||
func AddInitContainer(spec *corev1.PodSpec, cmName, secretName string) {
|
||||
init := corev1.Container{
|
||||
@@ -253,7 +280,7 @@ func ApplyWorkloadConfig(template *corev1.PodTemplateSpec, cfg WorkloadConfig) {
|
||||
AddInitContainerWithVolumes(spec, cfg.ConfigMapName, cfg.SecretName)
|
||||
}
|
||||
if cfg.UseInitContainerCSI && cfg.SPCName != "" {
|
||||
AddCSIVolume(spec, 0, cfg.SPCName)
|
||||
AddCSIInitContainer(spec, cfg.SPCName)
|
||||
}
|
||||
if cfg.MultipleContainers > 1 {
|
||||
for i := 1; i < cfg.MultipleContainers; i++ {
|
||||
|
||||
@@ -21,10 +21,6 @@ func Run(cmd *exec.Cmd) (string, error) {
|
||||
}
|
||||
cmd.Dir = dir
|
||||
|
||||
if err := os.Chdir(cmd.Dir); err != nil {
|
||||
_, _ = fmt.Fprintf(GinkgoWriter, "chdir dir: %q\n", err)
|
||||
}
|
||||
|
||||
cmd.Env = append(os.Environ(), "GO111MODULE=on")
|
||||
command := strings.Join(cmd.Args, " ")
|
||||
_, _ = fmt.Fprintf(GinkgoWriter, "running: %q\n", command)
|
||||
|
||||
@@ -3,8 +3,10 @@ package utils
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
. "github.com/onsi/ginkgo/v2" //nolint:revive,staticcheck
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/fields"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
@@ -47,12 +49,19 @@ type Condition[T any] func(T) bool
|
||||
// WatchUntil watches a resource until the condition is met or timeout occurs.
|
||||
// It handles watch reconnection automatically on errors.
|
||||
// If name is empty, it watches all resources and returns the first matching one.
|
||||
//
|
||||
// ResourceVersion "0" is used so the API server sends the current state as an
|
||||
// initial ADDED event before streaming live updates, preventing the TOCTOU window
|
||||
// where a reload that completes before WatchUntil is called would be missed.
|
||||
func WatchUntil[T runtime.Object](ctx context.Context, watchFunc WatchFunc, name string, condition Condition[T], timeout time.Duration) (T, error) {
|
||||
var zero T
|
||||
ctx, cancel := context.WithTimeout(ctx, timeout)
|
||||
defer cancel()
|
||||
|
||||
opts := metav1.ListOptions{Watch: true}
|
||||
opts := metav1.ListOptions{
|
||||
Watch: true,
|
||||
ResourceVersion: "0", // receive current state as initial ADDED event
|
||||
}
|
||||
if name != "" {
|
||||
opts.FieldSelector = fields.OneTermEqualSelector("metadata.name", name).String()
|
||||
}
|
||||
@@ -87,6 +96,8 @@ func watchOnce[T runtime.Object](
|
||||
|
||||
watcher, err := watchFunc(ctx, opts)
|
||||
if err != nil {
|
||||
// Log and signal retry; transient API errors are expected during CI.
|
||||
_, _ = fmt.Fprintf(GinkgoWriter, "watch: failed to start watch: %v — retrying\n", err)
|
||||
return zero, false, nil
|
||||
}
|
||||
defer watcher.Stop()
|
||||
@@ -112,7 +123,8 @@ func watchOnce[T runtime.Object](
|
||||
case watch.Deleted:
|
||||
continue
|
||||
case watch.Error:
|
||||
return zero, false, ErrWatchError
|
||||
_, _ = fmt.Fprintf(GinkgoWriter, "watch: received error event: %v — retrying\n", event.Object)
|
||||
return zero, false, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -129,8 +141,9 @@ func WatchUntilDeleted(
|
||||
defer cancel()
|
||||
|
||||
opts := metav1.ListOptions{
|
||||
FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String(),
|
||||
Watch: true,
|
||||
FieldSelector: fields.OneTermEqualSelector("metadata.name", name).String(),
|
||||
Watch: true,
|
||||
ResourceVersion: "0",
|
||||
}
|
||||
|
||||
for {
|
||||
@@ -159,6 +172,7 @@ func watchDeleteOnce(
|
||||
) (bool, error) {
|
||||
watcher, err := watchFunc(ctx, opts)
|
||||
if err != nil {
|
||||
_, _ = fmt.Fprintf(GinkgoWriter, "watch: failed to start delete watch: %v — retrying\n", err)
|
||||
return false, nil
|
||||
}
|
||||
defer watcher.Stop()
|
||||
@@ -175,7 +189,8 @@ func watchDeleteOnce(
|
||||
return true, nil
|
||||
}
|
||||
if event.Type == watch.Error {
|
||||
return false, ErrWatchError
|
||||
_, _ = fmt.Fprintf(GinkgoWriter, "watch: received error event during delete watch: %v — retrying\n", event.Object)
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user