From 4eda44f2b9b734d605d9cf90419bab7cc0f3ea99 Mon Sep 17 00:00:00 2001 From: Jian Qiu Date: Wed, 14 May 2025 15:09:27 +0800 Subject: [PATCH] Add jitter in requeue for status controller (#991) Instead of requeue all each resyncInterval, we requeue for each item separately with a jitter to avoud bursty request Signed-off-by: Jian Qiu --- .../availablestatus_controller.go | 38 +++++++------------ 1 file changed, 14 insertions(+), 24 deletions(-) diff --git a/pkg/work/spoke/controllers/statuscontroller/availablestatus_controller.go b/pkg/work/spoke/controllers/statuscontroller/availablestatus_controller.go index 1fcba6d64..1c11a2a92 100644 --- a/pkg/work/spoke/controllers/statuscontroller/availablestatus_controller.go +++ b/pkg/work/spoke/controllers/statuscontroller/availablestatus_controller.go @@ -12,9 +12,9 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/dynamic" "k8s.io/klog/v2" @@ -41,6 +41,7 @@ type AvailableStatusController struct { manifestWorkLister worklister.ManifestWorkNamespaceLister spokeDynamicClient dynamic.Interface statusReader *statusfeedback.StatusReader + syncInterval time.Duration } // NewAvailableStatusController returns a AvailableStatusController @@ -59,45 +60,34 @@ func NewAvailableStatusController( manifestWorkClient), manifestWorkLister: manifestWorkLister, spokeDynamicClient: spokeDynamicClient, + syncInterval: syncInterval, statusReader: statusfeedback.NewStatusReader().WithMaxJsonRawLength(maxJSONRawLength), } return factory.New(). WithInformersQueueKeysFunc(queue.QueueKeyByMetaName, manifestWorkInformer.Informer()). - WithSync(controller.sync).ResyncEvery(syncInterval).ToController("AvailableStatusController", recorder) + WithSync(controller.sync).ToController("AvailableStatusController", recorder) } func (c *AvailableStatusController) sync(ctx context.Context, controllerContext factory.SyncContext) error { manifestWorkName := controllerContext.QueueKey() - if manifestWorkName != factory.DefaultQueueKey { - // sync a particular manifestwork - manifestWork, err := c.manifestWorkLister.Get(manifestWorkName) - if errors.IsNotFound(err) { - // work not found, could have been deleted, do nothing. - return nil - } - if err != nil { - return fmt.Errorf("unable to fetch manifestwork %q: %w", manifestWorkName, err) - } - - err = c.syncManifestWork(ctx, manifestWork) - if err != nil { - return fmt.Errorf("unable to sync manifestwork %q: %w", manifestWork.Name, err) - } + // sync a particular manifestwork + manifestWork, err := c.manifestWorkLister.Get(manifestWorkName) + if errors.IsNotFound(err) { + // work not found, could have been deleted, do nothing. return nil } - - // resync all manifestworks - klog.V(5).Infof("Resync all ManifestWorks by adding them to the queue") - manifestWorks, err := c.manifestWorkLister.List(labels.Everything()) if err != nil { - return fmt.Errorf("unable to list manifestworks: %w", err) + return fmt.Errorf("unable to fetch manifestwork %q: %w", manifestWorkName, err) } - for _, manifestWork := range manifestWorks { - controllerContext.Queue().Add(manifestWork.Name) + err = c.syncManifestWork(ctx, manifestWork) + if err != nil { + return fmt.Errorf("unable to sync manifestwork %q: %w", manifestWork.Name, err) } + // requeue with a certain jitter + controllerContext.Queue().AddAfter(manifestWorkName, wait.Jitter(c.syncInterval, 0.9)) return nil }