mirror of
https://github.com/open-cluster-management-io/ocm.git
synced 2026-02-14 18:09:57 +00:00
🌱 optimize the requeue timing for lease controller (#1236)
Signed-off-by: Yang Le <yangle@redhat.com>
This commit is contained in:
@@ -124,15 +124,21 @@ func (c *leaseController) sync(ctx context.Context, syncCtx factory.SyncContext)
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
if !now.Before(observedLease.Spec.RenewTime.Add(gracePeriod)) {
|
||||
leaseExpired := !now.Before(observedLease.Spec.RenewTime.Add(gracePeriod))
|
||||
|
||||
if leaseExpired {
|
||||
// the lease is not updated constantly, change the cluster available condition to unknown
|
||||
if err := c.updateClusterStatus(ctx, cluster); err != nil {
|
||||
return err
|
||||
}
|
||||
// Requeue after grace period. Recovery will be detected immediately via lease watch.
|
||||
syncCtx.Queue().AddAfter(clusterName, gracePeriod)
|
||||
} else {
|
||||
// Lease is fresh, requeue exactly when it will expire to detect expiration immediately
|
||||
timeUntilExpiry := observedLease.Spec.RenewTime.Add(gracePeriod).Sub(now)
|
||||
syncCtx.Queue().AddAfter(clusterName, timeUntilExpiry)
|
||||
}
|
||||
|
||||
// always requeue this cluster to check its lease constantly
|
||||
syncCtx.Queue().AddAfter(clusterName, gracePeriod)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -7,11 +7,14 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/openshift/library-go/pkg/operator/events"
|
||||
"github.com/openshift/library-go/pkg/operator/events/eventstesting"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
kubeinformers "k8s.io/client-go/informers"
|
||||
kubefake "k8s.io/client-go/kubernetes/fake"
|
||||
clienttesting "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
|
||||
clusterfake "open-cluster-management.io/api/client/cluster/clientset/versioned/fake"
|
||||
clusterscheme "open-cluster-management.io/api/client/cluster/clientset/versioned/scheme"
|
||||
@@ -202,3 +205,128 @@ func newDeletingManagedCluster() *clusterv1.ManagedCluster {
|
||||
cluster.DeletionTimestamp = &now
|
||||
return cluster
|
||||
}
|
||||
|
||||
// spyQueue wraps a real queue and captures AddAfter calls
|
||||
type spyQueue struct {
|
||||
workqueue.RateLimitingInterface
|
||||
addAfterDelay time.Duration
|
||||
addAfterKey interface{}
|
||||
}
|
||||
|
||||
func (s *spyQueue) AddAfter(item interface{}, duration time.Duration) {
|
||||
s.addAfterDelay = duration
|
||||
s.addAfterKey = item
|
||||
s.RateLimitingInterface.AddAfter(item, duration)
|
||||
}
|
||||
|
||||
// testSyncContext is a custom sync context for testing requeue timing
|
||||
type testSyncContext struct {
|
||||
queueKey string
|
||||
recorder events.Recorder
|
||||
queue *spyQueue
|
||||
}
|
||||
|
||||
func (t *testSyncContext) Queue() workqueue.RateLimitingInterface { return t.queue }
|
||||
func (t *testSyncContext) QueueKey() string { return t.queueKey }
|
||||
func (t *testSyncContext) Recorder() events.Recorder { return t.recorder }
|
||||
|
||||
func newManagedClusterWithLeaseDuration(seconds int32) *clusterv1.ManagedCluster {
|
||||
cluster := testinghelpers.NewAvailableManagedCluster()
|
||||
cluster.Spec.LeaseDurationSeconds = seconds
|
||||
return cluster
|
||||
}
|
||||
|
||||
func TestRequeueTime(t *testing.T) {
|
||||
// Using 60 second lease duration (grace period = 5 * 60 = 300 seconds)
|
||||
cases := []struct {
|
||||
name string
|
||||
cluster runtime.Object
|
||||
clusterLease runtime.Object
|
||||
expectedRequeueMin time.Duration
|
||||
expectedRequeueMax time.Duration
|
||||
}{
|
||||
{
|
||||
name: "requeue for expired lease - recovery detected via watch, use grace period",
|
||||
cluster: newManagedClusterWithLeaseDuration(60),
|
||||
// Lease expired 5 minutes ago (grace period is 5 * 60 = 300 seconds)
|
||||
clusterLease: testinghelpers.NewManagedClusterLease("managed-cluster-lease", now.Add(-5*time.Minute)),
|
||||
expectedRequeueMin: 295 * time.Second, // Should be ~300s (grace period)
|
||||
expectedRequeueMax: 305 * time.Second, // Allow some tolerance
|
||||
},
|
||||
{
|
||||
name: "requeue for fresh lease - should use time until expiry for immediate detection",
|
||||
cluster: newManagedClusterWithLeaseDuration(60),
|
||||
// Lease renewed 4 minutes ago, will expire in 1 minute (grace period is 5 minutes)
|
||||
clusterLease: testinghelpers.NewManagedClusterLease("managed-cluster-lease", now.Add(-4*time.Minute)),
|
||||
expectedRequeueMin: 55 * time.Second, // Should be ~60s (1 minute)
|
||||
expectedRequeueMax: 65 * time.Second, // Allow some tolerance
|
||||
},
|
||||
{
|
||||
name: "requeue for recently renewed lease - should check just before expiry",
|
||||
cluster: newManagedClusterWithLeaseDuration(60),
|
||||
// Lease renewed 30 seconds ago, will expire in 4.5 minutes
|
||||
clusterLease: testinghelpers.NewManagedClusterLease("managed-cluster-lease", now.Add(-30*time.Second)),
|
||||
expectedRequeueMin: 265 * time.Second, // Should be ~270s (4.5 minutes)
|
||||
expectedRequeueMax: 275 * time.Second, // Allow some tolerance
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
clusterClient := clusterfake.NewSimpleClientset(c.cluster)
|
||||
clusterInformerFactory := clusterinformers.NewSharedInformerFactory(clusterClient, time.Minute*10)
|
||||
clusterStore := clusterInformerFactory.Cluster().V1().ManagedClusters().Informer().GetStore()
|
||||
if err := clusterStore.Add(c.cluster); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
hubClient := kubefake.NewSimpleClientset(c.clusterLease)
|
||||
leaseInformerFactory := kubeinformers.NewSharedInformerFactory(hubClient, time.Minute*10)
|
||||
leaseStore := leaseInformerFactory.Coordination().V1().Leases().Informer().GetStore()
|
||||
if err := leaseStore.Add(c.clusterLease); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
ctx := context.TODO()
|
||||
mcEventRecorder, err := helpers.NewEventRecorder(ctx, clusterscheme.Scheme, hubClient.EventsV1(), "test")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Create a custom sync context with spy queue to capture AddAfter calls
|
||||
spyQ := &spyQueue{
|
||||
RateLimitingInterface: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
|
||||
}
|
||||
syncCtx := &testSyncContext{
|
||||
queueKey: testinghelpers.TestManagedClusterName,
|
||||
recorder: eventstesting.NewTestingEventRecorder(t),
|
||||
queue: spyQ,
|
||||
}
|
||||
|
||||
ctrl := &leaseController{
|
||||
kubeClient: hubClient,
|
||||
patcher: patcher.NewPatcher[
|
||||
*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus](
|
||||
clusterClient.ClusterV1().ManagedClusters()),
|
||||
clusterLister: clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(),
|
||||
leaseLister: leaseInformerFactory.Coordination().V1().Leases().Lister(),
|
||||
mcEventRecorder: mcEventRecorder,
|
||||
}
|
||||
|
||||
syncErr := ctrl.sync(context.TODO(), syncCtx)
|
||||
if syncErr != nil {
|
||||
t.Errorf("unexpected err: %v", syncErr)
|
||||
}
|
||||
|
||||
// Verify the requeue delay captured by the spy
|
||||
actualDelay := spyQ.addAfterDelay
|
||||
if actualDelay < c.expectedRequeueMin || actualDelay > c.expectedRequeueMax {
|
||||
t.Errorf("Unexpected requeue delay: got %v, expected between %v and %v",
|
||||
actualDelay, c.expectedRequeueMin, c.expectedRequeueMax)
|
||||
} else {
|
||||
t.Logf("✓ Requeue delay %v is within expected range [%v, %v]",
|
||||
actualDelay, c.expectedRequeueMin, c.expectedRequeueMax)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user