support cert auto approve for grpc (#1134)
Some checks failed
Scorecard supply-chain security / Scorecard analysis (push) Failing after 2m35s
Post / coverage (push) Failing after 41m11s
Post / images (amd64, addon-manager) (push) Failing after 8m58s
Post / images (amd64, placement) (push) Failing after 7m57s
Post / images (amd64, registration) (push) Failing after 8m0s
Post / images (amd64, registration-operator) (push) Failing after 7m59s
Post / images (amd64, work) (push) Failing after 7m32s
Post / images (arm64, addon-manager) (push) Failing after 8m3s
Post / images (arm64, placement) (push) Failing after 7m41s
Post / images (arm64, registration) (push) Failing after 7m20s
Post / images (arm64, registration-operator) (push) Failing after 7m41s
Post / images (arm64, work) (push) Failing after 7m42s
Post / image manifest (addon-manager) (push) Has been skipped
Post / image manifest (placement) (push) Has been skipped
Post / image manifest (registration) (push) Has been skipped
Post / image manifest (registration-operator) (push) Has been skipped
Post / image manifest (work) (push) Has been skipped
Post / trigger clusteradm e2e (push) Has been skipped
Close stale issues and PRs / stale (push) Successful in 31s

Signed-off-by: Wei Liu <liuweixa@redhat.com>
This commit is contained in:
Wei Liu
2025-08-25 15:44:21 +08:00
committed by GitHub
parent 28040f5d9a
commit ef24cbbab4
11 changed files with 441 additions and 100 deletions

2
go.mod
View File

@@ -26,7 +26,6 @@ require (
github.com/spf13/pflag v1.0.7
github.com/stretchr/testify v1.10.0
github.com/valyala/fasttemplate v1.2.2
golang.org/x/net v0.43.0
gopkg.in/yaml.v2 v2.4.0
helm.sh/helm/v3 v3.18.5
k8s.io/api v0.33.4
@@ -156,6 +155,7 @@ require (
go.yaml.in/yaml/v3 v3.0.3 // indirect
golang.org/x/crypto v0.41.0 // indirect
golang.org/x/exp v0.0.0-20241217172543-b2144cdd0a67 // indirect
golang.org/x/net v0.43.0 // indirect
golang.org/x/oauth2 v0.30.0 // indirect
golang.org/x/sync v0.16.0 // indirect
golang.org/x/sys v0.35.0 // indirect

View File

@@ -7,3 +7,6 @@ const (
)
const GRPCCAuthSigner = "open-cluster-management.io/grpc"
// CSRUserAnnotation will be added to a CSR and used to identify the user who request the CSR
const CSRUserAnnotation = "open-cluster-management.io/csruser"

View File

@@ -60,8 +60,11 @@ type HubManagerOptions struct {
AutoApprovedARNPatterns []string
AwsResourceTags []string
Labels string
GRPCCAFile string
GRPCCAKeyFile string
// TODO (skeeey) introduce hub options for different drives to group these options
AutoApprovedGRPCUsers []string
GRPCCAFile string
GRPCCAKeyFile string
GRPCSigningDuration time.Duration
}
// NewHubManagerOptions returns a HubManagerOptions
@@ -71,6 +74,7 @@ func NewHubManagerOptions() *HubManagerOptions {
"work.open-cluster-management.io/v1/manifestworks"},
ImportOption: importeroptions.New(),
EnabledRegistrationDrivers: []string{commonhelpers.CSRAuthType},
GRPCSigningDuration: 720 * time.Hour,
}
}
@@ -93,11 +97,14 @@ func (m *HubManagerOptions) AddFlags(fs *pflag.FlagSet) {
"A bootstrap user list whose cluster registration requests can be automatically approved.")
fs.StringSliceVar(&m.AutoApprovedARNPatterns, "auto-approved-arn-patterns", m.AutoApprovedARNPatterns,
"A list of AWS EKS ARN patterns such that an EKS cluster will be auto approved if its ARN matches with any of the patterns")
fs.StringSliceVar(&m.AutoApprovedGRPCUsers, "auto-approved-grpc-users", m.AutoApprovedGRPCUsers,
"A bootstrap user list via gRPC whose cluster registration requests can be automatically approved.")
fs.StringSliceVar(&m.AwsResourceTags, "aws-resource-tags", m.AwsResourceTags, "A list of tags to apply to AWS resources created through the OCM controllers")
fs.StringVar(&m.Labels, "labels", m.Labels,
"Labels to be added to the resources created by registration controller. The format is key1=value1,key2=value2.")
fs.StringVar(&m.GRPCCAFile, "grpc-ca-file", m.GRPCCAFile, "ca file to sign client cert for grpc")
fs.StringVar(&m.GRPCCAKeyFile, "grpc-key-file", m.GRPCCAKeyFile, "ca key file to sign client cert for grpc")
fs.DurationVar(&m.GRPCSigningDuration, "grpc-signing-duration", m.GRPCSigningDuration, "The max length of duration signed certificates will be given.")
m.ImportOption.AddFlags(fs)
}
@@ -202,7 +209,10 @@ func (m *HubManagerOptions) RunControllerManagerWithInformers(
drivers = append(drivers, awsIRSAHubDriver)
case commonhelpers.GRPCCAuthType:
grpcHubDriver, err := grpc.NewGRPCHubDriver(
kubeClient, kubeInformers, m.GRPCCAKeyFile, m.GRPCCAFile, 720*time.Hour, controllerContext.EventRecorder)
kubeClient, kubeInformers,
m.GRPCCAKeyFile, m.GRPCCAFile, m.GRPCSigningDuration,
m.AutoApprovedGRPCUsers,
controllerContext.EventRecorder)
if err != nil {
return err
}

View File

@@ -28,46 +28,48 @@ const (
reconcileContinue
)
type csrInfo struct {
name string
labels map[string]string
signerName string
username string
uid string
groups []string
extra map[string]authorizationv1.ExtraValue
request []byte
type CSRInfo struct {
Name string
Labels map[string]string
SignerName string
Username string
UID string
Groups []string
Extra map[string]authorizationv1.ExtraValue
Request []byte
}
type approveCSRFunc func(kubernetes.Interface) error
type Reconciler interface {
Reconcile(context.Context, csrInfo, approveCSRFunc) (reconcileState, error)
Reconcile(context.Context, CSRInfo, approveCSRFunc) (reconcileState, error)
}
type csrRenewalReconciler struct {
signer string
kubeClient kubernetes.Interface
eventRecorder events.Recorder
}
func NewCSRRenewalReconciler(kubeClient kubernetes.Interface, recorder events.Recorder) Reconciler {
func NewCSRRenewalReconciler(kubeClient kubernetes.Interface, signer string, recorder events.Recorder) Reconciler {
return &csrRenewalReconciler{
signer: signer,
kubeClient: kubeClient,
eventRecorder: recorder.WithComponentSuffix("csr-approving-controller"),
}
}
func (r *csrRenewalReconciler) Reconcile(ctx context.Context, csr csrInfo, approveCSR approveCSRFunc) (reconcileState, error) {
func (r *csrRenewalReconciler) Reconcile(ctx context.Context, csr CSRInfo, approveCSR approveCSRFunc) (reconcileState, error) {
logger := klog.FromContext(ctx)
// Check whether current csr is a valid spoker cluster csr.
valid, _, commonName := validateCSR(logger, csr)
valid, _, commonName := validateCSR(logger, r.signer, csr)
if !valid {
logger.V(4).Info("CSR was not recognized", "csrName", csr.name)
logger.V(4).Info("CSR was not recognized", "csrName", csr.Name)
return reconcileStop, nil
}
// Check if user name in csr is the same as commonName field in csr request.
if csr.username != commonName {
if csr.Username != commonName {
return reconcileContinue, nil
}
@@ -77,7 +79,7 @@ func (r *csrRenewalReconciler) Reconcile(ctx context.Context, csr csrInfo, appro
return reconcileContinue, err
}
if !allowed {
logger.V(4).Info("Managed cluster csr cannot be auto approved due to subject access review not approved", "csrName", csr.name)
logger.V(4).Info("Managed cluster csr cannot be auto approved due to subject access review not approved", "csrName", csr.Name)
return reconcileStop, nil
}
@@ -85,37 +87,40 @@ func (r *csrRenewalReconciler) Reconcile(ctx context.Context, csr csrInfo, appro
return reconcileContinue, err
}
r.eventRecorder.Eventf("ManagedClusterCSRAutoApproved", "managed cluster csr %q is auto approved by hub csr controller", csr.name)
r.eventRecorder.Eventf("ManagedClusterCSRAutoApproved", "managed cluster csr %q is auto approved by hub csr controller", csr.Name)
return reconcileStop, nil
}
type csrBootstrapReconciler struct {
signer string
kubeClient kubernetes.Interface
approvalUsers sets.Set[string]
eventRecorder events.Recorder
}
func NewCSRBootstrapReconciler(kubeClient kubernetes.Interface,
signer string,
approvalUsers []string,
recorder events.Recorder) Reconciler {
return &csrBootstrapReconciler{
signer: signer,
kubeClient: kubeClient,
approvalUsers: sets.New(approvalUsers...),
eventRecorder: recorder.WithComponentSuffix("csr-approving-controller"),
}
}
func (b *csrBootstrapReconciler) Reconcile(ctx context.Context, csr csrInfo, approveCSR approveCSRFunc) (reconcileState, error) {
func (b *csrBootstrapReconciler) Reconcile(ctx context.Context, csr CSRInfo, approveCSR approveCSRFunc) (reconcileState, error) {
logger := klog.FromContext(ctx)
// Check whether current csr is a valid spoker cluster csr.
valid, clusterName, _ := validateCSR(logger, csr)
valid, clusterName, _ := validateCSR(logger, b.signer, csr)
if !valid {
logger.V(4).Info("CSR was not recognized", "csrName", csr.name)
logger.V(4).Info("CSR was not recognized", "csrName", csr.Name)
return reconcileStop, nil
}
// Check whether current csr can be approved.
if !b.approvalUsers.Has(csr.username) {
if !b.approvalUsers.Has(csr.Username) {
return reconcileContinue, nil
}
@@ -130,25 +135,25 @@ func (b *csrBootstrapReconciler) Reconcile(ctx context.Context, csr csrInfo, app
// To validate a managed cluster csr, we check
// 1. if the signer name in csr request is valid.
// 2. if organization field and commonName field in csr request is valid.
func validateCSR(logger klog.Logger, csr csrInfo) (bool, string, string) {
spokeClusterName, existed := csr.labels[clusterv1.ClusterNameLabelKey]
func validateCSR(logger klog.Logger, signer string, csr CSRInfo) (bool, string, string) {
spokeClusterName, existed := csr.Labels[clusterv1.ClusterNameLabelKey]
if !existed {
return false, "", ""
}
if csr.signerName != certificatesv1.KubeAPIServerClientSignerName {
if csr.SignerName != signer {
return false, "", ""
}
block, _ := pem.Decode(csr.request)
block, _ := pem.Decode(csr.Request)
if block == nil || block.Type != "CERTIFICATE REQUEST" {
logger.V(4).Info("CSR was not recognized: PEM block type is not CERTIFICATE REQUEST", "csrName", csr.name)
logger.V(4).Info("CSR was not recognized: PEM block type is not CERTIFICATE REQUEST", "csrName", csr.Name)
return false, "", ""
}
x509cr, err := x509.ParseCertificateRequest(block.Bytes)
if err != nil {
logger.Error(err, "CSR was not recognized", "csrName", csr.name)
logger.Error(err, "CSR was not recognized", "csrName", csr.Name)
return false, "", ""
}
@@ -174,13 +179,13 @@ func validateCSR(logger klog.Logger, csr csrInfo) (bool, string, string) {
// Using SubjectAccessReview API to check whether a spoke agent has been authorized to renew its csr,
// a spoke agent is authorized after its spoke cluster is accepted by hub cluster admin.
func authorize(ctx context.Context, kubeClient kubernetes.Interface, csr csrInfo) (bool, error) {
func authorize(ctx context.Context, kubeClient kubernetes.Interface, csr CSRInfo) (bool, error) {
sar := &authorizationv1.SubjectAccessReview{
Spec: authorizationv1.SubjectAccessReviewSpec{
User: csr.username,
UID: csr.uid,
Groups: csr.groups,
Extra: csr.extra,
User: csr.Username,
UID: csr.UID,
Groups: csr.Groups,
Extra: csr.Extra,
ResourceAttributes: &authorizationv1.ResourceAttributes{
Group: "register.open-cluster-management.io",
Resource: "managedclusters",
@@ -197,40 +202,47 @@ func authorize(ctx context.Context, kubeClient kubernetes.Interface, csr csrInfo
return sar.Status.Allowed, nil
}
// newCSRInfo creates csrInfo from CertificateSigningRequest by api version(v1/v1beta1).
func newCSRInfo(logger klog.Logger, csr any) csrInfo {
func getCSRInfo(csr *certificatesv1.CertificateSigningRequest) CSRInfo {
extra := make(map[string]authorizationv1.ExtraValue)
switch v := csr.(type) {
case *certificatesv1.CertificateSigningRequest:
for k, v := range v.Spec.Extra {
extra[k] = authorizationv1.ExtraValue(v)
}
return csrInfo{
name: v.Name,
labels: v.Labels,
signerName: v.Spec.SignerName,
username: v.Spec.Username,
uid: v.Spec.UID,
groups: v.Spec.Groups,
extra: extra,
request: v.Spec.Request,
}
case *certificatesv1beta1.CertificateSigningRequest:
for k, v := range v.Spec.Extra {
extra[k] = authorizationv1.ExtraValue(v)
}
return csrInfo{
name: v.Name,
labels: v.Labels,
signerName: *v.Spec.SignerName,
username: v.Spec.Username,
uid: v.Spec.UID,
groups: v.Spec.Groups,
extra: extra,
request: v.Spec.Request,
}
default:
logger.Error(nil, "Unsupported Type", "valueType", v)
return csrInfo{}
for k, v := range csr.Spec.Extra {
extra[k] = authorizationv1.ExtraValue(v)
}
return CSRInfo{
Name: csr.Name,
Labels: csr.Labels,
SignerName: csr.Spec.SignerName,
Username: csr.Spec.Username,
UID: csr.Spec.UID,
Groups: csr.Spec.Groups,
Extra: extra,
Request: csr.Spec.Request,
}
}
func getCSRv1beta1Info(csr *certificatesv1beta1.CertificateSigningRequest) CSRInfo {
extra := make(map[string]authorizationv1.ExtraValue)
for k, v := range csr.Spec.Extra {
extra[k] = authorizationv1.ExtraValue(v)
}
return CSRInfo{
Name: csr.Name,
Labels: csr.Labels,
SignerName: *csr.Spec.SignerName,
Username: csr.Spec.Username,
UID: csr.Spec.UID,
Groups: csr.Spec.Groups,
Extra: extra,
Request: csr.Spec.Request,
}
}
func eventFilter(csr any) bool {
switch v := csr.(type) {
case *certificatesv1.CertificateSigningRequest:
return v.Spec.SignerName == certificatesv1.KubeAPIServerClientSignerName
case *certificatesv1beta1.CertificateSigningRequest:
return *v.Spec.SignerName == certificatesv1beta1.KubeAPIServerClientSignerName
default:
return false
}
}

View File

@@ -146,11 +146,15 @@ func Test_v1beta1CSRApprovingController_sync(t *testing.T) {
}
ctrl := &csrApprovingController[*certificatesv1beta1.CertificateSigningRequest]{
lister: informerFactory.Certificates().V1beta1().CertificateSigningRequests().Lister(),
approver: newCSRV1beta1Approver(kubeClient),
lister: informerFactory.Certificates().V1beta1().CertificateSigningRequests().Lister(),
approver: newCSRV1beta1Approver(kubeClient),
csrInfoGetter: getCSRv1beta1Info,
reconcilers: []Reconciler{
&csrBootstrapReconciler{},
&csrBootstrapReconciler{
signer: certificatesv1beta1.KubeAPIServerClientSignerName,
},
&csrRenewalReconciler{
signer: certificatesv1beta1.KubeAPIServerClientSignerName,
kubeClient: kubeClient,
eventRecorder: eventstesting.NewTestingEventRecorder(t),
},

View File

@@ -9,6 +9,7 @@ import (
"github.com/openshift/library-go/pkg/operator/events/eventstesting"
authorizationv1 "k8s.io/api/authorization/v1"
certificatesv1 "k8s.io/api/certificates/v1"
certificatesv1beta1 "k8s.io/api/certificates/v1beta1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@@ -205,17 +206,20 @@ func TestSync(t *testing.T) {
recorder := eventstesting.NewTestingEventRecorder(t)
ctrl := &csrApprovingController[*certificatesv1.CertificateSigningRequest]{
lister: informerFactory.Certificates().V1().CertificateSigningRequests().Lister(),
approver: newCSRV1Approver(kubeClient),
lister: informerFactory.Certificates().V1().CertificateSigningRequests().Lister(),
approver: NewCSRV1Approver(kubeClient),
csrInfoGetter: getCSRInfo,
reconcilers: []Reconciler{
&csrBootstrapReconciler{
signer: certificatesv1.KubeAPIServerClientSignerName,
kubeClient: kubeClient,
eventRecorder: recorder,
approvalUsers: sets.Set[string]{},
},
NewCSRRenewalReconciler(kubeClient, recorder),
NewCSRRenewalReconciler(kubeClient, certificatesv1.KubeAPIServerClientSignerName, recorder),
NewCSRBootstrapReconciler(
kubeClient,
certificatesv1.KubeAPIServerClientSignerName,
c.approvalUsers,
recorder,
),
@@ -317,7 +321,7 @@ func TestIsSpokeClusterClientCertRenewal(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
isRenewal, clusterName, commonName := validateCSR(logger, newCSRInfo(logger, testinghelpers.NewCSR(c.csr)))
isRenewal, clusterName, commonName := validateCSR(logger, certificatesv1.KubeAPIServerClientSignerName, getCSRInfo(testinghelpers.NewCSR(c.csr)))
if isRenewal != c.isRenewal {
t.Errorf("expected %t, but failed", c.isRenewal)
}
@@ -347,3 +351,67 @@ func TestNewApprover(t *testing.T) {
t.Error(err)
}
}
func TestEventFilter(t *testing.T) {
tests := []struct {
name string
input any
expected bool
}{
{
name: "nil input",
input: nil,
expected: false,
},
{
name: "v1 CSR with matching signer",
input: &certificatesv1.CertificateSigningRequest{
Spec: certificatesv1.CertificateSigningRequestSpec{
SignerName: certificatesv1.KubeAPIServerClientSignerName,
},
},
expected: true,
},
{
name: "v1 CSR with non-matching signer",
input: &certificatesv1.CertificateSigningRequest{
Spec: certificatesv1.CertificateSigningRequestSpec{
SignerName: "example.com/custom",
},
},
expected: false,
},
{
name: "v1beta1 CSR with matching signer",
input: func() *certificatesv1beta1.CertificateSigningRequest {
signer := certificatesv1beta1.KubeAPIServerClientSignerName
return &certificatesv1beta1.CertificateSigningRequest{
Spec: certificatesv1beta1.CertificateSigningRequestSpec{
SignerName: &signer,
},
}
}(),
expected: true,
},
{
name: "v1beta1 CSR with non-matching signer",
input: func() *certificatesv1beta1.CertificateSigningRequest {
signer := "example.com/custom"
return &certificatesv1beta1.CertificateSigningRequest{
Spec: certificatesv1beta1.CertificateSigningRequestSpec{
SignerName: &signer,
},
}
}(),
expected: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := eventFilter(tt.input); got != tt.expected {
t.Errorf("eventFilter() = %v, want %v", got, tt.expected)
}
})
}
}

View File

@@ -29,6 +29,8 @@ type CSR interface {
*certificatesv1.CertificateSigningRequest | *certificatesv1beta1.CertificateSigningRequest
}
type CSRInfoGetter[T CSR] func(c T) CSRInfo
type CSRLister[T CSR] interface {
Get(name string) (T, error)
}
@@ -40,26 +42,30 @@ type csrApprover[T CSR] interface {
// csrApprovingController auto approve the renewal CertificateSigningRequests for an accepted spoke cluster on the hub.
type csrApprovingController[T CSR] struct {
lister CSRLister[T]
approver csrApprover[T]
reconcilers []Reconciler
lister CSRLister[T]
approver csrApprover[T]
csrInfoGetter CSRInfoGetter[T]
reconcilers []Reconciler
}
// NewCSRApprovingController creates a new csr approving controller
func NewCSRApprovingController[T CSR](
csrInformer cache.SharedIndexInformer,
lister CSRLister[T],
eventFilter factory.EventFilterFunc,
approver csrApprover[T],
csrInfoGetter CSRInfoGetter[T],
reconcilers []Reconciler,
recorder events.Recorder) factory.Controller {
c := &csrApprovingController[T]{
lister: lister,
approver: approver,
reconcilers: reconcilers,
lister: lister,
approver: approver,
csrInfoGetter: csrInfoGetter,
reconcilers: reconcilers,
}
return factory.New().
WithInformersQueueKeysFunc(queue.QueueKeyByMetaName, csrInformer).
WithFilteredEventsInformersQueueKeysFunc(queue.QueueKeyByMetaName, eventFilter, csrInformer).
WithSync(c.sync).
ToController("CSRApprovingController", recorder)
}
@@ -81,7 +87,7 @@ func (c *csrApprovingController[T]) sync(ctx context.Context, syncCtx factory.Sy
return nil
}
csrInfo := newCSRInfo(logger, csr)
csrInfo := c.csrInfoGetter(csr)
for _, r := range c.reconcilers {
state, err := r.Reconcile(ctx, csrInfo, c.approver.approve(ctx, csr))
if err != nil {
@@ -102,7 +108,7 @@ type csrV1Approver struct {
kubeClient kubernetes.Interface
}
func newCSRV1Approver(client kubernetes.Interface) *csrV1Approver {
func NewCSRV1Approver(client kubernetes.Interface) *csrV1Approver {
return &csrV1Approver{kubeClient: client}
}
@@ -155,8 +161,7 @@ func (c *csrV1beta1Approver) approve(ctx context.Context, csr *certificatesv1bet
}
type CSRHubDriver struct {
controller factory.Controller
autoApprovedCSRUsers []string
controller factory.Controller
}
func (c *CSRHubDriver) Run(ctx context.Context, workers int) {
@@ -174,13 +179,13 @@ func NewCSRHubDriver(
kubeInformers informers.SharedInformerFactory,
autoApprovedCSRUsers []string,
recorder events.Recorder) (register.HubDriver, error) {
csrDriverForHub := &CSRHubDriver{
autoApprovedCSRUsers: autoApprovedCSRUsers,
}
csrReconciles := []Reconciler{NewCSRRenewalReconciler(kubeClient, recorder)}
csrDriverForHub := &CSRHubDriver{}
csrReconciles := []Reconciler{NewCSRRenewalReconciler(kubeClient, certificatesv1.KubeAPIServerClientSignerName, recorder)}
if features.HubMutableFeatureGate.Enabled(ocmfeature.ManagedClusterAutoApproval) {
csrReconciles = append(csrReconciles, NewCSRBootstrapReconciler(
kubeClient,
certificatesv1.KubeAPIServerClientSignerName,
autoApprovedCSRUsers,
recorder,
))
@@ -193,10 +198,12 @@ func NewCSRHubDriver(
}
if !v1CSRSupported && v1beta1CSRSupported {
csrDriverForHub.controller = NewCSRApprovingController[*certificatesv1beta1.CertificateSigningRequest](
csrDriverForHub.controller = NewCSRApprovingController(
kubeInformers.Certificates().V1beta1().CertificateSigningRequests().Informer(),
kubeInformers.Certificates().V1beta1().CertificateSigningRequests().Lister(),
eventFilter,
newCSRV1beta1Approver(kubeClient),
getCSRv1beta1Info,
csrReconciles,
recorder,
)
@@ -205,10 +212,12 @@ func NewCSRHubDriver(
}
}
csrDriverForHub.controller = NewCSRApprovingController[*certificatesv1.CertificateSigningRequest](
csrDriverForHub.controller = NewCSRApprovingController(
kubeInformers.Certificates().V1().CertificateSigningRequests().Informer(),
kubeInformers.Certificates().V1().CertificateSigningRequests().Lister(),
newCSRV1Approver(kubeClient),
eventFilter,
NewCSRV1Approver(kubeClient),
getCSRInfo,
csrReconciles,
recorder,
)

View File

@@ -1,13 +1,14 @@
package grpc
import (
"context"
"fmt"
"os"
"time"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
"golang.org/x/net/context"
authorizationv1 "k8s.io/api/authorization/v1"
certificatesv1 "k8s.io/api/certificates/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
@@ -19,18 +20,23 @@ import (
certificatesv1listers "k8s.io/client-go/listers/certificates/v1"
clusterv1 "open-cluster-management.io/api/cluster/v1"
ocmfeature "open-cluster-management.io/api/feature"
sdkhelpers "open-cluster-management.io/sdk-go/pkg/helpers"
"open-cluster-management.io/ocm/pkg/common/helpers"
"open-cluster-management.io/ocm/pkg/features"
"open-cluster-management.io/ocm/pkg/registration/register"
"open-cluster-management.io/ocm/pkg/registration/register/csr"
)
type GRPCHubDriver struct {
controller factory.Controller
csrApprovingController factory.Controller
csrSignController factory.Controller
}
func (c *GRPCHubDriver) Run(ctx context.Context, workers int) {
c.controller.Run(ctx, workers)
go c.csrApprovingController.Run(ctx, workers)
c.csrSignController.Run(ctx, workers)
}
func (c *GRPCHubDriver) Cleanup(_ context.Context, _ *clusterv1.ManagedCluster) error {
@@ -43,7 +49,18 @@ func NewGRPCHubDriver(
kubeInformers informers.SharedInformerFactory,
caKeyFile, caFile string,
duration time.Duration,
autoApprovedCSRUsers []string,
recorder events.Recorder) (register.HubDriver, error) {
csrReconciles := []csr.Reconciler{csr.NewCSRRenewalReconciler(kubeClient, helpers.GRPCCAuthSigner, recorder)}
if features.HubMutableFeatureGate.Enabled(ocmfeature.ManagedClusterAutoApproval) {
csrReconciles = append(csrReconciles, csr.NewCSRBootstrapReconciler(
kubeClient,
helpers.GRPCCAuthSigner,
autoApprovedCSRUsers,
recorder,
))
}
caData, err := os.ReadFile(caFile)
if err != nil {
return nil, err
@@ -53,7 +70,16 @@ func NewGRPCHubDriver(
return nil, err
}
return &GRPCHubDriver{
controller: newCSRSignController(
csrApprovingController: csr.NewCSRApprovingController(
kubeInformers.Certificates().V1().CertificateSigningRequests().Informer(),
kubeInformers.Certificates().V1().CertificateSigningRequests().Lister(),
eventFilter,
csr.NewCSRV1Approver(kubeClient),
getCSRInfo,
csrReconciles,
recorder,
),
csrSignController: newCSRSignController(
kubeClient,
kubeInformers.Certificates().V1().CertificateSigningRequests(),
caKey, caData, duration, recorder,
@@ -156,3 +182,29 @@ func (c *csrSignController) sync(ctx context.Context, syncCtx factory.SyncContex
_, err = c.kubeClient.CertificatesV1().CertificateSigningRequests().UpdateStatus(ctx, csr, metav1.UpdateOptions{})
return err
}
func getCSRInfo(c *certificatesv1.CertificateSigningRequest) csr.CSRInfo {
extra := make(map[string]authorizationv1.ExtraValue)
for k, v := range c.Spec.Extra {
extra[k] = authorizationv1.ExtraValue(v)
}
return csr.CSRInfo{
Name: c.Name,
Labels: c.Labels,
SignerName: c.Spec.SignerName,
Username: c.Annotations[helpers.CSRUserAnnotation],
UID: c.Spec.UID,
Groups: c.Spec.Groups,
Extra: extra,
Request: c.Spec.Request,
}
}
func eventFilter(csr any) bool {
switch v := csr.(type) {
case *certificatesv1.CertificateSigningRequest:
return v.Spec.SignerName == helpers.GRPCCAuthSigner
default:
return false
}
}

View File

@@ -151,3 +151,61 @@ func TestSignCSR(t *testing.T) {
})
}
}
func TestEventFilter(t *testing.T) {
tests := []struct {
name string
input any
expected bool
}{
{
name: "nil input",
input: nil,
expected: false,
},
{
name: "v1 CSR with matching signer",
input: &certificatesv1.CertificateSigningRequest{
Spec: certificatesv1.CertificateSigningRequestSpec{
SignerName: helpers.GRPCCAuthSigner,
},
},
expected: true,
},
{
name: "v1 CSR with non-matching signer",
input: &certificatesv1.CertificateSigningRequest{
Spec: certificatesv1.CertificateSigningRequestSpec{
SignerName: "example.com/custom",
},
},
expected: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := eventFilter(tt.input); got != tt.expected {
t.Errorf("eventFilter() = %v, want %v", got, tt.expected)
}
})
}
}
func TestGetCSRInfo(t *testing.T) {
csr := &certificatesv1.CertificateSigningRequest{
ObjectMeta: metav1.ObjectMeta{
Annotations: map[string]string{
helpers.CSRUserAnnotation: "test",
},
},
Spec: certificatesv1.CertificateSigningRequestSpec{
SignerName: helpers.GRPCCAuthSigner,
},
}
info := getCSRInfo(csr)
if info.Username != "test" {
t.Errorf("unexpected username %s", info.Username)
}
}

View File

@@ -19,7 +19,9 @@ import (
csrce "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/csr"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
"open-cluster-management.io/sdk-go/pkg/cloudevents/server"
"open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/authn"
"open-cluster-management.io/ocm/pkg/common/helpers"
"open-cluster-management.io/ocm/pkg/server/services"
)
@@ -84,6 +86,11 @@ func (c *CSRService) HandleStatusUpdate(ctx context.Context, evt *cloudevents.Ev
switch eventType.Action {
case types.CreateRequestAction:
// The agent requests a CSR, and the gRPC server creates the CSR on the hub. As a result,
// the username in the csr is the service account of gRPC server rather than the user of agent.
// The approver controller in the registration will not be able to know where this CSR originates
// from. Therefore, this annotation with the agent's username is added for CSR approval checks.
csr.Annotations = map[string]string{helpers.CSRUserAnnotation: fmt.Sprintf("%v", ctx.Value(authn.ContextUserKey))}
_, err := c.csrClient.CertificatesV1().CertificateSigningRequests().Create(ctx, csr, metav1.CreateOptions{})
return err
default:

View File

@@ -19,6 +19,7 @@ import (
csrce "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/csr"
eventce "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/event"
leasece "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/lease"
grpcauthn "open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/authn"
grpcoptions "open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/options"
"open-cluster-management.io/ocm/pkg/common/helpers"
@@ -78,7 +79,9 @@ var _ = ginkgo.Describe("Registration using GRPC", ginkgo.Ordered, ginkgo.Label(
hook, err := util.NewGRPCServerRegistrationHook(hubCfg)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
server := grpcoptions.NewServer(gRPCServerOptions).WithPreStartHooks(hook).WithService(
server := grpcoptions.NewServer(gRPCServerOptions).WithAuthenticator(
grpcauthn.NewMtlsAuthenticator(),
).WithPreStartHooks(hook).WithService(
clusterce.ManagedClusterEventDataType,
cluster.NewClusterService(hook.ClusterClient, hook.ClusterInformers.Cluster().V1().ManagedClusters()),
).WithService(
@@ -200,6 +203,121 @@ var _ = ginkgo.Describe("Registration using GRPC", ginkgo.Ordered, ginkgo.Label(
})
})
})
ginkgo.Context("Certificate rotation", func() {
ginkgo.BeforeEach(func() {
postfix = rand.String(5)
hubOptionWithGRPC = hub.NewHubManagerOptions()
hubOptionWithGRPC.EnabledRegistrationDrivers = []string{helpers.GRPCCAuthType}
hubOptionWithGRPC.GRPCCAFile = gRPCServerOptions.ClientCAFile
hubOptionWithGRPC.GRPCCAKeyFile = gRPCCAKeyFile
hubOptionWithGRPC.GRPCSigningDuration = 5 * time.Second
startHub(hubOptionWithGRPC)
grpcManagedClusterName = fmt.Sprintf("%s-cert-rotation-%s", grpcTest, postfix)
hubGRPCConfigSecret = fmt.Sprintf("%s-hub-grpcconfig-secret-%s", grpcTest, postfix)
hubGRPCConfigDir = path.Join(util.TestDir, fmt.Sprintf("%s-grpc-%s", grpcTest, postfix), "hub-kubeconfig")
grpcDriverOption := factory.NewOptions()
grpcDriverOption.RegistrationAuth = helpers.GRPCCAuthType
grpcDriverOption.GRPCOption = &grpc.Option{
BootstrapConfigFile: bootstrapGRPCConfigFile,
ConfigFile: path.Join(hubGRPCConfigDir, "config.yaml"),
}
grpcAgentOptions := &spoke.SpokeAgentOptions{
BootstrapKubeconfig: bootstrapKubeConfigFile,
HubKubeconfigSecret: hubGRPCConfigSecret,
ClusterHealthCheckPeriod: 1 * time.Minute,
RegisterDriverOption: grpcDriverOption,
}
grpcCommOptions := commonoptions.NewAgentOptions()
grpcCommOptions.HubKubeconfigDir = hubGRPCConfigDir
grpcCommOptions.SpokeClusterName = grpcManagedClusterName
stopGRPCAgent = runAgent(fmt.Sprintf("%s-cert-rotation-agent", grpcTest), grpcAgentOptions, grpcCommOptions, spokeCfg)
})
ginkgo.AfterEach(func() {
stopGRPCAgent()
stopHub()
})
ginkgo.It("should automatically rotate the certificate when it is about to expire", func() {
ginkgo.By("getting managedclusters and csrs after bootstrap", func() {
assertManagedCluster(grpcManagedClusterName)
})
// simulate hub cluster admin to accept the managedcluster and approve the csr
ginkgo.By("accept managedclusters and approve csrs", func() {
err = util.AcceptManagedCluster(clusterClient, grpcManagedClusterName)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
// for gpc, the hub controller will sign the client certs, we just approve
// the csr here
csr, err := util.FindUnapprovedSpokeCSR(kubeClient, grpcManagedClusterName)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
err = util.ApproveCSR(kubeClient, csr)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
})
ginkgo.By("getting managedclusters joined condition", func() {
assertManagedClusterJoined(grpcManagedClusterName, hubGRPCConfigSecret)
})
// the agent should rotate the certificate because the certificate with a short valid time
// the hub controller should auto approve it
gomega.Eventually(func() error {
if _, err := util.FindAutoApprovedSpokeCSR(kubeClient, grpcManagedClusterName); err != nil {
return err
}
return nil
}, eventuallyTimeout, eventuallyInterval).ShouldNot(gomega.HaveOccurred())
})
})
ginkgo.Context("Auto approval", func() {
ginkgo.BeforeEach(func() {
postfix = rand.String(5)
hubOptionWithGRPC = hub.NewHubManagerOptions()
hubOptionWithGRPC.EnabledRegistrationDrivers = []string{helpers.GRPCCAuthType}
hubOptionWithGRPC.GRPCCAFile = gRPCServerOptions.ClientCAFile
hubOptionWithGRPC.GRPCCAKeyFile = gRPCCAKeyFile
hubOptionWithGRPC.AutoApprovedGRPCUsers = []string{"test-client"}
startHub(hubOptionWithGRPC)
grpcManagedClusterName = fmt.Sprintf("%s-auto-approval-%s", grpcTest, postfix)
hubGRPCConfigSecret = fmt.Sprintf("%s-hub-grpcconfig-secret-%s", grpcTest, postfix)
hubGRPCConfigDir = path.Join(util.TestDir, fmt.Sprintf("%s-grpc-%s", grpcTest, postfix), "hub-kubeconfig")
grpcDriverOption := factory.NewOptions()
grpcDriverOption.RegistrationAuth = helpers.GRPCCAuthType
grpcDriverOption.GRPCOption = &grpc.Option{
BootstrapConfigFile: bootstrapGRPCConfigFile,
ConfigFile: path.Join(hubGRPCConfigDir, "config.yaml"),
}
grpcAgentOptions := &spoke.SpokeAgentOptions{
BootstrapKubeconfig: bootstrapKubeConfigFile,
HubKubeconfigSecret: hubGRPCConfigSecret,
ClusterHealthCheckPeriod: 1 * time.Minute,
RegisterDriverOption: grpcDriverOption,
}
grpcCommOptions := commonoptions.NewAgentOptions()
grpcCommOptions.HubKubeconfigDir = hubGRPCConfigDir
grpcCommOptions.SpokeClusterName = grpcManagedClusterName
stopGRPCAgent = runAgent(fmt.Sprintf("%s-auto-approval-agent", grpcTest), grpcAgentOptions, grpcCommOptions, spokeCfg)
})
ginkgo.AfterEach(func() {
stopGRPCAgent()
stopHub()
})
ginkgo.It("should automatically approve managedcluster", func() {
ginkgo.By("getting managedclusters joined condition", func() {
assertManagedClusterJoined(grpcManagedClusterName, hubGRPCConfigSecret)
})
})
})
})
func assertManagedCluster(clusterName string) {