Refactor registration (#535)

* Refactor registration

Signed-off-by: Jian Qiu <jqiu@redhat.com>

* Fix integration test

Signed-off-by: Jian Qiu <jqiu@redhat.com>

* Refactor cert controller to secret controller

Signed-off-by: Jian Qiu <jqiu@redhat.com>

* Update health check func

Signed-off-by: Jian Qiu <jqiu@redhat.com>

---------

Signed-off-by: Jian Qiu <jqiu@redhat.com>
This commit is contained in:
Jian Qiu
2024-07-17 22:14:11 +08:00
committed by GitHub
parent 110a804c08
commit 8c1d286b11
35 changed files with 1556 additions and 1334 deletions

View File

@@ -62,7 +62,7 @@ func NewKlusterletAgentCmd() *cobra.Command {
agentConfig := singletonspoke.NewAgentConfig(commonOptions, registrationOption, workOptions)
cmdConfig := commonOptions.CommonOpts.
NewControllerCommandConfig("klusterlet", version.Get(), agentConfig.RunSpokeAgent).
WithHealthChecks(registrationOption.GetHealthCheckers()...)
WithHealthChecks(agentConfig.HealthCheckers()...)
cmd := cmdConfig.NewCommandWithContext(context.TODO())
cmd.Use = agentCmdName
cmd.Short = "Start the klusterlet agent"

View File

@@ -20,7 +20,7 @@ func NewRegistrationAgent() *cobra.Command {
cfg := spoke.NewSpokeAgentConfig(commonOptions, agentOptions)
cmdConfig := commonOptions.CommonOpts.
NewControllerCommandConfig("registration-agent", version.Get(), cfg.RunSpokeAgent).
WithHealthChecks(agentOptions.GetHealthCheckers()...)
WithHealthChecks(cfg.HealthCheckers()...)
cmd := cmdConfig.NewCommandWithContext(context.TODO())
cmd.Use = agentCmdName

View File

@@ -12,10 +12,8 @@ import (
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
"open-cluster-management.io/ocm/pkg/registration/clientcert"
"open-cluster-management.io/ocm/pkg/registration/spoke/registration"
"open-cluster-management.io/ocm/pkg/registration/register"
)
const (
@@ -96,7 +94,7 @@ func (o *AgentOptions) Validate() error {
// Complete fills in missing values.
func (o *AgentOptions) Complete() error {
if len(o.HubKubeconfigFile) == 0 {
o.HubKubeconfigFile = path.Join(o.HubKubeconfigDir, clientcert.KubeconfigFile)
o.HubKubeconfigFile = path.Join(o.HubKubeconfigDir, register.KubeconfigFile)
}
// load or generate cluster/agent names
@@ -122,13 +120,6 @@ func (o *AgentOptions) getOrGenerateClusterAgentID() (string, string) {
if len(o.SpokeClusterName) > 0 && len(o.AgentID) > 0 {
return o.SpokeClusterName, o.AgentID
}
// try to load cluster/agent name from tls certification
var clusterNameInCert, agentNameInCert string
certPath := path.Join(o.HubKubeconfigDir, clientcert.TLSCertFile)
certData, certErr := os.ReadFile(path.Clean(certPath))
if certErr == nil {
clusterNameInCert, agentNameInCert, _ = registration.GetClusterAgentNamesFromCertificate(certData)
}
clusterName := o.SpokeClusterName
// if cluster name is not specified with input argument, try to load it from file
@@ -136,15 +127,9 @@ func (o *AgentOptions) getOrGenerateClusterAgentID() (string, string) {
// TODO, read cluster name from openshift struct if the spoke agent is running in an openshift cluster
// and then load the cluster name from the mounted secret
clusterNameFilePath := path.Join(o.HubKubeconfigDir, clientcert.ClusterNameFile)
clusterNameFilePath := path.Join(o.HubKubeconfigDir, register.ClusterNameFile)
clusterNameBytes, err := os.ReadFile(path.Clean(clusterNameFilePath))
switch {
case len(clusterNameInCert) > 0:
// use cluster name loaded from the tls certification
clusterName = clusterNameInCert
if clusterNameInCert != string(clusterNameBytes) {
klog.Warningf("Use cluster name %q in certification instead of %q in the mounted secret", clusterNameInCert, string(clusterNameBytes))
}
case err == nil:
// use cluster name load from the mounted secret
clusterName = string(clusterNameBytes)
@@ -157,17 +142,9 @@ func (o *AgentOptions) getOrGenerateClusterAgentID() (string, string) {
agentID := o.AgentID
// try to load agent name from the mounted secret
if len(agentID) == 0 {
agentIDFilePath := path.Join(o.HubKubeconfigDir, clientcert.AgentNameFile)
agentIDFilePath := path.Join(o.HubKubeconfigDir, register.AgentNameFile)
agentIDBytes, err := os.ReadFile(path.Clean(agentIDFilePath))
switch {
case len(agentNameInCert) > 0:
// use agent name loaded from the tls certification
agentID = agentNameInCert
if agentNameInCert != agentID {
klog.Warningf(
"Use agent name %q in certification instead of %q in the mounted secret",
agentNameInCert, agentID)
}
case err == nil:
// use agent name loaded from the mounted secret
agentID = string(agentIDBytes)

View File

@@ -5,7 +5,6 @@ import (
"os"
"path"
"testing"
"time"
"github.com/openshift/library-go/pkg/controller/controllercmd"
"github.com/openshift/library-go/pkg/operator/events/eventstesting"
@@ -14,8 +13,8 @@ import (
"k8s.io/apimachinery/pkg/runtime"
kubefake "k8s.io/client-go/kubernetes/fake"
"open-cluster-management.io/ocm/pkg/registration/clientcert"
testinghelpers "open-cluster-management.io/ocm/pkg/registration/helpers/testing"
"open-cluster-management.io/ocm/pkg/registration/register"
"open-cluster-management.io/ocm/pkg/registration/spoke/registration"
"open-cluster-management.io/ocm/pkg/version"
)
@@ -55,19 +54,6 @@ func TestComplete(t *testing.T) {
expectedClusterName: "cluster1",
expectedAgentName: "agent2",
},
{
name: "override cluster name in cert with specified value",
clusterName: "cluster1",
secret: testinghelpers.NewHubKubeconfigSecret(
componentNamespace, "hub-kubeconfig-secret", "",
testinghelpers.NewTestCert("system:open-cluster-management:cluster2:agent2", 60*time.Second), map[string][]byte{
"kubeconfig": testinghelpers.NewKubeconfig("c1", "https://127.0.0.1:6001", "", nil, nil, nil),
"cluster-name": []byte("cluster3"),
"agent-name": []byte("agent3"),
}),
expectedClusterName: "cluster1",
expectedAgentName: "agent2",
},
{
name: "take cluster/agent name from secret",
secret: testinghelpers.NewHubKubeconfigSecret(
@@ -78,25 +64,6 @@ func TestComplete(t *testing.T) {
expectedClusterName: "cluster1",
expectedAgentName: "agent1",
},
{
name: "take cluster/agent name from cert",
secret: testinghelpers.NewHubKubeconfigSecret(
componentNamespace, "hub-kubeconfig-secret", "",
testinghelpers.NewTestCert("system:open-cluster-management:cluster1:agent1", 60*time.Second), map[string][]byte{}),
expectedClusterName: "cluster1",
expectedAgentName: "agent1",
},
{
name: "override cluster name in secret with value from cert",
secret: testinghelpers.NewHubKubeconfigSecret(
componentNamespace, "hub-kubeconfig-secret", "",
testinghelpers.NewTestCert("system:open-cluster-management:cluster1:agent1", 60*time.Second), map[string][]byte{
"cluster-name": []byte("cluster2"),
"agent-name": []byte("agent2"),
}),
expectedClusterName: "cluster1",
expectedAgentName: "agent1",
},
}
for _, c := range cases {
@@ -213,8 +180,8 @@ func TestGetOrGenerateClusterAgentNames(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
if c.options.HubKubeconfigDir != "" {
testinghelpers.WriteFile(path.Join(tempDir, clientcert.ClusterNameFile), []byte(c.expectedClusterName))
testinghelpers.WriteFile(path.Join(tempDir, clientcert.AgentNameFile), []byte(c.expectedAgentName))
testinghelpers.WriteFile(path.Join(tempDir, register.ClusterNameFile), []byte(c.expectedClusterName))
testinghelpers.WriteFile(path.Join(tempDir, register.AgentNameFile), []byte(c.expectedAgentName))
}
clusterName, agentName := c.options.getOrGenerateClusterAgentID()
if clusterName != c.expectedClusterName {

View File

@@ -1,308 +0,0 @@
package clientcert
import (
"context"
"crypto/x509/pkix"
"fmt"
"testing"
"time"
"github.com/openshift/library-go/pkg/operator/events"
certificates "k8s.io/api/certificates/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/rand"
kubefake "k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2/ktesting"
testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
testinghelpers "open-cluster-management.io/ocm/pkg/registration/helpers/testing"
"open-cluster-management.io/ocm/pkg/registration/hub/user"
)
const (
testNamespace = "testns"
testAgentName = "testagent"
testSecretName = "testsecret"
testCSRName = "testcsr"
)
var commonName = fmt.Sprintf("%s%s:%s", user.SubjectPrefix, testinghelpers.TestManagedClusterName, testAgentName)
func TestSync(t *testing.T) {
testSubject := &pkix.Name{
CommonName: commonName,
}
cases := []struct {
name string
queueKey string
secrets []runtime.Object
approvedCSRCert *testinghelpers.TestCert
keyDataExpected bool
csrNameExpected bool
expectedCondition *metav1.Condition
validateActions func(t *testing.T, hubActions, agentActions []clienttesting.Action)
}{
{
name: "agent bootstrap",
secrets: []runtime.Object{},
queueKey: "key",
keyDataExpected: true,
csrNameExpected: true,
validateActions: func(t *testing.T, hubActions, agentActions []clienttesting.Action) {
testingcommon.AssertActions(t, hubActions, "create")
actual := hubActions[0].(clienttesting.CreateActionImpl).Object
if _, ok := actual.(*unstructured.Unstructured); !ok {
t.Errorf("expected csr was created, but failed")
}
testingcommon.AssertActions(t, agentActions, "get")
},
},
{
name: "syc csr after bootstrap",
queueKey: testSecretName,
secrets: []runtime.Object{
testinghelpers.NewHubKubeconfigSecret(testNamespace, testSecretName, "1", nil, map[string][]byte{
ClusterNameFile: []byte(testinghelpers.TestManagedClusterName),
AgentNameFile: []byte(testAgentName),
},
),
},
expectedCondition: &metav1.Condition{
Type: ClusterCertificateRotatedCondition,
Status: metav1.ConditionTrue,
},
approvedCSRCert: testinghelpers.NewTestCert(commonName, 10*time.Second),
validateActions: func(t *testing.T, hubActions, agentActions []clienttesting.Action) {
logger, _ := ktesting.NewTestContext(t)
testingcommon.AssertActions(t, hubActions, "get", "get")
testingcommon.AssertActions(t, agentActions, "get", "update")
actual := agentActions[1].(clienttesting.UpdateActionImpl).Object
secret := actual.(*corev1.Secret)
valid, err := IsCertificateValid(logger, secret.Data[TLSCertFile], testSubject)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !valid {
t.Error("client certificate is invalid")
}
},
},
{
name: "sync a valid hub kubeconfig secret",
queueKey: testSecretName,
secrets: []runtime.Object{
testinghelpers.NewHubKubeconfigSecret(testNamespace, testSecretName, "1", testinghelpers.NewTestCert(commonName, 10000*time.Second), map[string][]byte{
ClusterNameFile: []byte(testinghelpers.TestManagedClusterName),
AgentNameFile: []byte(testAgentName),
KubeconfigFile: testinghelpers.NewKubeconfig("c1", "https://127.0.0.1:6001", "", nil, nil, nil),
}),
},
validateActions: func(t *testing.T, hubActions, agentActions []clienttesting.Action) {
testingcommon.AssertNoActions(t, hubActions)
testingcommon.AssertActions(t, agentActions, "get")
},
},
{
name: "sync an expiring hub kubeconfig secret",
queueKey: testSecretName,
secrets: []runtime.Object{
testinghelpers.NewHubKubeconfigSecret(testNamespace, testSecretName, "1", testinghelpers.NewTestCert(commonName, -3*time.Second), map[string][]byte{
ClusterNameFile: []byte(testinghelpers.TestManagedClusterName),
AgentNameFile: []byte(testAgentName),
KubeconfigFile: testinghelpers.NewKubeconfig("c1", "https://127.0.0.1:6001", "", nil, nil, nil),
}),
},
keyDataExpected: true,
csrNameExpected: true,
validateActions: func(t *testing.T, hubActions, agentActions []clienttesting.Action) {
testingcommon.AssertActions(t, hubActions, "create")
actual := hubActions[0].(clienttesting.CreateActionImpl).Object
if _, ok := actual.(*unstructured.Unstructured); !ok {
t.Errorf("expected csr was created, but failed")
}
testingcommon.AssertActions(t, agentActions, "get")
},
},
{
name: "sync when additional secret data changes",
queueKey: testSecretName,
secrets: []runtime.Object{
testinghelpers.NewHubKubeconfigSecret(testNamespace, testSecretName, "1", testinghelpers.NewTestCert(commonName, 10000*time.Second), map[string][]byte{
ClusterNameFile: []byte(testinghelpers.TestManagedClusterName),
AgentNameFile: []byte("invalid-name"),
}),
},
keyDataExpected: true,
csrNameExpected: true,
validateActions: func(t *testing.T, hubActions, agentActions []clienttesting.Action) {
testingcommon.AssertActions(t, hubActions, "create")
actual := hubActions[0].(clienttesting.CreateActionImpl).Object
if _, ok := actual.(*unstructured.Unstructured); !ok {
t.Errorf("expected csr was created, but failed")
}
testingcommon.AssertActions(t, agentActions, "get")
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
ctrl := &mockCSRControl{}
var csrs []runtime.Object
if c.approvedCSRCert != nil {
csr := testinghelpers.NewApprovedCSR(testinghelpers.CSRHolder{Name: testCSRName})
csr.Status.Certificate = c.approvedCSRCert.Cert
csrs = append(csrs, csr)
ctrl.approved = true
ctrl.issuedCertData = c.approvedCSRCert.Cert
}
hubKubeClient := kubefake.NewSimpleClientset(csrs...)
ctrl.csrClient = &hubKubeClient.Fake
// GenerateName is not working for fake clent, we set the name with prepend reactor
hubKubeClient.PrependReactor(
"create",
"certificatesigningrequests",
func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) {
return true, testinghelpers.NewCSR(testinghelpers.CSRHolder{Name: testCSRName}), nil
},
)
agentKubeClient := kubefake.NewSimpleClientset(c.secrets...)
clientCertOption := ClientCertOption{
SecretNamespace: testNamespace,
SecretName: testSecretName,
AdditionalSecretData: map[string][]byte{
ClusterNameFile: []byte(testinghelpers.TestManagedClusterName),
AgentNameFile: []byte(testAgentName),
},
}
csrOption := CSROption{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "test-",
},
Subject: testSubject,
SignerName: certificates.KubeAPIServerClientSignerName,
HaltCSRCreation: func() bool { return false },
}
updater := &fakeStatusUpdater{}
controller := &clientCertificateController{
ClientCertOption: clientCertOption,
CSROption: csrOption,
csrControl: ctrl,
managementCoreClient: agentKubeClient.CoreV1(),
controllerName: "test-agent",
statusUpdater: updater.update,
}
if c.approvedCSRCert != nil {
controller.csrName = testCSRName
controller.keyData = c.approvedCSRCert.Key
}
err := controller.sync(context.TODO(), testingcommon.NewFakeSyncContext(t, c.queueKey))
if err != nil {
t.Errorf("unexpected error %v", err)
}
hasKeyData := controller.keyData != nil
if c.keyDataExpected != hasKeyData {
t.Error("controller.keyData should be set")
}
hasCSRName := controller.csrName != ""
if c.csrNameExpected != hasCSRName {
t.Error("controller.csrName should be set")
}
if !conditionEqual(c.expectedCondition, updater.cond) {
t.Errorf("condition is not correct, expected %v, got %v", c.expectedCondition, updater.cond)
}
c.validateActions(t, hubKubeClient.Actions(), agentKubeClient.Actions())
})
}
}
var _ CSRControl = &mockCSRControl{}
func conditionEqual(expected, actual *metav1.Condition) bool {
if expected == nil && actual == nil {
return true
}
if expected == nil || actual == nil {
return false
}
if expected.Type != actual.Type {
return false
}
if string(expected.Status) != string(actual.Status) {
return false
}
return true
}
type fakeStatusUpdater struct {
cond *metav1.Condition
}
func (f *fakeStatusUpdater) update(_ context.Context, cond metav1.Condition) error {
f.cond = cond.DeepCopy()
return nil
}
type mockCSRControl struct {
approved bool
issuedCertData []byte
csrClient *clienttesting.Fake
}
func (m *mockCSRControl) create(
_ context.Context, _ events.Recorder, objMeta metav1.ObjectMeta, _ []byte, _ string, _ *int32) (string, error) {
mockCSR := &unstructured.Unstructured{}
_, err := m.csrClient.Invokes(clienttesting.CreateActionImpl{
ActionImpl: clienttesting.ActionImpl{
Verb: "create",
},
Object: mockCSR,
}, nil)
return objMeta.Name + rand.String(4), err
}
func (m *mockCSRControl) isApproved(name string) (bool, error) {
_, err := m.csrClient.Invokes(clienttesting.GetActionImpl{
ActionImpl: clienttesting.ActionImpl{
Verb: "get",
Resource: certificates.SchemeGroupVersion.WithResource("certificatesigningrequests"),
},
Name: name,
}, nil)
return m.approved, err
}
func (m *mockCSRControl) getIssuedCertificate(name string) ([]byte, error) {
_, err := m.csrClient.Invokes(clienttesting.GetActionImpl{
ActionImpl: clienttesting.ActionImpl{
Verb: "get",
Resource: certificates.SchemeGroupVersion.WithResource("certificatesigningrequests"),
},
Name: name,
}, nil)
return m.issuedCertData, err
}
func (m *mockCSRControl) Informer() cache.SharedIndexInformer {
panic("implement me")
}

View File

@@ -0,0 +1,89 @@
package register
import (
"fmt"
"reflect"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
)
// BaseKubeConfigFromBootStrap builds kubeconfig from bootstrap without authInfo configurations
func BaseKubeConfigFromBootStrap(bootstrapConfig *clientcmdapi.Config) (*clientcmdapi.Config, error) {
kubeConfigCtx, cluster, err := currentKubeConfigCluster(bootstrapConfig)
if err != nil {
return nil, err
}
// Build kubeconfig.
kubeconfig := &clientcmdapi.Config{
// Define a cluster stanza based on the bootstrap kubeconfig.
Clusters: map[string]*clientcmdapi.Cluster{
kubeConfigCtx.Cluster: {
Server: cluster.Server,
InsecureSkipTLSVerify: false,
CertificateAuthorityData: cluster.CertificateAuthorityData,
ProxyURL: cluster.ProxyURL,
}},
// Define a context that connects the auth info and cluster, and set it as the default
Contexts: map[string]*clientcmdapi.Context{DefaultKubeConfigContext: {
Cluster: kubeConfigCtx.Cluster,
AuthInfo: DefaultKubeConfigAuth,
Namespace: "configuration",
}},
CurrentContext: DefaultKubeConfigContext,
}
return kubeconfig, nil
}
func currentKubeConfigCluster(config *clientcmdapi.Config) (*clientcmdapi.Context, *clientcmdapi.Cluster, error) {
kubeConfigCtx, ok := config.Contexts[config.CurrentContext]
if !ok {
return nil, nil, fmt.Errorf("kubeconfig does not contains context: %s", config.CurrentContext)
}
cluster, ok := config.Clusters[kubeConfigCtx.Cluster]
if !ok {
return nil, nil, fmt.Errorf("kubeconfig does not contains cluster: %s", kubeConfigCtx.Cluster)
}
return kubeConfigCtx, cluster, nil
}
// The hub kubeconfig is valid when it shares the same value of the following with the
// bootstrap hub kubeconfig.
// 1. The hub server
// 2. The proxy url
// 3. The CA bundle
// 4. The current context cluster name
func IsHubKubeconfigValid(bootstrapKubeConfig, hubeKubeConfig *clientcmdapi.Config) (bool, error) {
if bootstrapKubeConfig == nil {
return false, nil
}
bootstrapCtx, bootstrapCluster, err := currentKubeConfigCluster(bootstrapKubeConfig)
if err != nil {
return false, err
}
if hubeKubeConfig == nil {
return false, nil
}
hubKubeConfigCtx, hubKubeConfigCluster, err := currentKubeConfigCluster(hubeKubeConfig)
switch {
case err != nil:
return false, err
case bootstrapCluster.Server != hubKubeConfigCluster.Server,
bootstrapCluster.ProxyURL != hubKubeConfigCluster.ProxyURL,
!reflect.DeepEqual(bootstrapCluster.CertificateAuthorityData, hubKubeConfigCluster.CertificateAuthorityData),
// Here in addition to the server, proxyURL and CA bundle, we also need to compare the cluster name,
// because in some cases even the hub cluster API server serving certificate(kubeconfig ca bundle)
// is the same, but the signer certificate may be different(i.e the hub kubernetes cluster is rebuilt
// with a same serving certificate and url), so setting the cluster name in the bootstrap kubeconfig
// can help to distinguish the different clusters(signer certificate). And comparing the cluster name
// can help to avoid the situation that the hub kubeconfig is valid but not for the current cluster.
bootstrapCtx.Cluster != hubKubeConfigCtx.Cluster:
return false, nil
default:
return true, nil
}
}

View File

@@ -0,0 +1,105 @@
package register
import (
"reflect"
"testing"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
)
func TestBaseKubeConfigFromBootStrap(t *testing.T) {
server1 := "https://127.0.0.1:6443"
server2 := "https://api.cluster1.example.com:6443"
caData1 := []byte("fake-ca-data1")
caData2 := []byte("fake-ca-data2")
proxyURL := "https://127.0.0.1:3129"
cases := []struct {
name string
kubeconfig *clientcmdapi.Config
expectedServer string
expectedCAData []byte
expectedProxyURL string
}{
{
name: "without proxy url",
kubeconfig: &clientcmdapi.Config{
Clusters: map[string]*clientcmdapi.Cluster{
"test-cluster": {
Server: server1,
CertificateAuthorityData: caData1,
}},
// Define a context that connects the auth info and cluster, and set it as the default
Contexts: map[string]*clientcmdapi.Context{DefaultKubeConfigContext: {
Cluster: "test-cluster",
AuthInfo: DefaultKubeConfigAuth,
Namespace: "configuration",
}},
CurrentContext: DefaultKubeConfigContext,
AuthInfos: map[string]*clientcmdapi.AuthInfo{
DefaultKubeConfigAuth: {
ClientCertificate: "tls.crt",
ClientKey: "tls.key",
},
},
},
expectedServer: server1,
expectedCAData: caData1,
expectedProxyURL: "",
},
{
name: "with proxy url",
kubeconfig: &clientcmdapi.Config{
Clusters: map[string]*clientcmdapi.Cluster{
"test-cluster": {
Server: server2,
CertificateAuthorityData: caData2,
ProxyURL: proxyURL,
}},
// Define a context that connects the auth info and cluster, and set it as the default
Contexts: map[string]*clientcmdapi.Context{DefaultKubeConfigContext: {
Cluster: "test-cluster",
AuthInfo: DefaultKubeConfigAuth,
Namespace: "configuration",
}},
CurrentContext: DefaultKubeConfigContext,
AuthInfos: map[string]*clientcmdapi.AuthInfo{
DefaultKubeConfigAuth: {
ClientCertificate: "tls.crt",
ClientKey: "tls.key",
},
},
},
expectedServer: server2,
expectedCAData: caData2,
expectedProxyURL: proxyURL,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
kubeConfig, err := BaseKubeConfigFromBootStrap(c.kubeconfig)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
cluster := kubeConfig.Contexts[DefaultKubeConfigContext].Cluster
if cluster != "test-cluster" {
t.Errorf("expect context cluster %s, but %s", "test-cluster",
kubeConfig.Contexts[DefaultKubeConfigContext].Cluster)
}
if c.expectedServer != kubeConfig.Clusters[cluster].Server {
t.Errorf("expect server %s, but %s", c.expectedServer, kubeConfig.Clusters[cluster].Server)
}
if c.expectedProxyURL != kubeConfig.Clusters[cluster].ProxyURL {
t.Errorf("expect proxy url %s, but %s", c.expectedProxyURL, proxyURL)
}
if !reflect.DeepEqual(c.expectedCAData, kubeConfig.Clusters[cluster].CertificateAuthorityData) {
t.Errorf("expect ca data %v, but %v", c.expectedCAData, kubeConfig.Clusters[cluster].CertificateAuthorityData)
}
})
}
}

View File

@@ -1,4 +1,4 @@
package clientcert
package csr
import (
"context"
@@ -8,7 +8,6 @@ import (
certificates "k8s.io/api/certificates/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
certificatesinformers "k8s.io/client-go/informers/certificates/v1beta1"
csrclient "k8s.io/client-go/kubernetes/typed/certificates/v1beta1"
certificateslisters "k8s.io/client-go/listers/certificates/v1beta1"
"k8s.io/client-go/tools/cache"
@@ -17,7 +16,7 @@ import (
var _ CSRControl = &v1beta1CSRControl{}
type v1beta1CSRControl struct {
hubCSRInformer certificatesinformers.CertificateSigningRequestInformer
hubCSRInformer cache.SharedIndexInformer
hubCSRLister certificateslisters.CertificateSigningRequestLister
hubCSRClient csrclient.CertificateSigningRequestInterface
}
@@ -77,7 +76,7 @@ func (v *v1beta1CSRControl) create(ctx context.Context, recorder events.Recorder
}
func (v *v1beta1CSRControl) Informer() cache.SharedIndexInformer {
return v.hubCSRInformer.Informer()
return v.hubCSRInformer
}
func (v *v1beta1CSRControl) get(name string) (metav1.Object, error) {

View File

@@ -1,4 +1,4 @@
package clientcert
package csr
import (
"context"
@@ -6,6 +6,7 @@ import (
"crypto/x509/pkix"
"errors"
"fmt"
"strings"
"time"
"github.com/openshift/library-go/pkg/operator/events"
@@ -15,12 +16,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
certificatesinformers "k8s.io/client-go/informers/certificates"
certificatesv1informers "k8s.io/client-go/informers/certificates/v1"
"k8s.io/client-go/kubernetes"
csrclient "k8s.io/client-go/kubernetes/typed/certificates/v1"
certificatesv1listers "k8s.io/client-go/listers/certificates/v1"
"k8s.io/client-go/tools/cache"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
certutil "k8s.io/client-go/util/cert"
"k8s.io/klog/v2"
@@ -28,15 +27,16 @@ import (
"open-cluster-management.io/ocm/pkg/features"
"open-cluster-management.io/ocm/pkg/registration/helpers"
"open-cluster-management.io/ocm/pkg/registration/hub/user"
)
// IsCertificateValid return true if
// isCertificateValid return true if
// 1) All certs in client certificate are not expired.
// 2) At least one cert matches the given subject if specified
func IsCertificateValid(logger klog.Logger, certData []byte, subject *pkix.Name) (bool, error) {
func isCertificateValid(logger klog.Logger, certData []byte, subject *pkix.Name) (bool, error) {
certs, err := certutil.ParseCertsPEM(certData)
if err != nil {
return false, errors.New("unable to parse certificate")
return false, fmt.Errorf("unable to parse certificate: %v", err)
}
if len(certs) == 0 {
@@ -126,34 +126,26 @@ func getCertValidityPeriod(secret *corev1.Secret) (*time.Time, *time.Time, error
return notBefore, notAfter, nil
}
// BuildKubeconfig builds a kubeconfig based on a rest config template with a cert/key pair
func BuildKubeconfig(clusterName, server string, caData []byte,
proxyURL, clientCertPath, clientKeyPath string) clientcmdapi.Config {
// Build kubeconfig.
kubeconfig := clientcmdapi.Config{
// Define a cluster stanza based on the bootstrap kubeconfig.
Clusters: map[string]*clientcmdapi.Cluster{
clusterName: {
Server: server,
InsecureSkipTLSVerify: false,
CertificateAuthorityData: caData,
ProxyURL: proxyURL,
}},
// Define auth based on the obtained client cert.
AuthInfos: map[string]*clientcmdapi.AuthInfo{"default-auth": {
ClientCertificate: clientCertPath,
ClientKey: clientKeyPath,
}},
// Define a context that connects the auth info and cluster, and set it as the default
Contexts: map[string]*clientcmdapi.Context{"default-context": {
Cluster: clusterName,
AuthInfo: "default-auth",
Namespace: "configuration",
}},
CurrentContext: "default-context",
// GetClusterAgentNamesFromCertificate returns the cluster name and agent name by parsing
// the common name of the certification
func GetClusterAgentNamesFromCertificate(certData []byte) (clusterName, agentName string, err error) {
certs, err := certutil.ParseCertsPEM(certData)
if err != nil {
return "", "", fmt.Errorf("unable to parse certificate: %w", err)
}
return kubeconfig
for _, cert := range certs {
if ok := strings.HasPrefix(cert.Subject.CommonName, user.SubjectPrefix); !ok {
continue
}
names := strings.Split(strings.TrimPrefix(cert.Subject.CommonName, user.SubjectPrefix), ":")
if len(names) != 2 {
continue
}
return names[0], names[1], nil
}
return "", "", nil
}
type CSRControl interface {
@@ -168,7 +160,7 @@ type CSRControl interface {
var _ CSRControl = &v1CSRControl{}
type v1CSRControl struct {
hubCSRInformer certificatesv1informers.CertificateSigningRequestInformer
hubCSRInformer cache.SharedIndexInformer
hubCSRLister certificatesv1listers.CertificateSigningRequestLister
hubCSRClient csrclient.CertificateSigningRequestInterface
}
@@ -224,7 +216,7 @@ func (v *v1CSRControl) create(ctx context.Context, recorder events.Recorder, obj
}
func (v *v1CSRControl) Informer() cache.SharedIndexInformer {
return v.hubCSRInformer.Informer()
return v.hubCSRInformer
}
func (v *v1CSRControl) get(name string) (metav1.Object, error) {
@@ -250,7 +242,7 @@ func NewCSRControl(logger klog.Logger, hubCSRInformer certificatesinformers.Inte
}
if !v1CSRSupported && v1beta1CSRSupported {
csrCtrl := &v1beta1CSRControl{
hubCSRInformer: hubCSRInformer.V1beta1().CertificateSigningRequests(),
hubCSRInformer: hubCSRInformer.V1beta1().CertificateSigningRequests().Informer(),
hubCSRLister: hubCSRInformer.V1beta1().CertificateSigningRequests().Lister(),
hubCSRClient: hubKubeClient.CertificatesV1beta1().CertificateSigningRequests(),
}
@@ -260,7 +252,7 @@ func NewCSRControl(logger klog.Logger, hubCSRInformer certificatesinformers.Inte
}
return &v1CSRControl{
hubCSRInformer: hubCSRInformer.V1().CertificateSigningRequests(),
hubCSRInformer: hubCSRInformer.V1().CertificateSigningRequests().Informer(),
hubCSRLister: hubCSRInformer.V1().CertificateSigningRequests().Lister(),
hubCSRClient: hubKubeClient.CertificatesV1().CertificateSigningRequests(),
}, nil

View File

@@ -1,4 +1,4 @@
package clientcert
package csr
import (
"testing"

View File

@@ -1,4 +1,4 @@
package clientcert
package csr
import (
"crypto/x509/pkix"
@@ -12,11 +12,13 @@ import (
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/client-go/listers/certificates/v1"
"k8s.io/client-go/tools/cache"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
certutil "k8s.io/client-go/util/cert"
"k8s.io/klog/v2/ktesting"
testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
testinghelpers "open-cluster-management.io/ocm/pkg/registration/helpers/testing"
"open-cluster-management.io/ocm/pkg/registration/register"
)
func TestIsCSRApproved(t *testing.T) {
@@ -145,7 +147,7 @@ func TestIsCertificateValid(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
logger, _ := ktesting.NewTestContext(t)
isValid, _ := IsCertificateValid(logger, c.testCert.Cert, c.subject)
isValid, _ := isCertificateValid(logger, c.testCert.Cert, c.subject)
if isValid != c.isValid {
t.Errorf("expected %t, but got %t", c.isValid, isValid)
}
@@ -239,8 +241,31 @@ func TestBuildKubeconfig(t *testing.T) {
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
kubeconfig := BuildKubeconfig("default-cluster",
c.server, c.caData, c.proxyURL, c.clientCertFile, c.clientKeyFile)
bootstrapKubeconfig := &clientcmdapi.Config{
Clusters: map[string]*clientcmdapi.Cluster{
"default-cluster": {
Server: c.server,
InsecureSkipTLSVerify: false,
CertificateAuthorityData: c.caData,
ProxyURL: c.proxyURL,
}},
// Define a context that connects the auth info and cluster, and set it as the default
Contexts: map[string]*clientcmdapi.Context{register.DefaultKubeConfigContext: {
Cluster: "default-cluster",
AuthInfo: register.DefaultKubeConfigAuth,
Namespace: "configuration",
}},
CurrentContext: register.DefaultKubeConfigContext,
AuthInfos: map[string]*clientcmdapi.AuthInfo{
register.DefaultKubeConfigAuth: {
ClientCertificate: c.clientCertFile,
ClientKey: c.clientKeyFile,
},
},
}
registerImpl := &CSRDriver{}
kubeconfig := registerImpl.BuildKubeConfigFromTemplate(bootstrapKubeconfig)
currentContext, ok := kubeconfig.Contexts[kubeconfig.CurrentContext]
if !ok {
t.Errorf("current context %q not found: %v", kubeconfig.CurrentContext, kubeconfig)

View File

@@ -1,4 +1,4 @@
package clientcert
package csr
import (
"context"
@@ -6,41 +6,35 @@ import (
"crypto/x509/pkix"
"fmt"
"math/rand"
"os"
"path"
"reflect"
"time"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
corev1informers "k8s.io/client-go/informers/core/v1"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
certutil "k8s.io/client-go/util/cert"
"k8s.io/client-go/util/keyutil"
"k8s.io/klog/v2"
"open-cluster-management.io/ocm/pkg/registration/register"
)
const (
// KubeconfigFile is the name of the kubeconfig file in kubeconfigSecret
KubeconfigFile = "kubeconfig"
// TLSKeyFile is the name of tls key file in kubeconfigSecret
TLSKeyFile = "tls.key"
// TLSCertFile is the name of the tls cert file in kubeconfigSecret
TLSCertFile = "tls.crt"
ClusterNameFile = "cluster-name"
AgentNameFile = "agent-name"
// ClusterCertificateRotatedCondition is a condition type that client certificate is rotated
ClusterCertificateRotatedCondition = "ClusterCertificateRotated"
)
// ControllerResyncInterval is exposed so that integration tests can crank up the constroller sync speed.
var ControllerResyncInterval = 5 * time.Minute
// CSROption includes options that is used to create and monitor csrs
type CSROption struct {
// ObjectMeta is the ObjectMeta shared by all created csrs. It should use GenerateName instead of Name
@@ -68,37 +62,13 @@ type CSROption struct {
// EventFilterFunc matches csrs created with above options
EventFilterFunc factory.EventFilterFunc
CSRControl CSRControl
// HaltCSRCreation halt the csr creation
HaltCSRCreation func() bool
}
// ClientCertOption includes options that is used to create client certificate
type ClientCertOption struct {
// SecretNamespace is the namespace of the secret containing client certificate.
SecretNamespace string
// SecretName is the name of the secret containing client certificate. The secret will be created if
// it does not exist.
SecretName string
// AdditionalSecretData contains data that will be added into client certificate secret besides tls.key/tls.crt
// Once AdditionalSecretData changes, the client cert will be recreated.
AdditionalSecretData map[string][]byte
}
type StatusUpdateFunc func(ctx context.Context, cond metav1.Condition) error
// clientCertificateController implements the common logic of hub client certification creation/rotation. It
// creates a client certificate and rotates it before it becomes expired by using csrs. The client
// certificate generated is stored in a specific secret with the keys below:
// 1). tls.key: tls key file
// 2). tls.crt: tls cert file
type clientCertificateController struct {
ClientCertOption
CSROption
csrControl CSRControl
// managementCoreClient is used to create/delete hub kubeconfig secret on the management cluster
managementCoreClient corev1client.CoreV1Interface
controllerName string
type CSRDriver struct {
// csrName is the name of csr created by controller and waiting for approval.
csrName string
@@ -110,66 +80,15 @@ type clientCertificateController struct {
// 3. csrName set, keyData set: we are waiting for a new cert to be signed.
// 4. csrName empty, keydata set: the CSR failed to create, this shouldn't happen, it's a bug.
keyData []byte
statusUpdater StatusUpdateFunc
}
// NewClientCertificateController return an instance of clientCertificateController
func NewClientCertificateController(
clientCertOption ClientCertOption,
csrOption CSROption,
csrControl CSRControl,
managementSecretInformer corev1informers.SecretInformer,
managementCoreClient corev1client.CoreV1Interface,
statusUpdater StatusUpdateFunc,
recorder events.Recorder,
controllerName string,
) factory.Controller {
c := clientCertificateController{
ClientCertOption: clientCertOption,
CSROption: csrOption,
csrControl: csrControl,
managementCoreClient: managementCoreClient,
controllerName: controllerName,
statusUpdater: statusUpdater,
}
return factory.New().
WithFilteredEventsInformersQueueKeyFunc(func(obj runtime.Object) string {
return factory.DefaultQueueKey
}, func(obj interface{}) bool {
accessor, err := meta.Accessor(obj)
if err != nil {
return false
}
// only enqueue a specific secret
if accessor.GetNamespace() == c.SecretNamespace && accessor.GetName() == c.SecretName {
return true
}
return false
}, managementSecretInformer.Informer()).
WithFilteredEventsInformersQueueKeyFunc(func(obj runtime.Object) string {
return factory.DefaultQueueKey
}, c.EventFilterFunc, csrControl.Informer()).
WithSync(c.sync).
ResyncEvery(ControllerResyncInterval).
ToController(controllerName, recorder)
}
func (c *clientCertificateController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
func (c *CSRDriver) Process(
ctx context.Context, controllerName string, secret *corev1.Secret, additionalSecretData map[string][]byte,
recorder events.Recorder, opt any) (*corev1.Secret, *metav1.Condition, error) {
logger := klog.FromContext(ctx)
// get secret containing client certificate
secret, err := c.managementCoreClient.Secrets(c.SecretNamespace).Get(ctx, c.SecretName, metav1.GetOptions{})
switch {
case apierrors.IsNotFound(err):
secret = &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: c.SecretNamespace,
Name: c.SecretName,
},
}
case err != nil:
return fmt.Errorf("unable to get secret %q: %w", c.SecretNamespace+"/"+c.SecretName, err)
csrOption, ok := opt.(*CSROption)
if !ok {
return nil, nil, fmt.Errorf("option type is not correct")
}
// reconcile pending csr if exists
@@ -182,7 +101,7 @@ func (c *clientCertificateController) sync(ctx context.Context, syncCtx factory.
}
// skip if csr is not approved yet
isApproved, err := c.csrControl.isApproved(c.csrName)
isApproved, err := csrOption.CSRControl.isApproved(c.csrName)
if err != nil {
return nil, err
}
@@ -191,7 +110,7 @@ func (c *clientCertificateController) sync(ctx context.Context, syncCtx factory.
}
// skip if csr is not issued
certData, err := c.csrControl.getIssuedCertificate(c.csrName)
certData, err := csrOption.CSRControl.getIssuedCertificate(c.csrName)
if err != nil {
return nil, err
}
@@ -199,7 +118,7 @@ func (c *clientCertificateController) sync(ctx context.Context, syncCtx factory.
return nil, nil
}
logger.V(4).Info("Sync csr", "name", c.csrName)
logger.Info("Sync csr", "name", c.csrName)
// check if cert in csr status matches with the corresponding private key
if c.keyData == nil {
return nil, fmt.Errorf("no private key found for certificate in csr: %s", c.csrName)
@@ -219,40 +138,24 @@ func (c *clientCertificateController) sync(ctx context.Context, syncCtx factory.
if err != nil {
c.reset()
if updateErr := c.statusUpdater(ctx, metav1.Condition{
return secret, &metav1.Condition{
Type: "ClusterCertificateRotated",
Status: metav1.ConditionFalse,
Reason: "ClientCertificateUpdateFailed",
Message: fmt.Sprintf("Failed to rotated client certificate %v", err),
}); updateErr != nil {
return updateErr
}
return err
}, err
}
if len(newSecretConfig) == 0 {
return nil
return nil, nil, nil
}
// append additional data into client certificate secret
for k, v := range c.AdditionalSecretData {
newSecretConfig[k] = v
}
secret.Data = newSecretConfig
// save the changes into secret
if err := saveSecret(c.managementCoreClient, c.SecretNamespace, secret); err != nil {
if updateErr := c.statusUpdater(ctx, metav1.Condition{
Type: "ClusterCertificateRotated",
Status: metav1.ConditionFalse,
Reason: "ClientCertificateUpdateFailed",
Message: fmt.Sprintf("Failed to rotated client certificate %v", err),
}); updateErr != nil {
return updateErr
}
return err
for k, v := range newSecretConfig {
secret.Data[k] = v
}
notBefore, notAfter, err := getCertValidityPeriod(secret)
cond := metav1.Condition{
cond := &metav1.Condition{
Type: "ClusterCertificateRotated",
Status: metav1.ConditionTrue,
Reason: "ClientCertificateUpdated",
@@ -260,25 +163,17 @@ func (c *clientCertificateController) sync(ctx context.Context, syncCtx factory.
}
if err != nil {
cond = metav1.Condition{
cond = &metav1.Condition{
Type: "ClusterCertificateRotated",
Status: metav1.ConditionFalse,
Reason: "ClientCertificateUpdateFailed",
Message: fmt.Sprintf("Failed to rotated client certificate %v", err),
}
} else {
recorder.Eventf("ClientCertificateCreated", "A new client certificate for %s is available", controllerName)
}
if updateErr := c.statusUpdater(ctx, cond); updateErr != nil {
return updateErr
}
if err != nil {
c.reset()
return err
}
syncCtx.Recorder().Eventf("ClientCertificateCreated", "A new client certificate for %s is available", c.controllerName)
c.reset()
return nil
return secret, cond, err
}
// create a csr to request new client certificate if
@@ -287,30 +182,28 @@ func (c *clientCertificateController) sync(ctx context.Context, syncCtx factory.
// c. client certificate exists and has less than a random percentage range from 20% to 25% of its life remaining;
shouldCreate, err := shouldCreateCSR(
logger,
c.controllerName,
controllerName,
secret,
syncCtx.Recorder(),
c.Subject,
c.AdditionalSecretData)
recorder,
csrOption.Subject,
additionalSecretData)
if err != nil {
return err
return secret, nil, err
}
if !shouldCreate {
return nil
return nil, nil, nil
}
shouldHalt := c.CSROption.HaltCSRCreation()
shouldHalt := csrOption.HaltCSRCreation()
if shouldHalt {
if updateErr := c.statusUpdater(ctx, metav1.Condition{
recorder.Eventf("ClientCertificateCreationHalted",
"Stop creating csr since there are too many csr created already on hub", controllerName)
return nil, &metav1.Condition{
Type: "ClusterCertificateRotated",
Status: metav1.ConditionFalse,
Reason: "ClientCertificateUpdateFailed",
Message: "Stop creating csr since there are too many csr created already on hub",
}); updateErr != nil {
return updateErr
}
syncCtx.Recorder().Eventf("ClientCertificateCreationHalted", "Stop creating csr since there are too many csr created already on hub", c.controllerName)
return nil
}, nil
}
keyData, createdCSRName, err := func() ([]byte, string, error) {
@@ -324,48 +217,92 @@ func (c *clientCertificateController) sync(ctx context.Context, syncCtx factory.
if err != nil {
return keyData, "", fmt.Errorf("invalid private key for certificate request: %w", err)
}
csrData, err := certutil.MakeCSR(privateKey, c.Subject, c.DNSNames, nil)
csrData, err := certutil.MakeCSR(privateKey, csrOption.Subject, csrOption.DNSNames, nil)
if err != nil {
return keyData, "", fmt.Errorf("unable to generate certificate request: %w", err)
}
createdCSRName, err := c.csrControl.create(ctx, syncCtx.Recorder(), c.ObjectMeta, csrData, c.SignerName, c.ExpirationSeconds)
createdCSRName, err := csrOption.CSRControl.create(
ctx, recorder, csrOption.ObjectMeta, csrData, csrOption.SignerName, csrOption.ExpirationSeconds)
if err != nil {
return keyData, "", err
}
return keyData, createdCSRName, nil
}()
if err != nil {
if updateErr := c.statusUpdater(ctx, metav1.Condition{
return nil, &metav1.Condition{
Type: "ClusterCertificateRotated",
Status: metav1.ConditionFalse,
Reason: "ClientCertificateUpdateFailed",
Message: fmt.Sprintf("Failed to create CSR %v", err),
}); updateErr != nil {
return updateErr
}
return err
}, err
}
c.keyData = keyData
c.csrName = createdCSRName
return nil
return nil, nil, nil
}
func saveSecret(spokeCoreClient corev1client.CoreV1Interface, secretNamespace string, secret *corev1.Secret) error {
var err error
if secret.ResourceVersion == "" {
_, err = spokeCoreClient.Secrets(secretNamespace).Create(context.Background(), secret, metav1.CreateOptions{})
return err
}
_, err = spokeCoreClient.Secrets(secretNamespace).Update(context.Background(), secret, metav1.UpdateOptions{})
return err
}
func (c *clientCertificateController) reset() {
func (c *CSRDriver) reset() {
c.csrName = ""
c.keyData = nil
}
func (c *CSRDriver) BuildKubeConfigFromTemplate(kubeConfig *clientcmdapi.Config) *clientcmdapi.Config {
kubeConfig.AuthInfos = map[string]*clientcmdapi.AuthInfo{register.DefaultKubeConfigAuth: {
ClientCertificate: TLSCertFile,
ClientKey: TLSKeyFile,
}}
return kubeConfig
}
func (c *CSRDriver) InformerHandler(option any) (cache.SharedIndexInformer, factory.EventFilterFunc) {
csrOption, ok := option.(*CSROption)
if !ok {
utilruntime.Must(fmt.Errorf("option type is not correct"))
}
return csrOption.CSRControl.Informer(), csrOption.EventFilterFunc
}
func (c *CSRDriver) IsHubKubeConfigValid(ctx context.Context, secretOption register.SecretOption) (bool, error) {
logger := klog.FromContext(ctx)
keyPath := path.Join(secretOption.HubKubeconfigDir, TLSKeyFile)
if _, err := os.Stat(keyPath); os.IsNotExist(err) {
logger.V(4).Info("TLS key file not found", "keyPath", keyPath)
return false, nil
}
certPath := path.Join(secretOption.HubKubeconfigDir, TLSCertFile)
certData, err := os.ReadFile(path.Clean(certPath))
if err != nil {
logger.V(4).Info("Unable to load TLS cert file", "certPath", certPath)
return false, nil
}
// only set when clustername/agentname are set
if len(secretOption.ClusterName) > 0 && len(secretOption.AgentName) > 0 {
// check if the tls certificate is issued for the current cluster/agent
clusterNameInCert, agentNameInCert, err := GetClusterAgentNamesFromCertificate(certData)
if err != nil {
return false, nil
}
if secretOption.ClusterName != clusterNameInCert || secretOption.AgentName != agentNameInCert {
logger.V(4).Info("Certificate in file is issued for different agent",
"certPath", certPath,
"issuedFor", fmt.Sprintf("%s:%s", secretOption.ClusterName, secretOption.AgentName),
"expectedFor", fmt.Sprintf("%s:%s", secretOption.ClusterName, secretOption.AgentName))
return false, nil
}
}
return isCertificateValid(logger, certData, nil)
}
func NewCSRDriver() register.RegisterDriver {
return &CSRDriver{}
}
func shouldCreateCSR(
logger klog.Logger,
controllerName string,
@@ -375,7 +312,7 @@ func shouldCreateCSR(
additionalSecretData map[string][]byte) (bool, error) {
// create a csr to request new client certificate if
// a.there is no valid client certificate issued for the current cluster/agent
valid, err := IsCertificateValid(logger, secret.Data[TLSCertFile], subject)
valid, err := isCertificateValid(logger, secret.Data[TLSCertFile], subject)
if err != nil {
recorder.Eventf("CertificateValidationFailed", "Failed to validate client certificate for %s: %v", controllerName, err)
return true, nil

View File

@@ -0,0 +1,412 @@
package csr
import (
"context"
"crypto/x509/pkix"
"fmt"
"os"
"path"
"testing"
"time"
"github.com/openshift/library-go/pkg/operator/events"
certificates "k8s.io/api/certificates/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/rand"
kubefake "k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2/ktesting"
testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
testinghelpers "open-cluster-management.io/ocm/pkg/registration/helpers/testing"
"open-cluster-management.io/ocm/pkg/registration/hub/user"
"open-cluster-management.io/ocm/pkg/registration/register"
)
const (
testNamespace = "testns"
testAgentName = "testagent"
testSecretName = "testsecret"
testCSRName = "testcsr"
)
var commonName = fmt.Sprintf("%s%s:%s", user.SubjectPrefix, testinghelpers.TestManagedClusterName, testAgentName)
func TestProcess(t *testing.T) {
testSubject := &pkix.Name{
CommonName: commonName,
}
cases := []struct {
name string
queueKey string
secret *corev1.Secret
approvedCSRCert *testinghelpers.TestCert
keyDataExpected bool
csrNameExpected bool
expectedCondition *metav1.Condition
validateActions func(t *testing.T, hubActions []clienttesting.Action, secret *corev1.Secret)
}{
{
name: "syc csr after bootstrap",
queueKey: testSecretName,
secret: testinghelpers.NewHubKubeconfigSecret(
testNamespace, testSecretName, "1", nil,
map[string][]byte{
register.ClusterNameFile: []byte(testinghelpers.TestManagedClusterName),
register.AgentNameFile: []byte(testAgentName),
}),
expectedCondition: &metav1.Condition{
Type: ClusterCertificateRotatedCondition,
Status: metav1.ConditionTrue,
},
approvedCSRCert: testinghelpers.NewTestCert(commonName, 10*time.Second),
validateActions: func(t *testing.T, hubActions []clienttesting.Action, secret *corev1.Secret) {
logger, _ := ktesting.NewTestContext(t)
testingcommon.AssertActions(t, hubActions, "get", "get")
valid, err := isCertificateValid(logger, secret.Data[TLSCertFile], testSubject)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if !valid {
t.Error("client certificate is invalid")
}
},
},
{
name: "sync a valid hub kubeconfig secret",
queueKey: testSecretName,
secret: testinghelpers.NewHubKubeconfigSecret(
testNamespace, testSecretName, "1",
testinghelpers.NewTestCert(commonName, 10000*time.Second), map[string][]byte{
register.ClusterNameFile: []byte(testinghelpers.TestManagedClusterName),
register.AgentNameFile: []byte(testAgentName),
register.KubeconfigFile: testinghelpers.NewKubeconfig(
"c1", "https://127.0.0.1:6001", "", nil, nil, nil),
}),
validateActions: func(t *testing.T, hubActions []clienttesting.Action, secret *corev1.Secret) {
testingcommon.AssertNoActions(t, hubActions)
if secret != nil {
t.Errorf("expect the secret not to be generated")
}
},
},
{
name: "sync an expiring hub kubeconfig secret",
queueKey: testSecretName,
secret: testinghelpers.NewHubKubeconfigSecret(
testNamespace, testSecretName, "1",
testinghelpers.NewTestCert(commonName, -3*time.Second),
map[string][]byte{
register.ClusterNameFile: []byte(testinghelpers.TestManagedClusterName),
register.AgentNameFile: []byte(testAgentName),
register.KubeconfigFile: testinghelpers.NewKubeconfig(
"c1", "https://127.0.0.1:6001", "", nil, nil, nil),
}),
keyDataExpected: true,
csrNameExpected: true,
validateActions: func(t *testing.T, hubActions []clienttesting.Action, secret *corev1.Secret) {
testingcommon.AssertActions(t, hubActions, "create")
actual := hubActions[0].(clienttesting.CreateActionImpl).Object
if _, ok := actual.(*unstructured.Unstructured); !ok {
t.Errorf("expected csr was created, but failed")
}
},
},
{
name: "sync when additional secret data changes",
queueKey: testSecretName,
secret: testinghelpers.NewHubKubeconfigSecret(
testNamespace, testSecretName, "1",
testinghelpers.NewTestCert(commonName, 10000*time.Second),
map[string][]byte{
register.ClusterNameFile: []byte(testinghelpers.TestManagedClusterName),
register.AgentNameFile: []byte("invalid-name"),
}),
keyDataExpected: true,
csrNameExpected: true,
validateActions: func(t *testing.T, hubActions []clienttesting.Action, secret *corev1.Secret) {
testingcommon.AssertActions(t, hubActions, "create")
actual := hubActions[0].(clienttesting.CreateActionImpl).Object
if _, ok := actual.(*unstructured.Unstructured); !ok {
t.Errorf("expected csr was created, but failed")
}
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
ctrl := &mockCSRControl{}
var csrs []runtime.Object
if c.approvedCSRCert != nil {
csr := testinghelpers.NewApprovedCSR(testinghelpers.CSRHolder{Name: testCSRName})
csr.Status.Certificate = c.approvedCSRCert.Cert
csrs = append(csrs, csr)
ctrl.approved = true
ctrl.issuedCertData = c.approvedCSRCert.Cert
}
hubKubeClient := kubefake.NewSimpleClientset(csrs...)
ctrl.csrClient = &hubKubeClient.Fake
// GenerateName is not working for fake clent, we set the name with prepend reactor
hubKubeClient.PrependReactor(
"create",
"certificatesigningrequests",
func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) {
return true, testinghelpers.NewCSR(testinghelpers.CSRHolder{Name: testCSRName}), nil
},
)
additionalSecretData := map[string][]byte{
register.ClusterNameFile: []byte(testinghelpers.TestManagedClusterName),
register.AgentNameFile: []byte(testAgentName),
}
csrOption := &CSROption{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "test-",
},
Subject: testSubject,
SignerName: certificates.KubeAPIServerClientSignerName,
HaltCSRCreation: func() bool { return false },
CSRControl: ctrl,
}
driver := &CSRDriver{}
if c.approvedCSRCert != nil {
driver.csrName = testCSRName
driver.keyData = c.approvedCSRCert.Key
}
syncCtx := testingcommon.NewFakeSyncContext(t, "test")
secret, cond, err := driver.Process(
context.TODO(), "test", c.secret, additionalSecretData, syncCtx.Recorder(), csrOption)
if err != nil {
t.Errorf("unexpected error %v", err)
}
hasKeyData := driver.keyData != nil
if c.keyDataExpected != hasKeyData {
t.Error("controller.keyData should be set")
}
hasCSRName := driver.csrName != ""
if c.csrNameExpected != hasCSRName {
t.Error("controller.csrName should be set")
}
if !conditionEqual(c.expectedCondition, cond) {
t.Errorf("condition is not correct, expected %v, got %v", c.expectedCondition, cond)
}
c.validateActions(t, hubKubeClient.Actions(), secret)
})
}
}
var _ CSRControl = &mockCSRControl{}
func conditionEqual(expected, actual *metav1.Condition) bool {
if expected == nil && actual == nil {
return true
}
if expected == nil || actual == nil {
return false
}
if expected.Type != actual.Type {
return false
}
if string(expected.Status) != string(actual.Status) {
return false
}
return true
}
type mockCSRControl struct {
approved bool
issuedCertData []byte
csrClient *clienttesting.Fake
}
func (m *mockCSRControl) create(
_ context.Context, _ events.Recorder, objMeta metav1.ObjectMeta, _ []byte, _ string, _ *int32) (string, error) {
mockCSR := &unstructured.Unstructured{}
_, err := m.csrClient.Invokes(clienttesting.CreateActionImpl{
ActionImpl: clienttesting.ActionImpl{
Verb: "create",
},
Object: mockCSR,
}, nil)
return objMeta.Name + rand.String(4), err
}
func (m *mockCSRControl) isApproved(name string) (bool, error) {
_, err := m.csrClient.Invokes(clienttesting.GetActionImpl{
ActionImpl: clienttesting.ActionImpl{
Verb: "get",
Resource: certificates.SchemeGroupVersion.WithResource("certificatesigningrequests"),
},
Name: name,
}, nil)
return m.approved, err
}
func (m *mockCSRControl) getIssuedCertificate(name string) ([]byte, error) {
_, err := m.csrClient.Invokes(clienttesting.GetActionImpl{
ActionImpl: clienttesting.ActionImpl{
Verb: "get",
Resource: certificates.SchemeGroupVersion.WithResource("certificatesigningrequests"),
},
Name: name,
}, nil)
return m.issuedCertData, err
}
func (m *mockCSRControl) Informer() cache.SharedIndexInformer {
panic("implement me")
}
func TestIsHubKubeConfigValidFunc(t *testing.T) {
tempDir, err := os.MkdirTemp("", "testvalidhubclientconfig")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
defer os.RemoveAll(tempDir)
cert1 := testinghelpers.NewTestCert("system:open-cluster-management:cluster1:agent1", 60*time.Second)
cert2 := testinghelpers.NewTestCert("test", 60*time.Second)
kubeconfig := testinghelpers.NewKubeconfig("c1", "https://127.0.0.1:6001", "", nil, nil, nil)
cases := []struct {
name string
clusterName string
agentName string
kubeconfig []byte
bootstapKubeconfig []byte
tlsCert []byte
tlsKey []byte
isValid bool
}{
{
name: "no kubeconfig",
isValid: false,
},
{
name: "no tls key",
kubeconfig: kubeconfig,
isValid: false,
},
{
name: "no tls cert",
kubeconfig: kubeconfig,
tlsKey: cert1.Key,
isValid: false,
},
{
name: "cert is not issued for cluster1:agent1",
clusterName: "cluster1",
agentName: "agent1",
kubeconfig: kubeconfig,
tlsKey: cert2.Key,
tlsCert: cert2.Cert,
isValid: false,
},
{
name: "context cluster changes",
clusterName: "cluster1",
agentName: "agent1",
kubeconfig: kubeconfig,
bootstapKubeconfig: testinghelpers.NewKubeconfig("c2", "https://127.0.0.1:6001", "", nil, nil, nil),
tlsKey: cert1.Key,
tlsCert: cert1.Cert,
isValid: false,
},
{
name: "hub server url changes",
clusterName: "cluster1",
agentName: "agent1",
kubeconfig: kubeconfig,
bootstapKubeconfig: testinghelpers.NewKubeconfig("c1", "https://127.0.0.2:6001", "", nil, nil, nil),
tlsKey: cert1.Key,
tlsCert: cert1.Cert,
isValid: false,
},
{
name: "proxy url changes",
clusterName: "cluster1",
agentName: "agent1",
kubeconfig: kubeconfig,
bootstapKubeconfig: testinghelpers.NewKubeconfig("c1", "https://127.0.0.1:6001", "https://127.0.0.1:3129", nil, nil, nil),
tlsKey: cert1.Key,
tlsCert: cert1.Cert,
isValid: false,
},
{
name: "ca bundle changes",
clusterName: "cluster1",
agentName: "agent1",
kubeconfig: kubeconfig,
bootstapKubeconfig: testinghelpers.NewKubeconfig("c1", "https://127.0.0.1:6001", "", []byte("test"), nil, nil),
tlsKey: cert1.Key,
tlsCert: cert1.Cert,
isValid: false,
},
{
name: "valid hub client config",
clusterName: "cluster1",
agentName: "agent1",
kubeconfig: kubeconfig,
bootstapKubeconfig: testinghelpers.NewKubeconfig("c1", "https://127.0.0.1:6001", "", nil, nil, nil),
tlsKey: cert1.Key,
tlsCert: cert1.Cert,
isValid: true,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
driver := NewCSRDriver()
secretOption := register.SecretOption{
ClusterName: c.clusterName,
AgentName: c.agentName,
HubKubeconfigDir: tempDir,
HubKubeconfigFile: path.Join(tempDir, "kubeconfig"),
}
if c.kubeconfig != nil {
testinghelpers.WriteFile(path.Join(tempDir, "kubeconfig"), c.kubeconfig)
}
if c.tlsKey != nil {
testinghelpers.WriteFile(path.Join(tempDir, "tls.key"), c.tlsKey)
}
if c.tlsCert != nil {
testinghelpers.WriteFile(path.Join(tempDir, "tls.crt"), c.tlsCert)
}
if c.bootstapKubeconfig != nil {
bootstrapKubeconfig, err := clientcmd.Load(c.bootstapKubeconfig)
if err != nil {
t.Fatal(err)
}
secretOption.BootStrapKubeConfig = bootstrapKubeconfig
}
valid, err := register.IsHubKubeConfigValidFunc(driver, secretOption)(context.TODO())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if c.isValid != valid {
t.Errorf("expect %t, but %t", c.isValid, valid)
}
})
}
}

View File

@@ -0,0 +1,99 @@
package register
import (
"context"
"os"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/klog/v2"
)
const (
DefaultKubeConfigContext = "default-context"
DefaultKubeConfigAuth = "default-auth"
ClusterNameFile = "cluster-name"
AgentNameFile = "agent-name"
// KubeconfigFile is the name of the kubeconfig file in kubeconfigSecret
KubeconfigFile = "kubeconfig"
)
type SecretOption struct {
// SecretNamespace is the namespace of the secret containing client certificate.
SecretNamespace string
// SecretName is the name of the secret containing client certificate. The secret will be created if
// it does not exist.
SecretName string
// BootStrapKubeConfig is the kubeconfig to generate hubkubeconfig, if set, create kubeconfig value
// in the secret.
BootStrapKubeConfig *clientcmdapi.Config
// ClusterName is the cluster name, and it is set as a secret value if it is set.
ClusterName string
// AgentName is the agent name and it is set as a secret value if it is set.
AgentName string
HubKubeconfigFile string
HubKubeconfigDir string
ManagementSecretInformer cache.SharedIndexInformer
ManagementCoreClient corev1client.CoreV1Interface
}
// StatusUpdateFunc is A function to update the condition of the corresponding object.
type StatusUpdateFunc func(ctx context.Context, cond metav1.Condition) error
// RegisterDriver is the interface that each driver should implement
type RegisterDriver interface {
// IsHubKubeConfigValid is to check if the current hube-kubeconfig is valid. It is called before
// and after bootstrap to confirm if the bootstrap is finished.
IsHubKubeConfigValid(ctx context.Context, secretOption SecretOption) (bool, error)
// BuildKubeConfigFromTemplate builds the kubeconfig from the template kubeconfig
BuildKubeConfigFromTemplate(template *clientcmdapi.Config) *clientcmdapi.Config
// Process update secret with credentials
Process(
ctx context.Context,
name string,
secret *corev1.Secret,
additionalSecretData map[string][]byte,
recorder events.Recorder, opt any) (*corev1.Secret, *metav1.Condition, error)
// InformerHandler returns informer of the related object. If no object needs to be watched, the func could
// return nil, nil.
InformerHandler(option any) (cache.SharedIndexInformer, factory.EventFilterFunc)
}
func IsHubKubeConfigValidFunc(driver RegisterDriver, secretOption SecretOption) wait.ConditionWithContextFunc {
return func(ctx context.Context) (bool, error) {
logger := klog.FromContext(ctx)
if _, err := os.Stat(secretOption.HubKubeconfigFile); os.IsNotExist(err) {
logger.V(4).Info("Kubeconfig file not found", "kubeconfigPath", secretOption.HubKubeconfigFile)
return false, nil
}
// create a kubeconfig with references to the key/cert files in the same secret
hubKubeconfig, err := clientcmd.LoadFromFile(secretOption.HubKubeconfigFile)
if err != nil {
return false, err
}
if secretOption.BootStrapKubeConfig != nil {
if valid, err := IsHubKubeconfigValid(secretOption.BootStrapKubeConfig, hubKubeconfig); !valid || err != nil {
return valid, err
}
}
return driver.IsHubKubeConfigValid(ctx, secretOption)
}
}

View File

@@ -0,0 +1,165 @@
package register
import (
"context"
"fmt"
"time"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/clientcmd"
)
// ControllerResyncInterval is exposed so that integration tests can crank up the constroller sync speed.
var ControllerResyncInterval = 5 * time.Minute
// secretController run process in driver to get credential and keeps the defeined secret in secretOption update-to-date
type secretController struct {
SecretOption
option any
driver RegisterDriver
controllerName string
statusUpdater StatusUpdateFunc
additionalSecretData map[string][]byte
secretToSave *corev1.Secret
}
// NewSecretController return an instance of secretController
func NewSecretController(
secretOption SecretOption,
option any,
driver RegisterDriver,
statusUpdater StatusUpdateFunc,
recorder events.Recorder,
controllerName string,
) factory.Controller {
additionalSecretData := map[string][]byte{}
if secretOption.BootStrapKubeConfig != nil {
kubeConfigTemplate, err := BaseKubeConfigFromBootStrap(secretOption.BootStrapKubeConfig)
if err != nil {
utilruntime.Must(err)
}
kubeConfig := driver.BuildKubeConfigFromTemplate(kubeConfigTemplate)
if kubeConfig != nil {
kubeconfigData, err := clientcmd.Write(*kubeConfig)
if err != nil {
utilruntime.Must(err)
}
additionalSecretData[KubeconfigFile] = kubeconfigData
}
}
if len(secretOption.ClusterName) > 0 {
additionalSecretData[ClusterNameFile] = []byte(secretOption.ClusterName)
}
if len(secretOption.AgentName) > 0 {
additionalSecretData[AgentNameFile] = []byte(secretOption.AgentName)
}
c := secretController{
SecretOption: secretOption,
driver: driver,
controllerName: controllerName,
statusUpdater: statusUpdater,
additionalSecretData: additionalSecretData,
option: option,
}
f := factory.New().
WithFilteredEventsInformersQueueKeyFunc(func(obj runtime.Object) string {
return factory.DefaultQueueKey
}, func(obj interface{}) bool {
accessor, err := meta.Accessor(obj)
if err != nil {
return false
}
// only enqueue a specific secret
if accessor.GetNamespace() == c.SecretNamespace && accessor.GetName() == c.SecretName {
return true
}
return false
}, secretOption.ManagementSecretInformer)
driverInformer, driverFilter := driver.InformerHandler(option)
if driverInformer != nil && driverFilter != nil {
f = f.WithFilteredEventsInformersQueueKeyFunc(func(obj runtime.Object) string {
return factory.DefaultQueueKey
}, driverFilter, driverInformer)
}
return f.WithSync(c.sync).
ResyncEvery(ControllerResyncInterval).
ToController(controllerName, recorder)
}
func (c *secretController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
// get secret containing client certificate
secret, err := c.ManagementCoreClient.Secrets(c.SecretNamespace).Get(ctx, c.SecretName, metav1.GetOptions{})
switch {
case apierrors.IsNotFound(err):
secret = &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Namespace: c.SecretNamespace,
Name: c.SecretName,
},
}
case err != nil:
return fmt.Errorf("unable to get secret %q: %w", c.SecretNamespace+"/"+c.SecretName, err)
}
if secret.Data == nil {
secret.Data = map[string][]byte{}
}
if c.secretToSave == nil {
secret, cond, err := c.driver.Process(ctx, c.controllerName, secret, c.additionalSecretData, syncCtx.Recorder(), c.option)
if cond != nil {
if updateErr := c.statusUpdater(ctx, *cond); updateErr != nil {
return updateErr
}
}
if err != nil {
return err
}
if secret == nil {
return nil
}
if len(c.additionalSecretData) > 0 {
// append additional data into client certificate secret
for k, v := range c.additionalSecretData {
secret.Data[k] = v
}
}
c.secretToSave = secret
}
// save the changes into secret
if err := saveSecret(c.ManagementCoreClient, c.SecretNamespace, c.secretToSave); err != nil {
return err
}
syncCtx.Recorder().Eventf("SecretSave", "Secret %s/%s for %s is updated",
c.SecretNamespace, c.SecretName, c.controllerName)
// clean the cached secret.
c.secretToSave = nil
return nil
}
func saveSecret(spokeCoreClient corev1client.CoreV1Interface, secretNamespace string, secret *corev1.Secret) error {
var err error
if secret.ResourceVersion == "" {
_, err = spokeCoreClient.Secrets(secretNamespace).Create(context.Background(), secret, metav1.CreateOptions{})
return err
}
_, err = spokeCoreClient.Secrets(secretNamespace).Update(context.Background(), secret, metav1.UpdateOptions{})
return err
}

View File

@@ -0,0 +1,197 @@
package register
import (
"context"
"testing"
"time"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
corev1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
kubefake "k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
testinghelpers "open-cluster-management.io/ocm/pkg/registration/helpers/testing"
)
func TestSync(t *testing.T) {
commonName := "test"
testCases := []struct {
name string
option SecretOption
secrets []runtime.Object
driver *fakeDriver
expectedCond *metav1.Condition
validatAction func(t *testing.T, actions []clienttesting.Action)
}{
{
name: "create secret without additional data",
option: SecretOption{
SecretName: "test",
SecretNamespace: "test",
},
secrets: []runtime.Object{},
driver: newFakeDriver(
testinghelpers.NewHubKubeconfigSecret(
"test", "test", "",
testinghelpers.NewTestCert(commonName, 100*time.Second), map[string][]byte{}),
&metav1.Condition{Type: "Created", Status: metav1.ConditionTrue}, nil,
),
validatAction: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "get", "create")
},
expectedCond: &metav1.Condition{Type: "Created", Status: metav1.ConditionTrue},
},
{
name: "update secret without additional data",
option: SecretOption{
SecretName: "test",
SecretNamespace: "test",
},
secrets: []runtime.Object{
testinghelpers.NewHubKubeconfigSecret(
"test", "test", "0",
testinghelpers.NewTestCert(commonName, 100*time.Second), map[string][]byte{}),
},
driver: newFakeDriver(
testinghelpers.NewHubKubeconfigSecret(
"test", "test", "1",
testinghelpers.NewTestCert(commonName, 200*time.Second), map[string][]byte{}),
&metav1.Condition{Type: "Created", Status: metav1.ConditionTrue}, nil,
),
validatAction: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "get", "update")
},
expectedCond: &metav1.Condition{Type: "Created", Status: metav1.ConditionTrue},
},
{
name: "nothing to create if there is no secret generated",
option: SecretOption{
SecretName: "test",
SecretNamespace: "test",
},
secrets: []runtime.Object{},
driver: newFakeDriver(nil, nil, nil),
validatAction: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "get")
},
},
{
name: "addition secret data",
option: SecretOption{
SecretName: "test",
SecretNamespace: "test",
ClusterName: "cluster1",
AgentName: "agent1",
BootStrapKubeConfig: &clientcmdapi.Config{
Clusters: map[string]*clientcmdapi.Cluster{"test-cluster": {
Server: "localhost",
InsecureSkipTLSVerify: true,
}},
Contexts: map[string]*clientcmdapi.Context{"test-context": {
Cluster: "test-cluster",
AuthInfo: "test-user",
}},
AuthInfos: map[string]*clientcmdapi.AuthInfo{
"test-user": {
Token: "test-token",
},
},
CurrentContext: "test-context",
},
},
secrets: []runtime.Object{},
driver: newFakeDriver(testinghelpers.NewHubKubeconfigSecret(
"test", "test", "",
testinghelpers.NewTestCert(commonName, 100*time.Second), map[string][]byte{}), nil, nil),
validatAction: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "get", "create")
secret := actions[1].(clienttesting.CreateActionImpl).Object.(*corev1.Secret)
cluster, ok := secret.Data[ClusterNameFile]
if !ok || string(cluster) != "cluster1" {
t.Errorf("cluster name not correct")
}
agent, ok := secret.Data[AgentNameFile]
if !ok || string(agent) != "agent1" {
t.Errorf("agent name not correct")
}
_, ok = secret.Data[KubeconfigFile]
if !ok {
t.Errorf("kubeconfig file should exist")
}
},
},
}
for _, c := range testCases {
t.Run(c.name, func(t *testing.T) {
syncCtx := testingcommon.NewFakeSyncContext(t, "test")
kubeClient := kubefake.NewSimpleClientset(c.secrets...)
c.option.ManagementCoreClient = kubeClient.CoreV1()
informerFactory := informers.NewSharedInformerFactory(kubeClient, 10*time.Minute)
c.option.ManagementSecretInformer = informerFactory.Core().V1().Secrets().Informer()
updater := &fakeStatusUpdater{}
ctrl := NewSecretController(
c.option, nil, c.driver, updater.update, syncCtx.Recorder(), "test")
err := ctrl.Sync(context.Background(), syncCtx)
if err != nil {
t.Fatal(err)
}
c.validatAction(t, kubeClient.Actions())
if !apiequality.Semantic.DeepEqual(c.expectedCond, updater.cond) {
t.Errorf("Condition update not correct")
}
})
}
}
type fakeStatusUpdater struct {
cond *metav1.Condition
}
func (f *fakeStatusUpdater) update(_ context.Context, cond metav1.Condition) error {
f.cond = cond.DeepCopy()
return nil
}
type fakeDriver struct {
secret *corev1.Secret
err error
cond *metav1.Condition
}
func newFakeDriver(secret *corev1.Secret, cond *metav1.Condition, err error) *fakeDriver {
return &fakeDriver{
secret: secret,
cond: cond,
err: err,
}
}
func (f *fakeDriver) IsHubKubeConfigValid(_ context.Context, _ SecretOption) (bool, error) {
return true, nil
}
func (f *fakeDriver) BuildKubeConfigFromTemplate(config *clientcmdapi.Config) *clientcmdapi.Config {
return config
}
func (f *fakeDriver) Process(
_ context.Context,
_ string,
_ *corev1.Secret,
_ map[string][]byte,
_ events.Recorder, _ any) (*corev1.Secret, *metav1.Condition, error) {
return f.secret, f.cond, f.err
}
func (f *fakeDriver) InformerHandler(_ any) (cache.SharedIndexInformer, factory.EventFilterFunc) {
return nil, nil
}

View File

@@ -16,6 +16,7 @@ import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
"k8s.io/klog/v2"
addonv1alpha1 "open-cluster-management.io/api/addon/v1alpha1"
@@ -26,7 +27,8 @@ import (
"open-cluster-management.io/sdk-go/pkg/patcher"
"open-cluster-management.io/ocm/pkg/common/queue"
"open-cluster-management.io/ocm/pkg/registration/clientcert"
"open-cluster-management.io/ocm/pkg/registration/register"
"open-cluster-management.io/ocm/pkg/registration/register/csr"
)
const (
@@ -43,13 +45,13 @@ const (
type addOnRegistrationController struct {
clusterName string
agentName string
kubeconfigData []byte
kubeconfig *clientcmdapi.Config
managementKubeClient kubernetes.Interface // in-cluster local management kubeClient
spokeKubeClient kubernetes.Interface
hubAddOnLister addonlisterv1alpha1.ManagedClusterAddOnLister
patcher patcher.Patcher[
*addonv1alpha1.ManagedClusterAddOn, addonv1alpha1.ManagedClusterAddOnSpec, addonv1alpha1.ManagedClusterAddOnStatus]
csrControl clientcert.CSRControl
csrControl csr.CSRControl
recorder events.Recorder
csrIndexer cache.Indexer
@@ -64,18 +66,18 @@ type addOnRegistrationController struct {
func NewAddOnRegistrationController(
clusterName string,
agentName string,
kubeconfigData []byte,
kubeconfig *clientcmdapi.Config,
addOnClient addonclient.Interface,
managementKubeClient kubernetes.Interface,
managedKubeClient kubernetes.Interface,
csrControl clientcert.CSRControl,
csrControl csr.CSRControl,
hubAddOnInformers addoninformerv1alpha1.ManagedClusterAddOnInformer,
recorder events.Recorder,
) factory.Controller {
c := &addOnRegistrationController{
clusterName: clusterName,
agentName: agentName,
kubeconfigData: kubeconfigData,
kubeconfig: kubeconfig,
managementKubeClient: managementKubeClient,
spokeKubeClient: managedKubeClient,
hubAddOnLister: hubAddOnInformers.Lister(),
@@ -210,19 +212,19 @@ func (c *addOnRegistrationController) startRegistration(ctx context.Context, con
kubeInformerFactory := informers.NewSharedInformerFactoryWithOptions(
kubeClient, 10*time.Minute, informers.WithNamespace(config.InstallationNamespace))
additionalSecretData := map[string][]byte{}
secretOption := register.SecretOption{
SecretNamespace: config.InstallationNamespace,
SecretName: config.secretName,
ManagementCoreClient: kubeClient.CoreV1(),
ManagementSecretInformer: kubeInformerFactory.Core().V1().Secrets().Informer(),
}
if config.registration.SignerName == certificatesv1.KubeAPIServerClientSignerName {
additionalSecretData[clientcert.KubeconfigFile] = c.kubeconfigData
secretOption.BootStrapKubeConfig = c.kubeconfig
}
// build and start a client cert controller
clientCertOption := clientcert.ClientCertOption{
SecretNamespace: config.InstallationNamespace,
SecretName: config.secretName,
AdditionalSecretData: additionalSecretData,
}
csrOption := clientcert.CSROption{
driver := csr.NewCSRDriver()
csrOption := &csr.CSROption{
ObjectMeta: metav1.ObjectMeta{
GenerateName: fmt.Sprintf("addon-%s-%s-", c.clusterName, config.addOnName),
Labels: map[string]string{
@@ -236,25 +238,16 @@ func (c *addOnRegistrationController) startRegistration(ctx context.Context, con
SignerName: config.registration.SignerName,
EventFilterFunc: createCSREventFilterFunc(c.clusterName, config.addOnName, config.registration.SignerName),
HaltCSRCreation: c.haltCSRCreationFunc(config.addOnName),
CSRControl: c.csrControl,
}
controllerName := fmt.Sprintf("ClientCertController@addon:%s:signer:%s", config.addOnName, config.registration.SignerName)
statusUpdater := c.generateStatusUpdate(c.clusterName, config.addOnName)
clientCertController := clientcert.NewClientCertificateController(
clientCertOption,
csrOption,
c.csrControl,
kubeInformerFactory.Core().V1().Secrets(),
kubeClient.CoreV1(),
statusUpdater,
c.recorder,
controllerName,
)
secretController := register.NewSecretController(
secretOption, csrOption, driver, statusUpdater, c.recorder, controllerName)
go kubeInformerFactory.Start(ctx.Done())
go clientCertController.Run(ctx, 1)
go secretController.Run(ctx, 1)
return stopFunc
}
@@ -274,7 +267,7 @@ func (c *addOnRegistrationController) haltCSRCreationFunc(addonName string) func
}
}
func (c *addOnRegistrationController) generateStatusUpdate(clusterName, addonName string) clientcert.StatusUpdateFunc {
func (c *addOnRegistrationController) generateStatusUpdate(clusterName, addonName string) register.StatusUpdateFunc {
return func(ctx context.Context, cond metav1.Condition) error {
addon, err := c.hubAddOnLister.ManagedClusterAddOns(clusterName).Get(addonName)
if errors.IsNotFound(err) {

View File

@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net/http"
"os"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -56,19 +57,23 @@ func selectBootstrapKubeConfigs(ctx context.Context,
}
func compareServerEndpoint(bootstrapKubeConfigFilePath, hubKubeConfigFilePath string) (bool, error) {
_, bootstrapServer, _, _, err := parseKubeconfig(
bootstrapKubeConfigFilePath)
bootstrapKubeConfig, err := clientcmd.BuildConfigFromFlags("", bootstrapKubeConfigFilePath)
if os.IsNotExist(err) {
return false, nil
}
if err != nil {
return false, err
}
_, hubServer, _, _, err := parseKubeconfig(
hubKubeConfigFilePath)
hubKubeConfig, err := clientcmd.BuildConfigFromFlags("", hubKubeConfigFilePath)
if os.IsNotExist(err) {
return false, nil
}
if err != nil {
return false, err
}
return bootstrapServer == hubServer, nil
return bootstrapKubeConfig.Host == hubKubeConfig.Host, nil
}
// An "valid" bootstrap kubeconfig means:

View File

@@ -4,61 +4,48 @@ import (
"context"
"fmt"
"net/http"
"os"
"reflect"
"time"
"sync"
"github.com/pkg/errors"
corev1 "k8s.io/api/core/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
certutil "k8s.io/client-go/util/cert"
)
type clientCertHealthChecker struct {
interval time.Duration
expired bool
type hubKubeConfigHealthChecker struct {
checkFunc wait.ConditionWithContextFunc
bootstrapped bool
lock sync.RWMutex
}
func (hc *clientCertHealthChecker) Name() string {
return "hub-client-certificate"
func (hc *hubKubeConfigHealthChecker) Name() string {
return "hub-kube-config"
}
// clientCertHealthChecker returns an error when the client certificate created for the
// hub kubeconfig exists and expires.
func (hc *clientCertHealthChecker) Check(_ *http.Request) error {
if hc.expired {
func (hc *hubKubeConfigHealthChecker) Check(_ *http.Request) error {
hc.lock.RLock()
defer hc.lock.RUnlock()
if !hc.bootstrapped {
return nil
}
valid, err := hc.checkFunc(context.Background())
if err != nil {
return err
}
if !valid {
return errors.New("the client certificate expires and rebootstrap is required.")
}
return nil
}
func (hc *clientCertHealthChecker) start(ctx context.Context, tlsCertFile string) {
if err := wait.PollUntilContextCancel(ctx, hc.interval, false, func(ctx context.Context) (bool, error) {
data, err := os.ReadFile(tlsCertFile)
if err != nil {
// no work because the client certificate file may not exist yet.
return false, nil
}
certs, err := certutil.ParseCertsPEM(data)
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to parse client certificates %q: %w", tlsCertFile, err))
return false, nil
}
now := time.Now()
for _, cert := range certs {
if now.After(cert.NotAfter) {
hc.expired = true
}
}
return false, nil
}); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to check the health of the client certificate %q: %w", tlsCertFile, err))
}
func (hc *hubKubeConfigHealthChecker) setBootstrapped() {
hc.lock.Lock()
defer hc.lock.Unlock()
hc.bootstrapped = true
}
type bootstrapKubeconfigHealthChecker struct {

View File

@@ -1,8 +1,6 @@
package spoke
import (
"context"
"io/ioutil"
"os"
"path"
"testing"
@@ -12,10 +10,12 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
testinghelpers "open-cluster-management.io/ocm/pkg/registration/helpers/testing"
"open-cluster-management.io/ocm/pkg/registration/register"
"open-cluster-management.io/ocm/pkg/registration/register/csr"
)
func TestClientCertHealthChecker(t *testing.T) {
testDir, err := os.MkdirTemp("", "ClientCertHealthChecker")
func TestHubKubeConfigHealthChecker(t *testing.T) {
testDir, err := os.MkdirTemp("", "HubKubeConfigHealthChecker")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@@ -26,44 +26,52 @@ func TestClientCertHealthChecker(t *testing.T) {
}
}()
validCertFile := path.Join(testDir, "cert1.crt")
cert1 := testinghelpers.NewTestCert("cert1", 10*time.Minute)
err = ioutil.WriteFile(validCertFile, cert1.Cert, 0600)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
kubeconfig := testinghelpers.NewKubeconfig("c1", "https://127.0.0.1:6001", "", nil, nil, nil)
testinghelpers.WriteFile(path.Join(testDir, "kubeconfig"), kubeconfig)
expiredCertFile := path.Join(testDir, "cert2.crt")
cert2 := testinghelpers.NewTestCert("cert2", -1*time.Minute)
err = ioutil.WriteFile(expiredCertFile, cert2.Cert, 0600)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
validCert := testinghelpers.NewTestCert("system:open-cluster-management:cluster1:agent1", 10*time.Minute)
expiredCert := testinghelpers.NewTestCert("system:open-cluster-management:cluster1:agent1", -1*time.Minute)
cases := []struct {
name string
tlsCertFile string
unhealthy bool
name string
tlsCert []byte
tlsKey []byte
unhealthy bool
}{
{
name: "valid client cert",
tlsCertFile: validCertFile,
name: "valid client cert",
tlsKey: validCert.Key,
tlsCert: validCert.Cert,
},
{
name: "expired client cert",
tlsCertFile: expiredCertFile,
unhealthy: true,
name: "expired client cert",
tlsKey: expiredCert.Key,
tlsCert: expiredCert.Cert,
unhealthy: true,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
hc := &clientCertHealthChecker{
interval: 1 * time.Second,
if c.tlsKey != nil {
testinghelpers.WriteFile(path.Join(testDir, "tls.key"), c.tlsKey)
}
if c.tlsCert != nil {
testinghelpers.WriteFile(path.Join(testDir, "tls.crt"), c.tlsCert)
}
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
hc.start(ctx, c.tlsCertFile)
driver := csr.NewCSRDriver()
secretOption := register.SecretOption{
ClusterName: "cluster1",
AgentName: "agent1",
HubKubeconfigDir: testDir,
HubKubeconfigFile: path.Join(testDir, "kubeconfig"),
}
hc := &hubKubeConfigHealthChecker{
checkFunc: register.IsHubKubeConfigValidFunc(driver, secretOption),
bootstrapped: true,
}
err := hc.Check(nil)
if c.unhealthy && err == nil {

View File

@@ -6,7 +6,6 @@ import (
"github.com/pkg/errors"
"github.com/spf13/pflag"
"k8s.io/apiserver/pkg/server/healthz"
ocmfeature "open-cluster-management.io/api/feature"
@@ -38,28 +37,15 @@ type SpokeAgentOptions struct {
MaxCustomClusterClaims int
ClientCertExpirationSeconds int32
ClusterAnnotations map[string]string
clientCertHealthChecker *clientCertHealthChecker
bootstrapKubeconfigHealthChecker *bootstrapKubeconfigHealthChecker
reSelectChecker *reSelectChecker
}
func NewSpokeAgentOptions() *SpokeAgentOptions {
options := &SpokeAgentOptions{
BootstrapKubeconfigSecret: "bootstrap-hub-kubeconfig",
HubKubeconfigSecret: "hub-kubeconfig-secret",
ClusterHealthCheckPeriod: 1 * time.Minute,
MaxCustomClusterClaims: 20,
clientCertHealthChecker: &clientCertHealthChecker{
interval: ClientCertHealthCheckInterval,
},
BootstrapKubeconfigSecret: "bootstrap-hub-kubeconfig",
HubKubeconfigSecret: "hub-kubeconfig-secret",
ClusterHealthCheckPeriod: 1 * time.Minute,
MaxCustomClusterClaims: 20,
HubConnectionTimeoutSeconds: 600, // by default, the timeout is 10 minutes
reSelectChecker: &reSelectChecker{shouldReSelect: false},
}
options.bootstrapKubeconfigHealthChecker = &bootstrapKubeconfigHealthChecker{
bootstrapKubeconfigSecretName: &options.BootstrapKubeconfigSecret,
}
return options
@@ -122,11 +108,3 @@ func (o *SpokeAgentOptions) Validate() error {
return nil
}
func (o *SpokeAgentOptions) GetHealthCheckers() []healthz.HealthChecker {
return []healthz.HealthChecker{
o.bootstrapKubeconfigHealthChecker,
o.clientCertHealthChecker,
o.reSelectChecker,
}
}

View File

@@ -5,18 +5,15 @@ import (
"fmt"
"strings"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
"golang.org/x/net/context"
certificates "k8s.io/api/certificates/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
corev1informers "k8s.io/client-go/informers/core/v1"
certificatesinformers "k8s.io/client-go/informers/certificates"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
certutil "k8s.io/client-go/util/cert"
"k8s.io/klog/v2"
addonv1alpha1 "open-cluster-management.io/api/addon/v1alpha1"
clientset "open-cluster-management.io/api/client/cluster/clientset/versioned"
@@ -24,8 +21,9 @@ import (
clusterv1 "open-cluster-management.io/api/cluster/v1"
"open-cluster-management.io/sdk-go/pkg/patcher"
"open-cluster-management.io/ocm/pkg/registration/clientcert"
"open-cluster-management.io/ocm/pkg/registration/hub/user"
"open-cluster-management.io/ocm/pkg/registration/register"
"open-cluster-management.io/ocm/pkg/registration/register/csr"
)
const (
@@ -35,57 +33,40 @@ const (
clusterCSRThreshold = 10
)
// NewClientCertForHubController returns a controller to
// 1). Create a new client certificate and build a hub kubeconfig for the registration agent;
// 2). Or rotate the client certificate referenced by the hub kubeconfig before it become expired;
func NewClientCertForHubController(
clusterName string,
agentName string,
clientCertSecretNamespace string,
clientCertSecretName string,
kubeconfigData []byte,
spokeSecretInformer corev1informers.SecretInformer,
csrControl clientcert.CSRControl,
func NewCSROption(
logger klog.Logger,
secretOption register.SecretOption,
csrExpirationSeconds int32,
spokeKubeClient kubernetes.Interface,
statusUpdater clientcert.StatusUpdateFunc,
recorder events.Recorder,
controllerName string,
) factory.Controller {
err := csrControl.Informer().AddIndexers(cache.Indexers{
indexByCluster: indexByClusterFunc,
})
hubCSRInformer certificatesinformers.Interface,
hubKubeClient kubernetes.Interface) (*csr.CSROption, error) {
csrControl, err := csr.NewCSRControl(logger, hubCSRInformer, hubKubeClient)
if err != nil {
utilruntime.HandleError(err)
return nil, fmt.Errorf("failed to create CSR control: %w", err)
}
clientCertOption := clientcert.ClientCertOption{
SecretNamespace: clientCertSecretNamespace,
SecretName: clientCertSecretName,
AdditionalSecretData: map[string][]byte{
clientcert.ClusterNameFile: []byte(clusterName),
clientcert.AgentNameFile: []byte(agentName),
clientcert.KubeconfigFile: kubeconfigData,
},
}
var csrExpirationSecondsInCSROption *int32
if csrExpirationSeconds != 0 {
csrExpirationSecondsInCSROption = &csrExpirationSeconds
}
csrOption := clientcert.CSROption{
err = csrControl.Informer().AddIndexers(cache.Indexers{
indexByCluster: indexByClusterFunc,
})
if err != nil {
return nil, err
}
return &csr.CSROption{
ObjectMeta: metav1.ObjectMeta{
GenerateName: fmt.Sprintf("%s-", clusterName),
GenerateName: fmt.Sprintf("%s-", secretOption.ClusterName),
Labels: map[string]string{
// the label is only an hint for cluster name. Anyone could set/modify it.
clusterv1.ClusterNameLabelKey: clusterName,
clusterv1.ClusterNameLabelKey: secretOption.ClusterName,
},
},
Subject: &pkix.Name{
Organization: []string{
fmt.Sprintf("%s%s", user.SubjectPrefix, clusterName),
fmt.Sprintf("%s%s", user.SubjectPrefix, secretOption.ClusterName),
user.ManagedClustersGroup,
},
CommonName: fmt.Sprintf("%s%s:%s", user.SubjectPrefix, clusterName, agentName),
CommonName: fmt.Sprintf("%s%s:%s", user.SubjectPrefix, secretOption.ClusterName, secretOption.AgentName),
},
SignerName: certificates.KubeAPIServerClientSignerName,
EventFilterFunc: func(obj interface{}) bool {
@@ -95,7 +76,7 @@ func NewClientCertForHubController(
}
labels := accessor.GetLabels()
// only enqueue csr from a specific managed cluster
if labels[clusterv1.ClusterNameLabelKey] != clusterName {
if labels[clusterv1.ClusterNameLabelKey] != secretOption.ClusterName {
return false
}
@@ -106,22 +87,12 @@ func NewClientCertForHubController(
}
// only enqueue csr whose name starts with the cluster name
return strings.HasPrefix(accessor.GetName(), fmt.Sprintf("%s-", clusterName))
return strings.HasPrefix(accessor.GetName(), fmt.Sprintf("%s-", secretOption.ClusterName))
},
HaltCSRCreation: haltCSRCreationFunc(csrControl.Informer().GetIndexer(), clusterName),
HaltCSRCreation: haltCSRCreationFunc(csrControl.Informer().GetIndexer(), secretOption.ClusterName),
ExpirationSeconds: csrExpirationSecondsInCSROption,
}
return clientcert.NewClientCertificateController(
clientCertOption,
csrOption,
csrControl,
spokeSecretInformer,
spokeKubeClient.CoreV1(),
statusUpdater,
recorder,
controllerName,
)
CSRControl: csrControl,
}, nil
}
func haltCSRCreationFunc(indexer cache.Indexer, clusterName string) func() bool {
@@ -138,7 +109,7 @@ func haltCSRCreationFunc(indexer cache.Indexer, clusterName string) func() bool
}
}
func GenerateBootstrapStatusUpdater() clientcert.StatusUpdateFunc {
func GenerateBootstrapStatusUpdater() register.StatusUpdateFunc {
return func(ctx context.Context, cond metav1.Condition) error {
return nil
}
@@ -146,7 +117,7 @@ func GenerateBootstrapStatusUpdater() clientcert.StatusUpdateFunc {
// GenerateStatusUpdater generates status update func for the certificate management
func GenerateStatusUpdater(hubClusterClient clientset.Interface,
hubClusterLister clusterv1listers.ManagedClusterLister, clusterName string) clientcert.StatusUpdateFunc {
hubClusterLister clusterv1listers.ManagedClusterLister, clusterName string) register.StatusUpdateFunc {
return func(ctx context.Context, cond metav1.Condition) error {
cluster, err := hubClusterLister.Get(clusterName)
if errors.IsNotFound(err) {
@@ -165,28 +136,6 @@ func GenerateStatusUpdater(hubClusterClient clientset.Interface,
}
}
// GetClusterAgentNamesFromCertificate returns the cluster name and agent name by parsing
// the common name of the certification
func GetClusterAgentNamesFromCertificate(certData []byte) (clusterName, agentName string, err error) {
certs, err := certutil.ParseCertsPEM(certData)
if err != nil {
return "", "", fmt.Errorf("unable to parse certificate: %w", err)
}
for _, cert := range certs {
if ok := strings.HasPrefix(cert.Subject.CommonName, user.SubjectPrefix); !ok {
continue
}
names := strings.Split(strings.TrimPrefix(cert.Subject.CommonName, user.SubjectPrefix), ":")
if len(names) != 2 {
continue
}
return names[0], names[1], nil
}
return "", "", nil
}
func indexByClusterFunc(obj interface{}) ([]string, error) {
accessor, err := meta.Accessor(obj)
if err != nil {

View File

@@ -1,49 +1 @@
package registration
import (
"testing"
"time"
testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
testinghelpers "open-cluster-management.io/ocm/pkg/registration/helpers/testing"
)
func TestGetClusterAgentNamesFromCertificate(t *testing.T) {
cases := []struct {
name string
certData []byte
expectedClusterName string
expectedAgentName string
expectedErrorPrefix string
}{
{
name: "cert data is invalid",
certData: []byte("invalid cert"),
expectedErrorPrefix: "unable to parse certificate:",
},
{
name: "cert with invalid commmon name",
certData: testinghelpers.NewTestCert("test", 60*time.Second).Cert,
},
{
name: "valid cert with correct common name",
certData: testinghelpers.NewTestCert("system:open-cluster-management:cluster1:agent1", 60*time.Second).Cert,
expectedClusterName: "cluster1",
expectedAgentName: "agent1",
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
clusterName, agentName, err := GetClusterAgentNamesFromCertificate(c.certData)
testingcommon.AssertErrorWithPrefix(t, err, c.expectedErrorPrefix)
if clusterName != c.expectedClusterName {
t.Errorf("expect %v, but got %v", c.expectedClusterName, clusterName)
}
if agentName != c.expectedAgentName {
t.Errorf("expect %v, but got %v", c.expectedAgentName, agentName)
}
})
}
}

View File

@@ -13,8 +13,9 @@ import (
"k8s.io/apimachinery/pkg/util/rand"
kubefake "k8s.io/client-go/kubernetes/fake"
"open-cluster-management.io/ocm/pkg/registration/clientcert"
testinghelpers "open-cluster-management.io/ocm/pkg/registration/helpers/testing"
"open-cluster-management.io/ocm/pkg/registration/register"
"open-cluster-management.io/ocm/pkg/registration/register/csr"
)
const (
@@ -64,42 +65,42 @@ func TestDumpSecret(t *testing.T) {
testNamespace, testSecretName, "",
testinghelpers.NewTestCert("test", 60*time.Second),
map[string][]byte{
clientcert.ClusterNameFile: []byte("test"),
clientcert.AgentNameFile: []byte("test"),
clientcert.KubeconfigFile: testinghelpers.NewKubeconfig("c1", "https://127.0.0.1:6001", "", nil, nil, nil),
register.ClusterNameFile: []byte("test"),
register.AgentNameFile: []byte("test"),
register.KubeconfigFile: testinghelpers.NewKubeconfig("c1", "https://127.0.0.1:6001", "", nil, nil, nil),
},
),
validateFiles: func(t *testing.T, hubKubeconfigDir string) {
testinghelpers.AssertFileContent(t, path.Join(hubKubeconfigDir, clientcert.ClusterNameFile), []byte("test"))
testinghelpers.AssertFileContent(t, path.Join(hubKubeconfigDir, clientcert.AgentNameFile), []byte("test"))
testinghelpers.AssertFileContent(t, path.Join(hubKubeconfigDir, clientcert.KubeconfigFile), kubeConfigFile)
testinghelpers.AssertFileExist(t, path.Join(hubKubeconfigDir, clientcert.TLSKeyFile))
testinghelpers.AssertFileExist(t, path.Join(hubKubeconfigDir, clientcert.TLSCertFile))
testinghelpers.AssertFileContent(t, path.Join(hubKubeconfigDir, register.ClusterNameFile), []byte("test"))
testinghelpers.AssertFileContent(t, path.Join(hubKubeconfigDir, register.AgentNameFile), []byte("test"))
testinghelpers.AssertFileContent(t, path.Join(hubKubeconfigDir, register.KubeconfigFile), kubeConfigFile)
testinghelpers.AssertFileExist(t, path.Join(hubKubeconfigDir, csr.TLSKeyFile))
testinghelpers.AssertFileExist(t, path.Join(hubKubeconfigDir, csr.TLSCertFile))
},
},
{
name: "secret is updated",
queueKey: testSecretName,
oldConfigData: map[string][]byte{
clientcert.ClusterNameFile: []byte("test"),
clientcert.AgentNameFile: []byte("test"),
clientcert.KubeconfigFile: []byte("test"),
register.ClusterNameFile: []byte("test"),
register.AgentNameFile: []byte("test"),
register.KubeconfigFile: []byte("test"),
},
secret: testinghelpers.NewHubKubeconfigSecret(
testNamespace, testSecretName, "",
testinghelpers.NewTestCert("test", 60*time.Second),
map[string][]byte{
clientcert.ClusterNameFile: []byte("test1"),
clientcert.AgentNameFile: []byte("test"),
clientcert.KubeconfigFile: testinghelpers.NewKubeconfig("c1", "https://127.0.0.1:6001", "", nil, nil, nil),
register.ClusterNameFile: []byte("test1"),
register.AgentNameFile: []byte("test"),
register.KubeconfigFile: testinghelpers.NewKubeconfig("c1", "https://127.0.0.1:6001", "", nil, nil, nil),
},
),
validateFiles: func(t *testing.T, hubKubeconfigDir string) {
testinghelpers.AssertFileContent(t, path.Join(hubKubeconfigDir, clientcert.ClusterNameFile), []byte("test1"))
testinghelpers.AssertFileContent(t, path.Join(hubKubeconfigDir, clientcert.AgentNameFile), []byte("test"))
testinghelpers.AssertFileContent(t, path.Join(hubKubeconfigDir, clientcert.KubeconfigFile), kubeConfigFile)
testinghelpers.AssertFileExist(t, path.Join(hubKubeconfigDir, clientcert.TLSKeyFile))
testinghelpers.AssertFileExist(t, path.Join(hubKubeconfigDir, clientcert.TLSCertFile))
testinghelpers.AssertFileContent(t, path.Join(hubKubeconfigDir, register.ClusterNameFile), []byte("test1"))
testinghelpers.AssertFileContent(t, path.Join(hubKubeconfigDir, register.AgentNameFile), []byte("test"))
testinghelpers.AssertFileContent(t, path.Join(hubKubeconfigDir, register.KubeconfigFile), kubeConfigFile)
testinghelpers.AssertFileExist(t, path.Join(hubKubeconfigDir, csr.TLSKeyFile))
testinghelpers.AssertFileExist(t, path.Join(hubKubeconfigDir, csr.TLSCertFile))
},
},
}

View File

@@ -4,8 +4,6 @@ import (
"context"
"fmt"
"os"
"path"
"reflect"
"time"
"github.com/openshift/library-go/pkg/controller/controllercmd"
@@ -14,6 +12,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
@@ -31,7 +30,8 @@ import (
"open-cluster-management.io/ocm/pkg/common/helpers"
commonoptions "open-cluster-management.io/ocm/pkg/common/options"
"open-cluster-management.io/ocm/pkg/features"
"open-cluster-management.io/ocm/pkg/registration/clientcert"
"open-cluster-management.io/ocm/pkg/registration/register"
"open-cluster-management.io/ocm/pkg/registration/register/csr"
"open-cluster-management.io/ocm/pkg/registration/spoke/addon"
"open-cluster-management.io/ocm/pkg/registration/spoke/lease"
"open-cluster-management.io/ocm/pkg/registration/spoke/managedcluster"
@@ -46,16 +46,43 @@ type SpokeAgentConfig struct {
agentOptions *commonoptions.AgentOptions
registrationOption *SpokeAgentOptions
driver register.RegisterDriver
// currentBootstrapKubeConfig is the selected bootstrap kubeconfig file path.
// Only used in MultipleHubs feature.
currentBootstrapKubeConfig string
internalHubConfigValidFunc wait.ConditionWithContextFunc
hubKubeConfigChecker *hubKubeConfigHealthChecker
bootstrapKubeconfigHealthChecker *bootstrapKubeconfigHealthChecker
reSelectChecker *reSelectChecker
}
// NewSpokeAgentConfig returns a SpokeAgentConfig
func NewSpokeAgentConfig(commonOpts *commonoptions.AgentOptions, opts *SpokeAgentOptions) *SpokeAgentConfig {
return &SpokeAgentConfig{
registerDriver := csr.NewCSRDriver()
cfg := &SpokeAgentConfig{
agentOptions: commonOpts,
registrationOption: opts,
driver: registerDriver,
reSelectChecker: &reSelectChecker{shouldReSelect: false},
bootstrapKubeconfigHealthChecker: &bootstrapKubeconfigHealthChecker{
bootstrapKubeconfigSecretName: &opts.BootstrapKubeconfigSecret,
},
}
cfg.hubKubeConfigChecker = &hubKubeConfigHealthChecker{
checkFunc: cfg.IsHubKubeConfigValid,
}
return cfg
}
func (o *SpokeAgentConfig) HealthCheckers() []healthz.HealthChecker {
return []healthz.HealthChecker{
o.bootstrapKubeconfigHealthChecker,
o.hubKubeConfigChecker,
o.reSelectChecker,
}
}
@@ -85,7 +112,7 @@ func NewSpokeAgentConfig(commonOpts *commonoptions.AgentOptions, opts *SpokeAgen
// #3. Bootstrap kubeconfig is invalid (e.g. certificate expired) and hub kubeconfig is valid
// #4. Neither bootstrap kubeconfig nor hub kubeconfig is valid
//
// A temporary ClientCertForHubController with bootstrap kubeconfig is created
// A temporary BootstrpController with bootstrap kubeconfig is created
// and started if the hub kubeconfig does not exist or is invalid and used to
// create a valid hub kubeconfig. Once the hub kubeconfig is valid, the
// temporary controller is stopped and the main controllers are started.
@@ -214,9 +241,9 @@ func (o *SpokeAgentConfig) RunSpokeAgentWithSpokeInformers(ctx context.Context,
go spokeClusterCreatingController.Run(ctx, 1)
secretInformer := namespacedManagementKubeInformerFactory.Core().V1().Secrets()
if o.registrationOption.bootstrapKubeconfigHealthChecker != nil {
if o.bootstrapKubeconfigHealthChecker != nil {
// registter bootstrapKubeconfigHealthChecker as an event handle of secret informer
if _, err = secretInformer.Informer().AddEventHandler(o.registrationOption.bootstrapKubeconfigHealthChecker); err != nil {
if _, err = secretInformer.Informer().AddEventHandler(o.bootstrapKubeconfigHealthChecker); err != nil {
return err
}
}
@@ -232,13 +259,29 @@ func (o *SpokeAgentConfig) RunSpokeAgentWithSpokeInformers(ctx context.Context,
go namespacedManagementKubeInformerFactory.Start(ctx.Done())
// check if there already exists a valid client config for hub
ok, err := o.HasValidHubClientConfig(ctx)
kubeconfig, err := clientcmd.LoadFromFile(o.currentBootstrapKubeConfig)
if err != nil {
return err
}
secretOption := register.SecretOption{
SecretNamespace: o.agentOptions.ComponentNamespace,
SecretName: o.registrationOption.HubKubeconfigSecret,
ClusterName: o.agentOptions.SpokeClusterName,
AgentName: o.agentOptions.AgentID,
ManagementSecretInformer: namespacedManagementKubeInformerFactory.Core().V1().Secrets().Informer(),
ManagementCoreClient: managementKubeClient.CoreV1(),
HubKubeconfigFile: o.agentOptions.HubKubeconfigFile,
HubKubeconfigDir: o.agentOptions.HubKubeconfigDir,
BootStrapKubeConfig: kubeconfig,
}
o.internalHubConfigValidFunc = register.IsHubKubeConfigValidFunc(o.driver, secretOption)
ok, err := o.internalHubConfigValidFunc(ctx)
if err != nil {
return err
}
// create and start a ClientCertForHubController for spoke agent bootstrap to deal with scenario #1 and #4.
// Running the bootstrap ClientCertForHubController is optional. If always run it no matter if there already
// create and start a BootstrapController for spoke agent bootstrap to deal with scenario #1 and #4.
// Running the BootstrapController is optional. If always run it no matter if there already
// exists a valid client config for hub or not, the controller will be started and then stopped immediately
// in scenario #2 and #3, which results in an error message in log: 'Observed a panic: timeout waiting for
// informer cache'
@@ -246,55 +289,27 @@ func (o *SpokeAgentConfig) RunSpokeAgentWithSpokeInformers(ctx context.Context,
// create a ClientCertForHubController for spoke agent bootstrap
// the bootstrap informers are supposed to be terminated after completing the bootstrap process.
bootstrapInformerFactory := informers.NewSharedInformerFactory(bootstrapKubeClient, 10*time.Minute)
bootstrapNamespacedManagementKubeInformerFactory := informers.NewSharedInformerFactoryWithOptions(
managementKubeClient, 10*time.Minute, informers.WithNamespace(o.agentOptions.ComponentNamespace))
// create a kubeconfig with references to the key/cert files in the same secret
contextClusterName, server, proxyURL, caData, err := parseKubeconfig(o.currentBootstrapKubeConfig)
if err != nil {
return err
}
kubeconfig := clientcert.BuildKubeconfig(contextClusterName, server, caData,
proxyURL, clientcert.TLSCertFile, clientcert.TLSKeyFile)
kubeconfigData, err := clientcmd.Write(kubeconfig)
if err != nil {
return err
}
csrControl, err := clientcert.NewCSRControl(logger, bootstrapInformerFactory.Certificates(), bootstrapKubeClient)
if err != nil {
return err
}
controllerName := fmt.Sprintf("BootstrapClientCertController@cluster:%s", o.agentOptions.SpokeClusterName)
clientCertForHubController := registration.NewClientCertForHubController(
o.agentOptions.SpokeClusterName, o.agentOptions.AgentID, o.agentOptions.ComponentNamespace, o.registrationOption.HubKubeconfigSecret,
kubeconfigData,
// store the secret in the cluster where the agent pod runs
bootstrapNamespacedManagementKubeInformerFactory.Core().V1().Secrets(),
csrControl,
csrOption, err := registration.NewCSROption(logger,
secretOption,
o.registrationOption.ClientCertExpirationSeconds,
managementKubeClient,
registration.GenerateBootstrapStatusUpdater(),
recorder,
controllerName,
)
bootstrapInformerFactory.Certificates(),
bootstrapKubeClient)
if err != nil {
return err
}
controllerName := fmt.Sprintf("BootstrapController@cluster:%s", o.agentOptions.SpokeClusterName)
bootstrapCtx, stopBootstrap := context.WithCancel(ctx)
secretController := register.NewSecretController(
secretOption, csrOption, o.driver, registration.GenerateBootstrapStatusUpdater(), recorder, controllerName)
go bootstrapInformerFactory.Start(bootstrapCtx.Done())
go bootstrapNamespacedManagementKubeInformerFactory.Start(bootstrapCtx.Done())
go secretController.Run(bootstrapCtx, 1)
go clientCertForHubController.Run(bootstrapCtx, 1)
// wait until a valid hub kubeconfig is in place.
// If no hub kube kubeconfig exits, the controller will create the kubeconfig and the client cert; otherwise there
// exits an invalid hub kube kubeconfig, the controller will update both the kubeconfig and the referenced client
// cert with correct content.
// It's difficult to set a timeout for the waiting because a manual approval (approve the CSR) is required on the
// hub cluster to create a client cert with the bootstrap hub kubeconfig.
logger.Info("Waiting for hub client config for managed cluster to be ready")
if err := wait.PollUntilContextCancel(bootstrapCtx, 1*time.Second, true, o.HasValidHubClientConfig); err != nil {
// wait for the hub client config is ready.
logger.Info("Waiting for hub client config and managed cluster to be ready")
if err := wait.PollUntilContextCancel(bootstrapCtx, 1*time.Second, true, o.internalHubConfigValidFunc); err != nil {
// TODO need run the bootstrap CSR forever to re-establish the client-cert if it is ever lost.
stopBootstrap()
return fmt.Errorf("failed to wait for hub client config for managed cluster to be ready: %w", err)
@@ -348,42 +363,22 @@ func (o *SpokeAgentConfig) RunSpokeAgentWithSpokeInformers(ctx context.Context,
recorder.Event("HubClientConfigReady", "Client config for hub is ready.")
// create a kubeconfig with references to the key/cert files in the same secret
contextClusterName, server, proxyURL, caData, err := parseKubeconfig(o.agentOptions.HubKubeconfigFile)
if err != nil {
return fmt.Errorf("failed to parse hub kubeconfig: %w", err)
}
kubeconfig := clientcert.BuildKubeconfig(contextClusterName, server, caData,
proxyURL, clientcert.TLSCertFile, clientcert.TLSKeyFile)
kubeconfigData, err := clientcmd.Write(kubeconfig)
if err != nil {
return fmt.Errorf("failed to write hub kubeconfig: %w", err)
}
csrControl, err := clientcert.NewCSRControl(logger, hubKubeInformerFactory.Certificates(), hubKubeClient)
if err != nil {
return fmt.Errorf("failed to create CSR control: %w", err)
}
// create another ClientCertForHubController for client certificate rotation
controllerName := fmt.Sprintf("ClientCertController@cluster:%s", o.agentOptions.SpokeClusterName)
clientCertForHubController := registration.NewClientCertForHubController(
o.agentOptions.SpokeClusterName, o.agentOptions.AgentID, o.agentOptions.ComponentNamespace, o.registrationOption.HubKubeconfigSecret,
kubeconfigData,
namespacedManagementKubeInformerFactory.Core().V1().Secrets(),
csrControl,
csrOption, err := registration.NewCSROption(logger,
secretOption,
o.registrationOption.ClientCertExpirationSeconds,
managementKubeClient,
registration.GenerateStatusUpdater(
hubKubeInformerFactory.Certificates(),
hubKubeClient)
if err != nil {
return fmt.Errorf("failed to create CSR option: %w", err)
}
// create another RegisterController for registration credential rotation
controllerName := fmt.Sprintf("RegisterController@cluster:%s", o.agentOptions.SpokeClusterName)
secretController := register.NewSecretController(
secretOption, csrOption, o.driver, registration.GenerateStatusUpdater(
hubClusterClient,
hubClusterInformerFactory.Cluster().V1().ManagedClusters().Lister(),
o.agentOptions.SpokeClusterName),
recorder,
controllerName,
)
if err != nil {
return fmt.Errorf("failed to create client cert for hub controller: %w", err)
}
o.agentOptions.SpokeClusterName), recorder, controllerName)
// create ManagedClusterLeaseController to keep the spoke cluster heartbeat
managedClusterLeaseController := lease.NewManagedClusterLeaseController(
@@ -428,11 +423,11 @@ func (o *SpokeAgentConfig) RunSpokeAgentWithSpokeInformers(ctx context.Context,
addOnRegistrationController = addon.NewAddOnRegistrationController(
o.agentOptions.SpokeClusterName,
o.agentOptions.AgentID,
kubeconfigData,
kubeconfig,
addOnClient,
managementKubeClient,
spokeKubeClient,
csrControl,
csrOption.CSRControl,
addOnInformerFactory.Addon().V1alpha1().ManagedClusterAddOns(),
recorder,
)
@@ -445,7 +440,7 @@ func (o *SpokeAgentConfig) RunSpokeAgentWithSpokeInformers(ctx context.Context,
hubClusterInformerFactory.Cluster().V1().ManagedClusters(),
func(ctx context.Context) error {
logger.Info("Failed to connect to hub because of hubAcceptClient set to false, restart agent to reselect a new bootstrap kubeconfig")
o.registrationOption.reSelectChecker.shouldReSelect = true
o.reSelectChecker.shouldReSelect = true
return nil
},
recorder,
@@ -457,7 +452,7 @@ func (o *SpokeAgentConfig) RunSpokeAgentWithSpokeInformers(ctx context.Context,
o.registrationOption.HubConnectionTimeoutSeconds,
func(ctx context.Context) error {
logger.Info("Failed to connect to hub because of lease out-of-date, restart agent to reselect a new bootstrap kubeconfig")
o.registrationOption.reSelectChecker.shouldReSelect = true
o.reSelectChecker.shouldReSelect = true
return nil
},
recorder,
@@ -474,7 +469,7 @@ func (o *SpokeAgentConfig) RunSpokeAgentWithSpokeInformers(ctx context.Context,
go spokeClusterInformerFactory.Start(ctx.Done())
}
go clientCertForHubController.Run(ctx, 1)
go secretController.Run(ctx, 1)
go managedClusterLeaseController.Run(ctx, 1)
go managedClusterHealthCheckController.Run(ctx, 1)
if features.SpokeMutableFeatureGate.Enabled(ocmfeature.AddonManagement) {
@@ -483,9 +478,8 @@ func (o *SpokeAgentConfig) RunSpokeAgentWithSpokeInformers(ctx context.Context,
}
// start health checking of hub client certificate
if o.registrationOption.clientCertHealthChecker != nil {
tlsCertFile := path.Join(o.agentOptions.HubKubeconfigDir, clientcert.TLSCertFile)
go o.registrationOption.clientCertHealthChecker.start(ctx, tlsCertFile)
if o.hubKubeConfigChecker != nil {
o.hubKubeConfigChecker.setBootstrapped()
}
if features.SpokeMutableFeatureGate.Enabled(ocmfeature.MultipleHubs) {
@@ -497,91 +491,11 @@ func (o *SpokeAgentConfig) RunSpokeAgentWithSpokeInformers(ctx context.Context,
return nil
}
// HasValidHubClientConfig returns ture if all the conditions below are met:
// 1. KubeconfigFile exists;
// 2. TLSKeyFile exists;
// 3. TLSCertFile exists;
// 4. Certificate in TLSCertFile is issued for the current cluster/agent;
// 5. Certificate in TLSCertFile is not expired;
// 6. Hub kubeconfig and bootstrap hub kubeconfig include the same server, proxyURL
// CA bundle and cluster name.
//
// Normally, KubeconfigFile/TLSKeyFile/TLSCertFile will be created once the bootstrap process
// completes. Changing the name of the cluster will make the existing hub kubeconfig invalid,
// because certificate in TLSCertFile is issued to a specific cluster/agent.
func (o *SpokeAgentConfig) HasValidHubClientConfig(ctx context.Context) (bool, error) {
logger := klog.FromContext(ctx)
if _, err := os.Stat(o.agentOptions.HubKubeconfigFile); os.IsNotExist(err) {
logger.V(4).Info("Kubeconfig file not found", "kubeconfigPath", o.agentOptions.HubKubeconfigFile)
func (o *SpokeAgentConfig) IsHubKubeConfigValid(ctx context.Context) (bool, error) {
if o.internalHubConfigValidFunc == nil {
return false, nil
}
keyPath := path.Join(o.agentOptions.HubKubeconfigDir, clientcert.TLSKeyFile)
if _, err := os.Stat(keyPath); os.IsNotExist(err) {
logger.V(4).Info("TLS key file not found", "keyPath", keyPath)
return false, nil
}
certPath := path.Join(o.agentOptions.HubKubeconfigDir, clientcert.TLSCertFile)
certData, err := os.ReadFile(path.Clean(certPath))
if err != nil {
logger.V(4).Info("Unable to load TLS cert file", "certPath", certPath)
return false, nil
}
// check if the tls certificate is issued for the current cluster/agent
clusterName, agentName, err := registration.GetClusterAgentNamesFromCertificate(certData)
if err != nil {
return false, nil
}
if clusterName != o.agentOptions.SpokeClusterName || agentName != o.agentOptions.AgentID {
logger.V(4).Info("Certificate in file is issued for different agent",
"certPath", certPath,
"issuedFor", fmt.Sprintf("%s:%s", clusterName, agentName),
"expectedFor", fmt.Sprintf("%s:%s", o.agentOptions.SpokeClusterName, o.agentOptions.AgentID))
return false, nil
}
if valid, err := clientcert.IsCertificateValid(logger, certData, nil); err != nil || !valid {
return false, err
}
return isHubKubeconfigValid(o.currentBootstrapKubeConfig, o.agentOptions.HubKubeconfigFile)
}
// The hub kubeconfig is valid when it shares the same value of the following with the
// bootstrap hub kubeconfig.
// 1. The hub server
// 2. The proxy url
// 3. The CA bundle
// 4. The current context cluster name
func isHubKubeconfigValid(
bootstrapKubeConfigFilePath, hubeKubeConfigFilePath string) (bool, error) {
bootstrapCtxCluster, bootstrapServer, bootstrapProxyURL, bootstrapCABndle, err := parseKubeconfig(
bootstrapKubeConfigFilePath)
if err != nil {
return false, err
}
ctxCluster, server, proxyURL, caBundle, err := parseKubeconfig(hubeKubeConfigFilePath)
switch {
case err != nil:
return false, err
case bootstrapServer != server,
bootstrapProxyURL != proxyURL,
!reflect.DeepEqual(bootstrapCABndle, caBundle),
// Here in addition to the server, proxyURL and CA bundle, we also need to compare the cluster name,
// because in some cases even the hub cluster API server serving certificate(kubeconfig ca bundle)
// is the same, but the signer certificate may be different(i.e the hub kubernetes cluster is rebuilt
// with a same serving certificate and url), so setting the cluster name in the bootstrap kubeconfig
// can help to distinguish the different clusters(signer certificate). And comparing the cluster name
// can help to avoid the situation that the hub kubeconfig is valid but not for the current cluster.
bootstrapCtxCluster != ctxCluster:
return false, nil
default:
return true, nil
}
return o.internalHubConfigValidFunc(ctx)
}
// getSpokeClusterCABundle returns the spoke cluster Kubernetes client CA data when SpokeExternalServerURLs is specified
@@ -598,23 +512,3 @@ func (o *SpokeAgentConfig) getSpokeClusterCABundle(kubeConfig *rest.Config) ([]b
}
return data, nil
}
func parseKubeconfig(filename string) (string, string, string, []byte, error) {
config, err := clientcmd.LoadFromFile(filename)
if err != nil {
return "", "", "", nil, err
}
context, ok := config.Contexts[config.CurrentContext]
if !ok {
return "", "", "", nil,
fmt.Errorf("kubeconfig %q does not contains context: %s", filename, config.CurrentContext)
}
cluster, ok := config.Clusters[context.Cluster]
if !ok {
return "", "", "", nil, fmt.Errorf("kubeconfig %q does not contains cluster: %s", filename, context.Cluster)
}
return context.Cluster, cluster.Server, cluster.ProxyURL, cluster.CertificateAuthorityData, nil
}

View File

@@ -2,24 +2,19 @@ package spoke
import (
"bytes"
"context"
"os"
"path"
"reflect"
"testing"
"time"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
ocmfeature "open-cluster-management.io/api/feature"
commonoptions "open-cluster-management.io/ocm/pkg/common/options"
testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
"open-cluster-management.io/ocm/pkg/features"
"open-cluster-management.io/ocm/pkg/registration/clientcert"
testinghelpers "open-cluster-management.io/ocm/pkg/registration/helpers/testing"
)
@@ -132,143 +127,6 @@ func TestValidate(t *testing.T) {
}
}
func TestHasValidHubClientConfig(t *testing.T) {
tempDir, err := os.MkdirTemp("", "testvalidhubclientconfig")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
defer os.RemoveAll(tempDir)
cert1 := testinghelpers.NewTestCert("system:open-cluster-management:cluster1:agent1", 60*time.Second)
cert2 := testinghelpers.NewTestCert("test", 60*time.Second)
kubeconfig := testinghelpers.NewKubeconfig("c1", "https://127.0.0.1:6001", "", nil, nil, nil)
cases := []struct {
name string
clusterName string
agentName string
kubeconfig []byte
bootstapKubeconfig []byte
tlsCert []byte
tlsKey []byte
isValid bool
}{
{
name: "no kubeconfig",
isValid: false,
},
{
name: "no tls key",
kubeconfig: kubeconfig,
isValid: false,
},
{
name: "no tls cert",
kubeconfig: kubeconfig,
tlsKey: cert1.Key,
isValid: false,
},
{
name: "cert is not issued for cluster1:agent1",
clusterName: "cluster1",
agentName: "agent1",
kubeconfig: kubeconfig,
tlsKey: cert2.Key,
tlsCert: cert2.Cert,
isValid: false,
},
{
name: "context cluster changes",
clusterName: "cluster1",
agentName: "agent1",
kubeconfig: kubeconfig,
bootstapKubeconfig: testinghelpers.NewKubeconfig("c2", "https://127.0.0.1:6001", "", nil, nil, nil),
tlsKey: cert1.Key,
tlsCert: cert1.Cert,
isValid: false,
},
{
name: "hub server url changes",
clusterName: "cluster1",
agentName: "agent1",
kubeconfig: kubeconfig,
bootstapKubeconfig: testinghelpers.NewKubeconfig("c1", "https://127.0.0.2:6001", "", nil, nil, nil),
tlsKey: cert1.Key,
tlsCert: cert1.Cert,
isValid: false,
},
{
name: "proxy url changes",
clusterName: "cluster1",
agentName: "agent1",
kubeconfig: kubeconfig,
bootstapKubeconfig: testinghelpers.NewKubeconfig("c1", "https://127.0.0.1:6001", "https://127.0.0.1:3129", nil, nil, nil),
tlsKey: cert1.Key,
tlsCert: cert1.Cert,
isValid: false,
},
{
name: "ca bundle changes",
clusterName: "cluster1",
agentName: "agent1",
kubeconfig: kubeconfig,
bootstapKubeconfig: testinghelpers.NewKubeconfig("c1", "https://127.0.0.1:6001", "", []byte("test"), nil, nil),
tlsKey: cert1.Key,
tlsCert: cert1.Cert,
isValid: false,
},
{
name: "valid hub client config",
clusterName: "cluster1",
agentName: "agent1",
kubeconfig: kubeconfig,
bootstapKubeconfig: testinghelpers.NewKubeconfig("c1", "https://127.0.0.1:6001", "", nil, nil, nil),
tlsKey: cert1.Key,
tlsCert: cert1.Cert,
isValid: true,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
spokeAgentOptions := NewSpokeAgentOptions()
if c.kubeconfig != nil {
testinghelpers.WriteFile(path.Join(tempDir, "kubeconfig"), c.kubeconfig)
}
if c.tlsKey != nil {
testinghelpers.WriteFile(path.Join(tempDir, "tls.key"), c.tlsKey)
}
if c.tlsCert != nil {
testinghelpers.WriteFile(path.Join(tempDir, "tls.crt"), c.tlsCert)
}
if c.bootstapKubeconfig != nil {
bootstrapKubeconfigFile := path.Join(tempDir, "bootstrap-kubeconfig")
testinghelpers.WriteFile(bootstrapKubeconfigFile, c.bootstapKubeconfig)
spokeAgentOptions.BootstrapKubeconfig = bootstrapKubeconfigFile
}
agentOpts := &commonoptions.AgentOptions{
SpokeClusterName: c.clusterName,
AgentID: c.agentName,
HubKubeconfigDir: tempDir,
}
cfg := NewSpokeAgentConfig(agentOpts, spokeAgentOptions)
if err := agentOpts.Complete(); err != nil {
t.Fatal(err)
}
cfg.currentBootstrapKubeConfig = cfg.registrationOption.BootstrapKubeconfig
valid, err := cfg.HasValidHubClientConfig(context.TODO())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if c.isValid != valid {
t.Errorf("expect %t, but %t", c.isValid, valid)
}
})
}
}
func TestGetSpokeClusterCABundle(t *testing.T) {
tempDir, err := os.MkdirTemp("", "testgetspokeclustercabundle")
if err != nil {
@@ -332,74 +190,3 @@ func TestGetSpokeClusterCABundle(t *testing.T) {
})
}
}
func TestParseKubeconfig(t *testing.T) {
tempDir, err := os.MkdirTemp("", "testgetproxyurl")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
defer os.RemoveAll(tempDir)
server1 := "https://127.0.0.1:6443"
server2 := "https://api.cluster1.example.com:6443"
caData1 := []byte("fake-ca-data1")
caData2 := []byte("fake-ca-data2")
proxyURL := "https://127.0.0.1:3129"
kubeconfigWithoutProxy := clientcert.BuildKubeconfig(
"test-cluster", server1, caData1, "", "tls.crt", "tls.key")
kubeconfigWithProxy := clientcert.BuildKubeconfig(
"test-cluster", server2, caData2, proxyURL, "tls.crt", "tls.key")
cases := []struct {
name string
kubeconfig clientcmdapi.Config
expectedServer string
expectedCAData []byte
expectedProxyURL string
}{
{
name: "without proxy url",
kubeconfig: kubeconfigWithoutProxy,
expectedServer: server1,
expectedCAData: caData1,
expectedProxyURL: "",
},
{
name: "with proxy url",
kubeconfig: kubeconfigWithProxy,
expectedServer: server2,
expectedCAData: caData2,
expectedProxyURL: proxyURL,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
filename := path.Join(tempDir, "kubeconfig")
if err := clientcmd.WriteToFile(c.kubeconfig, filename); err != nil {
t.Errorf("unexpected error: %v", err)
}
ctxCluster, server, proxyURL, caData, err := parseKubeconfig(filename)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if ctxCluster != "test-cluster" {
t.Errorf("expect context cluster %s, but %s", "test-cluster", ctxCluster)
}
if c.expectedServer != server {
t.Errorf("expect server %s, but %s", c.expectedServer, server)
}
if c.expectedProxyURL != proxyURL {
t.Errorf("expect proxy url %s, but %s", c.expectedProxyURL, proxyURL)
}
if !reflect.DeepEqual(c.expectedCAData, caData) {
t.Errorf("expect ca data %v, but %v", c.expectedCAData, caData)
}
})
}
}

View File

@@ -6,6 +6,7 @@ import (
"github.com/openshift/library-go/pkg/controller/controllercmd"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/klog/v2"
commonoptions "open-cluster-management.io/ocm/pkg/common/options"
@@ -14,9 +15,8 @@ import (
)
type AgentConfig struct {
agentOption *commonoptions.AgentOptions
registrationOption *registration.SpokeAgentOptions
workOption *work.WorkloadAgentOptions
registrationConfig *registration.SpokeAgentConfig
workConfig *work.WorkAgentConfig
}
func NewAgentConfig(
@@ -24,31 +24,28 @@ func NewAgentConfig(
registrationOption *registration.SpokeAgentOptions,
workOption *work.WorkloadAgentOptions) *AgentConfig {
return &AgentConfig{
agentOption: agentOption,
registrationOption: registrationOption,
workOption: workOption,
registrationConfig: registration.NewSpokeAgentConfig(agentOption, registrationOption),
workConfig: work.NewWorkAgentConfig(agentOption, workOption),
}
}
func (a *AgentConfig) RunSpokeAgent(ctx context.Context, controllerContext *controllercmd.ControllerContext) error {
registrationCfg := registration.NewSpokeAgentConfig(a.agentOption, a.registrationOption)
// start registration agent at first
go func() {
if err := registrationCfg.RunSpokeAgent(ctx, controllerContext); err != nil {
if err := a.registrationConfig.RunSpokeAgent(ctx, controllerContext); err != nil {
klog.Fatal(err)
}
}()
// wait for the hub client config ready.
klog.Info("Waiting for hub client config and managed cluster to be ready")
if err := wait.PollUntilContextCancel(ctx, 1*time.Second, true, registrationCfg.HasValidHubClientConfig); err != nil {
if err := wait.PollUntilContextCancel(ctx, 1*time.Second, true, a.registrationConfig.IsHubKubeConfigValid); err != nil {
return err
}
workCfg := work.NewWorkAgentConfig(a.agentOption, a.workOption)
// start work agent
go func() {
if err := workCfg.RunWorkloadAgent(ctx, controllerContext); err != nil {
if err := a.workConfig.RunWorkloadAgent(ctx, controllerContext); err != nil {
klog.Fatal(err)
}
}()
@@ -56,3 +53,7 @@ func (a *AgentConfig) RunSpokeAgent(ctx context.Context, controllerContext *cont
<-ctx.Done()
return nil
}
func (o *AgentConfig) HealthCheckers() []healthz.HealthChecker {
return o.registrationConfig.HealthCheckers()
}

View File

@@ -18,6 +18,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/utils/pointer"
workapiv1 "open-cluster-management.io/api/work/v1"
)
@@ -102,7 +103,7 @@ func (c *UpdateApply) applyUnstructured(
existingOwners := existing.GetOwnerReferences()
existingLabels := existing.GetLabels()
existingAnnotations := existing.GetAnnotations()
modified := resourcemerge.BoolPtr(false)
modified := pointer.Bool(false)
resourcemerge.MergeMap(modified, &existingLabels, required.GetLabels())
resourcemerge.MergeMap(modified, &existingAnnotations, required.GetAnnotations())

View File

@@ -197,7 +197,7 @@ func (o *WorkAgentConfig) RunWorkloadAgent(ctx context.Context, controllerContex
}
func buildCodecs(codecNames []string, restMapper meta.RESTMapper) []generic.Codec[*workv1.ManifestWork] {
codecs := []generic.Codec[*workv1.ManifestWork]{}
var codecs []generic.Codec[*workv1.ManifestWork]
for _, name := range codecNames {
if name == manifestBundleCodecName {
codecs = append(codecs, codec.NewManifestBundleCodec())

View File

@@ -22,8 +22,9 @@ import (
clusterv1 "open-cluster-management.io/api/cluster/v1"
clusterv1alpha1 "open-cluster-management.io/api/cluster/v1alpha1"
"open-cluster-management.io/ocm/pkg/registration/clientcert"
"open-cluster-management.io/ocm/pkg/registration/helpers"
"open-cluster-management.io/ocm/pkg/registration/register"
"open-cluster-management.io/ocm/pkg/registration/register/csr"
)
var _ = ginkgo.Describe("Loopback registration [development]", func() {
@@ -307,13 +308,13 @@ var _ = ginkgo.Describe("Loopback registration [development]", func() {
if err != nil {
return false
}
if _, ok := secret.Data[clientcert.TLSKeyFile]; !ok {
if _, ok := secret.Data[csr.TLSKeyFile]; !ok {
return false
}
if _, ok := secret.Data[clientcert.TLSCertFile]; !ok {
if _, ok := secret.Data[csr.TLSCertFile]; !ok {
return false
}
if _, ok := secret.Data[clientcert.KubeconfigFile]; !ok {
if _, ok := secret.Data[register.KubeconfigFile]; !ok {
return false
}
return true
@@ -326,7 +327,7 @@ var _ = ginkgo.Describe("Loopback registration [development]", func() {
return err
}
if !meta.IsStatusConditionTrue(found.Status.Conditions, clientcert.ClusterCertificateRotatedCondition) {
if !meta.IsStatusConditionTrue(found.Status.Conditions, csr.ClusterCertificateRotatedCondition) {
return fmt.Errorf("Client cert condition is not correct")
}

View File

@@ -21,7 +21,8 @@ import (
clusterv1 "open-cluster-management.io/api/cluster/v1"
commonoptions "open-cluster-management.io/ocm/pkg/common/options"
"open-cluster-management.io/ocm/pkg/registration/clientcert"
"open-cluster-management.io/ocm/pkg/registration/register"
"open-cluster-management.io/ocm/pkg/registration/register/csr"
"open-cluster-management.io/ocm/pkg/registration/spoke"
"open-cluster-management.io/ocm/test/integration/util"
)
@@ -62,19 +63,19 @@ var _ = ginkgo.Describe("Addon Registration", func() {
assertSuccessClusterBootstrap := func() {
// the spoke cluster and csr should be created after bootstrap
ginkgo.By("Check existence of ManagedCluster & CSR")
gomega.Eventually(func() bool {
gomega.Eventually(func() error {
if _, err := util.GetManagedCluster(clusterClient, managedClusterName); err != nil {
return false
return err
}
return true
}, eventuallyTimeout, eventuallyInterval).Should(gomega.BeTrue())
return nil
}, eventuallyTimeout, eventuallyInterval).Should(gomega.Succeed())
gomega.Eventually(func() bool {
gomega.Eventually(func() error {
if _, err := util.FindUnapprovedSpokeCSR(kubeClient, managedClusterName); err != nil {
return false
return err
}
return true
}, eventuallyTimeout, eventuallyInterval).Should(gomega.BeTrue())
return nil
}, eventuallyTimeout, eventuallyInterval).Should(gomega.Succeed())
// the spoke cluster should has finalizer that is added by hub controller
gomega.Eventually(func() bool {
@@ -157,13 +158,13 @@ var _ = ginkgo.Describe("Addon Registration", func() {
if err != nil {
return false
}
if _, ok := secret.Data[clientcert.TLSKeyFile]; !ok {
if _, ok := secret.Data[csr.TLSKeyFile]; !ok {
return false
}
if _, ok := secret.Data[clientcert.TLSCertFile]; !ok {
if _, ok := secret.Data[csr.TLSCertFile]; !ok {
return false
}
kubeconfigData, ok := secret.Data[clientcert.KubeconfigFile]
kubeconfigData, ok := secret.Data[register.KubeconfigFile]
if signerName == certificates.KubeAPIServerClientSignerName {
if !ok {
@@ -208,7 +209,7 @@ var _ = ginkgo.Describe("Addon Registration", func() {
if err != nil {
return false
}
return meta.IsStatusConditionTrue(addon.Status.Conditions, clientcert.ClusterCertificateRotatedCondition)
return meta.IsStatusConditionTrue(addon.Status.Conditions, csr.ClusterCertificateRotatedCondition)
}, eventuallyTimeout, eventuallyInterval).Should(gomega.BeTrue())
}
@@ -349,13 +350,17 @@ var _ = ginkgo.Describe("Addon Registration", func() {
assertSuccessCSRApproval()
ginkgo.By("Wait for addon namespace")
gomega.Consistently(func() bool {
gomega.Consistently(func() error {
csrs, err := util.FindAddOnCSRs(kubeClient, managedClusterName, addOnName)
if err != nil {
return false
return err
}
return len(csrs) == 1
}, eventuallyTimeout, eventuallyInterval).Should(gomega.BeTrue())
if len(csrs) != 1 {
return fmt.Errorf("the number of CSRs is not correct, got %d", len(csrs))
}
return nil
}, eventuallyTimeout, eventuallyInterval).Should(gomega.Succeed())
ginkgo.By("Create addon namespace")
// create addon namespace

View File

@@ -27,8 +27,8 @@ import (
commonoptions "open-cluster-management.io/ocm/pkg/common/options"
"open-cluster-management.io/ocm/pkg/features"
"open-cluster-management.io/ocm/pkg/registration/clientcert"
"open-cluster-management.io/ocm/pkg/registration/hub"
"open-cluster-management.io/ocm/pkg/registration/register"
"open-cluster-management.io/ocm/pkg/registration/spoke"
"open-cluster-management.io/ocm/pkg/registration/spoke/addon"
"open-cluster-management.io/ocm/pkg/registration/spoke/registration"
@@ -78,15 +78,15 @@ var CRDPaths = []string{
}
func runAgent(name string, opt *spoke.SpokeAgentOptions, commOption *commonoptions.AgentOptions, cfg *rest.Config) context.CancelFunc {
agentConfig := spoke.NewSpokeAgentConfig(commOption, opt)
ctx, cancel := context.WithCancel(context.Background())
runAgentWithContext(ctx, name, opt, commOption, cfg)
runAgentWithContext(ctx, name, agentConfig, cfg)
return cancel
}
func runAgentWithContext(ctx context.Context, name string, opt *spoke.SpokeAgentOptions, commOption *commonoptions.AgentOptions, cfg *rest.Config) {
func runAgentWithContext(ctx context.Context, name string, agentConfig *spoke.SpokeAgentConfig, cfg *rest.Config) {
go func() {
config := spoke.NewSpokeAgentConfig(commOption, opt)
err := config.RunSpokeAgent(ctx, &controllercmd.ControllerContext{
err := agentConfig.RunSpokeAgent(ctx, &controllercmd.ControllerContext{
KubeConfig: cfg,
EventRecorder: util.NewIntegrationTestEventRecorder(name),
})
@@ -111,7 +111,7 @@ var _ = ginkgo.BeforeSuite(func() {
// crank up the sync speed
transport.CertCallbackRefreshDuration = 5 * time.Second
clientcert.ControllerResyncInterval = 5 * time.Second
register.ControllerResyncInterval = 5 * time.Second
registration.CreatingControllerSyncInterval = 1 * time.Second
// crank up the addon lease sync and udpate speed

View File

@@ -29,8 +29,9 @@ import (
clusterv1 "open-cluster-management.io/api/cluster/v1"
commonoptions "open-cluster-management.io/ocm/pkg/common/options"
"open-cluster-management.io/ocm/pkg/registration/clientcert"
"open-cluster-management.io/ocm/pkg/registration/hub"
"open-cluster-management.io/ocm/pkg/registration/register"
"open-cluster-management.io/ocm/pkg/registration/register/csr"
"open-cluster-management.io/ocm/pkg/registration/spoke"
"open-cluster-management.io/ocm/test/integration/util"
)
@@ -214,13 +215,13 @@ var _ = ginkgo.Describe("Rebootstrap", func() {
if err != nil {
return false
}
if _, ok := secret.Data[clientcert.TLSKeyFile]; !ok {
if _, ok := secret.Data[csr.TLSKeyFile]; !ok {
return false
}
if _, ok := secret.Data[clientcert.TLSCertFile]; !ok {
if _, ok := secret.Data[csr.TLSCertFile]; !ok {
return false
}
_, ok := secret.Data[clientcert.KubeconfigFile]
_, ok := secret.Data[register.KubeconfigFile]
if !ok && signerName == certificates.KubeAPIServerClientSignerName {
return false
}
@@ -297,20 +298,22 @@ var _ = ginkgo.Describe("Rebootstrap", func() {
}
startAgent := func(ctx context.Context, managedClusterName, hubKubeconfigDir string,
agentOptions *spoke.SpokeAgentOptions) (context.Context, context.CancelFunc) {
agentOptions *spoke.SpokeAgentOptions) (context.Context, context.CancelFunc, *spoke.SpokeAgentConfig) {
ginkgo.By("run registration agent")
commOptions := commonoptions.NewAgentOptions()
commOptions.HubKubeconfigDir = hubKubeconfigDir
commOptions.SpokeClusterName = managedClusterName
agentCtx, stopAgent := context.WithCancel(ctx)
runAgentWithContext(agentCtx, "rebootstrap-test", agentOptions, commOptions, spokeCfg)
agentConfig := spoke.NewSpokeAgentConfig(commOptions, agentOptions)
runAgentWithContext(agentCtx, "rebootstrap-test", agentConfig, spokeCfg)
return agentCtx, stopAgent
return agentCtx, stopAgent, agentConfig
}
ginkgo.JustBeforeEach(func() {
var spokeCtx, agentCtx context.Context
var agentConfig *spoke.SpokeAgentConfig
var stopAgent context.CancelFunc
// ensure there is no remaining bootstrap-hub-kubeconfig secret
@@ -334,7 +337,7 @@ var _ = ginkgo.Describe("Rebootstrap", func() {
agentOptions.HubKubeconfigSecret = hubKubeconfigSecret
spokeCtx, stopSpoke = context.WithCancel(context.Background())
agentCtx, stopAgent = startAgent(spokeCtx, managedClusterName, hubKubeconfigDir, agentOptions)
agentCtx, stopAgent, agentConfig = startAgent(spokeCtx, managedClusterName, hubKubeconfigDir, agentOptions)
// simulate k8s scheduler to perform heath check and restart the agent if it is down/unhealth
go func() {
@@ -345,7 +348,7 @@ var _ = ginkgo.Describe("Rebootstrap", func() {
case <-ticker.C:
// health check
fmt.Println("[agent-scheduler] - start health checking...")
for _, healthchecker := range agentOptions.GetHealthCheckers() {
for _, healthchecker := range agentConfig.HealthCheckers() {
if err := healthchecker.Check(nil); err != nil {
fmt.Printf("[agent-scheduler] - stop agent because it is not health: %v\n", err)
stopAgent()
@@ -360,7 +363,7 @@ var _ = ginkgo.Describe("Rebootstrap", func() {
case <-agentCtx.Done():
// restart agent
fmt.Println("[agent-scheduler] - restart agent...")
agentCtx, stopAgent = startAgent(spokeCtx, managedClusterName, hubKubeconfigDir, agentOptions)
agentCtx, stopAgent, _ = startAgent(spokeCtx, managedClusterName, hubKubeconfigDir, agentOptions)
case <-spokeCtx.Done():
// exit
fmt.Println("[agent-scheduler] - shutting down...")
@@ -525,7 +528,7 @@ var _ = ginkgo.Describe("Rebootstrap", func() {
if err != nil {
return false
}
data, ok := secret.Data[clientcert.TLSCertFile]
data, ok := secret.Data[csr.TLSCertFile]
if !ok {
return false
}

View File

@@ -95,12 +95,12 @@ var _ = ginkgo.Describe("Agent Recovery", func() {
gomega.Expect(err).NotTo(gomega.HaveOccurred())
// the hub kubeconfig secret should be filled after the csr is approved
gomega.Eventually(func() bool {
gomega.Eventually(func() error {
if _, err := util.GetFilledHubKubeConfigSecret(kubeClient, testNamespace, hubKubeconfigSecret); err != nil {
return false
return err
}
return true
}, eventuallyTimeout, eventuallyInterval).Should(gomega.BeTrue())
return nil
}, eventuallyTimeout, eventuallyInterval).Should(gomega.Succeed())
// the spoke cluster should have joined condition finally
gomega.Eventually(func() error {

View File

@@ -78,8 +78,8 @@ var _ = Describe("switch-hub", Ordered, func() {
agentOptions.HubConnectionTimeoutSeconds = 10
return agentOptions
},
func(ctx context.Context, stopAgent context.CancelFunc, agentOptions *spoke.SpokeAgentOptions) {
startAgentHealthChecker(ctx, stopAgent, agentOptions.GetHealthCheckers())
func(ctx context.Context, stopAgent context.CancelFunc, agentConfig *spoke.SpokeAgentConfig) {
startAgentHealthChecker(ctx, stopAgent, agentConfig.HealthCheckers())
})
approveAndAcceptManagedCluster(managedClusterName, hub1.kubeClient, hub1.clusterClient, hub1.authn, 10*time.Minute)
@@ -217,16 +217,17 @@ func startNewHub(ctx context.Context, hubName string) *mockHub {
}
func startAgent(ctx context.Context, managedClusterName, hubKubeconfigDir string,
agentOptions *spoke.SpokeAgentOptions) (context.Context, context.CancelFunc) {
agentOptions *spoke.SpokeAgentOptions) (context.Context, context.CancelFunc, *spoke.SpokeAgentConfig) {
ginkgo.By("run registration agent")
commOptions := commonoptions.NewAgentOptions()
commOptions.HubKubeconfigDir = hubKubeconfigDir
commOptions.SpokeClusterName = managedClusterName
agentCtx, stopAgent := context.WithCancel(ctx)
runAgentWithContext(agentCtx, "switch-hub", agentOptions, commOptions, spokeCfg)
agentConfig := spoke.NewSpokeAgentConfig(commOptions, agentOptions)
runAgentWithContext(agentCtx, "switch-hub", agentConfig, spokeCfg)
return agentCtx, stopAgent
return agentCtx, stopAgent, agentConfig
}
func approveAndAcceptManagedCluster(managedClusterName string,
@@ -324,12 +325,12 @@ func assertManagedClusterSuccessfullyJoined(testNamespace, managedClusterName, h
func startAutoRestartAgent(ctx context.Context,
managedClusterName, hubKubeconfigDir string,
getNewAgentOptions func() *spoke.SpokeAgentOptions,
watchStop func(ctx context.Context, stopAgent context.CancelFunc, agentOptions *spoke.SpokeAgentOptions),
watchStop func(ctx context.Context, stopAgent context.CancelFunc, agentConfig *spoke.SpokeAgentConfig),
) {
fmt.Println("[auto-restart-agent] - start agent...")
newAgentOptions := getNewAgentOptions()
agentCtx, stopAgent := startAgent(ctx, managedClusterName, hubKubeconfigDir, newAgentOptions)
go watchStop(ctx, stopAgent, newAgentOptions)
agentCtx, stopAgent, agentConfig := startAgent(ctx, managedClusterName, hubKubeconfigDir, newAgentOptions)
go watchStop(ctx, stopAgent, agentConfig)
for {
select {
case <-agentCtx.Done():
@@ -338,8 +339,8 @@ func startAutoRestartAgent(ctx context.Context,
fmt.Println("[auto-restart-agent] - restart agent...")
newAgentOptions := getNewAgentOptions()
agentCtx, stopAgent = startAgent(ctx, managedClusterName, hubKubeconfigDir, newAgentOptions)
go watchStop(ctx, stopAgent, newAgentOptions)
agentCtx, stopAgent, agentConfig = startAgent(ctx, managedClusterName, hubKubeconfigDir, newAgentOptions)
go watchStop(ctx, stopAgent, agentConfig)
case <-ctx.Done():
// exit
fmt.Println("[auto-restart-agent] - shutting down...")