auto approve bootstrap csr (#301)

Signed-off-by: Wei Liu <liuweixa@redhat.com>
This commit is contained in:
Wei Liu
2023-03-28 17:19:31 +08:00
committed by GitHub
parent e5972b44e1
commit 6e14d95873
11 changed files with 589 additions and 259 deletions

View File

@@ -7,14 +7,14 @@ import (
"github.com/openshift/library-go/pkg/controller/controllercmd"
"open-cluster-management.io/registration/pkg/features"
"open-cluster-management.io/registration/pkg/hub"
"open-cluster-management.io/registration/pkg/version"
)
func NewController() *cobra.Command {
manager := hub.NewHubManagerOptions()
cmdConfig := controllercmd.
NewControllerCommandConfig("registration-controller", version.Get(), hub.RunControllerManager)
NewControllerCommandConfig("registration-controller", version.Get(), manager.RunControllerManager)
cmd := cmdConfig.NewCommand()
cmd.Use = "controller"
cmd.Short = "Start the Cluster Registration Controller"
@@ -35,7 +35,7 @@ func NewController() *cobra.Command {
"The duration the clients should wait between attempting acquisition and renewal "+
"of a leadership. This is only applicable if leader election is enabled.")
features.DefaultHubMutableFeatureGate.AddFlag(flags)
manager.AddFlags(cmd.Flags())
return cmd
}

View File

@@ -2,44 +2,36 @@ package csr
import (
"context"
"crypto/x509"
"encoding/pem"
"fmt"
clusterv1 "open-cluster-management.io/api/cluster/v1"
"strings"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
authorizationv1 "k8s.io/api/authorization/v1"
certificatesv1 "k8s.io/api/certificates/v1"
certificatesv1beta1 "k8s.io/api/certificates/v1beta1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
certificatesinformers "k8s.io/client-go/informers/certificates/v1"
"k8s.io/client-go/kubernetes"
certificateslisters "k8s.io/client-go/listers/certificates/v1"
"k8s.io/klog/v2"
"open-cluster-management.io/registration/pkg/helpers"
"open-cluster-management.io/registration/pkg/hub/user"
)
// csrApprovingController auto approve the renewal CertificateSigningRequests for an accepted spoke cluster on the hub.
type csrApprovingController struct {
kubeClient kubernetes.Interface
csrLister certificateslisters.CertificateSigningRequestLister
eventRecorder events.Recorder
csrLister certificateslisters.CertificateSigningRequestLister
reconcilers []Reconciler
}
// NewCSRApprovingController creates a new csr approving controller
func NewCSRApprovingController(kubeClient kubernetes.Interface, csrInformer certificatesinformers.CertificateSigningRequestInformer, recorder events.Recorder) factory.Controller {
func NewCSRApprovingController(
csrInformer certificatesinformers.CertificateSigningRequestInformer,
reconcilers []Reconciler,
recorder events.Recorder) factory.Controller {
c := &csrApprovingController{
kubeClient: kubeClient,
csrLister: csrInformer.Lister(),
eventRecorder: recorder.WithComponentSuffix("csr-approving-controller"),
csrLister: csrInformer.Lister(),
reconcilers: reconcilers,
}
return factory.New().
WithInformersQueueKeyFunc(func(obj runtime.Object) string {
@@ -53,6 +45,7 @@ func NewCSRApprovingController(kubeClient kubernetes.Interface, csrInformer cert
func (c *csrApprovingController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
csrName := syncCtx.QueueKey()
klog.V(4).Infof("Reconciling CertificateSigningRequests %q", csrName)
csr, err := c.csrLister.Get(csrName)
if errors.IsNotFound(err) {
return nil
@@ -68,155 +61,29 @@ func (c *csrApprovingController) sync(ctx context.Context, syncCtx factory.SyncC
}
csrInfo := newCSRInfo(csr)
// Check whether current csr is a renewal spoker cluster csr.
isRenewal := isSpokeClusterClientCertRenewal(csrInfo)
if !isRenewal {
klog.V(4).Infof("CSR %q was not recognized", csr.Name)
return nil
for _, r := range c.reconcilers {
state, err := r.Reconcile(ctx, csrInfo, approveCSRV1Func(ctx, csr))
if err != nil {
return err
}
if state == reconcileStop {
break
}
}
// Authorize whether the current spoke agent has been authorized to renew its csr.
allowed, err := authorize(ctx, c.kubeClient, csrInfo)
if err != nil {
return err
}
if !allowed {
//TODO find a way to avoid looking at this CSR again.
klog.V(4).Infof("Managed cluster csr %q cannont be auto approved due to subject access review was not approved", csr.Name)
return nil
}
// Auto approve the spoke cluster csr
csr.Status.Conditions = append(csr.Status.Conditions, certificatesv1.CertificateSigningRequestCondition{
Type: certificatesv1.CertificateApproved,
Status: corev1.ConditionTrue,
Reason: "AutoApprovedByHubCSRApprovingController",
Message: "Auto approving Managed cluster agent certificate after SubjectAccessReview.",
})
_, err = c.kubeClient.CertificatesV1().CertificateSigningRequests().UpdateApproval(ctx, csr.Name, csr, metav1.UpdateOptions{})
if err != nil {
return err
}
c.eventRecorder.Eventf("ManagedClusterCSRAutoApproved", "spoke cluster csr %q is auto approved by hub csr controller", csr.Name)
return nil
}
// Using SubjectAccessReview API to check whether a spoke agent has been authorized to renew its csr,
// a spoke agent is authorized after its spoke cluster is accepted by hub cluster admin.
func authorize(ctx context.Context, kubeClient kubernetes.Interface, csr csrInfo) (bool, error) {
sar := &authorizationv1.SubjectAccessReview{
Spec: authorizationv1.SubjectAccessReviewSpec{
User: csr.username,
UID: csr.uid,
Groups: csr.groups,
Extra: csr.extra,
ResourceAttributes: &authorizationv1.ResourceAttributes{
Group: "register.open-cluster-management.io",
Resource: "managedclusters",
Verb: "renew",
Subresource: "clientcertificates",
},
},
}
sar, err := kubeClient.AuthorizationV1().SubjectAccessReviews().Create(ctx, sar, metav1.CreateOptions{})
if err != nil {
return false, err
}
return sar.Status.Allowed, nil
}
// To check a renewal managed cluster csr, we check
// 1. if the signer name in csr request is valid.
// 2. if organization field and commonName field in csr request is valid.
// 3. if user name in csr is the same as commonName field in csr request.
func isSpokeClusterClientCertRenewal(csr csrInfo) bool {
spokeClusterName, existed := csr.labels[clusterv1.ClusterNameLabelKey]
if !existed {
return false
}
if csr.signerName != certificatesv1.KubeAPIServerClientSignerName {
return false
}
block, _ := pem.Decode(csr.request)
if block == nil || block.Type != "CERTIFICATE REQUEST" {
klog.V(4).Infof("csr %q was not recognized: PEM block type is not CERTIFICATE REQUEST", csr.name)
return false
}
x509cr, err := x509.ParseCertificateRequest(block.Bytes)
if err != nil {
klog.V(4).Infof("csr %q was not recognized: %v", csr.name, err)
return false
}
requestingOrgs := sets.NewString(x509cr.Subject.Organization...)
if requestingOrgs.Has(user.ManagedClustersGroup) { // optional common group for backward-compatibility
requestingOrgs.Delete(user.ManagedClustersGroup)
}
if requestingOrgs.Len() != 1 {
return false
}
expectedPerClusterOrg := fmt.Sprintf("%s%s", user.SubjectPrefix, spokeClusterName)
if !requestingOrgs.Has(expectedPerClusterOrg) {
return false
}
if !strings.HasPrefix(x509cr.Subject.CommonName, expectedPerClusterOrg) {
return false
}
return csr.username == x509cr.Subject.CommonName
}
type csrInfo struct {
name string
labels map[string]string
signerName string
username string
uid string
groups []string
extra map[string]authorizationv1.ExtraValue
request []byte
}
// newCSRInfo creates csrInfo from CertificateSigningRequest by api version(v1/v1beta1).
func newCSRInfo(csr any) csrInfo {
extra := make(map[string]authorizationv1.ExtraValue)
switch v := csr.(type) {
case *certificatesv1.CertificateSigningRequest:
for k, v := range v.Spec.Extra {
extra[k] = authorizationv1.ExtraValue(v)
}
return csrInfo{
name: v.Name,
labels: v.Labels,
signerName: v.Spec.SignerName,
username: v.Spec.Username,
uid: v.Spec.UID,
groups: v.Spec.Groups,
extra: extra,
request: v.Spec.Request,
}
case *certificatesv1beta1.CertificateSigningRequest:
for k, v := range v.Spec.Extra {
extra[k] = authorizationv1.ExtraValue(v)
}
return csrInfo{
name: v.Name,
labels: v.Labels,
signerName: *v.Spec.SignerName,
username: v.Spec.Username,
uid: v.Spec.UID,
groups: v.Spec.Groups,
extra: extra,
request: v.Spec.Request,
}
default:
klog.Errorf("Unsupported type %T", v)
return csrInfo{}
func approveCSRV1Func(ctx context.Context, csr *certificatesv1.CertificateSigningRequest) approveCSRFunc {
return func(kubeClient kubernetes.Interface) error {
// Auto approve the spoke cluster csr
csr.Status.Conditions = append(csr.Status.Conditions, certificatesv1.CertificateSigningRequestCondition{
Type: certificatesv1.CertificateApproved,
Status: corev1.ConditionTrue,
Reason: "AutoApprovedByHubCSRApprovingController",
Message: "Auto approving Managed cluster agent certificate after SubjectAccessReview.",
})
_, err := kubeClient.CertificatesV1().CertificateSigningRequests().UpdateApproval(ctx, csr.Name, csr, metav1.UpdateOptions{})
return err
}
}

View File

@@ -21,20 +21,18 @@ import (
// v1beta1CSRApprovingController auto approve the renewal CertificateSigningRequests for an accepted spoke cluster on the hub.
type v1beta1CSRApprovingController struct {
kubeClient kubernetes.Interface
csrLister certificatesv1beta1lister.CertificateSigningRequestLister
eventRecorder events.Recorder
csrLister certificatesv1beta1lister.CertificateSigningRequestLister
reconcilers []Reconciler
}
func NewV1beta1CSRApprovingController(
kubeClient kubernetes.Interface,
v1beta1CSRInformer certificatesv1beta1informers.CertificateSigningRequestInformer,
reconcilers []Reconciler,
recorder events.Recorder) factory.Controller {
c := &v1beta1CSRApprovingController{
kubeClient: kubeClient,
csrLister: v1beta1CSRInformer.Lister(),
eventRecorder: recorder.WithComponentSuffix("csr-approving-controller"),
csrLister: v1beta1CSRInformer.Lister(),
reconcilers: reconcilers,
}
return factory.New().WithInformersQueueKeyFunc(func(obj runtime.Object) string {
@@ -63,34 +61,29 @@ func (c *v1beta1CSRApprovingController) sync(ctx context.Context, syncCtx factor
}
csrInfo := newCSRInfo(csr)
// Check whether current csr is a renewal spoke cluster csr.
isRenewal := isSpokeClusterClientCertRenewal(csrInfo)
if !isRenewal {
klog.V(4).Infof("CSR %q was not recognized", csr.Name)
return nil
for _, r := range c.reconcilers {
state, err := r.Reconcile(ctx, csrInfo, approveCSRV1beta1Func(ctx, csr))
if err != nil {
return err
}
if state == reconcileStop {
break
}
}
allowed, err := authorize(ctx, c.kubeClient, csrInfo)
if err != nil {
return err
}
if !allowed {
//TODO find a way to avoid looking at this CSR again.
klog.V(4).Infof("Managed cluster csr %q cannont be auto approved due to subject access review was not approved", csr.Name)
return nil
}
// Auto approve the spoke cluster csr
csr.Status.Conditions = append(csr.Status.Conditions, certificatesv1beta1.CertificateSigningRequestCondition{
Type: certificatesv1beta1.CertificateApproved,
Status: corev1.ConditionTrue,
Reason: "AutoApprovedByHubCSRApprovingController",
Message: "Auto approving Managed cluster agent certificate after SubjectAccessReview.",
})
_, err = c.kubeClient.CertificatesV1beta1().CertificateSigningRequests().UpdateApproval(ctx, csr, metav1.UpdateOptions{})
if err != nil {
return err
}
c.eventRecorder.Eventf("ManagedClusterCSRAutoApproved", "spoke cluster csr %q is auto approved by hub csr controller", csr.Name)
return nil
}
func approveCSRV1beta1Func(ctx context.Context, csr *certificatesv1beta1.CertificateSigningRequest) approveCSRFunc {
return func(kubeClient kubernetes.Interface) error {
// Auto approve the spoke cluster csr
csr.Status.Conditions = append(csr.Status.Conditions, certificatesv1beta1.CertificateSigningRequestCondition{
Type: certificatesv1beta1.CertificateApproved,
Status: corev1.ConditionTrue,
Reason: "AutoApprovedByHubCSRApprovingController",
Message: "Auto approving Managed cluster agent certificate after SubjectAccessReview.",
})
_, err := kubeClient.CertificatesV1beta1().CertificateSigningRequests().UpdateApproval(ctx, csr, metav1.UpdateOptions{})
return err
}
}

View File

@@ -144,9 +144,14 @@ func Test_v1beta1CSRApprovingController_sync(t *testing.T) {
}
ctrl := &v1beta1CSRApprovingController{
kubeClient,
informerFactory.Certificates().V1beta1().CertificateSigningRequests().Lister(),
eventstesting.NewTestingEventRecorder(t),
[]Reconciler{
&csrBootstrapReconciler{},
&csrRenewalReconciler{
kubeClient: kubeClient,
eventRecorder: eventstesting.NewTestingEventRecorder(t),
},
},
}
if err := ctrl.sync(context.TODO(), testinghelpers.NewFakeSyncContext(t, validV1beta1CSR.Name)); (err != nil) != tt.wantErr {
t.Errorf("v1beta1CSRApprovingController.sync() error = %v, wantErr %v", err, tt.wantErr)

View File

@@ -5,6 +5,9 @@ import (
"testing"
"time"
clusterfake "open-cluster-management.io/api/client/cluster/clientset/versioned/fake"
clusterinformers "open-cluster-management.io/api/client/cluster/informers/externalversions"
clusterv1 "open-cluster-management.io/api/cluster/v1"
testinghelpers "open-cluster-management.io/registration/pkg/helpers/testing"
"open-cluster-management.io/registration/pkg/hub/user"
@@ -13,6 +16,7 @@ import (
authorizationv1 "k8s.io/api/authorization/v1"
certificatesv1 "k8s.io/api/certificates/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/informers"
@@ -35,33 +39,39 @@ var (
func TestSync(t *testing.T) {
cases := []struct {
name string
startingClusters []runtime.Object
startingCSRs []runtime.Object
approvalUsers []string
autoApprovingAllowed bool
validateActions func(t *testing.T, actions []clienttesting.Action)
}{
{
name: "sync a deleted csr",
startingCSRs: []runtime.Object{},
name: "sync a deleted csr",
startingClusters: []runtime.Object{},
startingCSRs: []runtime.Object{},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testinghelpers.AssertNoActions(t, actions)
},
},
{
name: "sync a denied csr",
startingCSRs: []runtime.Object{testinghelpers.NewDeniedCSR(validCSR)},
name: "sync a denied csr",
startingClusters: []runtime.Object{},
startingCSRs: []runtime.Object{testinghelpers.NewDeniedCSR(validCSR)},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testinghelpers.AssertNoActions(t, actions)
},
},
{
name: "sync an approved csr",
startingCSRs: []runtime.Object{testinghelpers.NewApprovedCSR(validCSR)},
name: "sync an approved csr",
startingClusters: []runtime.Object{},
startingCSRs: []runtime.Object{testinghelpers.NewApprovedCSR(validCSR)},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testinghelpers.AssertNoActions(t, actions)
},
},
{
name: "sync an invalid csr",
name: "sync an invalid csr",
startingClusters: []runtime.Object{},
startingCSRs: []runtime.Object{testinghelpers.NewCSR(testinghelpers.CSRHolder{
Name: validCSR.Name,
Labels: validCSR.Labels,
@@ -76,8 +86,9 @@ func TestSync(t *testing.T) {
},
},
{
name: "deny an auto approving csr",
startingCSRs: []runtime.Object{testinghelpers.NewCSR(validCSR)},
name: "deny an auto approving csr",
startingClusters: []runtime.Object{},
startingCSRs: []runtime.Object{testinghelpers.NewCSR(validCSR)},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testinghelpers.AssertActions(t, actions, "create")
testinghelpers.AssertSubjectAccessReviewObj(t, actions[0].(clienttesting.CreateActionImpl).Object)
@@ -85,6 +96,7 @@ func TestSync(t *testing.T) {
},
{
name: "allow an auto approving csr",
startingClusters: []runtime.Object{},
startingCSRs: []runtime.Object{testinghelpers.NewCSR(validCSR)},
autoApprovingAllowed: true,
validateActions: func(t *testing.T, actions []clienttesting.Action) {
@@ -100,7 +112,8 @@ func TestSync(t *testing.T) {
},
},
{
name: "allow an auto approving csr w/o ManagedClusterGroup for backward-compatibility",
name: "allow an auto approving csr w/o ManagedClusterGroup for backward-compatibility",
startingClusters: []runtime.Object{},
startingCSRs: []runtime.Object{testinghelpers.NewCSR(testinghelpers.CSRHolder{
Name: validCSR.Name,
Labels: validCSR.Labels,
@@ -123,6 +136,34 @@ func TestSync(t *testing.T) {
testinghelpers.AssertCSRCondition(t, actual.(*certificatesv1.CertificateSigningRequest).Status.Conditions, expectedCondition)
},
},
{
name: "auto approve a bootstrap csr request",
startingClusters: []runtime.Object{
&clusterv1.ManagedCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "managedcluster1",
},
},
},
startingCSRs: []runtime.Object{func() *certificatesv1.CertificateSigningRequest {
csr := testinghelpers.NewCSR(validCSR)
csr.Spec.Username = "test"
return csr
}()},
autoApprovingAllowed: true,
approvalUsers: []string{"test"},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
expectedCondition := certificatesv1.CertificateSigningRequestCondition{
Type: certificatesv1.CertificateApproved,
Status: corev1.ConditionTrue,
Reason: "AutoApprovedByHubCSRApprovingController",
Message: "Auto approving Managed cluster agent certificate after SubjectAccessReview.",
}
testinghelpers.AssertActions(t, actions, "update")
actual := actions[0].(clienttesting.UpdateActionImpl).Object
testinghelpers.AssertCSRCondition(t, actual.(*certificatesv1.CertificateSigningRequest).Status.Conditions, expectedCondition)
},
},
}
for _, c := range cases {
@@ -147,7 +188,34 @@ func TestSync(t *testing.T) {
}
}
ctrl := &csrApprovingController{kubeClient, informerFactory.Certificates().V1().CertificateSigningRequests().Lister(), eventstesting.NewTestingEventRecorder(t)}
clusterClient := clusterfake.NewSimpleClientset(c.startingClusters...)
clusterInformerFactory := clusterinformers.NewSharedInformerFactory(clusterClient, time.Minute*10)
clusterStore := clusterInformerFactory.Cluster().V1().ManagedClusters().Informer().GetStore()
for _, cluster := range c.startingClusters {
if err := clusterStore.Add(cluster); err != nil {
t.Fatal(err)
}
}
recorder := eventstesting.NewTestingEventRecorder(t)
ctrl := &csrApprovingController{
informerFactory.Certificates().V1().CertificateSigningRequests().Lister(),
[]Reconciler{
&csrBootstrapReconciler{
kubeClient: kubeClient,
eventRecorder: recorder,
approvalUsers: sets.Set[string]{},
},
NewCSRRenewalReconciler(kubeClient, recorder),
NewCSRBootstrapReconciler(
kubeClient,
clusterClient,
clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(),
c.approvalUsers,
recorder,
),
},
}
syncErr := ctrl.sync(context.TODO(), testinghelpers.NewFakeSyncContext(t, validCSR.Name))
if syncErr != nil {
t.Errorf("unexpected err: %v", syncErr)
@@ -162,9 +230,11 @@ func TestIsSpokeClusterClientCertRenewal(t *testing.T) {
invalidSignerName := "invalidsigner"
cases := []struct {
name string
csr testinghelpers.CSRHolder
isRenewal bool
name string
csr testinghelpers.CSRHolder
isRenewal bool
clusterName string
commonName string
}{
{
name: "a spoke cluster csr without labels",
@@ -217,19 +287,6 @@ func TestIsSpokeClusterClientCertRenewal(t *testing.T) {
},
isRenewal: false,
},
{
name: "an common name does not equal user name",
csr: testinghelpers.CSRHolder{
Name: validCSR.Name,
Labels: validCSR.Labels,
SignerName: validCSR.SignerName,
CN: "system:open-cluster-management:managedcluster1:invalidagent",
Orgs: validCSR.Orgs,
Username: validCSR.Username,
ReqBlockType: validCSR.ReqBlockType,
},
isRenewal: false,
},
{
name: "a renewal csr without signer name",
csr: testinghelpers.CSRHolder{
@@ -244,18 +301,26 @@ func TestIsSpokeClusterClientCertRenewal(t *testing.T) {
isRenewal: false,
},
{
name: "a renewal csr",
csr: validCSR,
isRenewal: true,
name: "a renewal csr",
csr: validCSR,
isRenewal: true,
clusterName: "managedcluster1",
commonName: validCSR.CN,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
isRenewal := isSpokeClusterClientCertRenewal(newCSRInfo(testinghelpers.NewCSR(c.csr)))
isRenewal, clusterName, commonName := validateCSR(newCSRInfo(testinghelpers.NewCSR(c.csr)))
if isRenewal != c.isRenewal {
t.Errorf("expected %t, but failed", c.isRenewal)
}
if clusterName != c.clusterName {
t.Errorf("expected %s, but failed", commonName)
}
if commonName != c.commonName {
t.Errorf("expected %s, but failed", commonName)
}
})
}
}

269
pkg/hub/csr/reconciler.go Normal file
View File

@@ -0,0 +1,269 @@
package csr
import (
"context"
"crypto/x509"
"encoding/pem"
"fmt"
"strings"
"github.com/openshift/library-go/pkg/operator/events"
authorizationv1 "k8s.io/api/authorization/v1"
certificatesv1 "k8s.io/api/certificates/v1"
certificatesv1beta1 "k8s.io/api/certificates/v1beta1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
clusterclientset "open-cluster-management.io/api/client/cluster/clientset/versioned"
clusterv1listers "open-cluster-management.io/api/client/cluster/listers/cluster/v1"
clusterv1 "open-cluster-management.io/api/cluster/v1"
"open-cluster-management.io/registration/pkg/hub/user"
)
type reconcileState int64
const (
reconcileStop reconcileState = iota
reconcileContinue
)
type csrInfo struct {
name string
labels map[string]string
signerName string
username string
uid string
groups []string
extra map[string]authorizationv1.ExtraValue
request []byte
}
type approveCSRFunc func(kubernetes.Interface) error
type Reconciler interface {
Reconcile(context.Context, csrInfo, approveCSRFunc) (reconcileState, error)
}
type csrRenewalReconciler struct {
kubeClient kubernetes.Interface
eventRecorder events.Recorder
}
func NewCSRRenewalReconciler(kubeClient kubernetes.Interface, recorder events.Recorder) Reconciler {
return &csrRenewalReconciler{
kubeClient: kubeClient,
eventRecorder: recorder.WithComponentSuffix("csr-approving-controller"),
}
}
func (r *csrRenewalReconciler) Reconcile(ctx context.Context, csr csrInfo, approveCSR approveCSRFunc) (reconcileState, error) {
// Check whether current csr is a valid spoker cluster csr.
valid, _, commonName := validateCSR(csr)
if !valid {
klog.V(4).Infof("CSR %q was not recognized", csr.name)
return reconcileStop, nil
}
// Check if user name in csr is the same as commonName field in csr request.
if csr.username != commonName {
return reconcileContinue, nil
}
// Authorize whether the current spoke agent has been authorized to renew its csr.
allowed, err := authorize(ctx, r.kubeClient, csr)
if err != nil {
return reconcileContinue, err
}
if !allowed {
klog.V(4).Infof("Managed cluster csr %q cannont be auto approved due to subject access review was not approved", csr.name)
return reconcileStop, nil
}
if err := approveCSR(r.kubeClient); err != nil {
return reconcileContinue, err
}
r.eventRecorder.Eventf("ManagedClusterCSRAutoApproved", "spoke cluster csr %q is auto approved by hub csr controller", csr.name)
return reconcileStop, nil
}
type csrBootstrapReconciler struct {
kubeClient kubernetes.Interface
clusterClient clusterclientset.Interface
clusterLister clusterv1listers.ManagedClusterLister
approvalUsers sets.Set[string]
eventRecorder events.Recorder
}
func NewCSRBootstrapReconciler(kubeClient kubernetes.Interface,
clusterClient clusterclientset.Interface,
clusterLister clusterv1listers.ManagedClusterLister,
approvalUsers []string,
recorder events.Recorder) Reconciler {
return &csrBootstrapReconciler{
kubeClient: kubeClient,
clusterClient: clusterClient,
clusterLister: clusterLister,
approvalUsers: sets.New(approvalUsers...),
eventRecorder: recorder.WithComponentSuffix("csr-approving-controller"),
}
}
func (b *csrBootstrapReconciler) Reconcile(ctx context.Context, csr csrInfo, approveCSR approveCSRFunc) (reconcileState, error) {
// Check whether current csr is a valid spoker cluster csr.
valid, clusterName, _ := validateCSR(csr)
if !valid {
klog.V(4).Infof("CSR %q was not recognized", csr.name)
return reconcileStop, nil
}
// Check whether current csr can be approved.
if !b.approvalUsers.Has(csr.username) {
return reconcileContinue, nil
}
err := b.accpetCluster(ctx, clusterName)
if errors.IsNotFound(err) {
// Current spoke cluster not found, could have been deleted, do nothing.
return reconcileStop, nil
}
if err != nil {
return reconcileContinue, err
}
if err := approveCSR(b.kubeClient); err != nil {
return reconcileContinue, err
}
b.eventRecorder.Eventf("ManagedClusterAutoApproved", "spoke cluster %q is auto approved.", clusterName)
return reconcileStop, nil
}
func (b *csrBootstrapReconciler) accpetCluster(ctx context.Context, managedClusterName string) error {
managedCluster, err := b.clusterLister.Get(managedClusterName)
if err != nil {
return err
}
if managedCluster.Spec.HubAcceptsClient {
return nil
}
patch := []byte("{\"spec\": {\"hubAcceptsClient\": true}}")
_, err = b.clusterClient.ClusterV1().ManagedClusters().Patch(
ctx, managedCluster.Name, types.MergePatchType, patch, metav1.PatchOptions{})
return err
}
// To validate a managed cluster csr, we check
// 1. if the signer name in csr request is valid.
// 2. if organization field and commonName field in csr request is valid.
func validateCSR(csr csrInfo) (bool, string, string) {
spokeClusterName, existed := csr.labels[clusterv1.ClusterNameLabelKey]
if !existed {
return false, "", ""
}
if csr.signerName != certificatesv1.KubeAPIServerClientSignerName {
return false, "", ""
}
block, _ := pem.Decode(csr.request)
if block == nil || block.Type != "CERTIFICATE REQUEST" {
klog.V(4).Infof("csr %q was not recognized: PEM block type is not CERTIFICATE REQUEST", csr.name)
return false, "", ""
}
x509cr, err := x509.ParseCertificateRequest(block.Bytes)
if err != nil {
klog.V(4).Infof("csr %q was not recognized: %v", csr.name, err)
return false, "", ""
}
requestingOrgs := sets.New(x509cr.Subject.Organization...)
if requestingOrgs.Has(user.ManagedClustersGroup) { // optional common group for backward-compatibility
requestingOrgs.Delete(user.ManagedClustersGroup)
}
if requestingOrgs.Len() != 1 {
return false, "", ""
}
expectedPerClusterOrg := fmt.Sprintf("%s%s", user.SubjectPrefix, spokeClusterName)
if !requestingOrgs.Has(expectedPerClusterOrg) {
return false, "", ""
}
if !strings.HasPrefix(x509cr.Subject.CommonName, expectedPerClusterOrg) {
return false, "", ""
}
return true, spokeClusterName, x509cr.Subject.CommonName
}
// Using SubjectAccessReview API to check whether a spoke agent has been authorized to renew its csr,
// a spoke agent is authorized after its spoke cluster is accepted by hub cluster admin.
func authorize(ctx context.Context, kubeClient kubernetes.Interface, csr csrInfo) (bool, error) {
sar := &authorizationv1.SubjectAccessReview{
Spec: authorizationv1.SubjectAccessReviewSpec{
User: csr.username,
UID: csr.uid,
Groups: csr.groups,
Extra: csr.extra,
ResourceAttributes: &authorizationv1.ResourceAttributes{
Group: "register.open-cluster-management.io",
Resource: "managedclusters",
Verb: "renew",
Subresource: "clientcertificates",
},
},
}
sar, err := kubeClient.AuthorizationV1().SubjectAccessReviews().Create(ctx, sar, metav1.CreateOptions{})
if err != nil {
return false, err
}
return sar.Status.Allowed, nil
}
// newCSRInfo creates csrInfo from CertificateSigningRequest by api version(v1/v1beta1).
func newCSRInfo(csr any) csrInfo {
extra := make(map[string]authorizationv1.ExtraValue)
switch v := csr.(type) {
case *certificatesv1.CertificateSigningRequest:
for k, v := range v.Spec.Extra {
extra[k] = authorizationv1.ExtraValue(v)
}
return csrInfo{
name: v.Name,
labels: v.Labels,
signerName: v.Spec.SignerName,
username: v.Spec.Username,
uid: v.Spec.UID,
groups: v.Spec.Groups,
extra: extra,
request: v.Spec.Request,
}
case *certificatesv1beta1.CertificateSigningRequest:
for k, v := range v.Spec.Extra {
extra[k] = authorizationv1.ExtraValue(v)
}
return csrInfo{
name: v.Name,
labels: v.Labels,
signerName: *v.Spec.SignerName,
username: v.Spec.Username,
uid: v.Spec.UID,
groups: v.Spec.Groups,
extra: extra,
request: v.Spec.Request,
}
default:
klog.Errorf("Unsupported type %T", v)
return csrInfo{}
}
}

View File

@@ -28,6 +28,7 @@ import (
"github.com/openshift/library-go/pkg/controller/controllercmd"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/pkg/errors"
"github.com/spf13/pflag"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
@@ -37,8 +38,26 @@ import (
var ResyncInterval = 5 * time.Minute
// HubManagerOptions holds configuration for hub manager controller
type HubManagerOptions struct {
ClusterAutoApprovalUsers []string
}
// NewHubManagerOptions returns a HubManagerOptions
func NewHubManagerOptions() *HubManagerOptions {
return &HubManagerOptions{}
}
// AddFlags registers flags for manager
func (m *HubManagerOptions) AddFlags(fs *pflag.FlagSet) {
features.DefaultHubMutableFeatureGate.AddFlag(fs)
fs.StringArrayVar(&m.ClusterAutoApprovalUsers, "--cluster-auto-approval-users", m.ClusterAutoApprovalUsers,
"A list of reachable spoke cluster api server URLs for hub cluster.")
}
// RunControllerManager starts the controllers on hub to manage spoke cluster registration.
func RunControllerManager(ctx context.Context, controllerContext *controllercmd.ControllerContext) error {
func (m *HubManagerOptions) RunControllerManager(ctx context.Context, controllerContext *controllercmd.ControllerContext) error {
// If qps in kubconfig is not set, increase the qps and burst to enhance the ability of kube client to handle
// requests in concurrent
// TODO: Use ClientConnectionOverrides flags to change qps/burst when library-go exposes them in the future
@@ -86,6 +105,17 @@ func RunControllerManager(ctx context.Context, controllerContext *controllercmd.
controllerContext.EventRecorder,
)
csrReconciles := []csr.Reconciler{csr.NewCSRRenewalReconciler(kubeClient, controllerContext.EventRecorder)}
if features.DefaultHubMutableFeatureGate.Enabled(ocmfeature.ManagedClusterAutoApproval) {
csrReconciles = append(csrReconciles, csr.NewCSRBootstrapReconciler(
kubeClient,
clusterClient,
clusterInformers.Cluster().V1().ManagedClusters().Lister(),
m.ClusterAutoApprovalUsers,
controllerContext.EventRecorder,
))
}
var csrController factory.Controller
if features.DefaultHubMutableFeatureGate.Enabled(ocmfeature.V1beta1CSRAPICompatibility) {
v1CSRSupported, v1beta1CSRSupported, err := helpers.IsCSRSupported(kubeClient)
@@ -95,8 +125,8 @@ func RunControllerManager(ctx context.Context, controllerContext *controllercmd.
if !v1CSRSupported && v1beta1CSRSupported {
csrController = csr.NewV1beta1CSRApprovingController(
kubeClient,
kubeInfomers.Certificates().V1beta1().CertificateSigningRequests(),
csrReconciles,
controllerContext.EventRecorder,
)
klog.Info("Using v1beta1 CSR api to manage spoke client certificate")
@@ -104,8 +134,8 @@ func RunControllerManager(ctx context.Context, controllerContext *controllercmd.
}
if csrController == nil {
csrController = csr.NewCSRApprovingController(
kubeClient,
kubeInfomers.Certificates().V1().CertificateSigningRequests(),
csrReconciles,
controllerContext.EventRecorder,
)
}

View File

@@ -80,7 +80,7 @@ var _ = ginkgo.Describe("Disaster Recovery", func() {
// start hub controller
go func() {
err := hub.RunControllerManager(ctx, &controllercmd.ControllerContext{
err := hub.NewHubManagerOptions().RunControllerManager(ctx, &controllercmd.ControllerContext{
KubeConfig: cfg,
EventRecorder: util.NewIntegrationTestEventRecorder("hub"),
})

View File

@@ -3,7 +3,6 @@ package integration_test
import (
"context"
"fmt"
"io/ioutil"
"os"
"path"
"path/filepath"
@@ -66,7 +65,7 @@ func TestIntegration(t *testing.T) {
ginkgo.RunSpecs(t, "Integration Suite")
}
var _ = ginkgo.BeforeSuite(func(done ginkgo.Done) {
var _ = ginkgo.BeforeSuite(func() {
logf.SetLogger(zap.New(zap.WriteTo(ginkgo.GinkgoWriter), zap.UseDevMode(true)))
ginkgo.By("bootstrapping test environment")
@@ -141,7 +140,7 @@ var _ = ginkgo.BeforeSuite(func(done ginkgo.Done) {
gomega.Expect(clusterClient).ToNot(gomega.BeNil())
// prepare test namespace
nsBytes, err := ioutil.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
nsBytes, err := os.ReadFile("/var/run/secrets/kubernetes.io/serviceaccount/namespace")
if err != nil {
testNamespace = "open-cluster-management-agent"
} else {
@@ -154,16 +153,20 @@ var _ = ginkgo.BeforeSuite(func(done ginkgo.Done) {
err = features.DefaultHubMutableFeatureGate.Set("DefaultClusterSet=true")
gomega.Expect(err).ToNot(gomega.HaveOccurred())
// enable ManagedClusterAutoApproval feature gate
err = features.DefaultHubMutableFeatureGate.Set("ManagedClusterAutoApproval=true")
gomega.Expect(err).NotTo(gomega.HaveOccurred())
// start hub controller
go func() {
err := hub.RunControllerManager(ctx, &controllercmd.ControllerContext{
m := hub.NewHubManagerOptions()
m.ClusterAutoApprovalUsers = []string{util.AutoApprovalBootstrapUser}
err := m.RunControllerManager(ctx, &controllercmd.ControllerContext{
KubeConfig: cfg,
EventRecorder: util.NewIntegrationTestEventRecorder("hub"),
})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}()
close(done)
})
var _ = ginkgo.AfterSuite(func() {

View File

@@ -0,0 +1,83 @@
package integration_test
import (
"fmt"
"path"
"time"
"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
certificates "k8s.io/api/certificates/v1"
"k8s.io/apimachinery/pkg/api/meta"
clusterv1 "open-cluster-management.io/api/cluster/v1"
"open-cluster-management.io/registration/pkg/spoke"
"open-cluster-management.io/registration/test/integration/util"
)
var _ = ginkgo.Describe("Cluster Auto Approval", func() {
ginkgo.It("Cluster should be automatically approved", func() {
var err error
managedClusterName := "autoapprovaltest-spokecluster"
hubKubeconfigSecret := "autoapprovaltest-hub-kubeconfig-secret"
hubKubeconfigDir := path.Join(util.TestDir, "autoapprovaltest", "hub-kubeconfig")
bootstrapFile := path.Join(util.TestDir, "bootstrap-autoapprovaltest", "kubeconfig")
err = authn.CreateBootstrapKubeConfigWithUser(bootstrapFile, serverCertFile, securePort, util.AutoApprovalBootstrapUser)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
agentOptions := spoke.SpokeAgentOptions{
ClusterName: managedClusterName,
BootstrapKubeconfig: bootstrapFile,
HubKubeconfigSecret: hubKubeconfigSecret,
HubKubeconfigDir: hubKubeconfigDir,
ClusterHealthCheckPeriod: 1 * time.Minute,
}
// run registration agent
cancel := util.RunAgent("autoapprovaltest", agentOptions, spokeCfg)
defer cancel()
gomega.Eventually(func() error {
if _, err := util.GetManagedCluster(clusterClient, managedClusterName); err != nil {
return err
}
return nil
}, eventuallyTimeout, eventuallyInterval).ShouldNot(gomega.HaveOccurred())
var approvedCSR *certificates.CertificateSigningRequest
// after bootstrap the spokecluster csr should be auto approved
gomega.Eventually(func() error {
approvedCSR, err = util.FindAutoApprovedSpokeCSR(kubeClient, managedClusterName)
if err != nil {
return err
}
return nil
}, eventuallyTimeout, eventuallyInterval).ShouldNot(gomega.HaveOccurred())
// simulate hub cluster to fill a certificate
now := time.Now()
err = authn.FillCertificateToApprovedCSR(kubeClient, approvedCSR, now.UTC(), now.Add(30*time.Second).UTC())
gomega.Expect(err).NotTo(gomega.HaveOccurred())
// the hub kubeconfig secret should be filled after the csr is approved
gomega.Eventually(func() error {
if _, err := util.GetFilledHubKubeConfigSecret(kubeClient, testNamespace, hubKubeconfigSecret); err != nil {
return err
}
return nil
}, eventuallyTimeout, eventuallyInterval).ShouldNot(gomega.HaveOccurred())
// the spoke cluster should have joined condition finally
gomega.Eventually(func() error {
spokeCluster, err := util.GetManagedCluster(clusterClient, managedClusterName)
if err != nil {
return err
}
if !meta.IsStatusConditionTrue(spokeCluster.Status.Conditions, clusterv1.ManagedClusterConditionJoined) {
return fmt.Errorf("cluster should be joined")
}
return nil
}, eventuallyTimeout, eventuallyInterval).ShouldNot(gomega.HaveOccurred())
})
})

View File

@@ -45,6 +45,8 @@ const (
TestDir = "/tmp/registration-integration-test"
)
const AutoApprovalBootstrapUser = "autoapproval-user"
var (
CertDir = path.Join(TestDir, "client-certs")
caFile = path.Join(CertDir, "ca.crt")
@@ -171,6 +173,14 @@ func (t *TestAuthn) Stop() error {
}
func (t *TestAuthn) CreateBootstrapKubeConfigWithCertAge(configFileName, serverCertFile, securePort string, certAge time.Duration) error {
return t.CreateBootstrapKubeConfig(configFileName, serverCertFile, securePort, bootstrapUser, certAge)
}
func (t *TestAuthn) CreateBootstrapKubeConfigWithUser(configFileName, serverCertFile, securePort, bootstrapUser string) error {
return t.CreateBootstrapKubeConfig(configFileName, serverCertFile, securePort, bootstrapUser, 24*time.Hour)
}
func (t *TestAuthn) CreateBootstrapKubeConfig(configFileName, serverCertFile, securePort, bootstrapUser string, certAge time.Duration) error {
certData, keyData, err := t.signClientCertKeyWithCA(bootstrapUser, bootstrapGroups, certAge)
if err != nil {
return err
@@ -418,6 +428,27 @@ func (t *TestAuthn) ApproveSpokeClusterCSR(kubeClient kubernetes.Interface, spok
}
func (t *TestAuthn) ApproveCSR(kubeClient kubernetes.Interface, csr *certificates.CertificateSigningRequest, notBefore, notAfter time.Time) error {
if err := t.FillCertificateToApprovedCSR(kubeClient, csr, notBefore, notAfter); err != nil {
return err
}
// approve the csr
approved, err := kubeClient.CertificatesV1().CertificateSigningRequests().Get(context.TODO(), csr.Name, metav1.GetOptions{})
if err != nil {
return err
}
approved.Status.Conditions = append(approved.Status.Conditions, certificates.CertificateSigningRequestCondition{
Type: certificates.CertificateApproved,
Status: corev1.ConditionTrue,
Reason: "Approved",
Message: "CSR Approved.",
LastUpdateTime: metav1.Now(),
})
_, err = kubeClient.CertificatesV1().CertificateSigningRequests().UpdateApproval(context.TODO(), approved.Name, approved, metav1.UpdateOptions{})
return err
}
func (t *TestAuthn) FillCertificateToApprovedCSR(kubeClient kubernetes.Interface, csr *certificates.CertificateSigningRequest, notBefore, notAfter time.Time) error {
block, _ := pem.Decode(csr.Spec.Request)
cr, err := x509.ParseCertificateRequest(block.Bytes)
if err != nil {
@@ -470,29 +501,13 @@ func (t *TestAuthn) ApproveCSR(kubeClient kubernetes.Interface, csr *certificate
}
// set cert
csr.Status = certificates.CertificateSigningRequestStatus{
Certificate: certBuffer.Bytes(),
Conditions: []certificates.CertificateSigningRequestCondition{},
}
csr.Status.Certificate = certBuffer.Bytes()
_, err = kubeClient.CertificatesV1().CertificateSigningRequests().UpdateStatus(context.TODO(), csr, metav1.UpdateOptions{})
if err != nil {
return err
}
// approve the csr
approved, err := kubeClient.CertificatesV1().CertificateSigningRequests().Get(context.TODO(), csr.Name, metav1.GetOptions{})
if err != nil {
return err
}
approved.Status.Conditions = append(approved.Status.Conditions, certificates.CertificateSigningRequestCondition{
Type: certificates.CertificateApproved,
Status: corev1.ConditionTrue,
Reason: "Approved",
Message: "CSR Approved.",
LastUpdateTime: metav1.Now(),
})
_, err = kubeClient.CertificatesV1().CertificateSigningRequests().UpdateApproval(context.TODO(), approved.Name, approved, metav1.UpdateOptions{})
return err
return nil
}
func GetManagedCluster(clusterClient clusterclientset.Interface, spokeClusterName string) (*clusterv1.ManagedCluster, error) {