From 6e14d95873a4cea3eb7500a65e4142696626c0e8 Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Tue, 28 Mar 2023 17:19:31 +0800 Subject: [PATCH] auto approve bootstrap csr (#301) Signed-off-by: Wei Liu --- pkg/cmd/hub/controller.go | 6 +- pkg/hub/csr/controller.go | 189 ++---------- pkg/hub/csr/controller_beta.go | 61 ++-- pkg/hub/csr/controller_beta_test.go | 9 +- pkg/hub/csr/controller_test.go | 127 +++++++-- pkg/hub/csr/reconciler.go | 269 ++++++++++++++++++ pkg/hub/manager.go | 36 ++- test/integration/disaster_recovery_test.go | 2 +- test/integration/integration_suite_test.go | 15 +- .../spokecluster_autoapproval_test.go | 83 ++++++ test/integration/util/util.go | 51 ++-- 11 files changed, 589 insertions(+), 259 deletions(-) create mode 100644 pkg/hub/csr/reconciler.go create mode 100644 test/integration/spokecluster_autoapproval_test.go diff --git a/pkg/cmd/hub/controller.go b/pkg/cmd/hub/controller.go index 92da0b636..70fcc29f7 100644 --- a/pkg/cmd/hub/controller.go +++ b/pkg/cmd/hub/controller.go @@ -7,14 +7,14 @@ import ( "github.com/openshift/library-go/pkg/controller/controllercmd" - "open-cluster-management.io/registration/pkg/features" "open-cluster-management.io/registration/pkg/hub" "open-cluster-management.io/registration/pkg/version" ) func NewController() *cobra.Command { + manager := hub.NewHubManagerOptions() cmdConfig := controllercmd. - NewControllerCommandConfig("registration-controller", version.Get(), hub.RunControllerManager) + NewControllerCommandConfig("registration-controller", version.Get(), manager.RunControllerManager) cmd := cmdConfig.NewCommand() cmd.Use = "controller" cmd.Short = "Start the Cluster Registration Controller" @@ -35,7 +35,7 @@ func NewController() *cobra.Command { "The duration the clients should wait between attempting acquisition and renewal "+ "of a leadership. This is only applicable if leader election is enabled.") - features.DefaultHubMutableFeatureGate.AddFlag(flags) + manager.AddFlags(cmd.Flags()) return cmd } diff --git a/pkg/hub/csr/controller.go b/pkg/hub/csr/controller.go index f17c5bd98..bce587d14 100644 --- a/pkg/hub/csr/controller.go +++ b/pkg/hub/csr/controller.go @@ -2,44 +2,36 @@ package csr import ( "context" - "crypto/x509" - "encoding/pem" - "fmt" - clusterv1 "open-cluster-management.io/api/cluster/v1" - "strings" "github.com/openshift/library-go/pkg/controller/factory" "github.com/openshift/library-go/pkg/operator/events" - authorizationv1 "k8s.io/api/authorization/v1" certificatesv1 "k8s.io/api/certificates/v1" - certificatesv1beta1 "k8s.io/api/certificates/v1beta1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/sets" certificatesinformers "k8s.io/client-go/informers/certificates/v1" "k8s.io/client-go/kubernetes" certificateslisters "k8s.io/client-go/listers/certificates/v1" "k8s.io/klog/v2" "open-cluster-management.io/registration/pkg/helpers" - "open-cluster-management.io/registration/pkg/hub/user" ) // csrApprovingController auto approve the renewal CertificateSigningRequests for an accepted spoke cluster on the hub. type csrApprovingController struct { - kubeClient kubernetes.Interface - csrLister certificateslisters.CertificateSigningRequestLister - eventRecorder events.Recorder + csrLister certificateslisters.CertificateSigningRequestLister + reconcilers []Reconciler } // NewCSRApprovingController creates a new csr approving controller -func NewCSRApprovingController(kubeClient kubernetes.Interface, csrInformer certificatesinformers.CertificateSigningRequestInformer, recorder events.Recorder) factory.Controller { +func NewCSRApprovingController( + csrInformer certificatesinformers.CertificateSigningRequestInformer, + reconcilers []Reconciler, + recorder events.Recorder) factory.Controller { c := &csrApprovingController{ - kubeClient: kubeClient, - csrLister: csrInformer.Lister(), - eventRecorder: recorder.WithComponentSuffix("csr-approving-controller"), + csrLister: csrInformer.Lister(), + reconcilers: reconcilers, } return factory.New(). WithInformersQueueKeyFunc(func(obj runtime.Object) string { @@ -53,6 +45,7 @@ func NewCSRApprovingController(kubeClient kubernetes.Interface, csrInformer cert func (c *csrApprovingController) sync(ctx context.Context, syncCtx factory.SyncContext) error { csrName := syncCtx.QueueKey() klog.V(4).Infof("Reconciling CertificateSigningRequests %q", csrName) + csr, err := c.csrLister.Get(csrName) if errors.IsNotFound(err) { return nil @@ -68,155 +61,29 @@ func (c *csrApprovingController) sync(ctx context.Context, syncCtx factory.SyncC } csrInfo := newCSRInfo(csr) - // Check whether current csr is a renewal spoker cluster csr. - isRenewal := isSpokeClusterClientCertRenewal(csrInfo) - if !isRenewal { - klog.V(4).Infof("CSR %q was not recognized", csr.Name) - return nil + for _, r := range c.reconcilers { + state, err := r.Reconcile(ctx, csrInfo, approveCSRV1Func(ctx, csr)) + if err != nil { + return err + } + if state == reconcileStop { + break + } } - // Authorize whether the current spoke agent has been authorized to renew its csr. - allowed, err := authorize(ctx, c.kubeClient, csrInfo) - if err != nil { - return err - } - if !allowed { - //TODO find a way to avoid looking at this CSR again. - klog.V(4).Infof("Managed cluster csr %q cannont be auto approved due to subject access review was not approved", csr.Name) - return nil - } - - // Auto approve the spoke cluster csr - csr.Status.Conditions = append(csr.Status.Conditions, certificatesv1.CertificateSigningRequestCondition{ - Type: certificatesv1.CertificateApproved, - Status: corev1.ConditionTrue, - Reason: "AutoApprovedByHubCSRApprovingController", - Message: "Auto approving Managed cluster agent certificate after SubjectAccessReview.", - }) - _, err = c.kubeClient.CertificatesV1().CertificateSigningRequests().UpdateApproval(ctx, csr.Name, csr, metav1.UpdateOptions{}) - if err != nil { - return err - } - c.eventRecorder.Eventf("ManagedClusterCSRAutoApproved", "spoke cluster csr %q is auto approved by hub csr controller", csr.Name) return nil } -// Using SubjectAccessReview API to check whether a spoke agent has been authorized to renew its csr, -// a spoke agent is authorized after its spoke cluster is accepted by hub cluster admin. -func authorize(ctx context.Context, kubeClient kubernetes.Interface, csr csrInfo) (bool, error) { - sar := &authorizationv1.SubjectAccessReview{ - Spec: authorizationv1.SubjectAccessReviewSpec{ - User: csr.username, - UID: csr.uid, - Groups: csr.groups, - Extra: csr.extra, - ResourceAttributes: &authorizationv1.ResourceAttributes{ - Group: "register.open-cluster-management.io", - Resource: "managedclusters", - Verb: "renew", - Subresource: "clientcertificates", - }, - }, - } - - sar, err := kubeClient.AuthorizationV1().SubjectAccessReviews().Create(ctx, sar, metav1.CreateOptions{}) - if err != nil { - return false, err - } - return sar.Status.Allowed, nil -} - -// To check a renewal managed cluster csr, we check -// 1. if the signer name in csr request is valid. -// 2. if organization field and commonName field in csr request is valid. -// 3. if user name in csr is the same as commonName field in csr request. -func isSpokeClusterClientCertRenewal(csr csrInfo) bool { - spokeClusterName, existed := csr.labels[clusterv1.ClusterNameLabelKey] - if !existed { - return false - } - - if csr.signerName != certificatesv1.KubeAPIServerClientSignerName { - return false - } - - block, _ := pem.Decode(csr.request) - if block == nil || block.Type != "CERTIFICATE REQUEST" { - klog.V(4).Infof("csr %q was not recognized: PEM block type is not CERTIFICATE REQUEST", csr.name) - return false - } - - x509cr, err := x509.ParseCertificateRequest(block.Bytes) - if err != nil { - klog.V(4).Infof("csr %q was not recognized: %v", csr.name, err) - return false - } - - requestingOrgs := sets.NewString(x509cr.Subject.Organization...) - if requestingOrgs.Has(user.ManagedClustersGroup) { // optional common group for backward-compatibility - requestingOrgs.Delete(user.ManagedClustersGroup) - } - if requestingOrgs.Len() != 1 { - return false - } - - expectedPerClusterOrg := fmt.Sprintf("%s%s", user.SubjectPrefix, spokeClusterName) - if !requestingOrgs.Has(expectedPerClusterOrg) { - return false - } - - if !strings.HasPrefix(x509cr.Subject.CommonName, expectedPerClusterOrg) { - return false - } - - return csr.username == x509cr.Subject.CommonName -} - -type csrInfo struct { - name string - labels map[string]string - signerName string - username string - uid string - groups []string - extra map[string]authorizationv1.ExtraValue - request []byte -} - -// newCSRInfo creates csrInfo from CertificateSigningRequest by api version(v1/v1beta1). -func newCSRInfo(csr any) csrInfo { - extra := make(map[string]authorizationv1.ExtraValue) - switch v := csr.(type) { - case *certificatesv1.CertificateSigningRequest: - for k, v := range v.Spec.Extra { - extra[k] = authorizationv1.ExtraValue(v) - } - return csrInfo{ - name: v.Name, - labels: v.Labels, - signerName: v.Spec.SignerName, - username: v.Spec.Username, - uid: v.Spec.UID, - groups: v.Spec.Groups, - extra: extra, - request: v.Spec.Request, - } - case *certificatesv1beta1.CertificateSigningRequest: - for k, v := range v.Spec.Extra { - extra[k] = authorizationv1.ExtraValue(v) - } - return csrInfo{ - name: v.Name, - labels: v.Labels, - signerName: *v.Spec.SignerName, - username: v.Spec.Username, - uid: v.Spec.UID, - groups: v.Spec.Groups, - extra: extra, - request: v.Spec.Request, - } - default: - klog.Errorf("Unsupported type %T", v) - return csrInfo{} +func approveCSRV1Func(ctx context.Context, csr *certificatesv1.CertificateSigningRequest) approveCSRFunc { + return func(kubeClient kubernetes.Interface) error { + // Auto approve the spoke cluster csr + csr.Status.Conditions = append(csr.Status.Conditions, certificatesv1.CertificateSigningRequestCondition{ + Type: certificatesv1.CertificateApproved, + Status: corev1.ConditionTrue, + Reason: "AutoApprovedByHubCSRApprovingController", + Message: "Auto approving Managed cluster agent certificate after SubjectAccessReview.", + }) + _, err := kubeClient.CertificatesV1().CertificateSigningRequests().UpdateApproval(ctx, csr.Name, csr, metav1.UpdateOptions{}) + return err } } diff --git a/pkg/hub/csr/controller_beta.go b/pkg/hub/csr/controller_beta.go index 87d55cf22..bfff073ae 100644 --- a/pkg/hub/csr/controller_beta.go +++ b/pkg/hub/csr/controller_beta.go @@ -21,20 +21,18 @@ import ( // v1beta1CSRApprovingController auto approve the renewal CertificateSigningRequests for an accepted spoke cluster on the hub. type v1beta1CSRApprovingController struct { - kubeClient kubernetes.Interface - csrLister certificatesv1beta1lister.CertificateSigningRequestLister - eventRecorder events.Recorder + csrLister certificatesv1beta1lister.CertificateSigningRequestLister + reconcilers []Reconciler } func NewV1beta1CSRApprovingController( - kubeClient kubernetes.Interface, v1beta1CSRInformer certificatesv1beta1informers.CertificateSigningRequestInformer, + reconcilers []Reconciler, recorder events.Recorder) factory.Controller { c := &v1beta1CSRApprovingController{ - kubeClient: kubeClient, - csrLister: v1beta1CSRInformer.Lister(), - eventRecorder: recorder.WithComponentSuffix("csr-approving-controller"), + csrLister: v1beta1CSRInformer.Lister(), + reconcilers: reconcilers, } return factory.New().WithInformersQueueKeyFunc(func(obj runtime.Object) string { @@ -63,34 +61,29 @@ func (c *v1beta1CSRApprovingController) sync(ctx context.Context, syncCtx factor } csrInfo := newCSRInfo(csr) - // Check whether current csr is a renewal spoke cluster csr. - isRenewal := isSpokeClusterClientCertRenewal(csrInfo) - if !isRenewal { - klog.V(4).Infof("CSR %q was not recognized", csr.Name) - return nil + for _, r := range c.reconcilers { + state, err := r.Reconcile(ctx, csrInfo, approveCSRV1beta1Func(ctx, csr)) + if err != nil { + return err + } + if state == reconcileStop { + break + } } - allowed, err := authorize(ctx, c.kubeClient, csrInfo) - if err != nil { - return err - } - if !allowed { - //TODO find a way to avoid looking at this CSR again. - klog.V(4).Infof("Managed cluster csr %q cannont be auto approved due to subject access review was not approved", csr.Name) - return nil - } - - // Auto approve the spoke cluster csr - csr.Status.Conditions = append(csr.Status.Conditions, certificatesv1beta1.CertificateSigningRequestCondition{ - Type: certificatesv1beta1.CertificateApproved, - Status: corev1.ConditionTrue, - Reason: "AutoApprovedByHubCSRApprovingController", - Message: "Auto approving Managed cluster agent certificate after SubjectAccessReview.", - }) - _, err = c.kubeClient.CertificatesV1beta1().CertificateSigningRequests().UpdateApproval(ctx, csr, metav1.UpdateOptions{}) - if err != nil { - return err - } - c.eventRecorder.Eventf("ManagedClusterCSRAutoApproved", "spoke cluster csr %q is auto approved by hub csr controller", csr.Name) return nil } + +func approveCSRV1beta1Func(ctx context.Context, csr *certificatesv1beta1.CertificateSigningRequest) approveCSRFunc { + return func(kubeClient kubernetes.Interface) error { + // Auto approve the spoke cluster csr + csr.Status.Conditions = append(csr.Status.Conditions, certificatesv1beta1.CertificateSigningRequestCondition{ + Type: certificatesv1beta1.CertificateApproved, + Status: corev1.ConditionTrue, + Reason: "AutoApprovedByHubCSRApprovingController", + Message: "Auto approving Managed cluster agent certificate after SubjectAccessReview.", + }) + _, err := kubeClient.CertificatesV1beta1().CertificateSigningRequests().UpdateApproval(ctx, csr, metav1.UpdateOptions{}) + return err + } +} diff --git a/pkg/hub/csr/controller_beta_test.go b/pkg/hub/csr/controller_beta_test.go index 36aa03e4e..e6f235e78 100644 --- a/pkg/hub/csr/controller_beta_test.go +++ b/pkg/hub/csr/controller_beta_test.go @@ -144,9 +144,14 @@ func Test_v1beta1CSRApprovingController_sync(t *testing.T) { } ctrl := &v1beta1CSRApprovingController{ - kubeClient, informerFactory.Certificates().V1beta1().CertificateSigningRequests().Lister(), - eventstesting.NewTestingEventRecorder(t), + []Reconciler{ + &csrBootstrapReconciler{}, + &csrRenewalReconciler{ + kubeClient: kubeClient, + eventRecorder: eventstesting.NewTestingEventRecorder(t), + }, + }, } if err := ctrl.sync(context.TODO(), testinghelpers.NewFakeSyncContext(t, validV1beta1CSR.Name)); (err != nil) != tt.wantErr { t.Errorf("v1beta1CSRApprovingController.sync() error = %v, wantErr %v", err, tt.wantErr) diff --git a/pkg/hub/csr/controller_test.go b/pkg/hub/csr/controller_test.go index 144e9cafe..1cabadfd1 100644 --- a/pkg/hub/csr/controller_test.go +++ b/pkg/hub/csr/controller_test.go @@ -5,6 +5,9 @@ import ( "testing" "time" + clusterfake "open-cluster-management.io/api/client/cluster/clientset/versioned/fake" + clusterinformers "open-cluster-management.io/api/client/cluster/informers/externalversions" + clusterv1 "open-cluster-management.io/api/cluster/v1" testinghelpers "open-cluster-management.io/registration/pkg/helpers/testing" "open-cluster-management.io/registration/pkg/hub/user" @@ -13,6 +16,7 @@ import ( authorizationv1 "k8s.io/api/authorization/v1" certificatesv1 "k8s.io/api/certificates/v1" corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/client-go/informers" @@ -35,33 +39,39 @@ var ( func TestSync(t *testing.T) { cases := []struct { name string + startingClusters []runtime.Object startingCSRs []runtime.Object + approvalUsers []string autoApprovingAllowed bool validateActions func(t *testing.T, actions []clienttesting.Action) }{ { - name: "sync a deleted csr", - startingCSRs: []runtime.Object{}, + name: "sync a deleted csr", + startingClusters: []runtime.Object{}, + startingCSRs: []runtime.Object{}, validateActions: func(t *testing.T, actions []clienttesting.Action) { testinghelpers.AssertNoActions(t, actions) }, }, { - name: "sync a denied csr", - startingCSRs: []runtime.Object{testinghelpers.NewDeniedCSR(validCSR)}, + name: "sync a denied csr", + startingClusters: []runtime.Object{}, + startingCSRs: []runtime.Object{testinghelpers.NewDeniedCSR(validCSR)}, validateActions: func(t *testing.T, actions []clienttesting.Action) { testinghelpers.AssertNoActions(t, actions) }, }, { - name: "sync an approved csr", - startingCSRs: []runtime.Object{testinghelpers.NewApprovedCSR(validCSR)}, + name: "sync an approved csr", + startingClusters: []runtime.Object{}, + startingCSRs: []runtime.Object{testinghelpers.NewApprovedCSR(validCSR)}, validateActions: func(t *testing.T, actions []clienttesting.Action) { testinghelpers.AssertNoActions(t, actions) }, }, { - name: "sync an invalid csr", + name: "sync an invalid csr", + startingClusters: []runtime.Object{}, startingCSRs: []runtime.Object{testinghelpers.NewCSR(testinghelpers.CSRHolder{ Name: validCSR.Name, Labels: validCSR.Labels, @@ -76,8 +86,9 @@ func TestSync(t *testing.T) { }, }, { - name: "deny an auto approving csr", - startingCSRs: []runtime.Object{testinghelpers.NewCSR(validCSR)}, + name: "deny an auto approving csr", + startingClusters: []runtime.Object{}, + startingCSRs: []runtime.Object{testinghelpers.NewCSR(validCSR)}, validateActions: func(t *testing.T, actions []clienttesting.Action) { testinghelpers.AssertActions(t, actions, "create") testinghelpers.AssertSubjectAccessReviewObj(t, actions[0].(clienttesting.CreateActionImpl).Object) @@ -85,6 +96,7 @@ func TestSync(t *testing.T) { }, { name: "allow an auto approving csr", + startingClusters: []runtime.Object{}, startingCSRs: []runtime.Object{testinghelpers.NewCSR(validCSR)}, autoApprovingAllowed: true, validateActions: func(t *testing.T, actions []clienttesting.Action) { @@ -100,7 +112,8 @@ func TestSync(t *testing.T) { }, }, { - name: "allow an auto approving csr w/o ManagedClusterGroup for backward-compatibility", + name: "allow an auto approving csr w/o ManagedClusterGroup for backward-compatibility", + startingClusters: []runtime.Object{}, startingCSRs: []runtime.Object{testinghelpers.NewCSR(testinghelpers.CSRHolder{ Name: validCSR.Name, Labels: validCSR.Labels, @@ -123,6 +136,34 @@ func TestSync(t *testing.T) { testinghelpers.AssertCSRCondition(t, actual.(*certificatesv1.CertificateSigningRequest).Status.Conditions, expectedCondition) }, }, + { + name: "auto approve a bootstrap csr request", + startingClusters: []runtime.Object{ + &clusterv1.ManagedCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "managedcluster1", + }, + }, + }, + startingCSRs: []runtime.Object{func() *certificatesv1.CertificateSigningRequest { + csr := testinghelpers.NewCSR(validCSR) + csr.Spec.Username = "test" + return csr + }()}, + autoApprovingAllowed: true, + approvalUsers: []string{"test"}, + validateActions: func(t *testing.T, actions []clienttesting.Action) { + expectedCondition := certificatesv1.CertificateSigningRequestCondition{ + Type: certificatesv1.CertificateApproved, + Status: corev1.ConditionTrue, + Reason: "AutoApprovedByHubCSRApprovingController", + Message: "Auto approving Managed cluster agent certificate after SubjectAccessReview.", + } + testinghelpers.AssertActions(t, actions, "update") + actual := actions[0].(clienttesting.UpdateActionImpl).Object + testinghelpers.AssertCSRCondition(t, actual.(*certificatesv1.CertificateSigningRequest).Status.Conditions, expectedCondition) + }, + }, } for _, c := range cases { @@ -147,7 +188,34 @@ func TestSync(t *testing.T) { } } - ctrl := &csrApprovingController{kubeClient, informerFactory.Certificates().V1().CertificateSigningRequests().Lister(), eventstesting.NewTestingEventRecorder(t)} + clusterClient := clusterfake.NewSimpleClientset(c.startingClusters...) + clusterInformerFactory := clusterinformers.NewSharedInformerFactory(clusterClient, time.Minute*10) + clusterStore := clusterInformerFactory.Cluster().V1().ManagedClusters().Informer().GetStore() + for _, cluster := range c.startingClusters { + if err := clusterStore.Add(cluster); err != nil { + t.Fatal(err) + } + } + + recorder := eventstesting.NewTestingEventRecorder(t) + ctrl := &csrApprovingController{ + informerFactory.Certificates().V1().CertificateSigningRequests().Lister(), + []Reconciler{ + &csrBootstrapReconciler{ + kubeClient: kubeClient, + eventRecorder: recorder, + approvalUsers: sets.Set[string]{}, + }, + NewCSRRenewalReconciler(kubeClient, recorder), + NewCSRBootstrapReconciler( + kubeClient, + clusterClient, + clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(), + c.approvalUsers, + recorder, + ), + }, + } syncErr := ctrl.sync(context.TODO(), testinghelpers.NewFakeSyncContext(t, validCSR.Name)) if syncErr != nil { t.Errorf("unexpected err: %v", syncErr) @@ -162,9 +230,11 @@ func TestIsSpokeClusterClientCertRenewal(t *testing.T) { invalidSignerName := "invalidsigner" cases := []struct { - name string - csr testinghelpers.CSRHolder - isRenewal bool + name string + csr testinghelpers.CSRHolder + isRenewal bool + clusterName string + commonName string }{ { name: "a spoke cluster csr without labels", @@ -217,19 +287,6 @@ func TestIsSpokeClusterClientCertRenewal(t *testing.T) { }, isRenewal: false, }, - { - name: "an common name does not equal user name", - csr: testinghelpers.CSRHolder{ - Name: validCSR.Name, - Labels: validCSR.Labels, - SignerName: validCSR.SignerName, - CN: "system:open-cluster-management:managedcluster1:invalidagent", - Orgs: validCSR.Orgs, - Username: validCSR.Username, - ReqBlockType: validCSR.ReqBlockType, - }, - isRenewal: false, - }, { name: "a renewal csr without signer name", csr: testinghelpers.CSRHolder{ @@ -244,18 +301,26 @@ func TestIsSpokeClusterClientCertRenewal(t *testing.T) { isRenewal: false, }, { - name: "a renewal csr", - csr: validCSR, - isRenewal: true, + name: "a renewal csr", + csr: validCSR, + isRenewal: true, + clusterName: "managedcluster1", + commonName: validCSR.CN, }, } for _, c := range cases { t.Run(c.name, func(t *testing.T) { - isRenewal := isSpokeClusterClientCertRenewal(newCSRInfo(testinghelpers.NewCSR(c.csr))) + isRenewal, clusterName, commonName := validateCSR(newCSRInfo(testinghelpers.NewCSR(c.csr))) if isRenewal != c.isRenewal { t.Errorf("expected %t, but failed", c.isRenewal) } + if clusterName != c.clusterName { + t.Errorf("expected %s, but failed", commonName) + } + if commonName != c.commonName { + t.Errorf("expected %s, but failed", commonName) + } }) } } diff --git a/pkg/hub/csr/reconciler.go b/pkg/hub/csr/reconciler.go new file mode 100644 index 000000000..32644ebd0 --- /dev/null +++ b/pkg/hub/csr/reconciler.go @@ -0,0 +1,269 @@ +package csr + +import ( + "context" + "crypto/x509" + "encoding/pem" + "fmt" + "strings" + + "github.com/openshift/library-go/pkg/operator/events" + + authorizationv1 "k8s.io/api/authorization/v1" + certificatesv1 "k8s.io/api/certificates/v1" + certificatesv1beta1 "k8s.io/api/certificates/v1beta1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/kubernetes" + "k8s.io/klog/v2" + + clusterclientset "open-cluster-management.io/api/client/cluster/clientset/versioned" + clusterv1listers "open-cluster-management.io/api/client/cluster/listers/cluster/v1" + clusterv1 "open-cluster-management.io/api/cluster/v1" + "open-cluster-management.io/registration/pkg/hub/user" +) + +type reconcileState int64 + +const ( + reconcileStop reconcileState = iota + reconcileContinue +) + +type csrInfo struct { + name string + labels map[string]string + signerName string + username string + uid string + groups []string + extra map[string]authorizationv1.ExtraValue + request []byte +} + +type approveCSRFunc func(kubernetes.Interface) error + +type Reconciler interface { + Reconcile(context.Context, csrInfo, approveCSRFunc) (reconcileState, error) +} + +type csrRenewalReconciler struct { + kubeClient kubernetes.Interface + eventRecorder events.Recorder +} + +func NewCSRRenewalReconciler(kubeClient kubernetes.Interface, recorder events.Recorder) Reconciler { + return &csrRenewalReconciler{ + kubeClient: kubeClient, + eventRecorder: recorder.WithComponentSuffix("csr-approving-controller"), + } +} + +func (r *csrRenewalReconciler) Reconcile(ctx context.Context, csr csrInfo, approveCSR approveCSRFunc) (reconcileState, error) { + // Check whether current csr is a valid spoker cluster csr. + valid, _, commonName := validateCSR(csr) + if !valid { + klog.V(4).Infof("CSR %q was not recognized", csr.name) + return reconcileStop, nil + } + + // Check if user name in csr is the same as commonName field in csr request. + if csr.username != commonName { + return reconcileContinue, nil + } + + // Authorize whether the current spoke agent has been authorized to renew its csr. + allowed, err := authorize(ctx, r.kubeClient, csr) + if err != nil { + return reconcileContinue, err + } + if !allowed { + klog.V(4).Infof("Managed cluster csr %q cannont be auto approved due to subject access review was not approved", csr.name) + return reconcileStop, nil + } + + if err := approveCSR(r.kubeClient); err != nil { + return reconcileContinue, err + } + + r.eventRecorder.Eventf("ManagedClusterCSRAutoApproved", "spoke cluster csr %q is auto approved by hub csr controller", csr.name) + return reconcileStop, nil +} + +type csrBootstrapReconciler struct { + kubeClient kubernetes.Interface + clusterClient clusterclientset.Interface + clusterLister clusterv1listers.ManagedClusterLister + approvalUsers sets.Set[string] + eventRecorder events.Recorder +} + +func NewCSRBootstrapReconciler(kubeClient kubernetes.Interface, + clusterClient clusterclientset.Interface, + clusterLister clusterv1listers.ManagedClusterLister, + approvalUsers []string, + recorder events.Recorder) Reconciler { + return &csrBootstrapReconciler{ + kubeClient: kubeClient, + clusterClient: clusterClient, + clusterLister: clusterLister, + approvalUsers: sets.New(approvalUsers...), + eventRecorder: recorder.WithComponentSuffix("csr-approving-controller"), + } +} + +func (b *csrBootstrapReconciler) Reconcile(ctx context.Context, csr csrInfo, approveCSR approveCSRFunc) (reconcileState, error) { + // Check whether current csr is a valid spoker cluster csr. + valid, clusterName, _ := validateCSR(csr) + if !valid { + klog.V(4).Infof("CSR %q was not recognized", csr.name) + return reconcileStop, nil + } + + // Check whether current csr can be approved. + if !b.approvalUsers.Has(csr.username) { + return reconcileContinue, nil + } + + err := b.accpetCluster(ctx, clusterName) + if errors.IsNotFound(err) { + // Current spoke cluster not found, could have been deleted, do nothing. + return reconcileStop, nil + } + if err != nil { + return reconcileContinue, err + } + + if err := approveCSR(b.kubeClient); err != nil { + return reconcileContinue, err + } + + b.eventRecorder.Eventf("ManagedClusterAutoApproved", "spoke cluster %q is auto approved.", clusterName) + return reconcileStop, nil +} + +func (b *csrBootstrapReconciler) accpetCluster(ctx context.Context, managedClusterName string) error { + managedCluster, err := b.clusterLister.Get(managedClusterName) + if err != nil { + return err + } + + if managedCluster.Spec.HubAcceptsClient { + return nil + } + + patch := []byte("{\"spec\": {\"hubAcceptsClient\": true}}") + _, err = b.clusterClient.ClusterV1().ManagedClusters().Patch( + ctx, managedCluster.Name, types.MergePatchType, patch, metav1.PatchOptions{}) + return err +} + +// To validate a managed cluster csr, we check +// 1. if the signer name in csr request is valid. +// 2. if organization field and commonName field in csr request is valid. +func validateCSR(csr csrInfo) (bool, string, string) { + spokeClusterName, existed := csr.labels[clusterv1.ClusterNameLabelKey] + if !existed { + return false, "", "" + } + + if csr.signerName != certificatesv1.KubeAPIServerClientSignerName { + return false, "", "" + } + + block, _ := pem.Decode(csr.request) + if block == nil || block.Type != "CERTIFICATE REQUEST" { + klog.V(4).Infof("csr %q was not recognized: PEM block type is not CERTIFICATE REQUEST", csr.name) + return false, "", "" + } + + x509cr, err := x509.ParseCertificateRequest(block.Bytes) + if err != nil { + klog.V(4).Infof("csr %q was not recognized: %v", csr.name, err) + return false, "", "" + } + + requestingOrgs := sets.New(x509cr.Subject.Organization...) + if requestingOrgs.Has(user.ManagedClustersGroup) { // optional common group for backward-compatibility + requestingOrgs.Delete(user.ManagedClustersGroup) + } + if requestingOrgs.Len() != 1 { + return false, "", "" + } + + expectedPerClusterOrg := fmt.Sprintf("%s%s", user.SubjectPrefix, spokeClusterName) + if !requestingOrgs.Has(expectedPerClusterOrg) { + return false, "", "" + } + + if !strings.HasPrefix(x509cr.Subject.CommonName, expectedPerClusterOrg) { + return false, "", "" + } + + return true, spokeClusterName, x509cr.Subject.CommonName +} + +// Using SubjectAccessReview API to check whether a spoke agent has been authorized to renew its csr, +// a spoke agent is authorized after its spoke cluster is accepted by hub cluster admin. +func authorize(ctx context.Context, kubeClient kubernetes.Interface, csr csrInfo) (bool, error) { + sar := &authorizationv1.SubjectAccessReview{ + Spec: authorizationv1.SubjectAccessReviewSpec{ + User: csr.username, + UID: csr.uid, + Groups: csr.groups, + Extra: csr.extra, + ResourceAttributes: &authorizationv1.ResourceAttributes{ + Group: "register.open-cluster-management.io", + Resource: "managedclusters", + Verb: "renew", + Subresource: "clientcertificates", + }, + }, + } + + sar, err := kubeClient.AuthorizationV1().SubjectAccessReviews().Create(ctx, sar, metav1.CreateOptions{}) + if err != nil { + return false, err + } + return sar.Status.Allowed, nil +} + +// newCSRInfo creates csrInfo from CertificateSigningRequest by api version(v1/v1beta1). +func newCSRInfo(csr any) csrInfo { + extra := make(map[string]authorizationv1.ExtraValue) + switch v := csr.(type) { + case *certificatesv1.CertificateSigningRequest: + for k, v := range v.Spec.Extra { + extra[k] = authorizationv1.ExtraValue(v) + } + return csrInfo{ + name: v.Name, + labels: v.Labels, + signerName: v.Spec.SignerName, + username: v.Spec.Username, + uid: v.Spec.UID, + groups: v.Spec.Groups, + extra: extra, + request: v.Spec.Request, + } + case *certificatesv1beta1.CertificateSigningRequest: + for k, v := range v.Spec.Extra { + extra[k] = authorizationv1.ExtraValue(v) + } + return csrInfo{ + name: v.Name, + labels: v.Labels, + signerName: *v.Spec.SignerName, + username: v.Spec.Username, + uid: v.Spec.UID, + groups: v.Spec.Groups, + extra: extra, + request: v.Spec.Request, + } + default: + klog.Errorf("Unsupported type %T", v) + return csrInfo{} + } +} diff --git a/pkg/hub/manager.go b/pkg/hub/manager.go index 25c23a19d..4268197ba 100644 --- a/pkg/hub/manager.go +++ b/pkg/hub/manager.go @@ -28,6 +28,7 @@ import ( "github.com/openshift/library-go/pkg/controller/controllercmd" "github.com/openshift/library-go/pkg/controller/factory" "github.com/pkg/errors" + "github.com/spf13/pflag" kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" @@ -37,8 +38,26 @@ import ( var ResyncInterval = 5 * time.Minute +// HubManagerOptions holds configuration for hub manager controller +type HubManagerOptions struct { + ClusterAutoApprovalUsers []string +} + +// NewHubManagerOptions returns a HubManagerOptions +func NewHubManagerOptions() *HubManagerOptions { + return &HubManagerOptions{} +} + +// AddFlags registers flags for manager +func (m *HubManagerOptions) AddFlags(fs *pflag.FlagSet) { + features.DefaultHubMutableFeatureGate.AddFlag(fs) + fs.StringArrayVar(&m.ClusterAutoApprovalUsers, "--cluster-auto-approval-users", m.ClusterAutoApprovalUsers, + "A list of reachable spoke cluster api server URLs for hub cluster.") + +} + // RunControllerManager starts the controllers on hub to manage spoke cluster registration. -func RunControllerManager(ctx context.Context, controllerContext *controllercmd.ControllerContext) error { +func (m *HubManagerOptions) RunControllerManager(ctx context.Context, controllerContext *controllercmd.ControllerContext) error { // If qps in kubconfig is not set, increase the qps and burst to enhance the ability of kube client to handle // requests in concurrent // TODO: Use ClientConnectionOverrides flags to change qps/burst when library-go exposes them in the future @@ -86,6 +105,17 @@ func RunControllerManager(ctx context.Context, controllerContext *controllercmd. controllerContext.EventRecorder, ) + csrReconciles := []csr.Reconciler{csr.NewCSRRenewalReconciler(kubeClient, controllerContext.EventRecorder)} + if features.DefaultHubMutableFeatureGate.Enabled(ocmfeature.ManagedClusterAutoApproval) { + csrReconciles = append(csrReconciles, csr.NewCSRBootstrapReconciler( + kubeClient, + clusterClient, + clusterInformers.Cluster().V1().ManagedClusters().Lister(), + m.ClusterAutoApprovalUsers, + controllerContext.EventRecorder, + )) + } + var csrController factory.Controller if features.DefaultHubMutableFeatureGate.Enabled(ocmfeature.V1beta1CSRAPICompatibility) { v1CSRSupported, v1beta1CSRSupported, err := helpers.IsCSRSupported(kubeClient) @@ -95,8 +125,8 @@ func RunControllerManager(ctx context.Context, controllerContext *controllercmd. if !v1CSRSupported && v1beta1CSRSupported { csrController = csr.NewV1beta1CSRApprovingController( - kubeClient, kubeInfomers.Certificates().V1beta1().CertificateSigningRequests(), + csrReconciles, controllerContext.EventRecorder, ) klog.Info("Using v1beta1 CSR api to manage spoke client certificate") @@ -104,8 +134,8 @@ func RunControllerManager(ctx context.Context, controllerContext *controllercmd. } if csrController == nil { csrController = csr.NewCSRApprovingController( - kubeClient, kubeInfomers.Certificates().V1().CertificateSigningRequests(), + csrReconciles, controllerContext.EventRecorder, ) } diff --git a/test/integration/disaster_recovery_test.go b/test/integration/disaster_recovery_test.go index 3af72b588..e03a7cb70 100644 --- a/test/integration/disaster_recovery_test.go +++ b/test/integration/disaster_recovery_test.go @@ -80,7 +80,7 @@ var _ = ginkgo.Describe("Disaster Recovery", func() { // start hub controller go func() { - err := hub.RunControllerManager(ctx, &controllercmd.ControllerContext{ + err := hub.NewHubManagerOptions().RunControllerManager(ctx, &controllercmd.ControllerContext{ KubeConfig: cfg, EventRecorder: util.NewIntegrationTestEventRecorder("hub"), }) diff --git a/test/integration/integration_suite_test.go b/test/integration/integration_suite_test.go index 3673c384c..86d92c2d0 100644 --- a/test/integration/integration_suite_test.go +++ b/test/integration/integration_suite_test.go @@ -3,7 +3,6 @@ package integration_test import ( "context" "fmt" - "io/ioutil" "os" "path" "path/filepath" @@ -66,7 +65,7 @@ func TestIntegration(t *testing.T) { ginkgo.RunSpecs(t, "Integration Suite") } -var _ = ginkgo.BeforeSuite(func(done ginkgo.Done) { +var _ = ginkgo.BeforeSuite(func() { logf.SetLogger(zap.New(zap.WriteTo(ginkgo.GinkgoWriter), zap.UseDevMode(true))) ginkgo.By("bootstrapping test environment") @@ -141,7 +140,7 @@ var _ = ginkgo.BeforeSuite(func(done ginkgo.Done) { gomega.Expect(clusterClient).ToNot(gomega.BeNil()) // prepare test namespace - nsBytes, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace") + nsBytes, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace") if err != nil { testNamespace = "open-cluster-management-agent" } else { @@ -154,16 +153,20 @@ var _ = ginkgo.BeforeSuite(func(done ginkgo.Done) { err = features.DefaultHubMutableFeatureGate.Set("DefaultClusterSet=true") gomega.Expect(err).ToNot(gomega.HaveOccurred()) + // enable ManagedClusterAutoApproval feature gate + err = features.DefaultHubMutableFeatureGate.Set("ManagedClusterAutoApproval=true") + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + // start hub controller go func() { - err := hub.RunControllerManager(ctx, &controllercmd.ControllerContext{ + m := hub.NewHubManagerOptions() + m.ClusterAutoApprovalUsers = []string{util.AutoApprovalBootstrapUser} + err := m.RunControllerManager(ctx, &controllercmd.ControllerContext{ KubeConfig: cfg, EventRecorder: util.NewIntegrationTestEventRecorder("hub"), }) gomega.Expect(err).NotTo(gomega.HaveOccurred()) }() - - close(done) }) var _ = ginkgo.AfterSuite(func() { diff --git a/test/integration/spokecluster_autoapproval_test.go b/test/integration/spokecluster_autoapproval_test.go new file mode 100644 index 000000000..acb2ccf4f --- /dev/null +++ b/test/integration/spokecluster_autoapproval_test.go @@ -0,0 +1,83 @@ +package integration_test + +import ( + "fmt" + "path" + "time" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + certificates "k8s.io/api/certificates/v1" + "k8s.io/apimachinery/pkg/api/meta" + clusterv1 "open-cluster-management.io/api/cluster/v1" + "open-cluster-management.io/registration/pkg/spoke" + "open-cluster-management.io/registration/test/integration/util" +) + +var _ = ginkgo.Describe("Cluster Auto Approval", func() { + ginkgo.It("Cluster should be automatically approved", func() { + var err error + + managedClusterName := "autoapprovaltest-spokecluster" + hubKubeconfigSecret := "autoapprovaltest-hub-kubeconfig-secret" + hubKubeconfigDir := path.Join(util.TestDir, "autoapprovaltest", "hub-kubeconfig") + + bootstrapFile := path.Join(util.TestDir, "bootstrap-autoapprovaltest", "kubeconfig") + err = authn.CreateBootstrapKubeConfigWithUser(bootstrapFile, serverCertFile, securePort, util.AutoApprovalBootstrapUser) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + agentOptions := spoke.SpokeAgentOptions{ + ClusterName: managedClusterName, + BootstrapKubeconfig: bootstrapFile, + HubKubeconfigSecret: hubKubeconfigSecret, + HubKubeconfigDir: hubKubeconfigDir, + ClusterHealthCheckPeriod: 1 * time.Minute, + } + + // run registration agent + cancel := util.RunAgent("autoapprovaltest", agentOptions, spokeCfg) + defer cancel() + + gomega.Eventually(func() error { + if _, err := util.GetManagedCluster(clusterClient, managedClusterName); err != nil { + return err + } + return nil + }, eventuallyTimeout, eventuallyInterval).ShouldNot(gomega.HaveOccurred()) + + var approvedCSR *certificates.CertificateSigningRequest + // after bootstrap the spokecluster csr should be auto approved + gomega.Eventually(func() error { + approvedCSR, err = util.FindAutoApprovedSpokeCSR(kubeClient, managedClusterName) + if err != nil { + return err + } + return nil + }, eventuallyTimeout, eventuallyInterval).ShouldNot(gomega.HaveOccurred()) + + // simulate hub cluster to fill a certificate + now := time.Now() + err = authn.FillCertificateToApprovedCSR(kubeClient, approvedCSR, now.UTC(), now.Add(30*time.Second).UTC()) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + // the hub kubeconfig secret should be filled after the csr is approved + gomega.Eventually(func() error { + if _, err := util.GetFilledHubKubeConfigSecret(kubeClient, testNamespace, hubKubeconfigSecret); err != nil { + return err + } + return nil + }, eventuallyTimeout, eventuallyInterval).ShouldNot(gomega.HaveOccurred()) + + // the spoke cluster should have joined condition finally + gomega.Eventually(func() error { + spokeCluster, err := util.GetManagedCluster(clusterClient, managedClusterName) + if err != nil { + return err + } + if !meta.IsStatusConditionTrue(spokeCluster.Status.Conditions, clusterv1.ManagedClusterConditionJoined) { + return fmt.Errorf("cluster should be joined") + } + return nil + }, eventuallyTimeout, eventuallyInterval).ShouldNot(gomega.HaveOccurred()) + }) +}) diff --git a/test/integration/util/util.go b/test/integration/util/util.go index 2e12120ed..e6a6d610b 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -45,6 +45,8 @@ const ( TestDir = "/tmp/registration-integration-test" ) +const AutoApprovalBootstrapUser = "autoapproval-user" + var ( CertDir = path.Join(TestDir, "client-certs") caFile = path.Join(CertDir, "ca.crt") @@ -171,6 +173,14 @@ func (t *TestAuthn) Stop() error { } func (t *TestAuthn) CreateBootstrapKubeConfigWithCertAge(configFileName, serverCertFile, securePort string, certAge time.Duration) error { + return t.CreateBootstrapKubeConfig(configFileName, serverCertFile, securePort, bootstrapUser, certAge) +} + +func (t *TestAuthn) CreateBootstrapKubeConfigWithUser(configFileName, serverCertFile, securePort, bootstrapUser string) error { + return t.CreateBootstrapKubeConfig(configFileName, serverCertFile, securePort, bootstrapUser, 24*time.Hour) +} + +func (t *TestAuthn) CreateBootstrapKubeConfig(configFileName, serverCertFile, securePort, bootstrapUser string, certAge time.Duration) error { certData, keyData, err := t.signClientCertKeyWithCA(bootstrapUser, bootstrapGroups, certAge) if err != nil { return err @@ -418,6 +428,27 @@ func (t *TestAuthn) ApproveSpokeClusterCSR(kubeClient kubernetes.Interface, spok } func (t *TestAuthn) ApproveCSR(kubeClient kubernetes.Interface, csr *certificates.CertificateSigningRequest, notBefore, notAfter time.Time) error { + if err := t.FillCertificateToApprovedCSR(kubeClient, csr, notBefore, notAfter); err != nil { + return err + } + + // approve the csr + approved, err := kubeClient.CertificatesV1().CertificateSigningRequests().Get(context.TODO(), csr.Name, metav1.GetOptions{}) + if err != nil { + return err + } + approved.Status.Conditions = append(approved.Status.Conditions, certificates.CertificateSigningRequestCondition{ + Type: certificates.CertificateApproved, + Status: corev1.ConditionTrue, + Reason: "Approved", + Message: "CSR Approved.", + LastUpdateTime: metav1.Now(), + }) + _, err = kubeClient.CertificatesV1().CertificateSigningRequests().UpdateApproval(context.TODO(), approved.Name, approved, metav1.UpdateOptions{}) + return err +} + +func (t *TestAuthn) FillCertificateToApprovedCSR(kubeClient kubernetes.Interface, csr *certificates.CertificateSigningRequest, notBefore, notAfter time.Time) error { block, _ := pem.Decode(csr.Spec.Request) cr, err := x509.ParseCertificateRequest(block.Bytes) if err != nil { @@ -470,29 +501,13 @@ func (t *TestAuthn) ApproveCSR(kubeClient kubernetes.Interface, csr *certificate } // set cert - csr.Status = certificates.CertificateSigningRequestStatus{ - Certificate: certBuffer.Bytes(), - Conditions: []certificates.CertificateSigningRequestCondition{}, - } + csr.Status.Certificate = certBuffer.Bytes() _, err = kubeClient.CertificatesV1().CertificateSigningRequests().UpdateStatus(context.TODO(), csr, metav1.UpdateOptions{}) if err != nil { return err } - // approve the csr - approved, err := kubeClient.CertificatesV1().CertificateSigningRequests().Get(context.TODO(), csr.Name, metav1.GetOptions{}) - if err != nil { - return err - } - approved.Status.Conditions = append(approved.Status.Conditions, certificates.CertificateSigningRequestCondition{ - Type: certificates.CertificateApproved, - Status: corev1.ConditionTrue, - Reason: "Approved", - Message: "CSR Approved.", - LastUpdateTime: metav1.Now(), - }) - _, err = kubeClient.CertificatesV1().CertificateSigningRequests().UpdateApproval(context.TODO(), approved.Name, approved, metav1.UpdateOptions{}) - return err + return nil } func GetManagedCluster(clusterClient clusterclientset.Interface, spokeClusterName string) (*clusterv1.ManagedCluster, error) {