diff --git a/pkg/controller/core.oam.dev/v1beta1/application/application_controller.go b/pkg/controller/core.oam.dev/v1beta1/application/application_controller.go index b362afbb6..e0c002970 100644 --- a/pkg/controller/core.oam.dev/v1beta1/application/application_controller.go +++ b/pkg/controller/core.oam.dev/v1beta1/application/application_controller.go @@ -75,6 +75,10 @@ const ( const ( // baseWorkflowBackoffWaitTime is the time to wait gc check baseGCBackoffWaitTime = 3000 * time.Millisecond + + // minPerAppResyncPeriod is the minimum reconciliation interval that can be + // set via the per-application annotation to prevent excessive API server load. + minPerAppResyncPeriod = 10 * time.Second ) var ( @@ -408,7 +412,7 @@ func (r *Reconciler) gcResourceTrackers(logCtx monitorContext.Context, handler * cond := condition.Deleting() cond.Message = fmt.Sprintf("error encountered during garbage collection: %s", err.Error()) handler.app.Status.SetConditions(cond) - return r.result(statusUpdater(logCtx, handler.app, phase)).ret() + return r.result(statusUpdater(logCtx, handler.app, phase)).forApp(handler.app).ret() } if !finished { logCtx.Info("GarbageCollecting resourcetrackers unfinished") @@ -420,12 +424,13 @@ func (r *Reconciler) gcResourceTrackers(logCtx monitorContext.Context, handler * return r.result(statusUpdater(logCtx, handler.app, phase)).requeue(baseGCBackoffWaitTime).ret() } logCtx.Info("GarbageCollected resourcetrackers") - return r.result(statusUpdater(logCtx, handler.app, phase)).ret() + return r.result(statusUpdater(logCtx, handler.app, phase)).forApp(handler.app).ret() } type reconcileResult struct { time.Duration - err error + err error + defaultResync time.Duration } func (r *reconcileResult) requeue(d time.Duration) *reconcileResult { @@ -433,13 +438,37 @@ func (r *reconcileResult) requeue(d time.Duration) *reconcileResult { return r } +// forApp overrides the default resync period with the per-application value +// parsed from the AnnotationReconcileInterval annotation, if present and valid. +func (r *reconcileResult) forApp(app *v1beta1.Application) *reconcileResult { + if app == nil || app.Annotations == nil { + return r + } + v, ok := app.Annotations[oam.AnnotationReconcileInterval] + if !ok { + return r + } + d, err := time.ParseDuration(v) + switch { + case err != nil: + klog.Warningf("ignoring invalid %s annotation %q on application %s/%s, using global default", + oam.AnnotationReconcileInterval, v, app.Namespace, app.Name) + case d < minPerAppResyncPeriod: + klog.Warningf("ignoring %s annotation %q below minimum %s on application %s/%s, using global default", + oam.AnnotationReconcileInterval, v, minPerAppResyncPeriod, app.Namespace, app.Name) + default: + r.defaultResync = d + } + return r +} + func (r *reconcileResult) ret() (ctrl.Result, error) { if r.Duration.Seconds() != 0 { return ctrl.Result{RequeueAfter: r.Duration}, r.err } else if r.err != nil { return ctrl.Result{}, r.err } - return ctrl.Result{RequeueAfter: common2.ApplicationReSyncPeriod}, nil + return ctrl.Result{RequeueAfter: r.defaultResync}, nil } func (r *reconcileResult) end(endReconcile bool) (bool, ctrl.Result, error) { @@ -448,7 +477,7 @@ func (r *reconcileResult) end(endReconcile bool) (bool, ctrl.Result, error) { } func (r *Reconciler) result(err error) *reconcileResult { - return &reconcileResult{err: err} + return &reconcileResult{err: err, defaultResync: common2.ApplicationReSyncPeriod} } // NOTE Because resource tracker is cluster-scoped resources, we cannot garbage collect them diff --git a/pkg/controller/core.oam.dev/v1beta1/application/reconcile_result_test.go b/pkg/controller/core.oam.dev/v1beta1/application/reconcile_result_test.go new file mode 100644 index 000000000..ecceee9ee --- /dev/null +++ b/pkg/controller/core.oam.dev/v1beta1/application/reconcile_result_test.go @@ -0,0 +1,156 @@ +/* +Copyright 2021 The KubeVela Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package application + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1" + common2 "github.com/oam-dev/kubevela/pkg/controller/common" + "github.com/oam-dev/kubevela/pkg/oam" +) + +func TestReconcileResultForApp(t *testing.T) { + globalDefault := common2.ApplicationReSyncPeriod + + tests := []struct { + name string + annotations map[string]string + explicitRequeue time.Duration + err error + wantRequeue time.Duration + wantErr bool + }{ + { + name: "no annotation uses global default", + annotations: nil, + wantRequeue: globalDefault, + }, + { + name: "valid annotation overrides global default", + annotations: map[string]string{oam.AnnotationReconcileInterval: "1m"}, + wantRequeue: 1 * time.Minute, + }, + { + name: "valid annotation with 15m", + annotations: map[string]string{oam.AnnotationReconcileInterval: "15m"}, + wantRequeue: 15 * time.Minute, + }, + { + name: "valid annotation with 30s", + annotations: map[string]string{oam.AnnotationReconcileInterval: "30s"}, + wantRequeue: 30 * time.Second, + }, + { + name: "annotation below minimum floor falls back to global default", + annotations: map[string]string{oam.AnnotationReconcileInterval: "5s"}, + wantRequeue: globalDefault, + }, + { + name: "annotation exactly at minimum floor is accepted", + annotations: map[string]string{oam.AnnotationReconcileInterval: "10s"}, + wantRequeue: 10 * time.Second, + }, + { + name: "invalid annotation value falls back to global default", + annotations: map[string]string{oam.AnnotationReconcileInterval: "not-a-duration"}, + wantRequeue: globalDefault, + }, + { + name: "empty annotation value falls back to global default", + annotations: map[string]string{oam.AnnotationReconcileInterval: ""}, + wantRequeue: globalDefault, + }, + { + name: "negative duration falls back to global default", + annotations: map[string]string{oam.AnnotationReconcileInterval: "-5m"}, + wantRequeue: globalDefault, + }, + { + name: "zero duration falls back to global default", + annotations: map[string]string{oam.AnnotationReconcileInterval: "0s"}, + wantRequeue: globalDefault, + }, + { + name: "explicit requeue takes precedence over annotation", + annotations: map[string]string{oam.AnnotationReconcileInterval: "1m"}, + explicitRequeue: 3 * time.Second, + wantRequeue: 3 * time.Second, + }, + { + name: "error path skips requeue duration", + err: fmt.Errorf("some error"), + wantErr: true, + }, + { + name: "other annotations do not interfere", + annotations: map[string]string{"app.oam.dev/other": "value", oam.AnnotationReconcileInterval: "2m"}, + wantRequeue: 2 * time.Minute, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + app := &v1beta1.Application{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-app", + Namespace: "default", + Annotations: tt.annotations, + }, + } + + r := &Reconciler{} + res := r.result(tt.err) + if tt.explicitRequeue > 0 { + res.requeue(tt.explicitRequeue) + } + res.forApp(app) + + result, err := res.ret() + + if tt.wantErr { + assert.Error(t, err) + assert.Equal(t, time.Duration(0), result.RequeueAfter) + } else { + assert.NoError(t, err) + assert.Equal(t, tt.wantRequeue, result.RequeueAfter) + } + }) + } +} + +func TestReconcileResultForAppNilApp(t *testing.T) { + r := &Reconciler{} + res := r.result(nil).forApp(nil) + result, err := res.ret() + + assert.NoError(t, err) + assert.Equal(t, common2.ApplicationReSyncPeriod, result.RequeueAfter) +} + +func TestReconcileResultWithoutForApp(t *testing.T) { + r := &Reconciler{} + result, err := r.result(nil).ret() + + assert.NoError(t, err) + assert.Equal(t, common2.ApplicationReSyncPeriod, result.RequeueAfter) +} diff --git a/pkg/oam/labels.go b/pkg/oam/labels.go index f69eca1e3..395b59c55 100644 --- a/pkg/oam/labels.go +++ b/pkg/oam/labels.go @@ -220,6 +220,12 @@ const ( // AnnotationSkipResume annotation indicates that the resource does not need to be resumed. AnnotationSkipResume = "controller.core.oam.dev/skip-resume" + + // AnnotationReconcileInterval overrides the global ApplicationReSyncPeriod on a + // per-application basis. The value must be a valid Go duration string (e.g. + // "1m", "15m", "30s"). Values below 10s are ignored and fall back to the + // global default. Invalid values are also ignored. + AnnotationReconcileInterval = "app.oam.dev/reconcile-interval" ) const ( diff --git a/test/e2e-test/resource_policy_test.go b/test/e2e-test/resource_policy_test.go index 21055c0a8..fe0f8e1fa 100644 --- a/test/e2e-test/resource_policy_test.go +++ b/test/e2e-test/resource_policy_test.go @@ -37,6 +37,7 @@ import ( common2 "github.com/oam-dev/kubevela/apis/core.oam.dev/common" "github.com/oam-dev/kubevela/apis/core.oam.dev/condition" "github.com/oam-dev/kubevela/apis/core.oam.dev/v1beta1" + "github.com/oam-dev/kubevela/pkg/oam" "github.com/oam-dev/kubevela/pkg/utils/common" ) @@ -109,6 +110,38 @@ var _ = Describe("Application Resource-Related Policy Tests", func() { Expect(deploy.Spec.Replicas).Should(Equal(ptr.To(int32(0)))) }) + It("Test per-application reconcile interval override", func() { + By("create app with custom reconcile interval") + app := &v1beta1.Application{} + Expect(common.ReadYamlToObject("testdata/app/app_apply_once.yaml", app)).Should(BeNil()) + app.SetNamespace(namespace) + app.SetAnnotations(map[string]string{oam.AnnotationReconcileInterval: "10s"}) + Expect(k8sClient.Create(ctx, app)).Should(Succeed()) + appKey := client.ObjectKeyFromObject(app) + Eventually(func(g Gomega) { + g.Expect(k8sClient.Get(ctx, appKey, app)).Should(Succeed()) + g.Expect(app.Status.Phase).Should(Equal(common2.ApplicationRunning)) + }, 30*time.Second, time.Second*3).Should(Succeed()) + // Let status-update reconciles from app creation drain before mutating + // the workload, so the repair depends on the scheduled interval. + time.Sleep(12 * time.Second) + + By("mutate managed workload without forcing application reconciliation") + deploy := &v13.Deployment{} + deployKey := types.NamespacedName{Namespace: namespace, Name: "hello-world"} + Eventually(func(g Gomega) { + g.Expect(k8sClient.Get(ctx, deployKey, deploy)).Should(Succeed()) + deploy.Spec.Replicas = ptr.To(int32(0)) + g.Expect(k8sClient.Update(ctx, deploy)).Should(Succeed()) + }, 10*time.Second, time.Second).Should(Succeed()) + + By("scheduled reconciliation restores the workload through state keep") + Eventually(func(g Gomega) { + g.Expect(k8sClient.Get(ctx, deployKey, deploy)).Should(Succeed()) + g.Expect(deploy.Spec.Replicas).Should(Equal(ptr.To(int32(1)))) + }, 45*time.Second, time.Second).Should(Succeed()) + }) + It("Test GarbageCollect Policy", func() { By("create garbage-collect app") app := &v1beta1.Application{}