From 923cd19f379aecfa46037ce2a7e8426f475a3e07 Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Thu, 8 Dec 2022 14:54:10 +0800 Subject: [PATCH] using requeue cluster instead of resync lease controller (#288) Signed-off-by: Wei Liu Signed-off-by: Wei Liu --- pkg/hub/lease/controller.go | 159 ++++++++++-------- pkg/hub/lease/controller_test.go | 5 +- pkg/hub/manager.go | 1 - test/integration/managedcluster_lease_test.go | 48 +++++- test/integration/util/util.go | 6 +- 5 files changed, 144 insertions(+), 75 deletions(-) diff --git a/pkg/hub/lease/controller.go b/pkg/hub/lease/controller.go index 6044cde48..a7938bb8b 100644 --- a/pkg/hub/lease/controller.go +++ b/pkg/hub/lease/controller.go @@ -17,7 +17,7 @@ import ( "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" coordinformers "k8s.io/client-go/informers/coordination/v1" "k8s.io/client-go/kubernetes" coordlisters "k8s.io/client-go/listers/coordination/v1" @@ -26,6 +26,7 @@ import ( const leaseDurationTimes = 5 const leaseName = "managed-cluster-lease" +const clusterNameLabel = "open-cluster-management.io/cluster-name" var ( // LeaseDurationSeconds is lease update time interval @@ -38,6 +39,7 @@ type leaseController struct { clusterClient clientset.Interface clusterLister clusterv1listers.ManagedClusterLister leaseLister coordlisters.LeaseLister + eventRecorder events.Recorder } // NewClusterLeaseController creates a cluster lease controller on hub cluster. @@ -46,105 +48,128 @@ func NewClusterLeaseController( clusterClient clientset.Interface, clusterInformer clusterv1informer.ManagedClusterInformer, leaseInformer coordinformers.LeaseInformer, - resyncInterval time.Duration, recorder events.Recorder) factory.Controller { c := &leaseController{ kubeClient: kubeClient, clusterClient: clusterClient, clusterLister: clusterInformer.Lister(), leaseLister: leaseInformer.Lister(), + eventRecorder: recorder.WithComponentSuffix("managed-cluster-lease-controller"), } return factory.New(). - WithFilteredEventsInformers( + WithFilteredEventsInformersQueueKeyFunc( + func(obj runtime.Object) string { + accessor, _ := meta.Accessor(obj) + return accessor.GetLabels()[clusterNameLabel] + }, func(obj interface{}) bool { metaObj, ok := obj.(metav1.ObjectMetaAccessor) if !ok { return false } + // only handle the managed cluster lease // TODO instead of this by adding label filter in the SharedInformerFactory // see https://github.com/open-cluster-management-io/registration/issues/225 + if _, ok := metaObj.GetObjectMeta().GetLabels()[clusterNameLabel]; !ok { + return false + } + return metaObj.GetObjectMeta().GetName() == leaseName }, leaseInformer.Informer(), ). - WithInformers(clusterInformer.Informer()). + WithInformersQueueKeyFunc(func(obj runtime.Object) string { + accessor, _ := meta.Accessor(obj) + return accessor.GetName() + }, clusterInformer.Informer()). WithSync(c.sync). - ResyncEvery(resyncInterval). ToController("ManagedClusterLeaseController", recorder) } // sync checks the lease of each accepted cluster on hub to determine whether a managed cluster is available. func (c *leaseController) sync(ctx context.Context, syncCtx factory.SyncContext) error { - clusters, err := c.clusterLister.List(labels.Everything()) - if err != nil { + clusterName := syncCtx.QueueKey() + + cluster, err := c.clusterLister.Get(clusterName) + if errors.IsNotFound(err) { + // the cluster is not found, do nothing return nil } - for _, cluster := range clusters { + if err != nil { + return err + } + + if !meta.IsStatusConditionTrue(cluster.Status.Conditions, clusterv1.ManagedClusterConditionHubAccepted) { // cluster is not accepted, skip it. - if !meta.IsStatusConditionTrue(cluster.Status.Conditions, clusterv1.ManagedClusterConditionHubAccepted) { - continue + return nil + } + + observedLease, err := c.leaseLister.Leases(cluster.Name).Get(leaseName) + if errors.IsNotFound(err) { + if !cluster.DeletionTimestamp.IsZero() { + // the lease is not found and the cluster is deleting, update the cluster to unknown immediately + return c.updateClusterStatus(ctx, cluster) } - // get the lease of a cluster, if the lease is not found, create it - observedLease, err := c.leaseLister.Leases(cluster.Name).Get(leaseName) - switch { - case errors.IsNotFound(err): - if !cluster.DeletionTimestamp.IsZero() { - // the cluster is deleting, do nothing - break - } - lease := &coordv1.Lease{ - ObjectMeta: metav1.ObjectMeta{ - Name: leaseName, - Namespace: cluster.Name, - Labels: map[string]string{"open-cluster-management.io/cluster-name": cluster.Name}, - }, - Spec: coordv1.LeaseSpec{ - HolderIdentity: pointer.StringPtr(leaseName), - RenewTime: &metav1.MicroTime{Time: time.Now()}, - }, - } - if _, err := c.kubeClient.CoordinationV1().Leases(cluster.Name).Create(ctx, lease, metav1.CreateOptions{}); err != nil { - return err - } - continue - case err != nil: + // the lease is not found, try to create it + lease := &coordv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: leaseName, + Namespace: cluster.Name, + Labels: map[string]string{clusterNameLabel: cluster.Name}, + }, + Spec: coordv1.LeaseSpec{ + HolderIdentity: pointer.StringPtr(leaseName), + RenewTime: &metav1.MicroTime{Time: time.Now()}, + }, + } + _, err := c.kubeClient.CoordinationV1().Leases(cluster.Name).Create(ctx, lease, metav1.CreateOptions{}) + return err + } + if err != nil { + return err + } + + gracePeriod := time.Duration(leaseDurationTimes*cluster.Spec.LeaseDurationSeconds) * time.Second + if gracePeriod == 0 { + // FIX: #183 avoid gracePeriod is zero, will non-stop update ManagedClusterLeaseUpdateStopped condition. + gracePeriod = time.Duration(leaseDurationTimes*LeaseDurationSeconds) * time.Second + } + + now := time.Now() + if !now.Before(observedLease.Spec.RenewTime.Add(gracePeriod)) { + // the lease is not updated constantly, change the cluster available condition to unknown + if err := c.updateClusterStatus(ctx, cluster); err != nil { return err - case err == nil: - gracePeriod := time.Duration(leaseDurationTimes*cluster.Spec.LeaseDurationSeconds) * time.Second - // FIX: #183 avoid gracePeriod is zero, will non-stop update ManagedClusterLeaseUpdateStopped condition. - if gracePeriod == 0 { - gracePeriod = time.Duration(leaseDurationTimes*LeaseDurationSeconds) * time.Second - } - // the lease is constantly updated, do nothing - now := time.Now() - if now.Before(observedLease.Spec.RenewTime.Add(gracePeriod)) { - continue - } - } - - if meta.IsStatusConditionPresentAndEqual(cluster.Status.Conditions, clusterv1.ManagedClusterConditionAvailable, metav1.ConditionUnknown) { - // the managed cluster available condition alreay is unknown, do nothing - continue - } - - // the lease is not constantly updated, update it to unknown - conditionUpdateFn := helpers.UpdateManagedClusterConditionFn(metav1.Condition{ - Type: clusterv1.ManagedClusterConditionAvailable, - Status: metav1.ConditionUnknown, - Reason: "ManagedClusterLeaseUpdateStopped", - Message: "Registration agent stopped updating its lease.", - }) - _, updated, err := helpers.UpdateManagedClusterStatus(ctx, c.clusterClient, cluster.Name, conditionUpdateFn) - if err != nil { - return err - } - if updated { - syncCtx.Recorder().Eventf("ManagedClusterAvailableConditionUpdated", - "update managed cluster %q available condition to unknown, due to its lease is not updated constantly", - cluster.Name) } } + + // always requeue this cluster to check its lease constantly + syncCtx.Queue().AddAfter(clusterName, gracePeriod) return nil } + +func (c *leaseController) updateClusterStatus(ctx context.Context, cluster *clusterv1.ManagedCluster) error { + if meta.IsStatusConditionPresentAndEqual(cluster.Status.Conditions, clusterv1.ManagedClusterConditionAvailable, metav1.ConditionUnknown) { + // the managed cluster available condition alreay is unknown, do nothing + return nil + } + + // the lease is not constantly updated, update it to unknown + conditionUpdateFn := helpers.UpdateManagedClusterConditionFn(metav1.Condition{ + Type: clusterv1.ManagedClusterConditionAvailable, + Status: metav1.ConditionUnknown, + Reason: "ManagedClusterLeaseUpdateStopped", + Message: "Registration agent stopped updating its lease.", + }) + + _, updated, err := helpers.UpdateManagedClusterStatus(ctx, c.clusterClient, cluster.Name, conditionUpdateFn) + if updated { + c.eventRecorder.Eventf("ManagedClusterAvailableConditionUpdated", + "update managed cluster %q available condition to unknown, due to its lease is not updated constantly", + cluster.Name) + } + + return err +} diff --git a/pkg/hub/lease/controller_test.go b/pkg/hub/lease/controller_test.go index ea965a08a..a82c1ec6d 100644 --- a/pkg/hub/lease/controller_test.go +++ b/pkg/hub/lease/controller_test.go @@ -129,13 +129,16 @@ func TestSync(t *testing.T) { } } + syncCtx := testinghelpers.NewFakeSyncContext(t, testinghelpers.TestManagedClusterName) + ctrl := &leaseController{ kubeClient: leaseClient, clusterClient: clusterClient, clusterLister: clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(), leaseLister: leaseInformerFactory.Coordination().V1().Leases().Lister(), + eventRecorder: syncCtx.Recorder(), } - syncErr := ctrl.sync(context.TODO(), testinghelpers.NewFakeSyncContext(t, "")) + syncErr := ctrl.sync(context.TODO(), syncCtx) if syncErr != nil { t.Errorf("unexpected err: %v", syncErr) } diff --git a/pkg/hub/manager.go b/pkg/hub/manager.go index cddecd94c..25c23a19d 100644 --- a/pkg/hub/manager.go +++ b/pkg/hub/manager.go @@ -115,7 +115,6 @@ func RunControllerManager(ctx context.Context, controllerContext *controllercmd. clusterClient, clusterInformers.Cluster().V1().ManagedClusters(), kubeInfomers.Coordination().V1().Leases(), - ResyncInterval, //TODO: this interval time should be allowed to change from outside controllerContext.EventRecorder, ) diff --git a/test/integration/managedcluster_lease_test.go b/test/integration/managedcluster_lease_test.go index edf129aab..0ae8a0bd8 100644 --- a/test/integration/managedcluster_lease_test.go +++ b/test/integration/managedcluster_lease_test.go @@ -42,7 +42,7 @@ var _ = ginkgo.Describe("Cluster Lease Update", func() { cancel := util.RunAgent("cluster-leasetest", agentOptions, spokeCfg) defer cancel() - bootstrapManagedCluster(managedClusterName, hubKubeconfigSecret) + bootstrapManagedCluster(managedClusterName, hubKubeconfigSecret, util.TestLeaseDurationSeconds) // after two grace period, make sure the managed cluster is available gracePeriod := 2 * 5 * util.TestLeaseDurationSeconds assertAvailableCondition(managedClusterName, metav1.ConditionTrue, gracePeriod) @@ -59,7 +59,7 @@ var _ = ginkgo.Describe("Cluster Lease Update", func() { } stop := util.RunAgent("cluster-availabletest", agentOptions, spokeCfg) - bootstrapManagedCluster(managedClusterName, hubKubeconfigSecret) + bootstrapManagedCluster(managedClusterName, hubKubeconfigSecret, util.TestLeaseDurationSeconds) assertAvailableCondition(managedClusterName, metav1.ConditionTrue, 0) // stop the current managed cluster @@ -96,7 +96,7 @@ var _ = ginkgo.Describe("Cluster Lease Update", func() { cancel := util.RunAgent("cluster-leasetest", agentOptions, spokeCfg) defer cancel() - bootstrapManagedCluster(managedClusterName, hubKubeconfigSecret) + bootstrapManagedCluster(managedClusterName, hubKubeconfigSecret, util.TestLeaseDurationSeconds) assertAvailableCondition(managedClusterName, metav1.ConditionTrue, 0) // remove the cluster @@ -133,12 +133,38 @@ var _ = ginkgo.Describe("Cluster Lease Update", func() { gracePeriod := 2 * 5 * util.TestLeaseDurationSeconds assertAvailableCondition(managedClusterName, metav1.ConditionTrue, gracePeriod) }) + + ginkgo.It("should use a short lease duration", func() { + // run registration agent + agentOptions := spoke.SpokeAgentOptions{ + ClusterName: managedClusterName, + BootstrapKubeconfig: bootstrapKubeConfigFile, + HubKubeconfigSecret: hubKubeconfigSecret, + HubKubeconfigDir: hubKubeconfigDir, + ClusterHealthCheckPeriod: 1 * time.Minute, + } + stop := util.RunAgent("cluster-leasetest", agentOptions, spokeCfg) + + bootstrapManagedCluster(managedClusterName, hubKubeconfigSecret, 60) + assertAvailableCondition(managedClusterName, metav1.ConditionTrue, 0) + + // update the lease duration with a short duration (1s) + err := updateManagedClusterLeaseDuration(managedClusterName, util.TestLeaseDurationSeconds) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + // stop the agent + stop() + + // after two short grace period, make sure the managed cluster is unknown + gracePeriod := 2 * 5 * util.TestLeaseDurationSeconds + assertAvailableCondition(managedClusterName, metav1.ConditionUnknown, gracePeriod) + }) }) -func bootstrapManagedCluster(managedClusterName, hubKubeconfigSecret string) { +func bootstrapManagedCluster(managedClusterName, hubKubeconfigSecret string, leaseDuration int32) { // simulate hub cluster admin to accept the managed cluster and approve the csr gomega.Eventually(func() error { - return util.AcceptManagedCluster(clusterClient, managedClusterName) + return util.AcceptManagedClusterWithLeaseDuration(clusterClient, managedClusterName, leaseDuration) }, eventuallyTimeout, eventuallyInterval).ShouldNot(gomega.HaveOccurred()) gomega.Eventually(func() error { @@ -169,3 +195,15 @@ func assertAvailableCondition(managedClusterName string, status metav1.Condition return nil }, eventuallyTimeout, eventuallyInterval).ShouldNot(gomega.HaveOccurred()) } + +func updateManagedClusterLeaseDuration(clusterName string, leaseDuration int32) error { + cluster, err := clusterClient.ClusterV1().ManagedClusters().Get(context.TODO(), clusterName, metav1.GetOptions{}) + if err != nil { + return err + } + + cluster.Spec.LeaseDurationSeconds = leaseDuration + + _, err = clusterClient.ClusterV1().ManagedClusters().Update(context.TODO(), cluster, metav1.UpdateOptions{}) + return err +} diff --git a/test/integration/util/util.go b/test/integration/util/util.go index 034defdfc..2e12120ed 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -504,13 +504,17 @@ func GetManagedCluster(clusterClient clusterclientset.Interface, spokeClusterNam } func AcceptManagedCluster(clusterClient clusterclientset.Interface, spokeClusterName string) error { + return AcceptManagedClusterWithLeaseDuration(clusterClient, spokeClusterName, 60) +} + +func AcceptManagedClusterWithLeaseDuration(clusterClient clusterclientset.Interface, spokeClusterName string, leaseDuration int32) error { return retry.RetryOnConflict(retry.DefaultBackoff, func() error { spokeCluster, err := GetManagedCluster(clusterClient, spokeClusterName) if err != nil { return err } spokeCluster.Spec.HubAcceptsClient = true - spokeCluster.Spec.LeaseDurationSeconds = TestLeaseDurationSeconds + spokeCluster.Spec.LeaseDurationSeconds = leaseDuration _, err = clusterClient.ClusterV1().ManagedClusters().Update(context.TODO(), spokeCluster, metav1.UpdateOptions{}) return err })