From caebfd98f95b759b83333fb6ac24c5463170980e Mon Sep 17 00:00:00 2001 From: faizanahmad055 Date: Mon, 11 May 2026 21:59:32 +0200 Subject: [PATCH] Add e2e parallel Signed-off-by: faizanahmad055 --- Makefile | 10 +- internal/pkg/controller/controller.go | 88 +++++++++--- internal/pkg/controller/controller_test.go | 56 ++++---- test/e2e/README.md | 3 +- test/e2e/advanced/advanced_suite_test.go | 83 ++++++++---- .../e2e/annotations/annotations_suite_test.go | 116 +++++++++------- test/e2e/argo/argo_suite_test.go | 78 +++++++---- test/e2e/core/core_suite_test.go | 125 ++++++++++-------- test/e2e/csi/csi_suite_test.go | 92 ++++++++----- test/e2e/utils/testenv.go | 77 ++++++++++- 10 files changed, 485 insertions(+), 243 deletions(-) diff --git a/Makefile b/Makefile index 09a2577..17240b2 100644 --- a/Makefile +++ b/Makefile @@ -149,6 +149,10 @@ E2E_IMG ?= ghcr.io/stakater/reloader:test E2E_TIMEOUT ?= 45m KIND_CLUSTER ?= reloader-e2e CONTAINER_RUNTIME ?= $(shell command -v docker 2>/dev/null || command -v podman 2>/dev/null) +# Set SKIP_BUILD=true to skip the image build/load steps and use a pre-built image. +SKIP_BUILD ?= false +# Number of parallel Ginkgo workers. Set to 1 to run sequentially. +GINKGO_PROCS ?= 4 .PHONY: e2e-setup e2e-setup: ## One-time setup: create Kind cluster and install dependencies (Argo, CSI, Vault) @@ -161,7 +165,8 @@ e2e-setup: ## One-time setup: create Kind cluster and install dependencies (Argo ./scripts/e2e-cluster-setup.sh .PHONY: e2e -e2e: ## Run e2e tests (builds image, loads to Kind, runs tests in parallel) +e2e: ## Run e2e tests (build/load image unless SKIP_BUILD=true, then run tests in parallel) +ifneq ($(SKIP_BUILD),true) $(CONTAINER_RUNTIME) build -t $(E2E_IMG) -f Dockerfile . ifeq ($(notdir $(CONTAINER_RUNTIME)),podman) $(CONTAINER_RUNTIME) save $(E2E_IMG) -o /tmp/reloader-e2e.tar @@ -170,7 +175,8 @@ ifeq ($(notdir $(CONTAINER_RUNTIME)),podman) else kind load docker-image $(E2E_IMG) --name $(KIND_CLUSTER) endif - SKIP_BUILD=true RELOADER_IMAGE=$(E2E_IMG) "$(GOCMD)" tool ginkgo --keep-going -v --timeout=$(E2E_TIMEOUT) ./test/e2e/... +endif + RELOADER_IMAGE=$(E2E_IMG) "$(GOCMD)" tool ginkgo --keep-going -v --procs=$(GINKGO_PROCS) --timeout=$(E2E_TIMEOUT) ./test/e2e/... .PHONY: e2e-cleanup e2e-cleanup: ## Cleanup: remove test resources and delete Kind cluster diff --git a/internal/pkg/controller/controller.go b/internal/pkg/controller/controller.go index 7284504..4c2ced8 100644 --- a/internal/pkg/controller/controller.go +++ b/internal/pkg/controller/controller.go @@ -2,7 +2,6 @@ package controller import ( "fmt" - "slices" "sync" "sync/atomic" "time" @@ -48,11 +47,46 @@ type Controller struct { // read by the informer event handlers, so they must be atomic. var secretControllerInitialized atomic.Bool var configmapControllerInitialized atomic.Bool -var selectedNamespacesCache []string + +// selectedNamespacesCache holds an immutable snapshot of the set of namespace +// names that match the namespace label selector. Written exclusively by the +// namespace controller's informer goroutine; read concurrently by configmap/ +// secret controller informer goroutines. Using atomic.Value with an immutable +// map[string]struct{} snapshot avoids mutexes and prevents data races. +var selectedNamespacesCache atomic.Value // always stores map[string]struct{} + +// loadSelectedNamespaces returns the current namespace snapshot (never nil). +func loadSelectedNamespaces() map[string]struct{} { + if v := selectedNamespacesCache.Load(); v != nil { + return v.(map[string]struct{}) + } + return map[string]struct{}{} +} + +// storeSelectedNamespaces replaces the current snapshot with one built from ns. +// It is the only mutator of selectedNamespacesCache and is called only from +// the namespace controller's informer goroutine (or from tests for setup). +func storeSelectedNamespaces(ns []string) { + m := make(map[string]struct{}, len(ns)) + for _, n := range ns { + m[n] = struct{}{} + } + selectedNamespacesCache.Store(m) +} + +// loadSelectedNamespacesList returns the current namespace names as a slice. +// Intended for use in tests where slice-based assertions are more convenient. +func loadSelectedNamespacesList() []string { + m := loadSelectedNamespaces() + result := make([]string, 0, len(m)) + for k := range m { + result = append(result, k) + } + return result +} // NewController for initializing a Controller -func NewController(client kubernetes.Interface, resource string, namespace string, ignoredNamespaces []string, namespaceLabelSelector string, resourceLabelSelector string, collectors metrics.Collectors) (*Controller, - error) { +func NewController(client kubernetes.Interface, resource string, namespace string, ignoredNamespaces []string, namespaceLabelSelector string, resourceLabelSelector string, collectors metrics.Collectors) (*Controller, error) { if options.SyncAfterRestart { secretControllerInitialized.Store(true) configmapControllerInitialized.Store(true) @@ -155,36 +189,45 @@ func (c *Controller) resourceInSelectedNamespaces(raw interface{}) bool { return true } + namespaces := loadSelectedNamespaces() + var ns string switch object := raw.(type) { case *v1.ConfigMap: - if slices.Contains(selectedNamespacesCache, object.GetNamespace()) { - return true - } + ns = object.GetNamespace() case *v1.Secret: - if slices.Contains(selectedNamespacesCache, object.GetNamespace()) { - return true - } + ns = object.GetNamespace() case *csiv1.SecretProviderClassPodStatus: - if slices.Contains(selectedNamespacesCache, object.GetNamespace()) { - return true - } + ns = object.GetNamespace() + default: + return false } - return false + _, ok := namespaces[ns] + return ok } func (c *Controller) addSelectedNamespaceToCache(namespace v1.Namespace) { - selectedNamespacesCache = append(selectedNamespacesCache, namespace.GetName()) + old := loadSelectedNamespaces() + next := make(map[string]struct{}, len(old)+1) + for k := range old { + next[k] = struct{}{} + } + next[namespace.GetName()] = struct{}{} + selectedNamespacesCache.Store(next) logrus.Infof("added namespace to be watched: %s", namespace.GetName()) } func (c *Controller) removeSelectedNamespaceFromCache(namespace v1.Namespace) { - for i, v := range selectedNamespacesCache { - if v == namespace.GetName() { - selectedNamespacesCache = append(selectedNamespacesCache[:i], selectedNamespacesCache[i+1:]...) - logrus.Infof("removed namespace from watch: %s", namespace.GetName()) - return - } + old := loadSelectedNamespaces() + if _, ok := old[namespace.GetName()]; !ok { + return } + next := make(map[string]struct{}, len(old)) + for k := range old { + next[k] = struct{}{} + } + delete(next, namespace.GetName()) + selectedNamespacesCache.Store(next) + logrus.Infof("removed namespace from watch: %s", namespace.GetName()) } // Update function to add an old object and a new object to the queue in case of updating a resource @@ -319,6 +362,9 @@ func (c *Controller) processNextItem() bool { rh, ok := resourceHandler.(handler.ResourceHandler) if !ok { logrus.Errorf("Invalid resource handler type: %T", resourceHandler) + // Clear rate-limiter state so the item doesn't leak memory in the queue. + c.queue.Forget(resourceHandler) + c.collectors.RecordError("invalid_handler_type") return true } err := rh.Handle() diff --git a/internal/pkg/controller/controller_test.go b/internal/pkg/controller/controller_test.go index 342ab5d..c318c9f 100644 --- a/internal/pkg/controller/controller_test.go +++ b/internal/pkg/controller/controller_test.go @@ -45,7 +45,7 @@ func (m *mockResourceHandler) GetEnqueueTime() time.Time { func resetGlobalState() { secretControllerInitialized.Store(false) configmapControllerInitialized.Store(false) - selectedNamespacesCache = []string{} + storeSelectedNamespaces([]string{}) } // newTestController creates a controller for testing without starting informers @@ -223,12 +223,12 @@ func TestResourceInSelectedNamespaces(t *testing.T) { for _, tt := range tests { t.Run( tt.name, func(t *testing.T) { - resetGlobalState() - selectedNamespacesCache = tt.cachedNamespaces + resetGlobalState() + storeSelectedNamespaces(tt.cachedNamespaces) - c := newTestController([]string{}, tt.namespaceSelector) - result := c.resourceInSelectedNamespaces(tt.resource) - assert.Equal(t, tt.expected, result) + c := newTestController([]string{}, tt.namespaceSelector) + result := c.resourceInSelectedNamespaces(tt.resource) + assert.Equal(t, tt.expected, result) }, ) } @@ -244,17 +244,17 @@ func TestAddSelectedNamespaceToCache(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Name: "namespace-1"}, } c.addSelectedNamespaceToCache(ns1) - assert.Contains(t, selectedNamespacesCache, "namespace-1") - assert.Len(t, selectedNamespacesCache, 1) + assert.Contains(t, loadSelectedNamespaces(), "namespace-1") + assert.Len(t, loadSelectedNamespaces(), 1) // Add second namespace ns2 := v1.Namespace{ ObjectMeta: metav1.ObjectMeta{Name: "namespace-2"}, } c.addSelectedNamespaceToCache(ns2) - assert.Contains(t, selectedNamespacesCache, "namespace-1") - assert.Contains(t, selectedNamespacesCache, "namespace-2") - assert.Len(t, selectedNamespacesCache, 2) + assert.Contains(t, loadSelectedNamespaces(), "namespace-1") + assert.Contains(t, loadSelectedNamespaces(), "namespace-2") + assert.Len(t, loadSelectedNamespaces(), 2) } func TestRemoveSelectedNamespaceFromCache(t *testing.T) { @@ -293,16 +293,16 @@ func TestRemoveSelectedNamespaceFromCache(t *testing.T) { for _, tt := range tests { t.Run( tt.name, func(t *testing.T) { - resetGlobalState() - selectedNamespacesCache = tt.initialCache + resetGlobalState() + storeSelectedNamespaces(tt.initialCache) - c := newTestController([]string{}, "env=prod") - ns := v1.Namespace{ - ObjectMeta: metav1.ObjectMeta{Name: tt.namespaceToRemove}, - } - c.removeSelectedNamespaceFromCache(ns) + c := newTestController([]string{}, "env=prod") + ns := v1.Namespace{ + ObjectMeta: metav1.ObjectMeta{Name: tt.namespaceToRemove}, + } + c.removeSelectedNamespaceFromCache(ns) - assert.Equal(t, tt.expectedCache, selectedNamespacesCache) + assert.ElementsMatch(t, tt.expectedCache, loadSelectedNamespacesList()) }, ) } @@ -500,10 +500,10 @@ func TestUpdateHandler(t *testing.T) { for _, tt := range tests { t.Run( tt.name, func(t *testing.T) { - resetGlobalState() - if tt.cachedNamespaces != nil { - selectedNamespacesCache = tt.cachedNamespaces - } + resetGlobalState() + if tt.cachedNamespaces != nil { + storeSelectedNamespaces(tt.cachedNamespaces) + } c := newTestController(tt.ignoredNamespaces, tt.namespaceSelector) c.Update(tt.oldResource, tt.newResource) @@ -675,13 +675,13 @@ func TestAddHandlerWithNamespaceEvent(t *testing.T) { c.Add(ns) - assert.Contains(t, selectedNamespacesCache, "new-namespace") + assert.Contains(t, loadSelectedNamespaces(), "new-namespace") assert.Equal(t, 0, c.queue.Len(), "Namespace add should not queue anything") } func TestDeleteHandlerWithNamespaceEvent(t *testing.T) { resetGlobalState() - selectedNamespacesCache = []string{"ns-1", "ns-to-delete", "ns-2"} + storeSelectedNamespaces([]string{"ns-1", "ns-to-delete", "ns-2"}) c := newTestController([]string{}, "env=prod") options.ReloadOnDelete = "true" @@ -694,9 +694,9 @@ func TestDeleteHandlerWithNamespaceEvent(t *testing.T) { c.Delete(ns) - assert.NotContains(t, selectedNamespacesCache, "ns-to-delete") - assert.Contains(t, selectedNamespacesCache, "ns-1") - assert.Contains(t, selectedNamespacesCache, "ns-2") + assert.NotContains(t, loadSelectedNamespaces(), "ns-to-delete") + assert.Contains(t, loadSelectedNamespaces(), "ns-1") + assert.Contains(t, loadSelectedNamespaces(), "ns-2") assert.Equal(t, 0, c.queue.Len(), "Namespace delete should not queue anything") } diff --git a/test/e2e/README.md b/test/e2e/README.md index eae94ac..cfa989d 100644 --- a/test/e2e/README.md +++ b/test/e2e/README.md @@ -43,9 +43,10 @@ SKIP_BUILD=true RELOADER_IMAGE=ghcr.io/stakater/reloader:v1.2.0 make e2e | Variable | Default | Description | |----------|---------|-------------| | `RELOADER_IMAGE` | `ghcr.io/stakater/reloader:test` | Image to test | -| `SKIP_BUILD` | `false` | Skip image build | +| `SKIP_BUILD` | `false` | Skip the container image build and Kind load steps; requires `RELOADER_IMAGE` to point to an already-loaded image | | `KIND_CLUSTER` | `reloader-e2e` | Kind cluster name | | `E2E_TIMEOUT` | `45m` | Test timeout | +| `GINKGO_PROCS` | `4` | Number of parallel Ginkgo worker processes | ## Test Structure diff --git a/test/e2e/advanced/advanced_suite_test.go b/test/e2e/advanced/advanced_suite_test.go index a9ca674..84abcd7 100644 --- a/test/e2e/advanced/advanced_suite_test.go +++ b/test/e2e/advanced/advanced_suite_test.go @@ -2,6 +2,7 @@ package advanced import ( "context" + "encoding/json" "testing" . "github.com/onsi/ginkgo/v2" @@ -27,37 +28,63 @@ func TestAdvanced(t *testing.T) { RunSpecs(t, "Advanced E2E Suite") } -var _ = BeforeSuite(func() { - var err error - ctx = context.Background() +// SynchronizedBeforeSuite ensures only process 1 deploys Reloader. +// The namespace and release name are forwarded to all other processes so they +// share a single Reloader instance, avoiding resource exhaustion on Kind. +var _ = SynchronizedBeforeSuite( + // Process 1 only: create namespace, deploy Reloader. + func() []byte { + setupEnv, err := utils.SetupTestEnvironment(context.Background(), "reloader-advanced") + Expect(err).NotTo(HaveOccurred(), "Failed to setup test environment") - testEnv, err = utils.SetupTestEnvironment(ctx, "reloader-advanced") - Expect(err).NotTo(HaveOccurred(), "Failed to setup test environment") + deployValues := map[string]string{ + "reloader.reloadStrategy": "annotations", + "reloader.watchGlobally": "false", + } + if utils.IsCSIDriverInstalled(context.Background(), setupEnv.CSIClient) { + deployValues["reloader.enableCSIIntegration"] = "true" + GinkgoWriter.Println("Deploying Reloader with CSI integration support") + } - kubeClient = testEnv.KubeClient - csiClient = testEnv.CSIClient - restConfig = testEnv.RestConfig - testNamespace = testEnv.Namespace + Expect(setupEnv.DeployAndWait(deployValues)).To(Succeed(), "Failed to deploy Reloader") - deployValues := map[string]string{ - "reloader.reloadStrategy": "annotations", - "reloader.watchGlobally": "false", - } + data, err := json.Marshal(utils.SharedEnvData{ + Namespace: setupEnv.Namespace, + ReleaseName: setupEnv.ReleaseName, + }) + Expect(err).NotTo(HaveOccurred()) + return data + }, + // All processes (including #1): connect to the shared environment. + func(data []byte) { + var shared utils.SharedEnvData + Expect(json.Unmarshal(data, &shared)).To(Succeed()) - if utils.IsCSIDriverInstalled(ctx, csiClient) { - deployValues["reloader.enableCSIIntegration"] = "true" - GinkgoWriter.Println("Deploying Reloader with CSI integration support") - } + var err error + testEnv, err = utils.SetupSharedTestEnvironment(context.Background(), shared.Namespace, shared.ReleaseName) + Expect(err).NotTo(HaveOccurred(), "Failed to setup shared test environment") - err = testEnv.DeployAndWait(deployValues) - Expect(err).NotTo(HaveOccurred(), "Failed to deploy Reloader") -}) + kubeClient = testEnv.KubeClient + csiClient = testEnv.CSIClient + restConfig = testEnv.RestConfig + testNamespace = testEnv.Namespace + ctx = testEnv.Ctx + }, +) -var _ = AfterSuite(func() { - if testEnv != nil { - err := testEnv.Cleanup() - Expect(err).NotTo(HaveOccurred(), "Failed to cleanup test environment") - } - - GinkgoWriter.Println("Advanced E2E Suite cleanup complete") -}) +var _ = SynchronizedAfterSuite( + // All processes: cancel the per-process context. + func() { + if testEnv != nil { + testEnv.Cancel() + } + }, + // Process 1 only (runs last): undeploy Reloader and delete namespace. + func() { + if testEnv != nil { + err := testEnv.Cleanup() + Expect(err).NotTo(HaveOccurred(), "Failed to cleanup test environment") + } + GinkgoWriter.Println("Advanced E2E Suite cleanup complete") + }, +) diff --git a/test/e2e/annotations/annotations_suite_test.go b/test/e2e/annotations/annotations_suite_test.go index 586dfaf..7ce3de6 100644 --- a/test/e2e/annotations/annotations_suite_test.go +++ b/test/e2e/annotations/annotations_suite_test.go @@ -2,6 +2,7 @@ package annotations import ( "context" + "encoding/json" "testing" . "github.com/onsi/ginkgo/v2" @@ -19,7 +20,6 @@ var ( restConfig *rest.Config testNamespace string ctx context.Context - cancel context.CancelFunc testEnv *utils.TestEnvironment registry *utils.AdapterRegistry ) @@ -29,57 +29,77 @@ func TestAnnotations(t *testing.T) { RunSpecs(t, "Annotations Strategy E2E Suite") } -var _ = BeforeSuite(func() { - var err error - ctx, cancel = context.WithCancel(context.Background()) +// SynchronizedBeforeSuite ensures only process 1 deploys Reloader. +// The namespace and release name are forwarded to all other processes so they +// share a single Reloader instance, avoiding resource exhaustion on Kind. +var _ = SynchronizedBeforeSuite( + // Process 1 only: create namespace, deploy Reloader. + func() []byte { + setupEnv, err := utils.SetupTestEnvironment(context.Background(), "reloader-annotations-test") + Expect(err).NotTo(HaveOccurred(), "Failed to setup test environment") - testEnv, err = utils.SetupTestEnvironment(ctx, "reloader-annotations-test") - Expect(err).NotTo(HaveOccurred(), "Failed to setup test environment") + deployValues := map[string]string{ + "reloader.reloadStrategy": "annotations", + "reloader.watchGlobally": "false", + } + if utils.IsCSIDriverInstalled(context.Background(), setupEnv.CSIClient) { + deployValues["reloader.enableCSIIntegration"] = "true" + GinkgoWriter.Println("Deploying Reloader with CSI integration support") + } - kubeClient = testEnv.KubeClient - csiClient = testEnv.CSIClient - restConfig = testEnv.RestConfig - testNamespace = testEnv.Namespace + Expect(setupEnv.DeployAndWait(deployValues)).To(Succeed(), "Failed to deploy Reloader") - registry = utils.NewAdapterRegistry(kubeClient) + data, err := json.Marshal(utils.SharedEnvData{ + Namespace: setupEnv.Namespace, + ReleaseName: setupEnv.ReleaseName, + }) + Expect(err).NotTo(HaveOccurred()) + return data + }, + // All processes (including #1): connect to shared environment and build adapter registry. + func(data []byte) { + var shared utils.SharedEnvData + Expect(json.Unmarshal(data, &shared)).To(Succeed()) - if utils.IsArgoRolloutsInstalled(ctx, testEnv.RolloutsClient) { - GinkgoWriter.Println("Argo Rollouts detected, registering ArgoRolloutAdapter") - registry.RegisterAdapter(utils.NewArgoRolloutAdapter(testEnv.RolloutsClient)) - } else { - GinkgoWriter.Println("Argo Rollouts not detected, skipping ArgoRolloutAdapter registration") - } + var err error + testEnv, err = utils.SetupSharedTestEnvironment(context.Background(), shared.Namespace, shared.ReleaseName) + Expect(err).NotTo(HaveOccurred(), "Failed to setup shared test environment") - if utils.HasDeploymentConfigSupport(testEnv.DiscoveryClient) && testEnv.OpenShiftClient != nil { - GinkgoWriter.Println("OpenShift detected, registering DeploymentConfigAdapter") - registry.RegisterAdapter(utils.NewDeploymentConfigAdapter(testEnv.OpenShiftClient)) - } else { - GinkgoWriter.Println("OpenShift not detected, skipping DeploymentConfigAdapter registration") - } + kubeClient = testEnv.KubeClient + csiClient = testEnv.CSIClient + restConfig = testEnv.RestConfig + testNamespace = testEnv.Namespace + ctx = testEnv.Ctx - deployValues := map[string]string{ - "reloader.reloadStrategy": "annotations", - "reloader.watchGlobally": "false", - } + registry = utils.NewAdapterRegistry(kubeClient) + if utils.IsArgoRolloutsInstalled(ctx, testEnv.RolloutsClient) { + GinkgoWriter.Println("Argo Rollouts detected, registering ArgoRolloutAdapter") + registry.RegisterAdapter(utils.NewArgoRolloutAdapter(testEnv.RolloutsClient)) + } else { + GinkgoWriter.Println("Argo Rollouts not detected, skipping ArgoRolloutAdapter registration") + } + if utils.HasDeploymentConfigSupport(testEnv.DiscoveryClient) && testEnv.OpenShiftClient != nil { + GinkgoWriter.Println("OpenShift detected, registering DeploymentConfigAdapter") + registry.RegisterAdapter(utils.NewDeploymentConfigAdapter(testEnv.OpenShiftClient)) + } else { + GinkgoWriter.Println("OpenShift not detected, skipping DeploymentConfigAdapter registration") + } + }, +) - if utils.IsCSIDriverInstalled(ctx, csiClient) { - deployValues["reloader.enableCSIIntegration"] = "true" - GinkgoWriter.Println("Deploying Reloader with CSI integration support") - } - - err = testEnv.DeployAndWait(deployValues) - Expect(err).NotTo(HaveOccurred(), "Failed to deploy Reloader") -}) - -var _ = AfterSuite(func() { - if testEnv != nil { - err := testEnv.Cleanup() - Expect(err).NotTo(HaveOccurred(), "Failed to cleanup test environment") - } - - if cancel != nil { - cancel() - } - - GinkgoWriter.Println("Annotations E2E Suite cleanup complete") -}) +var _ = SynchronizedAfterSuite( + // All processes: cancel the per-process context. + func() { + if testEnv != nil { + testEnv.Cancel() + } + }, + // Process 1 only (runs last): undeploy Reloader and delete namespace. + func() { + if testEnv != nil { + err := testEnv.Cleanup() + Expect(err).NotTo(HaveOccurred(), "Failed to cleanup test environment") + } + GinkgoWriter.Println("Annotations E2E Suite cleanup complete") + }, +) diff --git a/test/e2e/argo/argo_suite_test.go b/test/e2e/argo/argo_suite_test.go index 0dcf616..6a59935 100644 --- a/test/e2e/argo/argo_suite_test.go +++ b/test/e2e/argo/argo_suite_test.go @@ -2,6 +2,7 @@ package argo import ( "context" + "encoding/json" "testing" rolloutsclient "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned" @@ -25,34 +26,61 @@ func TestArgo(t *testing.T) { RunSpecs(t, "Argo Rollouts E2E Suite") } -var _ = BeforeSuite(func() { - var err error - ctx = context.Background() +// SynchronizedBeforeSuite ensures only process 1 deploys Reloader. +// Process 1 also checks for Argo Rollouts and calls Skip if not installed — +// Ginkgo propagates the skip to all processes. +var _ = SynchronizedBeforeSuite( + // Process 1 only: check prerequisites, create namespace, deploy Reloader. + func() []byte { + setupEnv, err := utils.SetupTestEnvironment(context.Background(), "reloader-argo") + Expect(err).NotTo(HaveOccurred(), "Failed to setup test environment") - testEnv, err = utils.SetupTestEnvironment(ctx, "reloader-argo") - Expect(err).NotTo(HaveOccurred(), "Failed to setup test environment") + if !utils.IsArgoRolloutsInstalled(context.Background(), setupEnv.RolloutsClient) { + Skip("Argo Rollouts is not installed. Run ./scripts/e2e-cluster-setup.sh first") + } + GinkgoWriter.Println("Argo Rollouts is installed") - kubeClient = testEnv.KubeClient - rolloutsClient = testEnv.RolloutsClient - testNamespace = testEnv.Namespace + Expect(setupEnv.DeployAndWait(map[string]string{ + "reloader.reloadStrategy": "annotations", + "reloader.isArgoRollouts": "true", + })).To(Succeed(), "Failed to deploy Reloader") - if !utils.IsArgoRolloutsInstalled(ctx, rolloutsClient) { - Skip("Argo Rollouts is not installed. Run ./scripts/e2e-cluster-setup.sh first") - } - GinkgoWriter.Println("Argo Rollouts is installed") + data, err := json.Marshal(utils.SharedEnvData{ + Namespace: setupEnv.Namespace, + ReleaseName: setupEnv.ReleaseName, + }) + Expect(err).NotTo(HaveOccurred()) + return data + }, + // All processes (including #1): connect to the shared environment. + func(data []byte) { + var shared utils.SharedEnvData + Expect(json.Unmarshal(data, &shared)).To(Succeed()) - err = testEnv.DeployAndWait(map[string]string{ - "reloader.reloadStrategy": "annotations", - "reloader.isArgoRollouts": "true", - }) - Expect(err).NotTo(HaveOccurred(), "Failed to deploy Reloader") -}) + var err error + testEnv, err = utils.SetupSharedTestEnvironment(context.Background(), shared.Namespace, shared.ReleaseName) + Expect(err).NotTo(HaveOccurred(), "Failed to setup shared test environment") -var _ = AfterSuite(func() { - if testEnv != nil { - err := testEnv.Cleanup() - Expect(err).NotTo(HaveOccurred(), "Failed to cleanup test environment") - } + kubeClient = testEnv.KubeClient + rolloutsClient = testEnv.RolloutsClient + testNamespace = testEnv.Namespace + ctx = testEnv.Ctx + }, +) - GinkgoWriter.Println("Argo Rollouts E2E Suite cleanup complete (Argo Rollouts preserved for other suites)") -}) +var _ = SynchronizedAfterSuite( + // All processes: cancel the per-process context. + func() { + if testEnv != nil { + testEnv.Cancel() + } + }, + // Process 1 only (runs last): undeploy Reloader and delete namespace. + func() { + if testEnv != nil { + err := testEnv.Cleanup() + Expect(err).NotTo(HaveOccurred(), "Failed to cleanup test environment") + } + GinkgoWriter.Println("Argo Rollouts E2E Suite cleanup complete (Argo Rollouts preserved for other suites)") + }, +) diff --git a/test/e2e/core/core_suite_test.go b/test/e2e/core/core_suite_test.go index d3449ba..82e8582 100644 --- a/test/e2e/core/core_suite_test.go +++ b/test/e2e/core/core_suite_test.go @@ -2,6 +2,7 @@ package core import ( "context" + "encoding/json" "testing" . "github.com/onsi/ginkgo/v2" @@ -19,7 +20,6 @@ var ( restConfig *rest.Config testNamespace string ctx context.Context - cancel context.CancelFunc testEnv *utils.TestEnvironment registry *utils.AdapterRegistry ) @@ -29,62 +29,81 @@ func TestCore(t *testing.T) { RunSpecs(t, "Core Workload E2E Suite") } -var _ = BeforeSuite(func() { - var err error - ctx, cancel = context.WithCancel(context.Background()) +// SynchronizedBeforeSuite ensures only process 1 deploys Reloader. +// The namespace and release name are forwarded to all other processes so they +// share a single Reloader instance, avoiding resource exhaustion on Kind. +var _ = SynchronizedBeforeSuite( + // Process 1 only: create namespace, deploy Reloader. + func() []byte { + setupEnv, err := utils.SetupTestEnvironment(context.Background(), "reloader-core-test") + Expect(err).NotTo(HaveOccurred(), "Failed to setup test environment") - testEnv, err = utils.SetupTestEnvironment(ctx, "reloader-core-test") - Expect(err).NotTo(HaveOccurred(), "Failed to setup test environment") + deployValues := map[string]string{ + "reloader.reloadStrategy": "annotations", + "reloader.watchGlobally": "false", + } + if utils.IsArgoRolloutsInstalled(context.Background(), setupEnv.RolloutsClient) { + deployValues["reloader.isArgoRollouts"] = "true" + GinkgoWriter.Println("Deploying Reloader with Argo Rollouts support") + } + if utils.IsCSIDriverInstalled(context.Background(), setupEnv.CSIClient) { + deployValues["reloader.enableCSIIntegration"] = "true" + GinkgoWriter.Println("Deploying Reloader with CSI integration support") + } - kubeClient = testEnv.KubeClient - csiClient = testEnv.CSIClient - restConfig = testEnv.RestConfig - testNamespace = testEnv.Namespace + Expect(setupEnv.DeployAndWait(deployValues)).To(Succeed(), "Failed to deploy Reloader") - registry = utils.NewAdapterRegistry(kubeClient) + data, err := json.Marshal(utils.SharedEnvData{ + Namespace: setupEnv.Namespace, + ReleaseName: setupEnv.ReleaseName, + }) + Expect(err).NotTo(HaveOccurred()) + return data + }, + // All processes (including #1): connect to shared environment and build adapter registry. + func(data []byte) { + var shared utils.SharedEnvData + Expect(json.Unmarshal(data, &shared)).To(Succeed()) - if utils.IsArgoRolloutsInstalled(ctx, testEnv.RolloutsClient) { - GinkgoWriter.Println("Argo Rollouts detected, registering ArgoRolloutAdapter") - registry.RegisterAdapter(utils.NewArgoRolloutAdapter(testEnv.RolloutsClient)) - } else { - GinkgoWriter.Println("Argo Rollouts not detected, skipping ArgoRolloutAdapter registration") - } + var err error + testEnv, err = utils.SetupSharedTestEnvironment(context.Background(), shared.Namespace, shared.ReleaseName) + Expect(err).NotTo(HaveOccurred(), "Failed to setup shared test environment") - if utils.HasDeploymentConfigSupport(testEnv.DiscoveryClient) && testEnv.OpenShiftClient != nil { - GinkgoWriter.Println("OpenShift detected, registering DeploymentConfigAdapter") - registry.RegisterAdapter(utils.NewDeploymentConfigAdapter(testEnv.OpenShiftClient)) - } else { - GinkgoWriter.Println("OpenShift not detected, skipping DeploymentConfigAdapter registration") - } + kubeClient = testEnv.KubeClient + csiClient = testEnv.CSIClient + restConfig = testEnv.RestConfig + testNamespace = testEnv.Namespace + ctx = testEnv.Ctx - deployValues := map[string]string{ - "reloader.reloadStrategy": "annotations", - "reloader.watchGlobally": "false", - } + registry = utils.NewAdapterRegistry(kubeClient) + if utils.IsArgoRolloutsInstalled(ctx, testEnv.RolloutsClient) { + GinkgoWriter.Println("Argo Rollouts detected, registering ArgoRolloutAdapter") + registry.RegisterAdapter(utils.NewArgoRolloutAdapter(testEnv.RolloutsClient)) + } else { + GinkgoWriter.Println("Argo Rollouts not detected, skipping ArgoRolloutAdapter registration") + } + if utils.HasDeploymentConfigSupport(testEnv.DiscoveryClient) && testEnv.OpenShiftClient != nil { + GinkgoWriter.Println("OpenShift detected, registering DeploymentConfigAdapter") + registry.RegisterAdapter(utils.NewDeploymentConfigAdapter(testEnv.OpenShiftClient)) + } else { + GinkgoWriter.Println("OpenShift not detected, skipping DeploymentConfigAdapter registration") + } + }, +) - if utils.IsArgoRolloutsInstalled(ctx, testEnv.RolloutsClient) { - deployValues["reloader.isArgoRollouts"] = "true" - GinkgoWriter.Println("Deploying Reloader with Argo Rollouts support") - } - - if utils.IsCSIDriverInstalled(ctx, csiClient) { - deployValues["reloader.enableCSIIntegration"] = "true" - GinkgoWriter.Println("Deploying Reloader with CSI integration support") - } - - err = testEnv.DeployAndWait(deployValues) - Expect(err).NotTo(HaveOccurred(), "Failed to deploy Reloader") -}) - -var _ = AfterSuite(func() { - if testEnv != nil { - err := testEnv.Cleanup() - Expect(err).NotTo(HaveOccurred(), "Failed to cleanup test environment") - } - - if cancel != nil { - cancel() - } - - GinkgoWriter.Println("Core E2E Suite cleanup complete") -}) +var _ = SynchronizedAfterSuite( + // All processes: cancel the per-process context. + func() { + if testEnv != nil { + testEnv.Cancel() + } + }, + // Process 1 only (runs last): undeploy Reloader and delete namespace. + func() { + if testEnv != nil { + err := testEnv.Cleanup() + Expect(err).NotTo(HaveOccurred(), "Failed to cleanup test environment") + } + GinkgoWriter.Println("Core E2E Suite cleanup complete") + }, +) diff --git a/test/e2e/csi/csi_suite_test.go b/test/e2e/csi/csi_suite_test.go index a8746bb..7ea4a36 100644 --- a/test/e2e/csi/csi_suite_test.go +++ b/test/e2e/csi/csi_suite_test.go @@ -2,6 +2,7 @@ package csi import ( "context" + "encoding/json" "testing" . "github.com/onsi/ginkgo/v2" @@ -19,7 +20,6 @@ var ( restConfig *rest.Config testNamespace string ctx context.Context - cancel context.CancelFunc testEnv *utils.TestEnvironment ) @@ -28,43 +28,65 @@ func TestCSI(t *testing.T) { RunSpecs(t, "CSI SecretProviderClass E2E Suite") } -var _ = BeforeSuite(func() { - var err error - ctx, cancel = context.WithCancel(context.Background()) +// SynchronizedBeforeSuite ensures only process 1 deploys Reloader. +// Process 1 also checks prerequisites (CSI driver, Vault) and calls Skip if +// they are not installed — Ginkgo propagates the skip to all processes. +var _ = SynchronizedBeforeSuite( + // Process 1 only: check prerequisites, create namespace, deploy Reloader. + func() []byte { + setupEnv, err := utils.SetupTestEnvironment(context.Background(), "reloader-csi-test") + Expect(err).NotTo(HaveOccurred(), "Failed to setup test environment") - testEnv, err = utils.SetupTestEnvironment(ctx, "reloader-csi-test") - Expect(err).NotTo(HaveOccurred(), "Failed to setup test environment") + if !utils.IsCSIDriverInstalled(context.Background(), setupEnv.CSIClient) { + Skip("CSI secrets store driver not installed - skipping CSI suite") + } + if !utils.IsVaultProviderInstalled(context.Background(), setupEnv.KubeClient) { + Skip("Vault CSI provider not installed - skipping CSI suite") + } - kubeClient = testEnv.KubeClient - csiClient = testEnv.CSIClient - restConfig = testEnv.RestConfig - testNamespace = testEnv.Namespace + Expect(setupEnv.DeployAndWait(map[string]string{ + "reloader.reloadStrategy": "annotations", + "reloader.watchGlobally": "false", + "reloader.enableCSIIntegration": "true", + })).To(Succeed(), "Failed to deploy Reloader") - if !utils.IsCSIDriverInstalled(ctx, csiClient) { - Skip("CSI secrets store driver not installed - skipping CSI suite") - } + data, err := json.Marshal(utils.SharedEnvData{ + Namespace: setupEnv.Namespace, + ReleaseName: setupEnv.ReleaseName, + }) + Expect(err).NotTo(HaveOccurred()) + return data + }, + // All processes (including #1): connect to the shared environment. + func(data []byte) { + var shared utils.SharedEnvData + Expect(json.Unmarshal(data, &shared)).To(Succeed()) - if !utils.IsVaultProviderInstalled(ctx, kubeClient) { - Skip("Vault CSI provider not installed - skipping CSI suite") - } + var err error + testEnv, err = utils.SetupSharedTestEnvironment(context.Background(), shared.Namespace, shared.ReleaseName) + Expect(err).NotTo(HaveOccurred(), "Failed to setup shared test environment") - err = testEnv.DeployAndWait(map[string]string{ - "reloader.reloadStrategy": "annotations", - "reloader.watchGlobally": "false", - "reloader.enableCSIIntegration": "true", - }) - Expect(err).NotTo(HaveOccurred(), "Failed to deploy Reloader") -}) + kubeClient = testEnv.KubeClient + csiClient = testEnv.CSIClient + restConfig = testEnv.RestConfig + testNamespace = testEnv.Namespace + ctx = testEnv.Ctx + }, +) -var _ = AfterSuite(func() { - if testEnv != nil { - err := testEnv.Cleanup() - Expect(err).NotTo(HaveOccurred(), "Failed to cleanup test environment") - } - - if cancel != nil { - cancel() - } - - GinkgoWriter.Println("CSI E2E Suite cleanup complete") -}) +var _ = SynchronizedAfterSuite( + // All processes: cancel the per-process context. + func() { + if testEnv != nil { + testEnv.Cancel() + } + }, + // Process 1 only (runs last): undeploy Reloader and delete namespace. + func() { + if testEnv != nil { + err := testEnv.Cleanup() + Expect(err).NotTo(HaveOccurred(), "Failed to cleanup test environment") + } + GinkgoWriter.Println("CSI E2E Suite cleanup complete") + }, +) diff --git a/test/e2e/utils/testenv.go b/test/e2e/utils/testenv.go index 063d449..cf95b7d 100644 --- a/test/e2e/utils/testenv.go +++ b/test/e2e/utils/testenv.go @@ -3,6 +3,7 @@ package utils import ( "context" "fmt" + "time" rolloutsclient "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned" "github.com/onsi/ginkgo/v2" @@ -31,6 +32,71 @@ type TestEnvironment struct { ProjectDir string } +// SharedEnvData is passed from process 1 to all other processes via +// SynchronizedBeforeSuite. It carries the namespace and release name that +// process 1 created so the other processes can reuse them. +type SharedEnvData struct { + Namespace string `json:"namespace"` + ReleaseName string `json:"releaseName"` +} + +// SetupSharedTestEnvironment creates a TestEnvironment that connects to an +// already-provisioned namespace and Helm release. It builds Kubernetes clients +// but does NOT create a new namespace or deploy Reloader. Use this in the +// allProcsBody of SynchronizedBeforeSuite so that processes 2-N can share the +// single Reloader instance that process 1 deployed. +func SetupSharedTestEnvironment(ctx context.Context, namespace, releaseName string) (*TestEnvironment, error) { + childCtx, cancel := context.WithCancel(ctx) + env := &TestEnvironment{ + Ctx: childCtx, + Cancel: cancel, + TestImage: GetTestImage(), + Namespace: namespace, + ReleaseName: releaseName, + } + + var err error + + env.ProjectDir, err = GetProjectDir() + if err != nil { + cancel() + return nil, fmt.Errorf("getting project directory: %w", err) + } + + kubeconfig := GetKubeconfig() + config, err := clientcmd.BuildConfigFromFlags("", kubeconfig) + if err != nil { + cancel() + return nil, fmt.Errorf("building config from kubeconfig: %w", err) + } + env.RestConfig = config + + env.KubeClient, err = kubernetes.NewForConfig(config) + if err != nil { + cancel() + return nil, fmt.Errorf("creating kubernetes client: %w", err) + } + + env.DiscoveryClient, err = discovery.NewDiscoveryClientForConfig(config) + if err != nil { + cancel() + return nil, fmt.Errorf("creating discovery client: %w", err) + } + + // Optional clients — failures are non-fatal. + if env.CSIClient, err = csiclient.NewForConfig(config); err != nil { + env.CSIClient = nil + } + if env.RolloutsClient, err = rolloutsclient.NewForConfig(config); err != nil { + env.RolloutsClient = nil + } + if env.OpenShiftClient, err = openshiftclient.NewForConfig(config); err != nil { + env.OpenShiftClient = nil + } + + return env, nil +} + // SetupTestEnvironment creates a new test environment with kubernetes clients. // It creates a unique namespace with the given prefix. The returned env.Cancel must be // called (e.g., in AfterSuite) to release the child context after env.Cleanup() completes. @@ -112,15 +178,22 @@ func SetupTestEnvironment(ctx context.Context, namespacePrefix string) (*TestEnv } // Cleanup cleans up the test environment resources. +// It uses a fresh context so it can run safely even after the suite context +// has been cancelled by SynchronizedAfterSuite. func (e *TestEnvironment) Cleanup() error { if e.Namespace == "" { return nil } + // Use a fresh context with a generous timeout so cleanup works even + // after the per-process context (e.Ctx) has been cancelled. + cleanupCtx, cleanupCancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cleanupCancel() + ginkgo.GinkgoWriter.Printf("Cleaning up test namespace: %s\n", e.Namespace) ginkgo.GinkgoWriter.Printf("Cleaning up Helm release: %s\n", e.ReleaseName) - logs, err := GetPodLogs(e.Ctx, e.KubeClient, e.Namespace, ReloaderPodSelector(e.ReleaseName)) + logs, err := GetPodLogs(cleanupCtx, e.KubeClient, e.Namespace, ReloaderPodSelector(e.ReleaseName)) if err == nil && logs != "" { ginkgo.GinkgoWriter.Println("Reloader logs:") ginkgo.GinkgoWriter.Println(logs) @@ -128,7 +201,7 @@ func (e *TestEnvironment) Cleanup() error { _ = UndeployReloader(e.Namespace, e.ReleaseName) - if err := DeleteNamespace(e.Ctx, e.KubeClient, e.Namespace); err != nil { + if err := DeleteNamespace(cleanupCtx, e.KubeClient, e.Namespace); err != nil { return fmt.Errorf("deleting namespace: %w", err) }