mirror of
https://github.com/open-cluster-management-io/ocm.git
synced 2026-05-19 23:57:57 +00:00
resovle the remain comments in pr40
This commit is contained in:
@@ -3,7 +3,6 @@ package lease
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
clientset "github.com/open-cluster-management/api/client/cluster/clientset/versioned"
|
||||
@@ -29,11 +28,10 @@ const leaseDurationTimes = 5
|
||||
|
||||
// leaseController checks the lease of managed clusters on hub cluster to determine whether a managed cluster is available.
|
||||
type leaseController struct {
|
||||
kubeClient kubernetes.Interface
|
||||
clusterClient clientset.Interface
|
||||
clusterLister clusterv1listers.ManagedClusterLister
|
||||
leaseLister coordlisters.LeaseLister
|
||||
clusterLeaseMap *clusterLeaseMap
|
||||
kubeClient kubernetes.Interface
|
||||
clusterClient clientset.Interface
|
||||
clusterLister clusterv1listers.ManagedClusterLister
|
||||
leaseLister coordlisters.LeaseLister
|
||||
}
|
||||
|
||||
// NewClusterLeaseController creates a cluster lease controller on hub cluster.
|
||||
@@ -45,11 +43,10 @@ func NewClusterLeaseController(
|
||||
resyncInterval time.Duration,
|
||||
recorder events.Recorder) factory.Controller {
|
||||
c := &leaseController{
|
||||
kubeClient: kubeClient,
|
||||
clusterClient: clusterClient,
|
||||
clusterLister: clusterInformer.Lister(),
|
||||
leaseLister: leaseInformer.Lister(),
|
||||
clusterLeaseMap: newClusterLeaseMap(),
|
||||
kubeClient: kubeClient,
|
||||
clusterClient: clusterClient,
|
||||
clusterLister: clusterInformer.Lister(),
|
||||
leaseLister: leaseInformer.Lister(),
|
||||
}
|
||||
return factory.New().
|
||||
WithInformers(clusterInformer.Informer(), leaseInformer.Informer()).
|
||||
@@ -72,7 +69,7 @@ func (c *leaseController) sync(ctx context.Context, syncCtx factory.SyncContext)
|
||||
}
|
||||
|
||||
// get the lease of a cluster, if the lease is not found, create it
|
||||
leaseName := fmt.Sprintf("cluster-%s-lease", cluster.Name)
|
||||
leaseName := fmt.Sprintf("cluster-lease-%s", cluster.Name)
|
||||
observedLease, err := c.leaseLister.Leases(cluster.Name).Get(leaseName)
|
||||
switch {
|
||||
case errors.IsNotFound(err):
|
||||
@@ -84,6 +81,7 @@ func (c *leaseController) sync(ctx context.Context, syncCtx factory.SyncContext)
|
||||
},
|
||||
Spec: coordv1.LeaseSpec{
|
||||
HolderIdentity: pointer.StringPtr(leaseName),
|
||||
RenewTime: &metav1.MicroTime{Time: time.Now()},
|
||||
},
|
||||
}
|
||||
if _, err := c.kubeClient.CoordinationV1().Leases(cluster.Name).Create(ctx, lease, metav1.CreateOptions{}); err != nil {
|
||||
@@ -94,18 +92,6 @@ func (c *leaseController) sync(ctx context.Context, syncCtx factory.SyncContext)
|
||||
return err
|
||||
}
|
||||
|
||||
// update the lease probe time and last lease on hub cluster if the managed cluster lease is constantly
|
||||
// updated, in next, we will use hub probe time to determine whether the managed cluster lease is constantly
|
||||
// updated in a grace period to avoid the problem of time synchronization between hub and managed
|
||||
// cluster.
|
||||
lastClusterLease := c.clusterLeaseMap.get(observedLease.Name)
|
||||
if lastClusterLease.lease == nil || (lastClusterLease.lease.Spec.RenewTime == nil ||
|
||||
lastClusterLease.lease.Spec.RenewTime.Before(observedLease.Spec.RenewTime)) {
|
||||
lastClusterLease.lease = observedLease.DeepCopy()
|
||||
lastClusterLease.probeTimestamp = metav1.Now()
|
||||
c.clusterLeaseMap.set(observedLease.Name, lastClusterLease)
|
||||
}
|
||||
|
||||
leaseDurationSeconds := cluster.Spec.LeaseDurationSeconds
|
||||
// TODO: use CRDs defaulting or mutating admission webhook to eliminate this code.
|
||||
if leaseDurationSeconds == 0 {
|
||||
@@ -114,8 +100,8 @@ func (c *leaseController) sync(ctx context.Context, syncCtx factory.SyncContext)
|
||||
|
||||
gracePeriod := time.Duration(leaseDurationTimes*leaseDurationSeconds) * time.Second
|
||||
// the lease is constantly updated, do nothing
|
||||
now := metav1.Now()
|
||||
if now.Time.Before(lastClusterLease.probeTimestamp.Add(gracePeriod)) {
|
||||
now := time.Now()
|
||||
if now.Before(observedLease.Spec.RenewTime.Add(gracePeriod)) {
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -125,7 +111,7 @@ func (c *leaseController) sync(ctx context.Context, syncCtx factory.SyncContext)
|
||||
Status: metav1.ConditionUnknown,
|
||||
Reason: "ManagedClusterLeaseUpdateStopped",
|
||||
Message: fmt.Sprintf("Registration agent stopped updating its lease within %.0f minutes.",
|
||||
now.Sub(lastClusterLease.probeTimestamp.Time).Minutes()),
|
||||
now.Sub(observedLease.Spec.RenewTime.Time).Minutes()),
|
||||
})
|
||||
_, updated, err := helpers.UpdateManagedClusterStatus(ctx, c.clusterClient, cluster.Name, conditionUpdateFn)
|
||||
if err != nil {
|
||||
@@ -139,34 +125,3 @@ func (c *leaseController) sync(ctx context.Context, syncCtx factory.SyncContext)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type clusterLease struct {
|
||||
probeTimestamp metav1.Time
|
||||
lease *coordv1.Lease
|
||||
}
|
||||
|
||||
type clusterLeaseMap struct {
|
||||
lock sync.RWMutex
|
||||
clusterLeases map[string]*clusterLease
|
||||
}
|
||||
|
||||
func newClusterLeaseMap() *clusterLeaseMap {
|
||||
return &clusterLeaseMap{
|
||||
clusterLeases: make(map[string]*clusterLease),
|
||||
}
|
||||
}
|
||||
|
||||
func (n *clusterLeaseMap) get(name string) *clusterLease {
|
||||
n.lock.RLock()
|
||||
defer n.lock.RUnlock()
|
||||
if leaseData, ok := n.clusterLeases[name]; ok {
|
||||
return leaseData
|
||||
}
|
||||
return &clusterLease{}
|
||||
}
|
||||
|
||||
func (n *clusterLeaseMap) set(name string, data *clusterLease) {
|
||||
n.lock.Lock()
|
||||
defer n.lock.Unlock()
|
||||
n.clusterLeases[name] = data
|
||||
}
|
||||
|
||||
@@ -25,12 +25,11 @@ var now = time.Now()
|
||||
|
||||
func TestSync(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
clusters []runtime.Object
|
||||
lastClusterLease *clusterLease
|
||||
clusterLeases []runtime.Object
|
||||
validateActions func(t *testing.T, leaseActions, clusterActions []clienttesting.Action)
|
||||
expectedErr string
|
||||
name string
|
||||
clusters []runtime.Object
|
||||
clusterLeases []runtime.Object
|
||||
validateActions func(t *testing.T, leaseActions, clusterActions []clienttesting.Action)
|
||||
expectedErr string
|
||||
}{
|
||||
{
|
||||
name: "sync unaccepted managed cluster",
|
||||
@@ -51,12 +50,8 @@ func TestSync(t *testing.T) {
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "managed cluster stop update lease",
|
||||
clusters: []runtime.Object{newManagedCluster(newAcceptedCondtion(), newAvailableCondtion())},
|
||||
lastClusterLease: &clusterLease{
|
||||
probeTimestamp: metav1.Time{Time: now.Add(-5 * time.Minute)},
|
||||
lease: newClusterLease(now.Add(-5 * time.Minute)),
|
||||
},
|
||||
name: "managed cluster stop update lease",
|
||||
clusters: []runtime.Object{newManagedCluster(newAcceptedCondtion(), newAvailableCondtion())},
|
||||
clusterLeases: []runtime.Object{newClusterLease(now.Add(-5 * time.Minute))},
|
||||
validateActions: func(t *testing.T, leaseActions, clusterActions []clienttesting.Action) {
|
||||
assertActions(t, clusterActions, "get", "update")
|
||||
@@ -71,12 +66,8 @@ func TestSync(t *testing.T) {
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "managed cluster is available",
|
||||
clusters: []runtime.Object{newManagedCluster(newAcceptedCondtion(), newAvailableCondtion())},
|
||||
lastClusterLease: &clusterLease{
|
||||
probeTimestamp: metav1.Time{Time: now.Add(-1 * time.Minute)},
|
||||
lease: newClusterLease(now.Add(-1 * time.Minute)),
|
||||
},
|
||||
name: "managed cluster is available",
|
||||
clusters: []runtime.Object{newManagedCluster(newAcceptedCondtion(), newAvailableCondtion())},
|
||||
clusterLeases: []runtime.Object{newClusterLease(now)},
|
||||
validateActions: func(t *testing.T, leaseActions, clusterActions []clienttesting.Action) {
|
||||
assertActions(t, clusterActions)
|
||||
@@ -100,15 +91,11 @@ func TestSync(t *testing.T) {
|
||||
leaseStore.Add(lease)
|
||||
}
|
||||
|
||||
clusterLeaseMap := newClusterLeaseMap()
|
||||
clusterLeaseMap.set("cluster-testmanagedcluster-lease", c.lastClusterLease)
|
||||
|
||||
ctrl := &leaseController{
|
||||
kubeClient: leaseClient,
|
||||
clusterClient: clusterClient,
|
||||
clusterLister: clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(),
|
||||
leaseLister: leaseInformerFactory.Coordination().V1().Leases().Lister(),
|
||||
clusterLeaseMap: clusterLeaseMap,
|
||||
kubeClient: leaseClient,
|
||||
clusterClient: clusterClient,
|
||||
clusterLister: clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(),
|
||||
leaseLister: leaseInformerFactory.Coordination().V1().Leases().Lister(),
|
||||
}
|
||||
syncErr := ctrl.sync(context.TODO(), newFakeSyncContext(t))
|
||||
if len(c.expectedErr) > 0 && syncErr == nil {
|
||||
@@ -187,7 +174,7 @@ func newAvailableCondtion() clusterv1.StatusCondition {
|
||||
func newClusterLease(renewTime time.Time) *coordv1.Lease {
|
||||
return &coordv1.Lease{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "cluster-testmanagedcluster-lease",
|
||||
Name: "cluster-lease-testmanagedcluster",
|
||||
Namespace: "testmanagedcluster",
|
||||
},
|
||||
Spec: coordv1.LeaseSpec{
|
||||
|
||||
@@ -158,7 +158,7 @@ rules:
|
||||
# Allow spoke registration agent to get/update coordination.k8s.io/lease
|
||||
- apiGroups: ["coordination.k8s.io"]
|
||||
resources: ["leases"]
|
||||
resourceNames: ["cluster-{{ .ManagedClusterName }}-lease"]
|
||||
resourceNames: ["cluster-lease-{{ .ManagedClusterName }}"]
|
||||
verbs: ["get", "update"]
|
||||
`)
|
||||
|
||||
|
||||
@@ -7,5 +7,5 @@ rules:
|
||||
# Allow spoke registration agent to get/update coordination.k8s.io/lease
|
||||
- apiGroups: ["coordination.k8s.io"]
|
||||
resources: ["leases"]
|
||||
resourceNames: ["cluster-{{ .ManagedClusterName }}-lease"]
|
||||
resourceNames: ["cluster-lease-{{ .ManagedClusterName }}"]
|
||||
verbs: ["get", "update"]
|
||||
|
||||
@@ -21,10 +21,10 @@ import (
|
||||
|
||||
// managedClusterHealthCheckController checks the kube-apiserver health on managed cluster to determine it whether is available.
|
||||
type managedClusterHealthCheckController struct {
|
||||
clusterName string
|
||||
hubClusterClient clientset.Interface
|
||||
hubClusterLister clusterv1listers.ManagedClusterLister
|
||||
discoveryClient discovery.DiscoveryInterface
|
||||
clusterName string
|
||||
hubClusterClient clientset.Interface
|
||||
hubClusterLister clusterv1listers.ManagedClusterLister
|
||||
managedClusterDiscoveryClient discovery.DiscoveryInterface
|
||||
}
|
||||
|
||||
// NewManagedClusterHealthCheckController creates a managed cluster health check controller on managed cluster.
|
||||
@@ -32,14 +32,14 @@ func NewManagedClusterHealthCheckController(
|
||||
clusterName string,
|
||||
hubClusterClient clientset.Interface,
|
||||
hubClusterInformer clusterv1informer.ManagedClusterInformer,
|
||||
discoveryClient discovery.DiscoveryInterface,
|
||||
managedClusterDiscoveryClient discovery.DiscoveryInterface,
|
||||
resyncInterval time.Duration,
|
||||
recorder events.Recorder) factory.Controller {
|
||||
c := &managedClusterHealthCheckController{
|
||||
clusterName: clusterName,
|
||||
hubClusterClient: hubClusterClient,
|
||||
hubClusterLister: hubClusterInformer.Lister(),
|
||||
discoveryClient: discoveryClient,
|
||||
clusterName: clusterName,
|
||||
hubClusterClient: hubClusterClient,
|
||||
hubClusterLister: hubClusterInformer.Lister(),
|
||||
managedClusterDiscoveryClient: managedClusterDiscoveryClient,
|
||||
}
|
||||
|
||||
return factory.New().
|
||||
@@ -73,7 +73,7 @@ func (c *managedClusterHealthCheckController) sync(ctx context.Context, syncCtx
|
||||
func (c *managedClusterHealthCheckController) checkKubeAPIServerStatus(ctx context.Context) clusterv1.StatusCondition {
|
||||
statusCode := 0
|
||||
condition := clusterv1.StatusCondition{Type: clusterv1.ManagedClusterConditionAvailable}
|
||||
result := c.discoveryClient.RESTClient().Get().AbsPath("/readyz").Do(ctx).StatusCode(&statusCode)
|
||||
result := c.managedClusterDiscoveryClient.RESTClient().Get().AbsPath("/readyz").Do(ctx).StatusCode(&statusCode)
|
||||
if statusCode == http.StatusOK {
|
||||
condition.Status = metav1.ConditionTrue
|
||||
condition.Reason = "ManagedClusterAvailable"
|
||||
|
||||
@@ -100,10 +100,10 @@ func TestHealthCheck(t *testing.T) {
|
||||
serverResponse.responseMsg = c.responseMsg
|
||||
|
||||
ctrl := &managedClusterHealthCheckController{
|
||||
clusterName: testManagedClusterName,
|
||||
hubClusterClient: clusterClient,
|
||||
hubClusterLister: clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(),
|
||||
discoveryClient: discoveryClient,
|
||||
clusterName: testManagedClusterName,
|
||||
hubClusterClient: clusterClient,
|
||||
hubClusterLister: clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(),
|
||||
managedClusterDiscoveryClient: discoveryClient,
|
||||
}
|
||||
syncErr := ctrl.sync(context.TODO(), newFakeSyncContext(t))
|
||||
if len(c.expectedErr) > 0 && syncErr == nil {
|
||||
|
||||
@@ -43,7 +43,7 @@ func NewManagedClusterLeaseController(
|
||||
leaseUpdater: &leaseUpdater{
|
||||
hubClient: hubClient,
|
||||
clusterName: clusterName,
|
||||
leaseName: fmt.Sprintf("cluster-%s-lease", clusterName),
|
||||
leaseName: fmt.Sprintf("cluster-lease-%s", clusterName),
|
||||
recorder: recorder,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -69,7 +69,7 @@ func TestLeaseUpdate(t *testing.T) {
|
||||
|
||||
hubClient := kubefake.NewSimpleClientset(&coordinationv1.Lease{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "cluster-testmanagedcluster-lease",
|
||||
Name: "cluster-lease-testmanagedcluster",
|
||||
Namespace: "testmanagedcluster",
|
||||
},
|
||||
})
|
||||
@@ -77,7 +77,7 @@ func TestLeaseUpdate(t *testing.T) {
|
||||
leaseUpdater := &leaseUpdater{
|
||||
hubClient: hubClient,
|
||||
clusterName: testManagedClusterName,
|
||||
leaseName: fmt.Sprintf("cluster-%s-lease", testManagedClusterName),
|
||||
leaseName: fmt.Sprintf("cluster-lease-%s", testManagedClusterName),
|
||||
recorder: eventstesting.NewTestingEventRecorder(t),
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user