mirror of
https://github.com/open-cluster-management-io/ocm.git
synced 2026-02-14 18:09:57 +00:00
Add clock sync condition controller. (#312)
Signed-off-by: xuezhaojun <zxue@redhat.com>
This commit is contained in:
150
pkg/registration/hub/lease/clocksynccontroller.go
Normal file
150
pkg/registration/hub/lease/clocksynccontroller.go
Normal file
@@ -0,0 +1,150 @@
|
||||
package lease
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/openshift/library-go/pkg/controller/factory"
|
||||
"github.com/openshift/library-go/pkg/operator/events"
|
||||
coordv1 "k8s.io/api/coordination/v1"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
coordinformers "k8s.io/client-go/informers/coordination/v1"
|
||||
coordlisters "k8s.io/client-go/listers/coordination/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
|
||||
clientset "open-cluster-management.io/api/client/cluster/clientset/versioned"
|
||||
clusterv1informer "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1"
|
||||
clusterv1listers "open-cluster-management.io/api/client/cluster/listers/cluster/v1"
|
||||
clusterv1 "open-cluster-management.io/api/cluster/v1"
|
||||
|
||||
"open-cluster-management.io/ocm/pkg/common/patcher"
|
||||
"open-cluster-management.io/ocm/pkg/common/queue"
|
||||
)
|
||||
|
||||
type clockSyncController struct {
|
||||
patcher patcher.Patcher[*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus]
|
||||
clusterLister clusterv1listers.ManagedClusterLister
|
||||
leaseLister coordlisters.LeaseLister
|
||||
eventRecorder events.Recorder
|
||||
}
|
||||
|
||||
const (
|
||||
clockSyncControllerName = "ClockSyncController"
|
||||
)
|
||||
|
||||
func NewClockSyncController(
|
||||
clusterClient clientset.Interface,
|
||||
clusterInformer clusterv1informer.ManagedClusterInformer,
|
||||
leaseInformer coordinformers.LeaseInformer,
|
||||
recorder events.Recorder,
|
||||
) factory.Controller {
|
||||
c := &clockSyncController{
|
||||
patcher: patcher.NewPatcher[
|
||||
*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus](
|
||||
clusterClient.ClusterV1().ManagedClusters()),
|
||||
clusterLister: clusterInformer.Lister(),
|
||||
leaseLister: leaseInformer.Lister(),
|
||||
eventRecorder: recorder.WithComponentSuffix("managed-cluster-clock-sync-controller"),
|
||||
}
|
||||
|
||||
syncCtx := factory.NewSyncContext(clockSyncControllerName, recorder)
|
||||
leaseRenewTimeUpdateInformer := renewUpdateInfomer(syncCtx.Queue(), leaseInformer)
|
||||
|
||||
return factory.New().WithSyncContext(syncCtx).
|
||||
WithBareInformers(leaseRenewTimeUpdateInformer).
|
||||
WithSync(c.sync).
|
||||
ToController(clockSyncControllerName, recorder)
|
||||
}
|
||||
|
||||
func renewUpdateInfomer(q workqueue.RateLimitingInterface, leaseInformer coordinformers.LeaseInformer) factory.Informer {
|
||||
leaseRenewTimeUpdateInformer := leaseInformer.Informer()
|
||||
queueKeyByLabel := queue.QueueKeyByLabel(clusterv1.ClusterNameLabelKey)
|
||||
_, err := leaseRenewTimeUpdateInformer.AddEventHandler(&cache.FilteringResourceEventHandler{
|
||||
FilterFunc: queue.UnionFilter(queue.FileterByLabel(clusterv1.ClusterNameLabelKey), queue.FilterByNames(leaseName)),
|
||||
Handler: &cache.ResourceEventHandlerFuncs{
|
||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||
// only renew field update event will be added to queue
|
||||
oldLease := oldObj.(*coordv1.Lease)
|
||||
newLease := newObj.(*coordv1.Lease)
|
||||
if !oldLease.Spec.RenewTime.Equal(newLease.Spec.RenewTime) {
|
||||
for _, queueKey := range queueKeyByLabel(newLease) {
|
||||
q.Add(queueKey)
|
||||
}
|
||||
}
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
runtime.HandleError(err)
|
||||
}
|
||||
return leaseRenewTimeUpdateInformer
|
||||
}
|
||||
|
||||
func (c *clockSyncController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
|
||||
clusterName := syncCtx.QueueKey()
|
||||
|
||||
// the event caused by resync will be filtered because the cluster is not found
|
||||
cluster, err := c.clusterLister.Get(clusterName)
|
||||
if errors.IsNotFound(err) {
|
||||
// the cluster is not found, do nothing
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
observedLease, err := c.leaseLister.Leases(cluster.Name).Get(leaseName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// When the agent's lease get renewed, the "now" on hub should close to the RenewTime on agent.
|
||||
// If the two time are not close(over 1 lease duration), we assume the clock is out of sync.
|
||||
oneLeaseDuration := time.Duration(LeaseDurationSeconds) * time.Second
|
||||
if err := c.updateClusterStatusClockSynced(ctx, cluster,
|
||||
now.Sub(observedLease.Spec.RenewTime.Time) < oneLeaseDuration && observedLease.Spec.RenewTime.Time.Sub(now) < oneLeaseDuration); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *clockSyncController) updateClusterStatusClockSynced(ctx context.Context, cluster *clusterv1.ManagedCluster, synced bool) error {
|
||||
var desiredStatus metav1.ConditionStatus
|
||||
var condition metav1.Condition
|
||||
if synced {
|
||||
desiredStatus = metav1.ConditionTrue
|
||||
condition = metav1.Condition{
|
||||
Type: clusterv1.ManagedClusterConditionClockSynced,
|
||||
Status: metav1.ConditionTrue,
|
||||
Reason: "ManagedClusterClockSynced",
|
||||
Message: "The clock of the managed cluster is synced with the hub.",
|
||||
}
|
||||
} else {
|
||||
desiredStatus = metav1.ConditionFalse
|
||||
condition = metav1.Condition{
|
||||
Type: clusterv1.ManagedClusterConditionClockSynced,
|
||||
Status: metav1.ConditionFalse,
|
||||
Reason: "ManagedClusterClockOutOfSync",
|
||||
Message: "The clock of hub and agent is out of sync. This may cause the Unknown status and affect agent functionalities.",
|
||||
}
|
||||
}
|
||||
|
||||
if meta.IsStatusConditionPresentAndEqual(cluster.Status.Conditions, clusterv1.ManagedClusterConditionClockSynced, desiredStatus) {
|
||||
// the managed cluster clock synced condition alreay is desired status, do nothing
|
||||
return nil
|
||||
}
|
||||
|
||||
newCluster := cluster.DeepCopy()
|
||||
meta.SetStatusCondition(&newCluster.Status.Conditions, condition)
|
||||
|
||||
updated, err := c.patcher.PatchStatus(ctx, newCluster, newCluster.Status, cluster.Status)
|
||||
if updated {
|
||||
c.eventRecorder.Eventf("ManagedClusterClockSyncedConditionUpdated",
|
||||
"update managed cluster %q clock synced condition to %v.", cluster.Name, desiredStatus)
|
||||
}
|
||||
return err
|
||||
}
|
||||
125
pkg/registration/hub/lease/clocksynccontroller_test.go
Normal file
125
pkg/registration/hub/lease/clocksynccontroller_test.go
Normal file
@@ -0,0 +1,125 @@
|
||||
package lease
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
kubeinformers "k8s.io/client-go/informers"
|
||||
kubefake "k8s.io/client-go/kubernetes/fake"
|
||||
clienttesting "k8s.io/client-go/testing"
|
||||
|
||||
clusterfake "open-cluster-management.io/api/client/cluster/clientset/versioned/fake"
|
||||
clusterinformers "open-cluster-management.io/api/client/cluster/informers/externalversions"
|
||||
clusterv1 "open-cluster-management.io/api/cluster/v1"
|
||||
v1 "open-cluster-management.io/api/cluster/v1"
|
||||
|
||||
"open-cluster-management.io/ocm/pkg/common/patcher"
|
||||
testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
|
||||
testinghelpers "open-cluster-management.io/ocm/pkg/registration/helpers/testing"
|
||||
)
|
||||
|
||||
func TestClockSyncController(t *testing.T) {
|
||||
// cases:
|
||||
// 1. hub and agent clock is close
|
||||
// 2. hub and agent clock is not close
|
||||
cases := []struct {
|
||||
name string
|
||||
clusters []runtime.Object
|
||||
leases []runtime.Object
|
||||
validateActions func(t *testing.T, leaseActions, clusterActions []clienttesting.Action)
|
||||
}{
|
||||
{
|
||||
name: "hub and agent clock is close",
|
||||
clusters: []runtime.Object{
|
||||
testinghelpers.NewManagedCluster(),
|
||||
},
|
||||
leases: []runtime.Object{
|
||||
testinghelpers.NewManagedClusterLease("managed-cluster-lease", now.Add(5*time.Second)),
|
||||
},
|
||||
validateActions: func(t *testing.T, leaseActions, clusterActions []clienttesting.Action) {
|
||||
expected := metav1.Condition{
|
||||
Type: clusterv1.ManagedClusterConditionClockSynced,
|
||||
Status: metav1.ConditionTrue,
|
||||
Reason: "ManagedClusterClockSynced",
|
||||
Message: "The clock of the managed cluster is synced with the hub.",
|
||||
}
|
||||
testingcommon.AssertActions(t, clusterActions, "patch")
|
||||
patch := clusterActions[0].(clienttesting.PatchAction).GetPatch()
|
||||
managedCluster := &v1.ManagedCluster{}
|
||||
err := json.Unmarshal(patch, managedCluster)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
testingcommon.AssertCondition(t, managedCluster.Status.Conditions, expected)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "hub and agent clock is not close",
|
||||
clusters: []runtime.Object{
|
||||
testinghelpers.NewManagedCluster(),
|
||||
},
|
||||
leases: []runtime.Object{
|
||||
testinghelpers.NewManagedClusterLease("managed-cluster-lease", now.Add(61*time.Second)),
|
||||
},
|
||||
validateActions: func(t *testing.T, leaseActions, clusterActions []clienttesting.Action) {
|
||||
expected := metav1.Condition{
|
||||
Type: clusterv1.ManagedClusterConditionClockSynced,
|
||||
Status: metav1.ConditionFalse,
|
||||
Reason: "ManagedClusterClockOutOfSync",
|
||||
Message: "The clock of hub and agent is out of sync. This may cause the Unknown status and affect agent functionalities.",
|
||||
}
|
||||
testingcommon.AssertActions(t, clusterActions, "patch")
|
||||
patch := clusterActions[0].(clienttesting.PatchAction).GetPatch()
|
||||
managedCluster := &v1.ManagedCluster{}
|
||||
err := json.Unmarshal(patch, managedCluster)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
testingcommon.AssertCondition(t, managedCluster.Status.Conditions, expected)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
clusterClient := clusterfake.NewSimpleClientset(c.clusters...)
|
||||
clusterInformerFactory := clusterinformers.NewSharedInformerFactory(clusterClient, time.Minute*10)
|
||||
clusterStore := clusterInformerFactory.Cluster().V1().ManagedClusters().Informer().GetStore()
|
||||
for _, cluster := range c.clusters {
|
||||
if err := clusterStore.Add(cluster); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
leaseClient := kubefake.NewSimpleClientset(c.leases...)
|
||||
leaseInformerFactory := kubeinformers.NewSharedInformerFactory(leaseClient, time.Minute*10)
|
||||
leaseStore := leaseInformerFactory.Coordination().V1().Leases().Informer().GetStore()
|
||||
for _, lease := range c.leases {
|
||||
if err := leaseStore.Add(lease); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
syncCtx := testingcommon.NewFakeSyncContext(t, testinghelpers.TestManagedClusterName)
|
||||
|
||||
controller := &clockSyncController{
|
||||
patcher: patcher.NewPatcher[
|
||||
*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus](
|
||||
clusterClient.ClusterV1().ManagedClusters()),
|
||||
clusterLister: clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(),
|
||||
leaseLister: leaseInformerFactory.Coordination().V1().Leases().Lister(),
|
||||
eventRecorder: syncCtx.Recorder(),
|
||||
}
|
||||
syncErr := controller.sync(context.TODO(), syncCtx)
|
||||
if syncErr != nil {
|
||||
t.Errorf("unexpected err: %v", syncErr)
|
||||
}
|
||||
c.validateActions(t, leaseClient.Actions(), clusterClient.Actions())
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
@@ -196,6 +196,13 @@ func (m *HubManagerOptions) RunControllerManagerWithInformers(
|
||||
controllerContext.EventRecorder,
|
||||
)
|
||||
|
||||
clockSyncController := lease.NewClockSyncController(
|
||||
clusterClient,
|
||||
clusterInformers.Cluster().V1().ManagedClusters(),
|
||||
kubeInformers.Coordination().V1().Leases(),
|
||||
controllerContext.EventRecorder,
|
||||
)
|
||||
|
||||
managedClusterSetController := managedclusterset.NewManagedClusterSetController(
|
||||
clusterClient,
|
||||
clusterInformers.Cluster().V1().ManagedClusters(),
|
||||
@@ -268,6 +275,7 @@ func (m *HubManagerOptions) RunControllerManagerWithInformers(
|
||||
go taintController.Run(ctx, 1)
|
||||
go csrController.Run(ctx, 1)
|
||||
go leaseController.Run(ctx, 1)
|
||||
go clockSyncController.Run(ctx, 1)
|
||||
go managedClusterSetController.Run(ctx, 1)
|
||||
go managedClusterSetBindingController.Run(ctx, 1)
|
||||
go clusterroleController.Run(ctx, 1)
|
||||
|
||||
@@ -165,6 +165,45 @@ var _ = ginkgo.Describe("Cluster Lease Update", func() {
|
||||
gracePeriod := 2 * 5 * util.TestLeaseDurationSeconds
|
||||
assertAvailableCondition(managedClusterName, metav1.ConditionUnknown, gracePeriod)
|
||||
})
|
||||
|
||||
ginkgo.It("clock sync condition should work", func() {
|
||||
// run registration agent
|
||||
agentOptions := &spoke.SpokeAgentOptions{
|
||||
BootstrapKubeconfig: bootstrapKubeConfigFile,
|
||||
HubKubeconfigSecret: hubKubeconfigSecret,
|
||||
ClusterHealthCheckPeriod: 1 * time.Minute,
|
||||
}
|
||||
commOptions := commonoptions.NewAgentOptions()
|
||||
commOptions.HubKubeconfigDir = hubKubeconfigDir
|
||||
commOptions.SpokeClusterName = managedClusterName
|
||||
|
||||
stop := runAgent("cluster-leasetest", agentOptions, commOptions, spokeCfg)
|
||||
bootstrapManagedCluster(managedClusterName, hubKubeconfigSecret, util.TestLeaseDurationSeconds)
|
||||
|
||||
gracePeriod := 2 * 5 * util.TestLeaseDurationSeconds
|
||||
assertCloclSyncedCondition(managedClusterName, metav1.ConditionTrue, gracePeriod)
|
||||
|
||||
// stop the agent in case agent update the lease.
|
||||
stop()
|
||||
|
||||
// update the managed cluster lease renew time
|
||||
now := time.Now()
|
||||
gomega.Eventually(func() error {
|
||||
lease, err := util.GetManagedClusterLease(kubeClient, managedClusterName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// The default lease duration is 60s.
|
||||
// The renewTime is 2 leaseDuration before the hub's now, so the clock should be out of sync.
|
||||
// The renewTime + 5 * leaseDuration > now, so the available condition should be true
|
||||
lease.Spec.RenewTime = &metav1.MicroTime{Time: now.Add(-120 * time.Second)}
|
||||
_, err = kubeClient.CoordinationV1().Leases(managedClusterName).Update(context.TODO(), lease, metav1.UpdateOptions{})
|
||||
return err
|
||||
}, eventuallyInterval, eventuallyTimeout).ShouldNot(gomega.HaveOccurred())
|
||||
|
||||
assertAvailableCondition(managedClusterName, metav1.ConditionTrue, 0)
|
||||
assertCloclSyncedCondition(managedClusterName, metav1.ConditionFalse, 0)
|
||||
})
|
||||
})
|
||||
|
||||
func bootstrapManagedCluster(managedClusterName, hubKubeconfigSecret string, leaseDuration int32) {
|
||||
@@ -213,3 +252,21 @@ func updateManagedClusterLeaseDuration(clusterName string, leaseDuration int32)
|
||||
_, err = clusterClient.ClusterV1().ManagedClusters().Update(context.TODO(), cluster, metav1.UpdateOptions{})
|
||||
return err
|
||||
}
|
||||
|
||||
func assertCloclSyncedCondition(managedClusterName string, status metav1.ConditionStatus, d int) {
|
||||
<-time.After(time.Duration(d) * time.Second)
|
||||
gomega.Eventually(func() error {
|
||||
managedCluster, err := util.GetManagedCluster(clusterClient, managedClusterName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cond := meta.FindStatusCondition(managedCluster.Status.Conditions, clusterv1.ManagedClusterConditionClockSynced)
|
||||
if cond == nil {
|
||||
return fmt.Errorf("available condition is not found")
|
||||
}
|
||||
if cond.Status != status {
|
||||
return fmt.Errorf("expected avaibale condition is %s, but %v", status, cond)
|
||||
}
|
||||
return nil
|
||||
}, eventuallyTimeout, eventuallyInterval).ShouldNot(gomega.HaveOccurred())
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@ package util
|
||||
import (
|
||||
"context"
|
||||
|
||||
coordinationv1 "k8s.io/api/coordination/v1"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@@ -21,6 +22,14 @@ func GetManagedCluster(clusterClient clusterclientset.Interface, spokeClusterNam
|
||||
return spokeCluster, nil
|
||||
}
|
||||
|
||||
func GetManagedClusterLease(kubeClient kubernetes.Interface, spokeClusterName string) (*coordinationv1.Lease, error) {
|
||||
lease, err := kubeClient.CoordinationV1().Leases(spokeClusterName).Get(context.TODO(), "managed-cluster-lease", metav1.GetOptions{})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return lease, nil
|
||||
}
|
||||
|
||||
func AcceptManagedCluster(clusterClient clusterclientset.Interface, spokeClusterName string) error {
|
||||
return AcceptManagedClusterWithLeaseDuration(clusterClient, spokeClusterName, 60)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user