Add jitter in requeue for status controller (#991)
Some checks failed
Post / coverage (push) Failing after 27m51s
Post / images (amd64) (push) Failing after 3m27s
Post / images (arm64) (push) Failing after 3m13s
Post / image manifest (push) Has been skipped
Post / trigger clusteradm e2e (push) Has been skipped
Scorecard supply-chain security / Scorecard analysis (push) Failing after 1m9s
Close stale issues and PRs / stale (push) Successful in 40s

Instead of requeue all each resyncInterval, we requeue
for each item separately with a jitter to avoud bursty request

Signed-off-by: Jian Qiu <jqiu@redhat.com>
This commit is contained in:
Jian Qiu
2025-05-14 15:09:27 +08:00
committed by GitHub
parent 4c790fb3a4
commit 4eda44f2b9

View File

@@ -12,9 +12,9 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/klog/v2"
@@ -41,6 +41,7 @@ type AvailableStatusController struct {
manifestWorkLister worklister.ManifestWorkNamespaceLister
spokeDynamicClient dynamic.Interface
statusReader *statusfeedback.StatusReader
syncInterval time.Duration
}
// NewAvailableStatusController returns a AvailableStatusController
@@ -59,45 +60,34 @@ func NewAvailableStatusController(
manifestWorkClient),
manifestWorkLister: manifestWorkLister,
spokeDynamicClient: spokeDynamicClient,
syncInterval: syncInterval,
statusReader: statusfeedback.NewStatusReader().WithMaxJsonRawLength(maxJSONRawLength),
}
return factory.New().
WithInformersQueueKeysFunc(queue.QueueKeyByMetaName, manifestWorkInformer.Informer()).
WithSync(controller.sync).ResyncEvery(syncInterval).ToController("AvailableStatusController", recorder)
WithSync(controller.sync).ToController("AvailableStatusController", recorder)
}
func (c *AvailableStatusController) sync(ctx context.Context, controllerContext factory.SyncContext) error {
manifestWorkName := controllerContext.QueueKey()
if manifestWorkName != factory.DefaultQueueKey {
// sync a particular manifestwork
manifestWork, err := c.manifestWorkLister.Get(manifestWorkName)
if errors.IsNotFound(err) {
// work not found, could have been deleted, do nothing.
return nil
}
if err != nil {
return fmt.Errorf("unable to fetch manifestwork %q: %w", manifestWorkName, err)
}
err = c.syncManifestWork(ctx, manifestWork)
if err != nil {
return fmt.Errorf("unable to sync manifestwork %q: %w", manifestWork.Name, err)
}
// sync a particular manifestwork
manifestWork, err := c.manifestWorkLister.Get(manifestWorkName)
if errors.IsNotFound(err) {
// work not found, could have been deleted, do nothing.
return nil
}
// resync all manifestworks
klog.V(5).Infof("Resync all ManifestWorks by adding them to the queue")
manifestWorks, err := c.manifestWorkLister.List(labels.Everything())
if err != nil {
return fmt.Errorf("unable to list manifestworks: %w", err)
return fmt.Errorf("unable to fetch manifestwork %q: %w", manifestWorkName, err)
}
for _, manifestWork := range manifestWorks {
controllerContext.Queue().Add(manifestWork.Name)
err = c.syncManifestWork(ctx, manifestWork)
if err != nil {
return fmt.Errorf("unable to sync manifestwork %q: %w", manifestWork.Name, err)
}
// requeue with a certain jitter
controllerContext.Queue().AddAfter(manifestWorkName, wait.Jitter(c.syncInterval, 0.9))
return nil
}