mirror of
https://github.com/open-cluster-management-io/ocm.git
synced 2026-05-19 23:57:57 +00:00
se Lister to reduce the number of calls to api server
Signed-off-by: Yang Le <yangle@redhat.com>
This commit is contained in:
@@ -15,7 +15,9 @@ import (
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
certificatesinformers "k8s.io/client-go/informers/certificates/v1beta1"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
certificateslisters "k8s.io/client-go/listers/certificates/v1beta1"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
"github.com/open-cluster-management/registration/pkg/helpers"
|
||||
@@ -29,20 +31,22 @@ const (
|
||||
// csrApprovingController auto approve the renewal CertificateSigningRequests for an accepted spoke cluster on the hub.
|
||||
type csrApprovingController struct {
|
||||
kubeClient kubernetes.Interface
|
||||
csrLister certificateslisters.CertificateSigningRequestLister
|
||||
eventRecorder events.Recorder
|
||||
}
|
||||
|
||||
// NewCSRApprovingController creates a new csr approving controller
|
||||
func NewCSRApprovingController(kubeClient kubernetes.Interface, csrInformer factory.Informer, recorder events.Recorder) factory.Controller {
|
||||
func NewCSRApprovingController(kubeClient kubernetes.Interface, csrInformer certificatesinformers.CertificateSigningRequestInformer, recorder events.Recorder) factory.Controller {
|
||||
c := &csrApprovingController{
|
||||
kubeClient: kubeClient,
|
||||
csrLister: csrInformer.Lister(),
|
||||
eventRecorder: recorder.WithComponentSuffix("csr-approving-controller"),
|
||||
}
|
||||
return factory.New().
|
||||
WithInformersQueueKeyFunc(func(obj runtime.Object) string {
|
||||
accessor, _ := meta.Accessor(obj)
|
||||
return accessor.GetName()
|
||||
}, csrInformer).
|
||||
}, csrInformer.Informer()).
|
||||
WithSync(c.sync).
|
||||
ToController("CSRApprovingController", recorder)
|
||||
}
|
||||
@@ -50,7 +54,7 @@ func NewCSRApprovingController(kubeClient kubernetes.Interface, csrInformer fact
|
||||
func (c *csrApprovingController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
|
||||
csrName := syncCtx.QueueKey()
|
||||
klog.V(4).Infof("Reconciling CertificateSigningRequests %q", csrName)
|
||||
csr, err := c.kubeClient.CertificatesV1beta1().CertificateSigningRequests().Get(ctx, csrName, metav1.GetOptions{})
|
||||
csr, err := c.csrLister.Get(csrName)
|
||||
if errors.IsNotFound(err) {
|
||||
return nil
|
||||
}
|
||||
@@ -58,6 +62,7 @@ func (c *csrApprovingController) sync(ctx context.Context, syncCtx factory.SyncC
|
||||
return err
|
||||
}
|
||||
|
||||
csr = csr.DeepCopy()
|
||||
// Current csr is in terminal state, do nothing.
|
||||
if helpers.IsCSRInTerminalState(&csr.Status) {
|
||||
return nil
|
||||
|
||||
@@ -3,6 +3,7 @@ package csr
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
testinghelpers "github.com/open-cluster-management/registration/pkg/helpers/testing"
|
||||
|
||||
@@ -11,6 +12,7 @@ import (
|
||||
authorizationv1 "k8s.io/api/authorization/v1"
|
||||
certificatesv1beta1 "k8s.io/api/certificates/v1beta1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/client-go/informers"
|
||||
kubefake "k8s.io/client-go/kubernetes/fake"
|
||||
clienttesting "k8s.io/client-go/testing"
|
||||
)
|
||||
@@ -39,21 +41,21 @@ func TestSync(t *testing.T) {
|
||||
name: "sync a deleted csr",
|
||||
startingCSRs: []runtime.Object{},
|
||||
validateActions: func(t *testing.T, actions []clienttesting.Action) {
|
||||
testinghelpers.AssertActions(t, actions, "get")
|
||||
testinghelpers.AssertNoActions(t, actions)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "sync a denied csr",
|
||||
startingCSRs: []runtime.Object{testinghelpers.NewDeniedCSR(validCSR)},
|
||||
validateActions: func(t *testing.T, actions []clienttesting.Action) {
|
||||
testinghelpers.AssertActions(t, actions, "get")
|
||||
testinghelpers.AssertNoActions(t, actions)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "sync an approved csr",
|
||||
startingCSRs: []runtime.Object{testinghelpers.NewApprovedCSR(validCSR)},
|
||||
validateActions: func(t *testing.T, actions []clienttesting.Action) {
|
||||
testinghelpers.AssertActions(t, actions, "get")
|
||||
testinghelpers.AssertNoActions(t, actions)
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -68,15 +70,15 @@ func TestSync(t *testing.T) {
|
||||
ReqBlockType: validCSR.ReqBlockType,
|
||||
})},
|
||||
validateActions: func(t *testing.T, actions []clienttesting.Action) {
|
||||
testinghelpers.AssertActions(t, actions, "get")
|
||||
testinghelpers.AssertNoActions(t, actions)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "deny an auto approving csr",
|
||||
startingCSRs: []runtime.Object{testinghelpers.NewCSR(validCSR)},
|
||||
validateActions: func(t *testing.T, actions []clienttesting.Action) {
|
||||
testinghelpers.AssertActions(t, actions, "get", "create")
|
||||
testinghelpers.AssertSubjectAccessReviewObj(t, actions[1].(clienttesting.CreateActionImpl).Object)
|
||||
testinghelpers.AssertActions(t, actions, "create")
|
||||
testinghelpers.AssertSubjectAccessReviewObj(t, actions[0].(clienttesting.CreateActionImpl).Object)
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -89,8 +91,8 @@ func TestSync(t *testing.T) {
|
||||
Reason: "AutoApprovedByHubCSRApprovingController",
|
||||
Message: "Auto approving Managed cluster agent certificate after SubjectAccessReview.",
|
||||
}
|
||||
testinghelpers.AssertActions(t, actions, "get", "create", "update")
|
||||
actual := actions[2].(clienttesting.UpdateActionImpl).Object
|
||||
testinghelpers.AssertActions(t, actions, "create", "update")
|
||||
actual := actions[1].(clienttesting.UpdateActionImpl).Object
|
||||
testinghelpers.AssertCSRCondition(t, actual.(*certificatesv1beta1.CertificateSigningRequest).Status.Conditions, expectedCondition)
|
||||
},
|
||||
},
|
||||
@@ -110,8 +112,13 @@ func TestSync(t *testing.T) {
|
||||
}, nil
|
||||
},
|
||||
)
|
||||
informerFactory := informers.NewSharedInformerFactory(kubeClient, 3*time.Minute)
|
||||
csrStore := informerFactory.Certificates().V1beta1().CertificateSigningRequests().Informer().GetStore()
|
||||
for _, csr := range c.startingCSRs {
|
||||
csrStore.Add(csr)
|
||||
}
|
||||
|
||||
ctrl := &csrApprovingController{kubeClient, eventstesting.NewTestingEventRecorder(t)}
|
||||
ctrl := &csrApprovingController{kubeClient, informerFactory.Certificates().V1beta1().CertificateSigningRequests().Lister(), eventstesting.NewTestingEventRecorder(t)}
|
||||
syncErr := ctrl.sync(context.TODO(), testinghelpers.NewFakeSyncContext(t, validCSR.Name))
|
||||
if syncErr != nil {
|
||||
t.Errorf("unexpected err: %v", syncErr)
|
||||
|
||||
@@ -5,6 +5,8 @@ import (
|
||||
"fmt"
|
||||
|
||||
clientset "github.com/open-cluster-management/api/client/cluster/clientset/versioned"
|
||||
informerv1 "github.com/open-cluster-management/api/client/cluster/informers/externalversions/cluster/v1"
|
||||
listerv1 "github.com/open-cluster-management/api/client/cluster/listers/cluster/v1"
|
||||
v1 "github.com/open-cluster-management/api/cluster/v1"
|
||||
"github.com/open-cluster-management/registration/pkg/helpers"
|
||||
|
||||
@@ -39,6 +41,7 @@ var staticFiles = []string{
|
||||
type managedClusterController struct {
|
||||
kubeClient kubernetes.Interface
|
||||
clusterClient clientset.Interface
|
||||
clusterLister listerv1.ManagedClusterLister
|
||||
eventRecorder events.Recorder
|
||||
}
|
||||
|
||||
@@ -46,18 +49,19 @@ type managedClusterController struct {
|
||||
func NewManagedClusterController(
|
||||
kubeClient kubernetes.Interface,
|
||||
clusterClient clientset.Interface,
|
||||
clusterInformer factory.Informer,
|
||||
clusterInformer informerv1.ManagedClusterInformer,
|
||||
recorder events.Recorder) factory.Controller {
|
||||
c := &managedClusterController{
|
||||
kubeClient: kubeClient,
|
||||
clusterClient: clusterClient,
|
||||
clusterLister: clusterInformer.Lister(),
|
||||
eventRecorder: recorder.WithComponentSuffix("managed-cluster-controller"),
|
||||
}
|
||||
return factory.New().
|
||||
WithInformersQueueKeyFunc(func(obj runtime.Object) string {
|
||||
accessor, _ := meta.Accessor(obj)
|
||||
return accessor.GetName()
|
||||
}, clusterInformer).
|
||||
}, clusterInformer.Informer()).
|
||||
WithSync(c.sync).
|
||||
ToController("ManagedClusterController", recorder)
|
||||
}
|
||||
@@ -65,7 +69,7 @@ func NewManagedClusterController(
|
||||
func (c *managedClusterController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
|
||||
managedClusterName := syncCtx.QueueKey()
|
||||
klog.V(4).Infof("Reconciling ManagedCluster %s", managedClusterName)
|
||||
managedCluster, err := c.clusterClient.ClusterV1().ManagedClusters().Get(ctx, managedClusterName, metav1.GetOptions{})
|
||||
managedCluster, err := c.clusterLister.Get(managedClusterName)
|
||||
if errors.IsNotFound(err) {
|
||||
// Spoke cluster not found, could have been deleted, do nothing.
|
||||
return nil
|
||||
@@ -74,6 +78,7 @@ func (c *managedClusterController) sync(ctx context.Context, syncCtx factory.Syn
|
||||
return err
|
||||
}
|
||||
|
||||
managedCluster = managedCluster.DeepCopy()
|
||||
if managedCluster.DeletionTimestamp.IsZero() {
|
||||
hasFinalizer := false
|
||||
for i := range managedCluster.Finalizers {
|
||||
|
||||
@@ -3,8 +3,10 @@ package managedcluster
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
clusterfake "github.com/open-cluster-management/api/client/cluster/clientset/versioned/fake"
|
||||
clusterinformers "github.com/open-cluster-management/api/client/cluster/informers/externalversions"
|
||||
v1 "github.com/open-cluster-management/api/cluster/v1"
|
||||
testinghelpers "github.com/open-cluster-management/registration/pkg/helpers/testing"
|
||||
|
||||
@@ -26,15 +28,15 @@ func TestSyncManagedCluster(t *testing.T) {
|
||||
name: "sync a deleted spoke cluster",
|
||||
startingObjects: []runtime.Object{},
|
||||
validateActions: func(t *testing.T, actions []clienttesting.Action) {
|
||||
testinghelpers.AssertActions(t, actions, "get")
|
||||
testinghelpers.AssertNoActions(t, actions)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "create a new spoke cluster",
|
||||
startingObjects: []runtime.Object{testinghelpers.NewManagedCluster()},
|
||||
validateActions: func(t *testing.T, actions []clienttesting.Action) {
|
||||
testinghelpers.AssertActions(t, actions, "get", "update")
|
||||
managedCluster := (actions[1].(clienttesting.UpdateActionImpl).Object).(*v1.ManagedCluster)
|
||||
testinghelpers.AssertActions(t, actions, "update")
|
||||
managedCluster := (actions[0].(clienttesting.UpdateActionImpl).Object).(*v1.ManagedCluster)
|
||||
testinghelpers.AssertFinalizers(t, managedCluster, []string{managedClusterFinalizer})
|
||||
},
|
||||
},
|
||||
@@ -48,8 +50,8 @@ func TestSyncManagedCluster(t *testing.T) {
|
||||
Reason: "HubClusterAdminAccepted",
|
||||
Message: "Accepted by hub cluster admin",
|
||||
}
|
||||
testinghelpers.AssertActions(t, actions, "get", "get", "update")
|
||||
actual := actions[2].(clienttesting.UpdateActionImpl).Object
|
||||
testinghelpers.AssertActions(t, actions, "get", "update")
|
||||
actual := actions[1].(clienttesting.UpdateActionImpl).Object
|
||||
managedCluster := actual.(*v1.ManagedCluster)
|
||||
testinghelpers.AssertManagedClusterCondition(t, managedCluster.Status.Conditions, expectedCondition)
|
||||
},
|
||||
@@ -58,7 +60,7 @@ func TestSyncManagedCluster(t *testing.T) {
|
||||
name: "sync an accepted spoke cluster",
|
||||
startingObjects: []runtime.Object{testinghelpers.NewAcceptedManagedCluster()},
|
||||
validateActions: func(t *testing.T, actions []clienttesting.Action) {
|
||||
testinghelpers.AssertActions(t, actions, "get", "get")
|
||||
testinghelpers.AssertActions(t, actions, "get")
|
||||
},
|
||||
},
|
||||
{
|
||||
@@ -71,8 +73,8 @@ func TestSyncManagedCluster(t *testing.T) {
|
||||
Reason: "HubClusterAdminDenied",
|
||||
Message: "Denied by hub cluster admin",
|
||||
}
|
||||
testinghelpers.AssertActions(t, actions, "get", "get", "update")
|
||||
actual := actions[2].(clienttesting.UpdateActionImpl).Object
|
||||
testinghelpers.AssertActions(t, actions, "get", "update")
|
||||
actual := actions[1].(clienttesting.UpdateActionImpl).Object
|
||||
managedCluster := actual.(*v1.ManagedCluster)
|
||||
testinghelpers.AssertManagedClusterCondition(t, managedCluster.Status.Conditions, expectedCondition)
|
||||
},
|
||||
@@ -81,8 +83,8 @@ func TestSyncManagedCluster(t *testing.T) {
|
||||
name: "delete a spoke cluster",
|
||||
startingObjects: []runtime.Object{testinghelpers.NewDeletingManagedCluster()},
|
||||
validateActions: func(t *testing.T, actions []clienttesting.Action) {
|
||||
testinghelpers.AssertActions(t, actions, "get", "update")
|
||||
managedCluster := (actions[1].(clienttesting.UpdateActionImpl).Object).(*v1.ManagedCluster)
|
||||
testinghelpers.AssertActions(t, actions, "update")
|
||||
managedCluster := (actions[0].(clienttesting.UpdateActionImpl).Object).(*v1.ManagedCluster)
|
||||
testinghelpers.AssertFinalizers(t, managedCluster, []string{})
|
||||
},
|
||||
},
|
||||
@@ -92,8 +94,13 @@ func TestSyncManagedCluster(t *testing.T) {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
clusterClient := clusterfake.NewSimpleClientset(c.startingObjects...)
|
||||
kubeClient := kubefake.NewSimpleClientset()
|
||||
clusterInformerFactory := clusterinformers.NewSharedInformerFactory(clusterClient, time.Minute*10)
|
||||
clusterStore := clusterInformerFactory.Cluster().V1().ManagedClusters().Informer().GetStore()
|
||||
for _, cluster := range c.startingObjects {
|
||||
clusterStore.Add(cluster)
|
||||
}
|
||||
|
||||
ctrl := managedClusterController{kubeClient, clusterClient, eventstesting.NewTestingEventRecorder(t)}
|
||||
ctrl := managedClusterController{kubeClient, clusterClient, clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(), eventstesting.NewTestingEventRecorder(t)}
|
||||
syncErr := ctrl.sync(context.TODO(), testinghelpers.NewFakeSyncContext(t, testinghelpers.TestManagedClusterName))
|
||||
if syncErr != nil {
|
||||
t.Errorf("unexpected err: %v", syncErr)
|
||||
|
||||
@@ -54,13 +54,13 @@ func RunControllerManager(ctx context.Context, controllerContext *controllercmd.
|
||||
managedClusterController := managedcluster.NewManagedClusterController(
|
||||
kubeClient,
|
||||
clusterClient,
|
||||
clusterInformers.Cluster().V1().ManagedClusters().Informer(),
|
||||
clusterInformers.Cluster().V1().ManagedClusters(),
|
||||
controllerContext.EventRecorder,
|
||||
)
|
||||
|
||||
csrController := csr.NewCSRApprovingController(
|
||||
kubeClient,
|
||||
kubeInfomers.Certificates().V1beta1().CertificateSigningRequests().Informer(),
|
||||
kubeInfomers.Certificates().V1beta1().CertificateSigningRequests(),
|
||||
controllerContext.EventRecorder,
|
||||
)
|
||||
|
||||
|
||||
@@ -11,9 +11,9 @@ import (
|
||||
|
||||
"github.com/openshift/library-go/pkg/controller/factory"
|
||||
"github.com/openshift/library-go/pkg/operator/events"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
@@ -43,9 +43,10 @@ func NewManagedClusterCreatingController(
|
||||
spokeCABundle: spokeCABundle,
|
||||
hubClusterClient: hubClusterClient,
|
||||
}
|
||||
|
||||
return factory.New().
|
||||
WithSync(c.sync).
|
||||
ResyncEvery(CreatingControllerSyncInterval).
|
||||
ResyncEvery(wait.Jitter(CreatingControllerSyncInterval, 1.0)).
|
||||
ToController("ManagedClusterCreatingController", recorder)
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user