From 1649df933e54f0882a8aeb02b8ca2d9f2090a59d Mon Sep 17 00:00:00 2001 From: Yang Le Date: Thu, 21 Jan 2021 14:55:03 +0800 Subject: [PATCH] se Lister to reduce the number of calls to api server Signed-off-by: Yang Le --- pkg/hub/csr/controller.go | 11 +++++-- pkg/hub/csr/controller_test.go | 25 ++++++++++------ pkg/hub/managedcluster/controller.go | 11 +++++-- pkg/hub/managedcluster/controller_test.go | 29 ++++++++++++------- pkg/hub/manager.go | 4 +-- .../managedcluster/creating_controller.go | 5 ++-- 6 files changed, 55 insertions(+), 30 deletions(-) diff --git a/pkg/hub/csr/controller.go b/pkg/hub/csr/controller.go index fb57209be..be3deed24 100644 --- a/pkg/hub/csr/controller.go +++ b/pkg/hub/csr/controller.go @@ -15,7 +15,9 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + certificatesinformers "k8s.io/client-go/informers/certificates/v1beta1" "k8s.io/client-go/kubernetes" + certificateslisters "k8s.io/client-go/listers/certificates/v1beta1" "k8s.io/klog/v2" "github.com/open-cluster-management/registration/pkg/helpers" @@ -29,20 +31,22 @@ const ( // csrApprovingController auto approve the renewal CertificateSigningRequests for an accepted spoke cluster on the hub. type csrApprovingController struct { kubeClient kubernetes.Interface + csrLister certificateslisters.CertificateSigningRequestLister eventRecorder events.Recorder } // NewCSRApprovingController creates a new csr approving controller -func NewCSRApprovingController(kubeClient kubernetes.Interface, csrInformer factory.Informer, recorder events.Recorder) factory.Controller { +func NewCSRApprovingController(kubeClient kubernetes.Interface, csrInformer certificatesinformers.CertificateSigningRequestInformer, recorder events.Recorder) factory.Controller { c := &csrApprovingController{ kubeClient: kubeClient, + csrLister: csrInformer.Lister(), eventRecorder: recorder.WithComponentSuffix("csr-approving-controller"), } return factory.New(). WithInformersQueueKeyFunc(func(obj runtime.Object) string { accessor, _ := meta.Accessor(obj) return accessor.GetName() - }, csrInformer). + }, csrInformer.Informer()). WithSync(c.sync). ToController("CSRApprovingController", recorder) } @@ -50,7 +54,7 @@ func NewCSRApprovingController(kubeClient kubernetes.Interface, csrInformer fact func (c *csrApprovingController) sync(ctx context.Context, syncCtx factory.SyncContext) error { csrName := syncCtx.QueueKey() klog.V(4).Infof("Reconciling CertificateSigningRequests %q", csrName) - csr, err := c.kubeClient.CertificatesV1beta1().CertificateSigningRequests().Get(ctx, csrName, metav1.GetOptions{}) + csr, err := c.csrLister.Get(csrName) if errors.IsNotFound(err) { return nil } @@ -58,6 +62,7 @@ func (c *csrApprovingController) sync(ctx context.Context, syncCtx factory.SyncC return err } + csr = csr.DeepCopy() // Current csr is in terminal state, do nothing. if helpers.IsCSRInTerminalState(&csr.Status) { return nil diff --git a/pkg/hub/csr/controller_test.go b/pkg/hub/csr/controller_test.go index b76272eec..e417ba3f0 100644 --- a/pkg/hub/csr/controller_test.go +++ b/pkg/hub/csr/controller_test.go @@ -3,6 +3,7 @@ package csr import ( "context" "testing" + "time" testinghelpers "github.com/open-cluster-management/registration/pkg/helpers/testing" @@ -11,6 +12,7 @@ import ( authorizationv1 "k8s.io/api/authorization/v1" certificatesv1beta1 "k8s.io/api/certificates/v1beta1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/informers" kubefake "k8s.io/client-go/kubernetes/fake" clienttesting "k8s.io/client-go/testing" ) @@ -39,21 +41,21 @@ func TestSync(t *testing.T) { name: "sync a deleted csr", startingCSRs: []runtime.Object{}, validateActions: func(t *testing.T, actions []clienttesting.Action) { - testinghelpers.AssertActions(t, actions, "get") + testinghelpers.AssertNoActions(t, actions) }, }, { name: "sync a denied csr", startingCSRs: []runtime.Object{testinghelpers.NewDeniedCSR(validCSR)}, validateActions: func(t *testing.T, actions []clienttesting.Action) { - testinghelpers.AssertActions(t, actions, "get") + testinghelpers.AssertNoActions(t, actions) }, }, { name: "sync an approved csr", startingCSRs: []runtime.Object{testinghelpers.NewApprovedCSR(validCSR)}, validateActions: func(t *testing.T, actions []clienttesting.Action) { - testinghelpers.AssertActions(t, actions, "get") + testinghelpers.AssertNoActions(t, actions) }, }, { @@ -68,15 +70,15 @@ func TestSync(t *testing.T) { ReqBlockType: validCSR.ReqBlockType, })}, validateActions: func(t *testing.T, actions []clienttesting.Action) { - testinghelpers.AssertActions(t, actions, "get") + testinghelpers.AssertNoActions(t, actions) }, }, { name: "deny an auto approving csr", startingCSRs: []runtime.Object{testinghelpers.NewCSR(validCSR)}, validateActions: func(t *testing.T, actions []clienttesting.Action) { - testinghelpers.AssertActions(t, actions, "get", "create") - testinghelpers.AssertSubjectAccessReviewObj(t, actions[1].(clienttesting.CreateActionImpl).Object) + testinghelpers.AssertActions(t, actions, "create") + testinghelpers.AssertSubjectAccessReviewObj(t, actions[0].(clienttesting.CreateActionImpl).Object) }, }, { @@ -89,8 +91,8 @@ func TestSync(t *testing.T) { Reason: "AutoApprovedByHubCSRApprovingController", Message: "Auto approving Managed cluster agent certificate after SubjectAccessReview.", } - testinghelpers.AssertActions(t, actions, "get", "create", "update") - actual := actions[2].(clienttesting.UpdateActionImpl).Object + testinghelpers.AssertActions(t, actions, "create", "update") + actual := actions[1].(clienttesting.UpdateActionImpl).Object testinghelpers.AssertCSRCondition(t, actual.(*certificatesv1beta1.CertificateSigningRequest).Status.Conditions, expectedCondition) }, }, @@ -110,8 +112,13 @@ func TestSync(t *testing.T) { }, nil }, ) + informerFactory := informers.NewSharedInformerFactory(kubeClient, 3*time.Minute) + csrStore := informerFactory.Certificates().V1beta1().CertificateSigningRequests().Informer().GetStore() + for _, csr := range c.startingCSRs { + csrStore.Add(csr) + } - ctrl := &csrApprovingController{kubeClient, eventstesting.NewTestingEventRecorder(t)} + ctrl := &csrApprovingController{kubeClient, informerFactory.Certificates().V1beta1().CertificateSigningRequests().Lister(), eventstesting.NewTestingEventRecorder(t)} syncErr := ctrl.sync(context.TODO(), testinghelpers.NewFakeSyncContext(t, validCSR.Name)) if syncErr != nil { t.Errorf("unexpected err: %v", syncErr) diff --git a/pkg/hub/managedcluster/controller.go b/pkg/hub/managedcluster/controller.go index 35254e846..060fa522e 100644 --- a/pkg/hub/managedcluster/controller.go +++ b/pkg/hub/managedcluster/controller.go @@ -5,6 +5,8 @@ import ( "fmt" clientset "github.com/open-cluster-management/api/client/cluster/clientset/versioned" + informerv1 "github.com/open-cluster-management/api/client/cluster/informers/externalversions/cluster/v1" + listerv1 "github.com/open-cluster-management/api/client/cluster/listers/cluster/v1" v1 "github.com/open-cluster-management/api/cluster/v1" "github.com/open-cluster-management/registration/pkg/helpers" @@ -39,6 +41,7 @@ var staticFiles = []string{ type managedClusterController struct { kubeClient kubernetes.Interface clusterClient clientset.Interface + clusterLister listerv1.ManagedClusterLister eventRecorder events.Recorder } @@ -46,18 +49,19 @@ type managedClusterController struct { func NewManagedClusterController( kubeClient kubernetes.Interface, clusterClient clientset.Interface, - clusterInformer factory.Informer, + clusterInformer informerv1.ManagedClusterInformer, recorder events.Recorder) factory.Controller { c := &managedClusterController{ kubeClient: kubeClient, clusterClient: clusterClient, + clusterLister: clusterInformer.Lister(), eventRecorder: recorder.WithComponentSuffix("managed-cluster-controller"), } return factory.New(). WithInformersQueueKeyFunc(func(obj runtime.Object) string { accessor, _ := meta.Accessor(obj) return accessor.GetName() - }, clusterInformer). + }, clusterInformer.Informer()). WithSync(c.sync). ToController("ManagedClusterController", recorder) } @@ -65,7 +69,7 @@ func NewManagedClusterController( func (c *managedClusterController) sync(ctx context.Context, syncCtx factory.SyncContext) error { managedClusterName := syncCtx.QueueKey() klog.V(4).Infof("Reconciling ManagedCluster %s", managedClusterName) - managedCluster, err := c.clusterClient.ClusterV1().ManagedClusters().Get(ctx, managedClusterName, metav1.GetOptions{}) + managedCluster, err := c.clusterLister.Get(managedClusterName) if errors.IsNotFound(err) { // Spoke cluster not found, could have been deleted, do nothing. return nil @@ -74,6 +78,7 @@ func (c *managedClusterController) sync(ctx context.Context, syncCtx factory.Syn return err } + managedCluster = managedCluster.DeepCopy() if managedCluster.DeletionTimestamp.IsZero() { hasFinalizer := false for i := range managedCluster.Finalizers { diff --git a/pkg/hub/managedcluster/controller_test.go b/pkg/hub/managedcluster/controller_test.go index adb3997a8..466a5512c 100644 --- a/pkg/hub/managedcluster/controller_test.go +++ b/pkg/hub/managedcluster/controller_test.go @@ -3,8 +3,10 @@ package managedcluster import ( "context" "testing" + "time" clusterfake "github.com/open-cluster-management/api/client/cluster/clientset/versioned/fake" + clusterinformers "github.com/open-cluster-management/api/client/cluster/informers/externalversions" v1 "github.com/open-cluster-management/api/cluster/v1" testinghelpers "github.com/open-cluster-management/registration/pkg/helpers/testing" @@ -26,15 +28,15 @@ func TestSyncManagedCluster(t *testing.T) { name: "sync a deleted spoke cluster", startingObjects: []runtime.Object{}, validateActions: func(t *testing.T, actions []clienttesting.Action) { - testinghelpers.AssertActions(t, actions, "get") + testinghelpers.AssertNoActions(t, actions) }, }, { name: "create a new spoke cluster", startingObjects: []runtime.Object{testinghelpers.NewManagedCluster()}, validateActions: func(t *testing.T, actions []clienttesting.Action) { - testinghelpers.AssertActions(t, actions, "get", "update") - managedCluster := (actions[1].(clienttesting.UpdateActionImpl).Object).(*v1.ManagedCluster) + testinghelpers.AssertActions(t, actions, "update") + managedCluster := (actions[0].(clienttesting.UpdateActionImpl).Object).(*v1.ManagedCluster) testinghelpers.AssertFinalizers(t, managedCluster, []string{managedClusterFinalizer}) }, }, @@ -48,8 +50,8 @@ func TestSyncManagedCluster(t *testing.T) { Reason: "HubClusterAdminAccepted", Message: "Accepted by hub cluster admin", } - testinghelpers.AssertActions(t, actions, "get", "get", "update") - actual := actions[2].(clienttesting.UpdateActionImpl).Object + testinghelpers.AssertActions(t, actions, "get", "update") + actual := actions[1].(clienttesting.UpdateActionImpl).Object managedCluster := actual.(*v1.ManagedCluster) testinghelpers.AssertManagedClusterCondition(t, managedCluster.Status.Conditions, expectedCondition) }, @@ -58,7 +60,7 @@ func TestSyncManagedCluster(t *testing.T) { name: "sync an accepted spoke cluster", startingObjects: []runtime.Object{testinghelpers.NewAcceptedManagedCluster()}, validateActions: func(t *testing.T, actions []clienttesting.Action) { - testinghelpers.AssertActions(t, actions, "get", "get") + testinghelpers.AssertActions(t, actions, "get") }, }, { @@ -71,8 +73,8 @@ func TestSyncManagedCluster(t *testing.T) { Reason: "HubClusterAdminDenied", Message: "Denied by hub cluster admin", } - testinghelpers.AssertActions(t, actions, "get", "get", "update") - actual := actions[2].(clienttesting.UpdateActionImpl).Object + testinghelpers.AssertActions(t, actions, "get", "update") + actual := actions[1].(clienttesting.UpdateActionImpl).Object managedCluster := actual.(*v1.ManagedCluster) testinghelpers.AssertManagedClusterCondition(t, managedCluster.Status.Conditions, expectedCondition) }, @@ -81,8 +83,8 @@ func TestSyncManagedCluster(t *testing.T) { name: "delete a spoke cluster", startingObjects: []runtime.Object{testinghelpers.NewDeletingManagedCluster()}, validateActions: func(t *testing.T, actions []clienttesting.Action) { - testinghelpers.AssertActions(t, actions, "get", "update") - managedCluster := (actions[1].(clienttesting.UpdateActionImpl).Object).(*v1.ManagedCluster) + testinghelpers.AssertActions(t, actions, "update") + managedCluster := (actions[0].(clienttesting.UpdateActionImpl).Object).(*v1.ManagedCluster) testinghelpers.AssertFinalizers(t, managedCluster, []string{}) }, }, @@ -92,8 +94,13 @@ func TestSyncManagedCluster(t *testing.T) { t.Run(c.name, func(t *testing.T) { clusterClient := clusterfake.NewSimpleClientset(c.startingObjects...) kubeClient := kubefake.NewSimpleClientset() + clusterInformerFactory := clusterinformers.NewSharedInformerFactory(clusterClient, time.Minute*10) + clusterStore := clusterInformerFactory.Cluster().V1().ManagedClusters().Informer().GetStore() + for _, cluster := range c.startingObjects { + clusterStore.Add(cluster) + } - ctrl := managedClusterController{kubeClient, clusterClient, eventstesting.NewTestingEventRecorder(t)} + ctrl := managedClusterController{kubeClient, clusterClient, clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(), eventstesting.NewTestingEventRecorder(t)} syncErr := ctrl.sync(context.TODO(), testinghelpers.NewFakeSyncContext(t, testinghelpers.TestManagedClusterName)) if syncErr != nil { t.Errorf("unexpected err: %v", syncErr) diff --git a/pkg/hub/manager.go b/pkg/hub/manager.go index 4275acb63..9bb4c9098 100644 --- a/pkg/hub/manager.go +++ b/pkg/hub/manager.go @@ -54,13 +54,13 @@ func RunControllerManager(ctx context.Context, controllerContext *controllercmd. managedClusterController := managedcluster.NewManagedClusterController( kubeClient, clusterClient, - clusterInformers.Cluster().V1().ManagedClusters().Informer(), + clusterInformers.Cluster().V1().ManagedClusters(), controllerContext.EventRecorder, ) csrController := csr.NewCSRApprovingController( kubeClient, - kubeInfomers.Certificates().V1beta1().CertificateSigningRequests().Informer(), + kubeInfomers.Certificates().V1beta1().CertificateSigningRequests(), controllerContext.EventRecorder, ) diff --git a/pkg/spoke/managedcluster/creating_controller.go b/pkg/spoke/managedcluster/creating_controller.go index 660f8925d..85cb774f1 100644 --- a/pkg/spoke/managedcluster/creating_controller.go +++ b/pkg/spoke/managedcluster/creating_controller.go @@ -11,9 +11,9 @@ import ( "github.com/openshift/library-go/pkg/controller/factory" "github.com/openshift/library-go/pkg/operator/events" - "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/klog/v2" ) @@ -43,9 +43,10 @@ func NewManagedClusterCreatingController( spokeCABundle: spokeCABundle, hubClusterClient: hubClusterClient, } + return factory.New(). WithSync(c.sync). - ResyncEvery(CreatingControllerSyncInterval). + ResyncEvery(wait.Jitter(CreatingControllerSyncInterval, 1.0)). ToController("ManagedClusterCreatingController", recorder) }