grpc server (#1058)
Some checks failed
Post / coverage (push) Failing after 36m50s
Post / images (amd64) (push) Failing after 8m47s
Post / images (arm64) (push) Failing after 8m15s
Post / image manifest (push) Has been skipped
Post / trigger clusteradm e2e (push) Has been skipped
Scorecard supply-chain security / Scorecard analysis (push) Failing after 1m8s
Close stale issues and PRs / stale (push) Successful in 50s

Signed-off-by: Wei Liu <liuweixa@redhat.com>
This commit is contained in:
Wei Liu
2025-07-09 16:59:10 +08:00
committed by GitHub
parent cbff56ad4b
commit 7924226eba
70 changed files with 5808 additions and 73 deletions

View File

@@ -46,6 +46,7 @@ import (
"open-cluster-management.io/ocm/pkg/registration/register"
awsirsa "open-cluster-management.io/ocm/pkg/registration/register/aws_irsa"
"open-cluster-management.io/ocm/pkg/registration/register/csr"
"open-cluster-management.io/ocm/pkg/registration/register/grpc"
)
// HubManagerOptions holds configuration for hub manager controller
@@ -59,6 +60,8 @@ type HubManagerOptions struct {
AutoApprovedARNPatterns []string
AwsResourceTags []string
Labels string
GRPCCAFile string
GRPCCAKeyFile string
}
// NewHubManagerOptions returns a HubManagerOptions
@@ -93,6 +96,8 @@ func (m *HubManagerOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringSliceVar(&m.AwsResourceTags, "aws-resource-tags", m.AwsResourceTags, "A list of tags to apply to AWS resources created through the OCM controllers")
fs.StringVar(&m.Labels, "labels", m.Labels,
"Labels to be added to the resources created by registration controller. The format is key1=value1,key2=value2.")
fs.StringVar(&m.GRPCCAFile, "grpc-ca-file", m.GRPCCAFile, "ca file to sign client cert for grpc")
fs.StringVar(&m.GRPCCAKeyFile, "grpc-key-file", m.GRPCCAKeyFile, "ca key file to sign client cert for grpc")
m.ImportOption.AddFlags(fs)
}
@@ -195,6 +200,13 @@ func (m *HubManagerOptions) RunControllerManagerWithInformers(
return err
}
drivers = append(drivers, awsIRSAHubDriver)
case commonhelpers.GRPCCAuthType:
grpcHubDriver, err := grpc.NewGRPCHubDriver(
kubeClient, kubeInformers, m.GRPCCAKeyFile, m.GRPCCAFile, 720*time.Hour, controllerContext.EventRecorder)
if err != nil {
return err
}
drivers = append(drivers, grpcHubDriver)
}
}
hubDriver := register.NewAggregatedHubDriver(drivers...)

View File

@@ -16,7 +16,7 @@ rules:
# TODO for backward compatible, we do not limit the resource name
# remove this after we no longer support lower versions kubernetes (less than 1.14)
#resourceNames: ["managed-cluster-lease"]
verbs: ["get", "update"]
verbs: ["get", "list", "watch", "update"]
# Allow agent to get/list/watch managed cluster addons
- apiGroups: ["addon.open-cluster-management.io"]
resources: ["managedclusteraddons"]

View File

@@ -9,7 +9,6 @@ import (
"math/rand"
"os"
"path"
"reflect"
"strings"
"time"
@@ -17,9 +16,9 @@ import (
"github.com/openshift/library-go/pkg/operator/events"
certificates "k8s.io/api/certificates/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
@@ -281,7 +280,7 @@ func (c *CSRDriver) IsHubKubeConfigValid(ctx context.Context, secretOption regis
if secretOption.ClusterName != clusterNameInCert || secretOption.AgentName != agentNameInCert {
logger.V(4).Info("Certificate in file is issued for different agent",
"certPath", certPath,
"issuedFor", fmt.Sprintf("%s:%s", secretOption.ClusterName, secretOption.AgentName),
"issuedFor", fmt.Sprintf("%s:%s", clusterNameInCert, agentNameInCert),
"expectedFor", fmt.Sprintf("%s:%s", secretOption.ClusterName, secretOption.AgentName))
return false, nil
@@ -349,23 +348,25 @@ func (c *CSRDriver) BuildClients(ctx context.Context, secretOption register.Secr
return nil, fmt.Errorf("failed to create CSR control: %w", err)
}
err = csrControl.Informer().AddIndexers(cache.Indexers{
indexByCluster: indexByClusterFunc,
})
err = c.SetCSRControl(csrControl, secretOption.ClusterName)
if err != nil {
return nil, err
}
return clients, nil
}
err = csrControl.Informer().AddIndexers(cache.Indexers{
indexByAddon: indexByAddonFunc,
})
if err != nil {
utilruntime.HandleError(err)
func (c *CSRDriver) SetCSRControl(csrControl CSRControl, clusterName string) error {
if err := csrControl.Informer().AddIndexers(cache.Indexers{indexByCluster: indexByClusterFunc}); err != nil {
return err
}
if err := csrControl.Informer().AddIndexers(cache.Indexers{indexByAddon: indexByAddonFunc}); err != nil {
return err
}
c.csrControl = csrControl
c.haltCSRCreation = haltCSRCreationFunc(csrControl.Informer().GetIndexer(), secretOption.ClusterName)
return clients, nil
c.haltCSRCreation = haltCSRCreationFunc(csrControl.Informer().GetIndexer(), clusterName)
return nil
}
var _ register.RegisterDriver = &CSRDriver{}
@@ -481,7 +482,7 @@ func hasAdditionalSecretData(additionalSecretData map[string][]byte, secret *cor
return fmt.Errorf("key %q not found in secret %q", k, secret.Namespace+"/"+secret.Name)
}
if !reflect.DeepEqual(v, value) {
if !equality.Semantic.DeepEqual(v, value) {
return fmt.Errorf("key %q in secret %q does not match the expected value",
k, secret.Namespace+"/"+secret.Name)
}

View File

@@ -388,6 +388,16 @@ func TestIsHubKubeConfigValidFunc(t *testing.T) {
tlsCert: cert1.Cert,
isValid: false,
},
{
name: "invalid issuer",
clusterName: "cluster2",
agentName: "agent1",
kubeconfig: kubeconfig,
bootstapKubeconfig: testinghelpers.NewKubeconfig("c1", "https://127.0.0.1:6001", "", "", nil, nil, nil),
tlsKey: cert1.Key,
tlsCert: cert1.Cert,
isValid: false,
},
{
name: "valid hub client config",
clusterName: "cluster1",

View File

@@ -7,18 +7,21 @@ import (
"open-cluster-management.io/ocm/pkg/registration/register"
awsirsa "open-cluster-management.io/ocm/pkg/registration/register/aws_irsa"
"open-cluster-management.io/ocm/pkg/registration/register/csr"
"open-cluster-management.io/ocm/pkg/registration/register/grpc"
)
type Options struct {
RegistrationAuth string
CSROption *csr.Option
AWSISRAOption *awsirsa.AWSOption
GRPCOption *grpc.Option
}
func NewOptions() *Options {
return &Options{
CSROption: csr.NewCSROption(),
AWSISRAOption: awsirsa.NewAWSOption(),
GRPCOption: grpc.NewOptions(),
}
}
@@ -27,12 +30,15 @@ func (s *Options) AddFlags(fs *pflag.FlagSet) {
"The type of authentication to use to authenticate with hub.")
s.CSROption.AddFlags(fs)
s.AWSISRAOption.AddFlags(fs)
s.GRPCOption.AddFlags(fs)
}
func (s *Options) Validate() error {
switch s.RegistrationAuth {
case helpers.AwsIrsaAuthType:
return s.AWSISRAOption.Validate()
case helpers.GRPCCAuthType:
return s.GRPCOption.Validate()
default:
return s.CSROption.Validate()
}
@@ -42,6 +48,8 @@ func (s *Options) Driver(secretOption register.SecretOption) (register.RegisterD
switch s.RegistrationAuth {
case helpers.AwsIrsaAuthType:
return awsirsa.NewAWSIRSADriver(s.AWSISRAOption, secretOption), nil
case helpers.GRPCCAuthType:
return grpc.NewGRPCDriver(s.GRPCOption, s.CSROption, secretOption)
default:
return csr.NewCSRDriver(s.CSROption, secretOption)
}

View File

@@ -5,6 +5,7 @@ import (
awsirsa "open-cluster-management.io/ocm/pkg/registration/register/aws_irsa"
"open-cluster-management.io/ocm/pkg/registration/register/csr"
"open-cluster-management.io/ocm/pkg/registration/register/grpc"
)
func TestValidate(t *testing.T) {
@@ -51,6 +52,34 @@ func TestValidate(t *testing.T) {
},
expectErr: false,
},
{
name: "grpc validate",
opt: &Options{
RegistrationAuth: "grpc",
GRPCOption: &grpc.Option{},
},
expectErr: true,
},
{
name: "grpc validate pass (bootstrap config)",
opt: &Options{
RegistrationAuth: "grpc",
GRPCOption: &grpc.Option{
BootstrapConfigFile: "test-bootstrap-config",
},
},
expectErr: false,
},
{
name: "grpc validate pass",
opt: &Options{
RegistrationAuth: "grpc",
GRPCOption: &grpc.Option{
ConfigFile: "test-config",
},
},
expectErr: false,
},
}
for _, tt := range tests {

View File

@@ -0,0 +1,158 @@
package grpc
import (
"fmt"
"os"
"time"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
"golang.org/x/net/context"
certificatesv1 "k8s.io/api/certificates/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
certificatesv1informers "k8s.io/client-go/informers/certificates/v1"
"k8s.io/client-go/kubernetes"
certificatesv1listers "k8s.io/client-go/listers/certificates/v1"
clusterv1 "open-cluster-management.io/api/cluster/v1"
sdkhelpers "open-cluster-management.io/sdk-go/pkg/helpers"
"open-cluster-management.io/ocm/pkg/common/helpers"
"open-cluster-management.io/ocm/pkg/registration/register"
)
type GRPCHubDriver struct {
controller factory.Controller
}
func (c *GRPCHubDriver) Run(ctx context.Context, workers int) {
c.controller.Run(ctx, workers)
}
func (c *GRPCHubDriver) Cleanup(_ context.Context, _ *clusterv1.ManagedCluster) error {
// noop
return nil
}
func NewGRPCHubDriver(
kubeClient kubernetes.Interface,
kubeInformers informers.SharedInformerFactory,
caKeyFile, caFile string,
duration time.Duration,
recorder events.Recorder) (register.HubDriver, error) {
caData, err := os.ReadFile(caFile)
if err != nil {
return nil, err
}
caKey, err := os.ReadFile(caKeyFile)
if err != nil {
return nil, err
}
return &GRPCHubDriver{
controller: newCSRSignController(
kubeClient,
kubeInformers.Certificates().V1().CertificateSigningRequests(),
caKey, caData, duration, recorder,
),
}, nil
}
func (a *GRPCHubDriver) CreatePermissions(_ context.Context, _ *clusterv1.ManagedCluster) error {
// noop
return nil
}
func (c *GRPCHubDriver) Accept(_ *clusterv1.ManagedCluster) bool {
return true
}
type csrSignController struct {
kubeClient kubernetes.Interface
csrLister certificatesv1listers.CertificateSigningRequestLister
caKey []byte
caData []byte
duration time.Duration
}
// newCSRSignController creates a new csr signing controller
func newCSRSignController(
kubeClient kubernetes.Interface,
csrInformer certificatesv1informers.CertificateSigningRequestInformer,
caKey, caData []byte,
duration time.Duration,
recorder events.Recorder,
) factory.Controller {
c := &csrSignController{
kubeClient: kubeClient,
csrLister: csrInformer.Lister(),
caKey: caKey,
caData: caData,
duration: duration,
}
return factory.New().
WithFilteredEventsInformersQueueKeysFunc(
func(obj runtime.Object) []string {
accessor, _ := meta.Accessor(obj)
return []string{accessor.GetName()}
},
func(obj interface{}) bool {
accessor, _ := meta.Accessor(obj)
if len(accessor.GetLabels()) == 0 {
return false
}
labels := accessor.GetLabels()
if _, ok := labels[clusterv1.ClusterNameLabelKey]; !ok {
return false
}
return true
},
csrInformer.Informer()).
WithSync(c.sync).
ToController("CSRSignController", recorder)
}
func (c *csrSignController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
csrName := syncCtx.QueueKey()
csr, err := c.csrLister.Get(csrName)
if errors.IsNotFound(err) {
return nil
}
if err != nil {
return err
}
csr = csr.DeepCopy()
approved := false
for _, condition := range csr.Status.Conditions {
if condition.Type == certificatesv1.CertificateApproved {
approved = true
break
}
}
if !approved {
return nil
}
if len(csr.Status.Certificate) > 0 {
return nil
}
// Do not sign apiserver cert
if csr.Spec.SignerName != helpers.GRPCCAuthSigner {
return nil
}
signerFunc := sdkhelpers.CSRSignerWithExpiry(c.caKey, c.caData, c.duration)
csr.Status.Certificate = signerFunc(csr)
if len(csr.Status.Certificate) == 0 {
return fmt.Errorf("invalid client certificate generated for csr %q", csr.Name)
}
_, err = c.kubeClient.CertificatesV1().CertificateSigningRequests().UpdateStatus(ctx, csr, metav1.UpdateOptions{})
return err
}

View File

@@ -0,0 +1,153 @@
package grpc
import (
"context"
"crypto/x509/pkix"
"net"
"testing"
"time"
certificatesv1 "k8s.io/api/certificates/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
kubefake "k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"
certutil "k8s.io/client-go/util/cert"
"k8s.io/client-go/util/keyutil"
"open-cluster-management.io/ocm/pkg/common/helpers"
testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
)
func TestSignCSR(t *testing.T) {
cases := []struct {
name string
csrs []runtime.Object
validateActions func(t *testing.T, actions []clienttesting.Action)
}{
{
name: "no csr",
csrs: []runtime.Object{},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertNoActions(t, actions)
},
},
{
name: "unapproved csr",
csrs: []runtime.Object{
&certificatesv1.CertificateSigningRequest{
ObjectMeta: metav1.ObjectMeta{Name: "test_csr"},
},
},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertNoActions(t, actions)
},
},
{
name: "approved csr with cert",
csrs: []runtime.Object{
&certificatesv1.CertificateSigningRequest{
ObjectMeta: metav1.ObjectMeta{Name: "test_csr"},
Status: certificatesv1.CertificateSigningRequestStatus{
Conditions: []certificatesv1.CertificateSigningRequestCondition{
{
Type: certificatesv1.CertificateApproved,
},
},
Certificate: []byte("cert"),
},
},
},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertNoActions(t, actions)
},
},
{
name: "wrong signer",
csrs: []runtime.Object{
&certificatesv1.CertificateSigningRequest{
ObjectMeta: metav1.ObjectMeta{Name: "test_csr"},
Spec: certificatesv1.CertificateSigningRequestSpec{
SignerName: "wrong-signer",
},
Status: certificatesv1.CertificateSigningRequestStatus{
Conditions: []certificatesv1.CertificateSigningRequestCondition{
{
Type: certificatesv1.CertificateApproved,
},
},
},
},
},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertNoActions(t, actions)
},
},
{
name: "approved csr without cert",
csrs: []runtime.Object{
func() *certificatesv1.CertificateSigningRequest {
clientKey, _ := keyutil.MakeEllipticPrivateKeyPEM()
privateKey, _ := keyutil.ParsePrivateKeyPEM(clientKey)
request, _ := certutil.MakeCSR(privateKey, &pkix.Name{CommonName: "test", Organization: []string{"test"}}, []string{"test.localhost"}, nil)
return &certificatesv1.CertificateSigningRequest{
ObjectMeta: metav1.ObjectMeta{
Name: "test_csr",
},
Spec: certificatesv1.CertificateSigningRequestSpec{
Usages: []certificatesv1.KeyUsage{
certificatesv1.UsageClientAuth,
},
SignerName: helpers.GRPCCAuthSigner,
Username: "system:open-cluster-management:test",
Request: request,
},
Status: certificatesv1.CertificateSigningRequestStatus{
Conditions: []certificatesv1.CertificateSigningRequestCondition{
{
Type: certificatesv1.CertificateApproved,
},
},
},
}
}(),
},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "update")
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
ca, key, err := certutil.GenerateSelfSignedCertKey("test", []net.IP{}, []string{})
if err != nil {
t.Fatalf("Failed to generate self signed CA config: %v", err)
}
csrClient := kubefake.NewSimpleClientset(c.csrs...)
csrInformers := informers.NewSharedInformerFactory(csrClient, 10*time.Minute)
csrInformer := csrInformers.Certificates().V1().CertificateSigningRequests()
for _, csr := range c.csrs {
csrInformer.Informer().GetStore().Add(csr)
}
ctrl := &csrSignController{
kubeClient: csrClient,
csrLister: csrInformer.Lister(),
caData: ca,
caKey: key,
duration: 1 * time.Hour,
}
if err := ctrl.sync(context.Background(), testingcommon.NewFakeSyncContext(t, "test_csr")); err != nil {
t.Errorf("unexpected error: %v", err)
}
c.validateActions(t, csrClient.Actions())
})
}
}

View File

@@ -0,0 +1,29 @@
package grpc
import (
"fmt"
"github.com/spf13/pflag"
)
type Option struct {
BootstrapConfigFile string
ConfigFile string
}
func NewOptions() *Option {
return &Option{}
}
func (o *Option) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.BootstrapConfigFile, "grpc-bootstrap-config", o.BootstrapConfigFile, "")
fs.StringVar(&o.ConfigFile, "grpc-config", o.ConfigFile, "")
}
func (o *Option) Validate() error {
if o.ConfigFile == "" && o.BootstrapConfigFile == "" {
return fmt.Errorf("config file should be set")
}
return nil
}

View File

@@ -0,0 +1,44 @@
package grpc
import (
"testing"
)
func TestValidate(t *testing.T) {
cases := []struct {
name string
opt *Option
expectedErr bool
}{
{
name: "no config file",
opt: NewOptions(),
expectedErr: true,
},
{
name: "bootstrap config file is set",
opt: &Option{BootstrapConfigFile: "bootstrap-config.yaml"},
expectedErr: false,
},
{
name: "config file is set",
opt: &Option{ConfigFile: "config.yaml"},
expectedErr: false,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
err := c.opt.Validate()
if c.expectedErr {
if err == nil {
t.Errorf("expected an error, but failed")
}
return
}
if err != nil {
t.Errorf("unexpected error: %v", err)
}
})
}
}

View File

@@ -0,0 +1,166 @@
package grpc
import (
"context"
"os"
"path/filepath"
"testing"
"time"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc"
testinghelpers "open-cluster-management.io/ocm/pkg/registration/helpers/testing"
"open-cluster-management.io/ocm/pkg/registration/register"
"open-cluster-management.io/ocm/pkg/registration/register/csr"
)
func TestIsHubKubeConfigValid(t *testing.T) {
tempDir, err := os.MkdirTemp("", "grpc-test-is-hub-kubeconfig-valid")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
defer os.RemoveAll(tempDir)
cert := testinghelpers.NewTestCert("system:open-cluster-management:cluster1:agent1", 60*time.Second)
cases := []struct {
name string
clusterName string
agentName string
tlsCert []byte
tlsKey []byte
valid bool
}{
{
name: "no cert",
valid: false,
},
{
name: "no key",
tlsCert: cert.Cert,
valid: false,
},
{
name: "cert is not issued for cluster1:agent1",
clusterName: "cluster2",
agentName: "agent2",
tlsCert: cert.Cert,
tlsKey: cert.Key,
valid: false,
},
{
name: "valid",
tlsCert: cert.Cert,
tlsKey: cert.Key,
valid: true,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
if c.tlsCert != nil {
if err := os.WriteFile(filepath.Join(tempDir, "tls.crt"), c.tlsCert, 0600); err != nil {
t.Fatal(err)
}
}
if c.tlsKey != nil {
if err := os.WriteFile(filepath.Join(tempDir, "tls.key"), c.tlsKey, 0600); err != nil {
t.Fatal(err)
}
}
secretOption := register.SecretOption{
ClusterName: c.clusterName,
AgentName: c.agentName,
HubKubeconfigDir: tempDir,
}
driver, err := NewGRPCDriver(nil, csr.NewCSROption(), secretOption)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
valid, err := driver.IsHubKubeConfigValid(context.Background(), secretOption)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if valid != c.valid {
t.Errorf("expected valid: %v, got: %v", c.valid, valid)
}
})
}
}
func TestLoadConfig(t *testing.T) {
tempDir, err := os.MkdirTemp("", "grpc-test-load-config")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
defer os.RemoveAll(tempDir)
configFile := filepath.Join(tempDir, "config.yaml")
if err := os.WriteFile(configFile, []byte(`url: "https://localhost:8443"`), 0600); err != nil {
t.Fatal(err)
}
cases := []struct {
name string
bootstrapped bool
bootstrapConfigFile string
configFile string
expectedErr bool
}{
{
name: "no bootstrap config file",
bootstrapped: true,
expectedErr: true,
},
{
name: "no config file",
bootstrapped: false,
expectedErr: true,
},
{
name: "load bootstrap config",
bootstrapped: true,
bootstrapConfigFile: configFile,
expectedErr: false,
},
{
name: "load bootstrap config",
bootstrapped: false,
configFile: configFile,
expectedErr: false,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
driver := &GRPCDriver{
opt: &Option{
BootstrapConfigFile: c.bootstrapConfigFile,
ConfigFile: c.configFile,
},
}
config, configData, err := driver.loadConfig(register.SecretOption{}, c.bootstrapped)
if c.expectedErr {
if err == nil {
t.Errorf("expected error, but failed")
}
return
}
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if _, ok := config.(*grpc.GRPCOptions); !ok {
t.Errorf("expected config to be a *grpc.GRPCOptions, got: %T", config)
}
if len(configData) == 0 {
t.Errorf("expected config data, but got empty")
}
})
}
}

View File

@@ -0,0 +1,324 @@
package grpc
import (
"context"
"fmt"
"os"
"path"
"time"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
"gopkg.in/yaml.v2"
certificatesv1 "k8s.io/api/certificates/v1"
coordv1 "k8s.io/api/coordination/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/klog/v2"
addonapiv1alpha1 "open-cluster-management.io/api/addon/v1alpha1"
addoninformers "open-cluster-management.io/api/client/addon/informers/externalversions"
clusterinformers "open-cluster-management.io/api/client/cluster/informers/externalversions"
clusterv1 "open-cluster-management.io/api/cluster/v1"
cloudeventsaddon "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/addon"
cloudeventscluster "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/cluster"
cloudeventscsr "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/csr"
cloudeventsevent "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/event"
cloudeventslease "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/lease"
cloudeventsoptions "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/options"
cloudeventsstore "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store"
"open-cluster-management.io/sdk-go/pkg/cloudevents/constants"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/cert"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc"
"open-cluster-management.io/ocm/pkg/common/helpers"
"open-cluster-management.io/ocm/pkg/registration/register"
"open-cluster-management.io/ocm/pkg/registration/register/csr"
)
type GRPCDriver struct {
csrDriver *csr.CSRDriver
control *ceCSRControl
opt *Option
configTemplate []byte
}
var _ register.RegisterDriver = &GRPCDriver{}
var _ register.AddonDriver = &GRPCDriver{}
func NewGRPCDriver(opt *Option, csrOption *csr.Option, secretOption register.SecretOption) (register.RegisterDriver, error) {
secretOption.Signer = helpers.GRPCCAuthSigner
csrDriver, err := csr.NewCSRDriver(csrOption, secretOption)
if err != nil {
return nil, err
}
return &GRPCDriver{
csrDriver: csrDriver,
opt: opt,
}, nil
}
func (d *GRPCDriver) BuildClients(ctx context.Context, secretOption register.SecretOption, bootstrapped bool) (*register.Clients, error) {
config, configData, err := d.loadConfig(secretOption, bootstrapped)
if err != nil {
return nil, err
}
d.configTemplate = configData
clusterWatchStore := cloudeventsstore.NewAgentInformerWatcherStore[*clusterv1.ManagedCluster]()
clusterClientHolder, err := cloudeventscluster.NewClientHolder(
ctx,
cloudeventsoptions.NewGenericClientOptions(
config,
cloudeventscluster.NewManagedClusterCodec(),
secretOption.ClusterName,
).
WithClusterName(secretOption.ClusterName).
WithClientWatcherStore(clusterWatchStore))
if err != nil {
return nil, err
}
clusterClient := clusterClientHolder.ClusterInterface()
clusterInformers := clusterinformers.NewSharedInformerFactory(
clusterClient, 10*time.Minute).Cluster().V1().ManagedClusters()
clusterWatchStore.SetInformer(clusterInformers.Informer())
leaseWatchStore := cloudeventsstore.NewSimpleStore[*coordv1.Lease]()
leaseClient, err := cloudeventslease.NewLeaseClient(
ctx,
cloudeventsoptions.NewGenericClientOptions(
config,
cloudeventslease.NewLeaseCodec(),
secretOption.ClusterName,
).WithClusterName(secretOption.ClusterName).WithClientWatcherStore(leaseWatchStore),
secretOption.ClusterName,
)
if err != nil {
return nil, err
}
eventClient, err := cloudeventsevent.NewClientHolder(
ctx,
cloudeventsoptions.NewGenericClientOptions(
config,
cloudeventsevent.NewEventCodec(),
secretOption.ClusterName,
).WithClusterName(secretOption.ClusterName).WithSubscription(false).WithResyncEnabled(false),
)
if err != nil {
return nil, err
}
addonWatchStore := cloudeventsstore.NewAgentInformerWatcherStore[*addonapiv1alpha1.ManagedClusterAddOn]()
addonClient, err := cloudeventsaddon.ManagedClusterAddOnInterface(
ctx,
cloudeventsoptions.NewGenericClientOptions(
config,
cloudeventsaddon.NewManagedClusterAddOnCodec(),
secretOption.ClusterName,
).WithClusterName(secretOption.ClusterName).WithClientWatcherStore(addonWatchStore))
if err != nil {
return nil, err
}
addonInformer := addoninformers.NewSharedInformerFactoryWithOptions(
addonClient, 10*time.Minute, addoninformers.WithNamespace(secretOption.ClusterName)).
Addon().V1alpha1().ManagedClusterAddOns()
addonWatchStore.SetInformer(addonInformer.Informer())
csrClientHolder, err := cloudeventscsr.NewAgentClientHolder(ctx,
cloudeventsoptions.NewGenericClientOptions(
config,
cloudeventscsr.NewCSRCodec(),
secretOption.ClusterName,
).WithClusterName(secretOption.ClusterName),
)
if err != nil {
return nil, err
}
csrControl := &ceCSRControl{csrClientHolder: csrClientHolder}
if err := d.csrDriver.SetCSRControl(csrControl, secretOption.ClusterName); err != nil {
return nil, err
}
d.control = csrControl
clients := &register.Clients{
ClusterClient: clusterClient,
ClusterInformer: clusterInformers,
AddonClient: addonClient,
AddonInformer: addonInformer,
LeaseClient: leaseClient,
EventsClient: eventClient,
}
return clients, nil
}
func (d *GRPCDriver) Fork(addonName string, secretOption register.SecretOption) register.RegisterDriver {
csrDriver := d.csrDriver.Fork(addonName, secretOption)
return &GRPCDriver{
control: d.control,
opt: d.opt,
csrDriver: csrDriver.(*csr.CSRDriver),
}
}
func (d *GRPCDriver) Process(
ctx context.Context, controllerName string, secret *corev1.Secret, additionalSecretData map[string][]byte,
recorder events.Recorder) (*corev1.Secret, *metav1.Condition, error) {
additionalSecretData["config.yaml"] = d.configTemplate
return d.csrDriver.Process(ctx, controllerName, secret, additionalSecretData, recorder)
}
func (d *GRPCDriver) BuildKubeConfigFromTemplate(kubeConfig *clientcmdapi.Config) *clientcmdapi.Config {
kubeConfig.AuthInfos = map[string]*clientcmdapi.AuthInfo{register.DefaultKubeConfigAuth: {
ClientCertificate: "tls.crt",
ClientKey: "tls.key",
}}
return kubeConfig
}
func (d *GRPCDriver) InformerHandler() (cache.SharedIndexInformer, factory.EventFilterFunc) {
return d.control.Informer(), nil
}
func (d *GRPCDriver) IsHubKubeConfigValid(ctx context.Context, secretOption register.SecretOption) (bool, error) {
logger := klog.FromContext(ctx)
certPath := path.Join(secretOption.HubKubeconfigDir, csr.TLSCertFile)
certData, err := os.ReadFile(path.Clean(certPath))
if err != nil {
logger.V(4).Info("Unable to load TLS cert file", "certPath", certPath)
return false, nil
}
keyPath := path.Join(secretOption.HubKubeconfigDir, csr.TLSKeyFile)
if _, err := os.Stat(keyPath); os.IsNotExist(err) {
logger.V(4).Info("TLS key file not found", "keyPath", keyPath)
return false, nil
}
// only set when clustername/agentname are set
if len(secretOption.ClusterName) > 0 && len(secretOption.AgentName) > 0 {
// check if the tls certificate is issued for the current cluster/agent
clusterNameInCert, agentNameInCert, err := csr.GetClusterAgentNamesFromCertificate(certData)
if err != nil {
return false, nil
}
if secretOption.ClusterName != clusterNameInCert || secretOption.AgentName != agentNameInCert {
logger.V(4).Info("Certificate in file is issued for different agent",
"certPath", certPath,
"issuedFor", fmt.Sprintf("%s:%s", clusterNameInCert, agentNameInCert),
"expectedFor", fmt.Sprintf("%s:%s", secretOption.ClusterName, secretOption.AgentName))
return false, nil
}
}
return csr.IsCertificateValid(logger, certData, nil)
}
func (d *GRPCDriver) ManagedClusterDecorator(cluster *clusterv1.ManagedCluster) *clusterv1.ManagedCluster {
return cluster
}
func (d *GRPCDriver) loadConfig(secretOption register.SecretOption, bootstrapped bool) (any, []byte, error) {
var err error
var config any
var configFile string
if bootstrapped {
_, config, err = generic.NewConfigLoader(constants.ConfigTypeGRPC, d.opt.BootstrapConfigFile).LoadConfig()
if err != nil {
return nil, nil, fmt.Errorf(
"failed to load hub bootstrap registration config from file %q: %w",
d.opt.BootstrapConfigFile, err)
}
configFile = d.opt.BootstrapConfigFile
} else {
_, config, err = generic.NewConfigLoader(constants.ConfigTypeGRPC, d.opt.ConfigFile).LoadConfig()
if err != nil {
return nil, nil, fmt.Errorf(
"failed to load hub registration config from file %q: %w",
d.opt.ConfigFile, err)
}
configFile = d.opt.ConfigFile
}
grpcConfig, err := grpc.LoadConfig(configFile)
if err != nil {
return nil, nil, err
}
configData, err := yaml.Marshal(&grpc.GRPCConfig{
CertConfig: cert.CertConfig{
CAData: grpcConfig.CAData,
ClientKeyFile: path.Join(secretOption.HubKubeconfigDir, csr.TLSKeyFile),
ClientCertFile: path.Join(secretOption.HubKubeconfigDir, csr.TLSCertFile),
},
URL: grpcConfig.URL,
KeepAliveConfig: grpcConfig.KeepAliveConfig,
})
if err != nil {
return nil, nil, err
}
return config, configData, nil
}
type ceCSRControl struct {
csrClientHolder *cloudeventscsr.ClientHolder
}
var _ csr.CSRControl = &ceCSRControl{}
func (c *ceCSRControl) IsApproved(name string) (bool, error) {
csr, err := c.csrClientHolder.Clients().Get(context.Background(), name, metav1.GetOptions{})
if err != nil {
return false, err
}
for _, condition := range csr.Status.Conditions {
if condition.Type == certificatesv1.CertificateApproved {
return true, nil
}
}
return false, nil
}
func (c *ceCSRControl) GetIssuedCertificate(name string) ([]byte, error) {
csr, err := c.csrClientHolder.Clients().Get(context.Background(), name, metav1.GetOptions{})
if err != nil {
return nil, err
}
return csr.Status.Certificate, nil
}
func (c *ceCSRControl) Create(ctx context.Context, recorder events.Recorder, objMeta metav1.ObjectMeta, csrData []byte,
signerName string, expirationSeconds *int32) (string, error) {
csr := &certificatesv1.CertificateSigningRequest{
ObjectMeta: objMeta,
Spec: certificatesv1.CertificateSigningRequestSpec{
Request: csrData,
Usages: []certificatesv1.KeyUsage{
certificatesv1.UsageDigitalSignature,
certificatesv1.UsageKeyEncipherment,
certificatesv1.UsageClientAuth,
},
SignerName: signerName,
ExpirationSeconds: expirationSeconds,
},
}
req, err := c.csrClientHolder.Clients().Create(ctx, csr, metav1.CreateOptions{})
if err != nil {
return "", err
}
recorder.Eventf("CSRCreated", "A csr %q is created", req.Name)
return req.Name, nil
}
func (c *ceCSRControl) Informer() cache.SharedIndexInformer {
return c.csrClientHolder.Informer()
}