mirror of
https://github.com/open-cluster-management-io/ocm.git
synced 2026-05-22 00:54:00 +00:00
using requeue cluster instead of resync lease controller (#288)
Signed-off-by: Wei Liu <liuweixa@redhat.com> Signed-off-by: Wei Liu <liuweixa@redhat.com>
This commit is contained in:
@@ -17,7 +17,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
coordinformers "k8s.io/client-go/informers/coordination/v1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
coordlisters "k8s.io/client-go/listers/coordination/v1"
|
||||
@@ -26,6 +26,7 @@ import (
|
||||
|
||||
const leaseDurationTimes = 5
|
||||
const leaseName = "managed-cluster-lease"
|
||||
const clusterNameLabel = "open-cluster-management.io/cluster-name"
|
||||
|
||||
var (
|
||||
// LeaseDurationSeconds is lease update time interval
|
||||
@@ -38,6 +39,7 @@ type leaseController struct {
|
||||
clusterClient clientset.Interface
|
||||
clusterLister clusterv1listers.ManagedClusterLister
|
||||
leaseLister coordlisters.LeaseLister
|
||||
eventRecorder events.Recorder
|
||||
}
|
||||
|
||||
// NewClusterLeaseController creates a cluster lease controller on hub cluster.
|
||||
@@ -46,105 +48,128 @@ func NewClusterLeaseController(
|
||||
clusterClient clientset.Interface,
|
||||
clusterInformer clusterv1informer.ManagedClusterInformer,
|
||||
leaseInformer coordinformers.LeaseInformer,
|
||||
resyncInterval time.Duration,
|
||||
recorder events.Recorder) factory.Controller {
|
||||
c := &leaseController{
|
||||
kubeClient: kubeClient,
|
||||
clusterClient: clusterClient,
|
||||
clusterLister: clusterInformer.Lister(),
|
||||
leaseLister: leaseInformer.Lister(),
|
||||
eventRecorder: recorder.WithComponentSuffix("managed-cluster-lease-controller"),
|
||||
}
|
||||
return factory.New().
|
||||
WithFilteredEventsInformers(
|
||||
WithFilteredEventsInformersQueueKeyFunc(
|
||||
func(obj runtime.Object) string {
|
||||
accessor, _ := meta.Accessor(obj)
|
||||
return accessor.GetLabels()[clusterNameLabel]
|
||||
},
|
||||
func(obj interface{}) bool {
|
||||
metaObj, ok := obj.(metav1.ObjectMetaAccessor)
|
||||
if !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
// only handle the managed cluster lease
|
||||
// TODO instead of this by adding label filter in the SharedInformerFactory
|
||||
// see https://github.com/open-cluster-management-io/registration/issues/225
|
||||
if _, ok := metaObj.GetObjectMeta().GetLabels()[clusterNameLabel]; !ok {
|
||||
return false
|
||||
}
|
||||
|
||||
return metaObj.GetObjectMeta().GetName() == leaseName
|
||||
},
|
||||
leaseInformer.Informer(),
|
||||
).
|
||||
WithInformers(clusterInformer.Informer()).
|
||||
WithInformersQueueKeyFunc(func(obj runtime.Object) string {
|
||||
accessor, _ := meta.Accessor(obj)
|
||||
return accessor.GetName()
|
||||
}, clusterInformer.Informer()).
|
||||
WithSync(c.sync).
|
||||
ResyncEvery(resyncInterval).
|
||||
ToController("ManagedClusterLeaseController", recorder)
|
||||
}
|
||||
|
||||
// sync checks the lease of each accepted cluster on hub to determine whether a managed cluster is available.
|
||||
func (c *leaseController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
|
||||
clusters, err := c.clusterLister.List(labels.Everything())
|
||||
if err != nil {
|
||||
clusterName := syncCtx.QueueKey()
|
||||
|
||||
cluster, err := c.clusterLister.Get(clusterName)
|
||||
if errors.IsNotFound(err) {
|
||||
// the cluster is not found, do nothing
|
||||
return nil
|
||||
}
|
||||
for _, cluster := range clusters {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if !meta.IsStatusConditionTrue(cluster.Status.Conditions, clusterv1.ManagedClusterConditionHubAccepted) {
|
||||
// cluster is not accepted, skip it.
|
||||
if !meta.IsStatusConditionTrue(cluster.Status.Conditions, clusterv1.ManagedClusterConditionHubAccepted) {
|
||||
continue
|
||||
return nil
|
||||
}
|
||||
|
||||
observedLease, err := c.leaseLister.Leases(cluster.Name).Get(leaseName)
|
||||
if errors.IsNotFound(err) {
|
||||
if !cluster.DeletionTimestamp.IsZero() {
|
||||
// the lease is not found and the cluster is deleting, update the cluster to unknown immediately
|
||||
return c.updateClusterStatus(ctx, cluster)
|
||||
}
|
||||
|
||||
// get the lease of a cluster, if the lease is not found, create it
|
||||
observedLease, err := c.leaseLister.Leases(cluster.Name).Get(leaseName)
|
||||
switch {
|
||||
case errors.IsNotFound(err):
|
||||
if !cluster.DeletionTimestamp.IsZero() {
|
||||
// the cluster is deleting, do nothing
|
||||
break
|
||||
}
|
||||
lease := &coordv1.Lease{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: leaseName,
|
||||
Namespace: cluster.Name,
|
||||
Labels: map[string]string{"open-cluster-management.io/cluster-name": cluster.Name},
|
||||
},
|
||||
Spec: coordv1.LeaseSpec{
|
||||
HolderIdentity: pointer.StringPtr(leaseName),
|
||||
RenewTime: &metav1.MicroTime{Time: time.Now()},
|
||||
},
|
||||
}
|
||||
if _, err := c.kubeClient.CoordinationV1().Leases(cluster.Name).Create(ctx, lease, metav1.CreateOptions{}); err != nil {
|
||||
return err
|
||||
}
|
||||
continue
|
||||
case err != nil:
|
||||
// the lease is not found, try to create it
|
||||
lease := &coordv1.Lease{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: leaseName,
|
||||
Namespace: cluster.Name,
|
||||
Labels: map[string]string{clusterNameLabel: cluster.Name},
|
||||
},
|
||||
Spec: coordv1.LeaseSpec{
|
||||
HolderIdentity: pointer.StringPtr(leaseName),
|
||||
RenewTime: &metav1.MicroTime{Time: time.Now()},
|
||||
},
|
||||
}
|
||||
_, err := c.kubeClient.CoordinationV1().Leases(cluster.Name).Create(ctx, lease, metav1.CreateOptions{})
|
||||
return err
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
gracePeriod := time.Duration(leaseDurationTimes*cluster.Spec.LeaseDurationSeconds) * time.Second
|
||||
if gracePeriod == 0 {
|
||||
// FIX: #183 avoid gracePeriod is zero, will non-stop update ManagedClusterLeaseUpdateStopped condition.
|
||||
gracePeriod = time.Duration(leaseDurationTimes*LeaseDurationSeconds) * time.Second
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
if !now.Before(observedLease.Spec.RenewTime.Add(gracePeriod)) {
|
||||
// the lease is not updated constantly, change the cluster available condition to unknown
|
||||
if err := c.updateClusterStatus(ctx, cluster); err != nil {
|
||||
return err
|
||||
case err == nil:
|
||||
gracePeriod := time.Duration(leaseDurationTimes*cluster.Spec.LeaseDurationSeconds) * time.Second
|
||||
// FIX: #183 avoid gracePeriod is zero, will non-stop update ManagedClusterLeaseUpdateStopped condition.
|
||||
if gracePeriod == 0 {
|
||||
gracePeriod = time.Duration(leaseDurationTimes*LeaseDurationSeconds) * time.Second
|
||||
}
|
||||
// the lease is constantly updated, do nothing
|
||||
now := time.Now()
|
||||
if now.Before(observedLease.Spec.RenewTime.Add(gracePeriod)) {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
if meta.IsStatusConditionPresentAndEqual(cluster.Status.Conditions, clusterv1.ManagedClusterConditionAvailable, metav1.ConditionUnknown) {
|
||||
// the managed cluster available condition alreay is unknown, do nothing
|
||||
continue
|
||||
}
|
||||
|
||||
// the lease is not constantly updated, update it to unknown
|
||||
conditionUpdateFn := helpers.UpdateManagedClusterConditionFn(metav1.Condition{
|
||||
Type: clusterv1.ManagedClusterConditionAvailable,
|
||||
Status: metav1.ConditionUnknown,
|
||||
Reason: "ManagedClusterLeaseUpdateStopped",
|
||||
Message: "Registration agent stopped updating its lease.",
|
||||
})
|
||||
_, updated, err := helpers.UpdateManagedClusterStatus(ctx, c.clusterClient, cluster.Name, conditionUpdateFn)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if updated {
|
||||
syncCtx.Recorder().Eventf("ManagedClusterAvailableConditionUpdated",
|
||||
"update managed cluster %q available condition to unknown, due to its lease is not updated constantly",
|
||||
cluster.Name)
|
||||
}
|
||||
}
|
||||
|
||||
// always requeue this cluster to check its lease constantly
|
||||
syncCtx.Queue().AddAfter(clusterName, gracePeriod)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *leaseController) updateClusterStatus(ctx context.Context, cluster *clusterv1.ManagedCluster) error {
|
||||
if meta.IsStatusConditionPresentAndEqual(cluster.Status.Conditions, clusterv1.ManagedClusterConditionAvailable, metav1.ConditionUnknown) {
|
||||
// the managed cluster available condition alreay is unknown, do nothing
|
||||
return nil
|
||||
}
|
||||
|
||||
// the lease is not constantly updated, update it to unknown
|
||||
conditionUpdateFn := helpers.UpdateManagedClusterConditionFn(metav1.Condition{
|
||||
Type: clusterv1.ManagedClusterConditionAvailable,
|
||||
Status: metav1.ConditionUnknown,
|
||||
Reason: "ManagedClusterLeaseUpdateStopped",
|
||||
Message: "Registration agent stopped updating its lease.",
|
||||
})
|
||||
|
||||
_, updated, err := helpers.UpdateManagedClusterStatus(ctx, c.clusterClient, cluster.Name, conditionUpdateFn)
|
||||
if updated {
|
||||
c.eventRecorder.Eventf("ManagedClusterAvailableConditionUpdated",
|
||||
"update managed cluster %q available condition to unknown, due to its lease is not updated constantly",
|
||||
cluster.Name)
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -129,13 +129,16 @@ func TestSync(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
syncCtx := testinghelpers.NewFakeSyncContext(t, testinghelpers.TestManagedClusterName)
|
||||
|
||||
ctrl := &leaseController{
|
||||
kubeClient: leaseClient,
|
||||
clusterClient: clusterClient,
|
||||
clusterLister: clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(),
|
||||
leaseLister: leaseInformerFactory.Coordination().V1().Leases().Lister(),
|
||||
eventRecorder: syncCtx.Recorder(),
|
||||
}
|
||||
syncErr := ctrl.sync(context.TODO(), testinghelpers.NewFakeSyncContext(t, ""))
|
||||
syncErr := ctrl.sync(context.TODO(), syncCtx)
|
||||
if syncErr != nil {
|
||||
t.Errorf("unexpected err: %v", syncErr)
|
||||
}
|
||||
|
||||
@@ -115,7 +115,6 @@ func RunControllerManager(ctx context.Context, controllerContext *controllercmd.
|
||||
clusterClient,
|
||||
clusterInformers.Cluster().V1().ManagedClusters(),
|
||||
kubeInfomers.Coordination().V1().Leases(),
|
||||
ResyncInterval, //TODO: this interval time should be allowed to change from outside
|
||||
controllerContext.EventRecorder,
|
||||
)
|
||||
|
||||
|
||||
@@ -42,7 +42,7 @@ var _ = ginkgo.Describe("Cluster Lease Update", func() {
|
||||
cancel := util.RunAgent("cluster-leasetest", agentOptions, spokeCfg)
|
||||
defer cancel()
|
||||
|
||||
bootstrapManagedCluster(managedClusterName, hubKubeconfigSecret)
|
||||
bootstrapManagedCluster(managedClusterName, hubKubeconfigSecret, util.TestLeaseDurationSeconds)
|
||||
// after two grace period, make sure the managed cluster is available
|
||||
gracePeriod := 2 * 5 * util.TestLeaseDurationSeconds
|
||||
assertAvailableCondition(managedClusterName, metav1.ConditionTrue, gracePeriod)
|
||||
@@ -59,7 +59,7 @@ var _ = ginkgo.Describe("Cluster Lease Update", func() {
|
||||
}
|
||||
stop := util.RunAgent("cluster-availabletest", agentOptions, spokeCfg)
|
||||
|
||||
bootstrapManagedCluster(managedClusterName, hubKubeconfigSecret)
|
||||
bootstrapManagedCluster(managedClusterName, hubKubeconfigSecret, util.TestLeaseDurationSeconds)
|
||||
assertAvailableCondition(managedClusterName, metav1.ConditionTrue, 0)
|
||||
|
||||
// stop the current managed cluster
|
||||
@@ -96,7 +96,7 @@ var _ = ginkgo.Describe("Cluster Lease Update", func() {
|
||||
cancel := util.RunAgent("cluster-leasetest", agentOptions, spokeCfg)
|
||||
defer cancel()
|
||||
|
||||
bootstrapManagedCluster(managedClusterName, hubKubeconfigSecret)
|
||||
bootstrapManagedCluster(managedClusterName, hubKubeconfigSecret, util.TestLeaseDurationSeconds)
|
||||
assertAvailableCondition(managedClusterName, metav1.ConditionTrue, 0)
|
||||
|
||||
// remove the cluster
|
||||
@@ -133,12 +133,38 @@ var _ = ginkgo.Describe("Cluster Lease Update", func() {
|
||||
gracePeriod := 2 * 5 * util.TestLeaseDurationSeconds
|
||||
assertAvailableCondition(managedClusterName, metav1.ConditionTrue, gracePeriod)
|
||||
})
|
||||
|
||||
ginkgo.It("should use a short lease duration", func() {
|
||||
// run registration agent
|
||||
agentOptions := spoke.SpokeAgentOptions{
|
||||
ClusterName: managedClusterName,
|
||||
BootstrapKubeconfig: bootstrapKubeConfigFile,
|
||||
HubKubeconfigSecret: hubKubeconfigSecret,
|
||||
HubKubeconfigDir: hubKubeconfigDir,
|
||||
ClusterHealthCheckPeriod: 1 * time.Minute,
|
||||
}
|
||||
stop := util.RunAgent("cluster-leasetest", agentOptions, spokeCfg)
|
||||
|
||||
bootstrapManagedCluster(managedClusterName, hubKubeconfigSecret, 60)
|
||||
assertAvailableCondition(managedClusterName, metav1.ConditionTrue, 0)
|
||||
|
||||
// update the lease duration with a short duration (1s)
|
||||
err := updateManagedClusterLeaseDuration(managedClusterName, util.TestLeaseDurationSeconds)
|
||||
gomega.Expect(err).NotTo(gomega.HaveOccurred())
|
||||
|
||||
// stop the agent
|
||||
stop()
|
||||
|
||||
// after two short grace period, make sure the managed cluster is unknown
|
||||
gracePeriod := 2 * 5 * util.TestLeaseDurationSeconds
|
||||
assertAvailableCondition(managedClusterName, metav1.ConditionUnknown, gracePeriod)
|
||||
})
|
||||
})
|
||||
|
||||
func bootstrapManagedCluster(managedClusterName, hubKubeconfigSecret string) {
|
||||
func bootstrapManagedCluster(managedClusterName, hubKubeconfigSecret string, leaseDuration int32) {
|
||||
// simulate hub cluster admin to accept the managed cluster and approve the csr
|
||||
gomega.Eventually(func() error {
|
||||
return util.AcceptManagedCluster(clusterClient, managedClusterName)
|
||||
return util.AcceptManagedClusterWithLeaseDuration(clusterClient, managedClusterName, leaseDuration)
|
||||
}, eventuallyTimeout, eventuallyInterval).ShouldNot(gomega.HaveOccurred())
|
||||
|
||||
gomega.Eventually(func() error {
|
||||
@@ -169,3 +195,15 @@ func assertAvailableCondition(managedClusterName string, status metav1.Condition
|
||||
return nil
|
||||
}, eventuallyTimeout, eventuallyInterval).ShouldNot(gomega.HaveOccurred())
|
||||
}
|
||||
|
||||
func updateManagedClusterLeaseDuration(clusterName string, leaseDuration int32) error {
|
||||
cluster, err := clusterClient.ClusterV1().ManagedClusters().Get(context.TODO(), clusterName, metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
cluster.Spec.LeaseDurationSeconds = leaseDuration
|
||||
|
||||
_, err = clusterClient.ClusterV1().ManagedClusters().Update(context.TODO(), cluster, metav1.UpdateOptions{})
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -504,13 +504,17 @@ func GetManagedCluster(clusterClient clusterclientset.Interface, spokeClusterNam
|
||||
}
|
||||
|
||||
func AcceptManagedCluster(clusterClient clusterclientset.Interface, spokeClusterName string) error {
|
||||
return AcceptManagedClusterWithLeaseDuration(clusterClient, spokeClusterName, 60)
|
||||
}
|
||||
|
||||
func AcceptManagedClusterWithLeaseDuration(clusterClient clusterclientset.Interface, spokeClusterName string, leaseDuration int32) error {
|
||||
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
|
||||
spokeCluster, err := GetManagedCluster(clusterClient, spokeClusterName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
spokeCluster.Spec.HubAcceptsClient = true
|
||||
spokeCluster.Spec.LeaseDurationSeconds = TestLeaseDurationSeconds
|
||||
spokeCluster.Spec.LeaseDurationSeconds = leaseDuration
|
||||
_, err = clusterClient.ClusterV1().ManagedClusters().Update(context.TODO(), spokeCluster, metav1.UpdateOptions{})
|
||||
return err
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user