diff --git a/pkg/hub/csr/controller.go b/pkg/hub/csr/controller.go index 3263e22ba..5f0a9925e 100644 --- a/pkg/hub/csr/controller.go +++ b/pkg/hub/csr/controller.go @@ -34,7 +34,7 @@ type csrApprovingController struct { func NewCSRApprovingController(kubeClient kubernetes.Interface, csrInformer factory.Informer, recorder events.Recorder) factory.Controller { c := &csrApprovingController{ kubeClient: kubeClient, - eventRecorder: recorder.WithComponentSuffix("csr-controller"), + eventRecorder: recorder.WithComponentSuffix("csr-approving-controller"), } return factory.New(). WithInformersQueueKeyFunc(func(obj runtime.Object) string { @@ -42,7 +42,7 @@ func NewCSRApprovingController(kubeClient kubernetes.Interface, csrInformer fact return accessor.GetName() }, csrInformer). WithSync(c.sync). - ToController("CSRController", recorder) + ToController("CSRApprovingController", recorder) } func (c *csrApprovingController) sync(ctx context.Context, syncCtx factory.SyncContext) error { @@ -74,6 +74,7 @@ func (c *csrApprovingController) sync(ctx context.Context, syncCtx factory.SyncC return err } if !allowed { + //TODO find a way to avoid looking at this CSR again. klog.V(4).Infof("Spoke cluster csr %q cannont be auto approved due to subject access review was not approved", csr.Name) return nil } @@ -81,7 +82,7 @@ func (c *csrApprovingController) sync(ctx context.Context, syncCtx factory.SyncC // Auto approve the spoke cluster csr csr.Status.Conditions = append(csr.Status.Conditions, certificatesv1beta1.CertificateSigningRequestCondition{ Type: certificatesv1beta1.CertificateApproved, - Reason: "AutoApproved", + Reason: "AutoApprovedByHubCSRApprovingController", Message: "Auto approving spoke cluster agent certificate after SubjectAccessReview.", }) _, err = c.kubeClient.CertificatesV1beta1().CertificateSigningRequests().UpdateApproval(ctx, csr, metav1.UpdateOptions{}) @@ -165,14 +166,13 @@ func isSpokeClusterClientCertRenewal(csr *certificatesv1beta1.CertificateSigning // Check whether a CSR is in terminal state func isCSRInTerminalState(status *certificatesv1beta1.CertificateSigningRequestStatus) bool { - approved, denied := false, false for _, c := range status.Conditions { if c.Type == certificatesv1beta1.CertificateApproved { - approved = true + return true } if c.Type == certificatesv1beta1.CertificateDenied { - denied = true + return true } } - return approved || denied + return false } diff --git a/pkg/hub/csr/controller_test.go b/pkg/hub/csr/controller_test.go index 2cb788b49..e38478e31 100644 --- a/pkg/hub/csr/controller_test.go +++ b/pkg/hub/csr/controller_test.go @@ -41,35 +41,35 @@ func TestSync(t *testing.T) { name: "sync a deleted csr", startingCSRs: []runtime.Object{}, validateActions: func(t *testing.T, actions []clienttesting.Action) { - assertAction(t, actions, "get") + assertActions(t, actions, "get") }, }, { name: "sync a denied csr", startingCSRs: []runtime.Object{newDeniedCSR()}, validateActions: func(t *testing.T, actions []clienttesting.Action) { - assertAction(t, actions, "get") + assertActions(t, actions, "get") }, }, { name: "sync an approved csr", startingCSRs: []runtime.Object{newApprovedCSR()}, validateActions: func(t *testing.T, actions []clienttesting.Action) { - assertAction(t, actions, "get") + assertActions(t, actions, "get") }, }, { name: "sync an invalid csr", startingCSRs: []runtime.Object{newInvalidCSR()}, validateActions: func(t *testing.T, actions []clienttesting.Action) { - assertAction(t, actions, "get") + assertActions(t, actions, "get") }, }, { name: "deny an auto approving csr", startingCSRs: []runtime.Object{newRenewalCSR()}, validateActions: func(t *testing.T, actions []clienttesting.Action) { - assertAction(t, actions, "get", "create") + assertActions(t, actions, "get", "create") assertSubjectAccessReviewCreated(t, actions[1].(clienttesting.CreateActionImpl).Object) }, }, @@ -78,8 +78,8 @@ func TestSync(t *testing.T) { startingCSRs: []runtime.Object{newRenewalCSR()}, autoApprovingAllowed: true, validateActions: func(t *testing.T, actions []clienttesting.Action) { - assertAction(t, actions, "get", "create", "update") - assertCondition(t, actions[2].(clienttesting.UpdateActionImpl).Object, certificatesv1beta1.CertificateApproved, "AutoApproved") + assertActions(t, actions, "get", "create", "update") + assertCondition(t, actions[2].(clienttesting.UpdateActionImpl).Object, certificatesv1beta1.CertificateApproved, "AutoApprovedByHubCSRApprovingController") }, }, } @@ -249,7 +249,7 @@ func newApprovedCSR() *certificatesv1beta1.CertificateSigningRequest { return csr } -func assertAction(t *testing.T, actualActions []clienttesting.Action, expectedActions ...string) { +func assertActions(t *testing.T, actualActions []clienttesting.Action, expectedActions ...string) { if len(actualActions) != len(expectedActions) { t.Errorf("expected %d call but got: %#v", len(expectedActions), actualActions) } diff --git a/pkg/hub/manager.go b/pkg/hub/manager.go index 72dbda08e..3abb11d56 100644 --- a/pkg/hub/manager.go +++ b/pkg/hub/manager.go @@ -27,7 +27,7 @@ func RunControllerManager(ctx context.Context, controllerContext *controllercmd. } clusterInformers := clusterv1informers.NewSharedInformerFactory(clusterClient, 10*time.Minute) - csrInformers := kubeinformers.NewSharedInformerFactory(kubeClient, 10*time.Minute) + kubeInfomers := kubeinformers.NewSharedInformerFactory(kubeClient, 10*time.Minute) spokeClusterController := spokecluster.NewSpokeClusterController( kubeClient, @@ -38,12 +38,12 @@ func RunControllerManager(ctx context.Context, controllerContext *controllercmd. csrController := csr.NewCSRApprovingController( kubeClient, - csrInformers.Certificates().V1beta1().CertificateSigningRequests().Informer(), + kubeInfomers.Certificates().V1beta1().CertificateSigningRequests().Informer(), controllerContext.EventRecorder, ) go clusterInformers.Start(ctx.Done()) - go csrInformers.Start(ctx.Done()) + go kubeInfomers.Start(ctx.Done()) go spokeClusterController.Run(ctx, 1) go csrController.Run(ctx, 1) diff --git a/pkg/spoke/hubclientcert/controller.go b/pkg/spoke/hubclientcert/controller.go index 2f5c40c6e..c57da1b70 100644 --- a/pkg/spoke/hubclientcert/controller.go +++ b/pkg/spoke/hubclientcert/controller.go @@ -65,7 +65,7 @@ type ClientCertForHubController struct { csrName string // keyData is the private key data used to created a csr // csrName and keyData store the internal state of the controller. They are set after controller creates a new csr - // and cleared once the csr is approved and processed by controller. There are 4 combination of thier values: + // and cleared once the csr is approved and processed by controller. There are 4 combination of their values: // 1. csrName empty, keyData empty: means we aren't trying to create a new client cert, our current one is valid // 2. csrName set, keyData empty: there was bug // 3. csrName set, keyData set: we are waiting for a new cert to be signed. @@ -74,10 +74,14 @@ type ClientCertForHubController struct { } // NewClientCertForHubController return a ClientCertForHubController -func NewClientCertForHubController(clusterName, agentName, hubKubeconfigSecretNamespace, - kubeconfigSecretName string, hubClientConfig *restclient.Config, spokeCoreClient corev1client.CoreV1Interface, - hubCSRClient csrclient.CertificateSigningRequestInterface, hubCSRInformer certificatesinformers.CertificateSigningRequestInformer, - spokeSecretInformer corev1informers.SecretInformer, recorder events.Recorder, controllerNameOverride string) (factory.Controller, error) { +func NewClientCertForHubController( + clusterName, agentName, hubKubeconfigSecretNamespace, kubeconfigSecretName string, + hubClientConfig *restclient.Config, + spokeCoreClient corev1client.CoreV1Interface, + hubCSRClient csrclient.CertificateSigningRequestInterface, + hubCSRInformer certificatesinformers.CertificateSigningRequestInformer, + spokeSecretInformer corev1informers.SecretInformer, + recorder events.Recorder, controllerName string) factory.Controller { c := &ClientCertForHubController{ clusterName: clusterName, agentName: agentName, @@ -90,16 +94,11 @@ func NewClientCertForHubController(clusterName, agentName, hubKubeconfigSecretNa spokeCoreClient: spokeCoreClient, } - controllerName := "ClientCertForHubController" - if controllerNameOverride != "" { - controllerName = controllerNameOverride - } - return factory.New(). WithInformers(hubCSRInformer.Informer(), spokeSecretInformer.Informer()). WithSync(c.sync). ResyncEvery(5*time.Minute). - ToController(controllerName, recorder), nil + ToController(controllerName, recorder) } func (c *ClientCertForHubController) sync(ctx context.Context, syncCtx factory.SyncContext) error { diff --git a/pkg/spoke/hubclientcert/controller_test.go b/pkg/spoke/hubclientcert/controller_test.go index 76b166951..ab6caba6d 100644 --- a/pkg/spoke/hubclientcert/controller_test.go +++ b/pkg/spoke/hubclientcert/controller_test.go @@ -158,7 +158,7 @@ func TestSyncWithoutHubKubeconfigSecret(t *testing.T) { t.Errorf("expected agent name %q but got: %s", agentName, string(secret.Data[AgentNameFile])) } - // checkc if there is new csr created + // check if there is new csr created csrs, err := fakeKubeClient.CertificatesV1beta1().CertificateSigningRequests().List(context.Background(), metav1.ListOptions{}) if err != nil { t.Errorf("unexpected error: %v", err) diff --git a/pkg/spoke/spokeagent.go b/pkg/spoke/spokeagent.go index 26a81cdc6..539031e7e 100644 --- a/pkg/spoke/spokeagent.go +++ b/pkg/spoke/spokeagent.go @@ -15,13 +15,11 @@ import ( "github.com/open-cluster-management/registration/pkg/spoke/spokecluster" "github.com/openshift/library-go/pkg/controller/controllercmd" "github.com/spf13/pflag" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilrand "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" - corev1client "k8s.io/client-go/kubernetes/typed/core/v1" restclient "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog" @@ -36,13 +34,13 @@ const ( // SpokeAgentOptions holds configuration for spoke cluster agent type SpokeAgentOptions struct { - ComponentNamespace string - ClusterName string - AgentName string - BootstrapKubeconfig string - HubKubeconfigSecret string - HubKubeconfigDir string - SpokeServerUrl string + ComponentNamespace string + ClusterName string + AgentName string + BootstrapKubeconfig string + HubKubeconfigSecret string + HubKubeconfigDir string + SpokeExternalServerUrl string } // NewSpokeAgentOptions returns a SpokeAgentOptions @@ -110,26 +108,52 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext if err != nil { return fmt.Errorf("unable to load bootstrap kubeconfig from file %q: %w", o.BootstrapKubeconfig, err) } - bootstrapClient, err := kubernetes.NewForConfig(bootstrapClientConfig) + bootstrapKubeClient, err := kubernetes.NewForConfig(bootstrapClientConfig) if err != nil { return err } - bootstrapInformerFactory := informers.NewSharedInformerFactory(bootstrapClient, 10*time.Minute) + bootstrapClusterClient, err := clusterv1client.NewForConfig(bootstrapClientConfig) + if err != nil { + return err + } + + bootstrapInformerFactory := informers.NewSharedInformerFactory(bootstrapKubeClient, 10*time.Minute) // create a ClientCertForHubController for spoke agent bootstrap - clientCertForHubController, err := hubclientcert.NewClientCertForHubController( + clientCertForHubController := hubclientcert.NewClientCertForHubController( o.ClusterName, o.AgentName, o.ComponentNamespace, o.HubKubeconfigSecret, restclient.AnonymousClientConfig(bootstrapClientConfig), spokeKubeClient.CoreV1(), - bootstrapClient.CertificatesV1beta1().CertificateSigningRequests(), + bootstrapKubeClient.CertificatesV1beta1().CertificateSigningRequests(), bootstrapInformerFactory.Certificates().V1beta1().CertificateSigningRequests(), spokeKubeInformerFactory.Core().V1().Secrets(), controllerContext.EventRecorder, - "BoostrapClientCertForHubController", + "BootstrapClientCertForHubController", ) - if err != nil { - return err + + // create a SpokeClusterCreatingControlle to create a spoke cluster on hub cluster + hasSpokeCluster := false + + caBundle := controllerContext.KubeConfig.CAData + if caBundle == nil && controllerContext.KubeConfig.CAFile != "" { + data, err := ioutil.ReadFile(controllerContext.KubeConfig.CAFile) + if err != nil { + return fmt.Errorf("unable to load CA data from file %q: %w", controllerContext.KubeConfig.CAFile, err) + } + caBundle = data } + + // TODO there is a corner case, if the hub kubeconfig is ready, but the spoke cluster did not create, and the agent is + // restarted, the agent will not be able to create spoke cluster again due to the bootstrap kubeconfig has been switched + // to hub kubeconfig, in this case, the hub cluster admin need to create the spoke cluster on hub cluster manually + spokeClusterCreatingController := spokecluster.NewSpokeClusterCreatingController( + o.ClusterName, o.SpokeExternalServerUrl, + caBundle, + bootstrapClusterClient, + func(isCreated bool) { hasSpokeCluster = isCreated }, + controllerContext.EventRecorder, + ) + var bootstrapCtx context.Context bootstrapCtx, stopBootstrap = context.WithCancel(ctx) @@ -137,12 +161,16 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext go spokeKubeInformerFactory.Start(bootstrapCtx.Done()) go clientCertForHubController.Run(bootstrapCtx, 1) + go spokeClusterCreatingController.Run(bootstrapCtx, 1) + + // wait for the spoke cluster is created + wait.PollImmediateInfinite(1*time.Second, func() (bool, error) { return hasSpokeCluster, nil }) } // wait for the client config for hub is ready klog.Info("Waiting for client config for hub to be ready") - err = wait.PollImmediateInfinite(1*time.Second, o.hasValidHubClientConfig) - if err != nil { + if err := wait.PollImmediateInfinite(1*time.Second, o.hasValidHubClientConfig); err != nil { + // TODO we need run the bootstrap CSR forever too to re-establish the client-cert if we ever lose it. if stopBootstrap != nil { stopBootstrap() } @@ -176,7 +204,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext controllerContext.EventRecorder.Event("HubClientConfigReady", "Client config for hub is ready.") // create another ClientCertForHubController for client certificate rotation - clientCertForHubController, err := hubclientcert.NewClientCertForHubController( + clientCertForHubController := hubclientcert.NewClientCertForHubController( o.ClusterName, o.AgentName, o.ComponentNamespace, o.HubKubeconfigSecret, restclient.AnonymousClientConfig(hubClientConfig), spokeKubeClient.CoreV1(), @@ -184,30 +212,12 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext hubKubeInformerFactory.Certificates().V1beta1().CertificateSigningRequests(), spokeKubeInformerFactory.Core().V1().Secrets(), controllerContext.EventRecorder, - "", + "ClientCertForHubController", ) - if err != nil { - return err - } // create SpokeClusterController to reconcile instances of SpokeCluster on the spoke cluster - caBundle := controllerContext.KubeConfig.CAData - if caBundle == nil && controllerContext.KubeConfig.CAFile != "" { - data, err := ioutil.ReadFile(controllerContext.KubeConfig.CAFile) - if err != nil { - return fmt.Errorf("unable to load CA data from file %q: %w", controllerContext.KubeConfig.CAFile, err) - } - caBundle = data - } - - spokeServerURL, err := o.getSpokeServerURL(spokeKubeClient.CoreV1()) - if err != nil { - return err - } - spokeClusterController := spokecluster.NewSpokeClusterController( - o.ClusterName, spokeServerURL, - caBundle, + o.ClusterName, hubClusterClient, hubClusterInformerFactory.Cluster().V1().SpokeClusters(), spokeKubeClient.Discovery(), @@ -236,7 +246,7 @@ func (o *SpokeAgentOptions) AddFlags(fs *pflag.FlagSet) { "The name of secret in component namespace storing kubeconfig for hub.") fs.StringVar(&o.HubKubeconfigDir, "hub-kubeconfig-dir", o.HubKubeconfigDir, "The mount path of hub-kubeconfig-secret in the container.") - fs.StringVar(&o.SpokeServerUrl, "spoke-server-url", o.SpokeServerUrl, + fs.StringVar(&o.SpokeExternalServerUrl, "spoke-external-server-url", o.SpokeExternalServerUrl, "A reachable URL of spoke cluster api server for hub cluster.") } @@ -254,6 +264,10 @@ func (o *SpokeAgentOptions) Validate() error { return errors.New("agent name is empty") } + if o.SpokeExternalServerUrl == "" { + return errors.New("spoke cluster external api server url is empty") + } + return nil } @@ -347,32 +361,3 @@ func (o *SpokeAgentOptions) getOrGenerateClusterAgentNames() (string, string) { return clusterName, agentName } - -// getSpokeServerURL returns the api server url of spoke cluster -func (o *SpokeAgentOptions) getSpokeServerURL(spokeCoreClient corev1client.CoreV1Interface) (string, error) { - // use user specified spoke server url if exists - if o.SpokeServerUrl != "" { - return o.SpokeServerUrl, nil - } - - // otherwise try to get it from the default/kubernetes endpoints - klog.Infof("Try to get spoke server URL from default/kubernetes endpoints") - kubeEndpoints, err := spokeCoreClient.Endpoints("default").Get(context.TODO(), "kubernetes", metav1.GetOptions{}) - if err != nil { - return "", fmt.Errorf("unable to get server URL from default/kubernetes endpoint: %w", err) - } - - if len(kubeEndpoints.Subsets) == 0 { - return "", fmt.Errorf("unable to get server URL from default/kubernetes endpoint: no subsets") - } - - if len(kubeEndpoints.Subsets[0].Addresses) == 0 { - return "", fmt.Errorf("unable to get server URL from default/kubernetes endpoint: no addresses") - } - - if len(kubeEndpoints.Subsets[0].Ports) == 0 { - return "", fmt.Errorf("unable to get server URL from default/kubernetes endpoint: no ports") - } - - return fmt.Sprintf("%s:%d", kubeEndpoints.Subsets[0].Addresses[0].IP, kubeEndpoints.Subsets[0].Ports[0].Port), nil -} diff --git a/pkg/spoke/spokecluster/controller.go b/pkg/spoke/spokecluster/controller.go index 3acb8ef9a..4f63db1fc 100644 --- a/pkg/spoke/spokecluster/controller.go +++ b/pkg/spoke/spokecluster/controller.go @@ -9,7 +9,6 @@ import ( "github.com/openshift/library-go/pkg/controller/factory" "github.com/openshift/library-go/pkg/operator/events" - "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" @@ -27,8 +26,6 @@ import ( // spokeClusterController reconciles instances of SpokeCluster on the spoke cluster. type spokeClusterController struct { clusterName string - spokeServerURL string - spokeCABundle []byte hubClusterClient clientset.Interface hubClusterLister clusterv1listers.SpokeClusterLister spokeDiscoveryClient discovery.DiscoveryInterface @@ -37,8 +34,7 @@ type spokeClusterController struct { // NewSpokeClusterController creates a new spoke cluster controller on the spoke cluster. func NewSpokeClusterController( - clusterName, spokeServerURL string, - spokeCABundle []byte, + clusterName string, hubClusterClient clientset.Interface, hubSpokeClusterInformer clusterv1informer.SpokeClusterInformer, spokeDiscoveryClient discovery.DiscoveryInterface, @@ -46,25 +42,25 @@ func NewSpokeClusterController( recorder events.Recorder) factory.Controller { c := &spokeClusterController{ clusterName: clusterName, - spokeServerURL: spokeServerURL, - spokeCABundle: spokeCABundle, hubClusterClient: hubClusterClient, hubClusterLister: hubSpokeClusterInformer.Lister(), spokeDiscoveryClient: spokeDiscoveryClient, spokeNodeLister: spokeNodeInformer.Lister(), } + return factory.New(). + // TODO need to have conditional node capacity recalculation based on number of nodes here and decoupling + // the node capacity calculation to another controller. WithInformers(hubSpokeClusterInformer.Informer(), spokeNodeInformer.Informer()). WithSync(c.sync). ResyncEvery(5*time.Minute). ToController("SpokeClusterController", recorder) } +// sync maintains the spoke-side status of a SpokeCluster, it maintains the SpokeClusterJoined condition according to +// the value of the SpokeClusterHubAccepted condition and ensures resource and version are up to date. func (c spokeClusterController) sync(ctx context.Context, syncCtx factory.SyncContext) error { spokeCluster, err := c.hubClusterLister.Get(c.clusterName) - if errors.IsNotFound(err) { - return c.createSpokeCluster(ctx, syncCtx) - } if err != nil { return fmt.Errorf("unable to get spoke cluster with name %q from hub: %w", c.clusterName, err) } @@ -72,7 +68,7 @@ func (c spokeClusterController) sync(ctx context.Context, syncCtx factory.SyncCo // current spoke cluster is not accepted, do nothing. acceptedCondition := helpers.FindSpokeClusterCondition(spokeCluster.Status.Conditions, clusterv1.SpokeClusterConditionHubAccepted) if !helpers.IsConditionTrue(acceptedCondition) { - klog.V(4).Infof("Spoke cluster %q is not accepted by hub yet", c.clusterName) + syncCtx.Recorder().Eventf("SpokeClusterIsNotAccepted", "Spoke cluster %q is not accepted by hub yet", c.clusterName) return nil } @@ -119,27 +115,6 @@ func (c spokeClusterController) sync(ctx context.Context, syncCtx factory.SyncCo return nil } -func (c *spokeClusterController) createSpokeCluster(ctx context.Context, syncCtx factory.SyncContext) error { - spokeCluster := &clusterv1.SpokeCluster{ - ObjectMeta: metav1.ObjectMeta{ - Name: c.clusterName, - }, - Spec: clusterv1.SpokeClusterSpec{ - SpokeClientConfig: clusterv1.ClientConfig{ - URL: c.spokeServerURL, - CABundle: c.spokeCABundle, - }, - }, - } - - if _, err := c.hubClusterClient.ClusterV1().SpokeClusters().Create(ctx, spokeCluster, metav1.CreateOptions{}); err != nil { - return fmt.Errorf("unable to create spoke cluster with name %q on hub: %w", c.clusterName, err) - } - - syncCtx.Recorder().Eventf("SpokeClusterCreated", "Spoke cluster %q created on hub", c.clusterName) - return nil -} - func (c *spokeClusterController) getSpokeVersion() (*clusterv1.SpokeVersion, error) { serverVersion, err := c.spokeDiscoveryClient.ServerVersion() if err != nil { @@ -167,15 +142,15 @@ func (c *spokeClusterController) getClusterResources() (capacity, allocatable cl return clusterv1.ResourceList{ clusterv1.ResourceCPU: cpuCapacity, - clusterv1.ResourceMemory: formatQuatityToMi(memoryCapacity), + clusterv1.ResourceMemory: formatQuantityToMi(memoryCapacity), }, clusterv1.ResourceList{ clusterv1.ResourceCPU: cpuAllocatable, - clusterv1.ResourceMemory: formatQuatityToMi(memoryAllocatable), + clusterv1.ResourceMemory: formatQuantityToMi(memoryAllocatable), }, nil } -func formatQuatityToMi(q resource.Quantity) resource.Quantity { +func formatQuantityToMi(q resource.Quantity) resource.Quantity { raw, _ := q.AsInt64() raw /= (1024 * 1024) rq, err := resource.ParseQuantity(fmt.Sprintf("%dMi", raw)) diff --git a/pkg/spoke/spokecluster/controller_test.go b/pkg/spoke/spokecluster/controller_test.go index 8a6489b97..6dd7f8bb9 100644 --- a/pkg/spoke/spokecluster/controller_test.go +++ b/pkg/spoke/spokecluster/controller_test.go @@ -37,12 +37,11 @@ func TestSyncSpokeCluster(t *testing.T) { name: "sync no spoke cluster", startingObjects: []runtime.Object{}, validateActions: func(t *testing.T, actions []clienttesting.Action) { - if len(actions) != 1 { - t.Errorf("expected 1 call but got: %#v", actions) + if len(actions) != 0 { + t.Errorf("expected 0 call but got: %#v", actions) } - assertAction(t, actions[0], "create") - assertSpokeCluster(t, actions[0].(clienttesting.CreateActionImpl).Object, testSpokeClusterName) }, + expectedErr: "unable to get spoke cluster with name \"testspokecluster\" from hub: spokecluster.cluster.open-cluster-management.io \"testspokecluster\" not found", }, { name: "sync an unaccepted spoke cluster", @@ -58,10 +57,7 @@ func TestSyncSpokeCluster(t *testing.T) { startingObjects: []runtime.Object{newAcceptedSpokeCluster()}, nodes: []runtime.Object{newNode("testnode1", newResourceList(32, 64), newResourceList(16, 32))}, validateActions: func(t *testing.T, actions []clienttesting.Action) { - if len(actions) != 2 { - t.Errorf("expected 2 call but got: %#v", actions) - } - assertAction(t, actions[1], "update") + assertActions(t, actions, "get", "update") actual := actions[1].(clienttesting.UpdateActionImpl).Object assertCondition(t, actual, clusterv1.SpokeClusterConditionJoined, metav1.ConditionTrue) assertStatusVersion(t, actual, kubeversion.Get()) @@ -73,10 +69,7 @@ func TestSyncSpokeCluster(t *testing.T) { startingObjects: []runtime.Object{newJoinedSpokeCluster(newResourceList(32, 64), newResourceList(16, 32))}, nodes: []runtime.Object{newNode("testnode1", newResourceList(32, 64), newResourceList(16, 32))}, validateActions: func(t *testing.T, actions []clienttesting.Action) { - if len(actions) != 1 { - t.Errorf("expected 1 call but got: %#v", actions) - } - assertAction(t, actions[0], "get") + assertActions(t, actions, "get") }, }, { @@ -87,10 +80,7 @@ func TestSyncSpokeCluster(t *testing.T) { newNode("testnode2", newResourceList(32, 64), newResourceList(16, 32)), }, validateActions: func(t *testing.T, actions []clienttesting.Action) { - if len(actions) != 2 { - t.Errorf("expected 1 call but got: %#v", actions) - } - assertAction(t, actions[1], "update") + assertActions(t, actions, "get", "update") actual := actions[1].(clienttesting.UpdateActionImpl).Object assertCondition(t, actual, clusterv1.SpokeClusterConditionJoined, metav1.ConditionTrue) assertStatusVersion(t, actual, kubeversion.Get()) @@ -117,8 +107,6 @@ func TestSyncSpokeCluster(t *testing.T) { ctrl := spokeClusterController{ clusterName: testSpokeClusterName, - spokeServerURL: "https://127.0.0.1:32768", - spokeCABundle: []byte("testspokeclusterca"), hubClusterClient: clusterClient, hubClusterLister: clusterInformerFactory.Cluster().V1().SpokeClusters().Lister(), spokeDiscoveryClient: kubeClient.Discovery(), @@ -134,7 +122,7 @@ func TestSyncSpokeCluster(t *testing.T) { t.Errorf("expected %q error, got %q", c.expectedErr, syncErr.Error()) return } - if syncErr != nil { + if len(c.expectedErr) == 0 && syncErr != nil { t.Errorf("unexpected err: %v", syncErr) } @@ -143,9 +131,14 @@ func TestSyncSpokeCluster(t *testing.T) { } } -func assertAction(t *testing.T, actual clienttesting.Action, expected string) { - if actual.GetVerb() != expected { - t.Errorf("expected %s action but got: %#v", expected, actual) +func assertActions(t *testing.T, actualActions []clienttesting.Action, expectedActions ...string) { + if len(actualActions) != len(expectedActions) { + t.Errorf("expected %d call but got: %#v", len(expectedActions), actualActions) + } + for i, expected := range expectedActions { + if actualActions[i].GetVerb() != expected { + t.Errorf("expected %s action but got: %#v", expected, actualActions[i]) + } } } diff --git a/pkg/spoke/spokecluster/creating_controller.go b/pkg/spoke/spokecluster/creating_controller.go new file mode 100644 index 000000000..112b2ea43 --- /dev/null +++ b/pkg/spoke/spokecluster/creating_controller.go @@ -0,0 +1,77 @@ +package spokecluster + +import ( + "context" + "fmt" + "time" + + clientset "github.com/open-cluster-management/api/client/cluster/clientset/versioned" + clusterv1 "github.com/open-cluster-management/api/cluster/v1" + "github.com/openshift/library-go/pkg/controller/factory" + "github.com/openshift/library-go/pkg/operator/events" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +// spokeClusterCreatingController creates a spoke cluster on hub cluster during the spoke agent bootstrap phase +type spokeClusterCreatingController struct { + isSpokeClusterExists bool + clusterName string + spokeExternalServerUrl string + spokeCABundle []byte + hasSpokeClusterFunc func(bool) + hubClusterClient clientset.Interface +} + +// NewSpokeClusterCreatingController creates a new spoke cluster creating controller on the spoke cluster. +func NewSpokeClusterCreatingController( + clusterName, spokeExternalServerUrl string, + spokeCABundle []byte, + hubClusterClient clientset.Interface, + hasSpokeClusterFunc func(bool), + recorder events.Recorder) factory.Controller { + c := &spokeClusterCreatingController{ + clusterName: clusterName, + spokeExternalServerUrl: spokeExternalServerUrl, + spokeCABundle: spokeCABundle, + hasSpokeClusterFunc: hasSpokeClusterFunc, + hubClusterClient: hubClusterClient, + } + return factory.New(). + WithSync(c.sync). + ResyncEvery(5*time.Minute). + ToController("SpokeClusterCreatingController", recorder) +} + +func (c *spokeClusterCreatingController) sync(ctx context.Context, syncCtx factory.SyncContext) error { + if c.isSpokeClusterExists { + return nil + } + + spokeCluster := &clusterv1.SpokeCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: c.clusterName, + }, + Spec: clusterv1.SpokeClusterSpec{ + SpokeClientConfig: clusterv1.ClientConfig{ + URL: c.spokeExternalServerUrl, + CABundle: c.spokeCABundle, + }, + }, + } + + _, err := c.hubClusterClient.ClusterV1().SpokeClusters().Create(ctx, spokeCluster, metav1.CreateOptions{}) + if errors.IsAlreadyExists(err) { + c.isSpokeClusterExists = true + c.hasSpokeClusterFunc(c.isSpokeClusterExists) + return nil + } + if err != nil { + return fmt.Errorf("unable to create spoke cluster with name %q on hub: %w", c.clusterName, err) + } + + c.isSpokeClusterExists = true + c.hasSpokeClusterFunc(c.isSpokeClusterExists) + syncCtx.Recorder().Eventf("SpokeClusterCreated", "Spoke cluster %q created on hub", c.clusterName) + return nil +} diff --git a/pkg/spoke/spokecluster/creating_controller_test.go b/pkg/spoke/spokecluster/creating_controller_test.go new file mode 100644 index 000000000..abfa70dfa --- /dev/null +++ b/pkg/spoke/spokecluster/creating_controller_test.go @@ -0,0 +1,92 @@ +package spokecluster + +import ( + "bytes" + "context" + "testing" + + clusterfake "github.com/open-cluster-management/api/client/cluster/clientset/versioned/fake" + clusterv1 "github.com/open-cluster-management/api/cluster/v1" + "k8s.io/apimachinery/pkg/runtime" + clienttesting "k8s.io/client-go/testing" +) + +const testSpokeExternalServerUrl = "https://192.168.3.77:32769" + +func TestCreateSpokeCluster(t *testing.T) { + cases := []struct { + name string + startingObjects []runtime.Object + validateActions func(t *testing.T, actions []clienttesting.Action) + hasSpokeCluster bool + expectedErr string + }{ + { + name: "create a new cluster", + startingObjects: []runtime.Object{}, + hasSpokeCluster: true, + validateActions: func(t *testing.T, actions []clienttesting.Action) { + assertActions(t, actions, "create") + actual := actions[0].(clienttesting.CreateActionImpl).Object + assertSpokeExternalServerUrl(t, actual, testSpokeExternalServerUrl) + assertSpokeCABundle(t, actual, []byte("testcabundle")) + }, + }, + { + name: "create an existed cluster", + startingObjects: []runtime.Object{newSpokeCluster([]clusterv1.StatusCondition{})}, + hasSpokeCluster: true, + validateActions: func(t *testing.T, actions []clienttesting.Action) { + assertActions(t, actions, "create") + }, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + hasSpokeCluster := false + clusterClient := clusterfake.NewSimpleClientset(c.startingObjects...) + ctrl := spokeClusterCreatingController{ + clusterName: testSpokeClusterName, + spokeExternalServerUrl: testSpokeExternalServerUrl, + spokeCABundle: []byte("testcabundle"), + hasSpokeClusterFunc: func(isCreated bool) { hasSpokeCluster = isCreated }, + hubClusterClient: clusterClient, + } + + syncErr := ctrl.sync(context.TODO(), newFakeSyncContext(t)) + if len(c.expectedErr) > 0 && syncErr == nil { + t.Errorf("expected %q error", c.expectedErr) + return + } + if len(c.expectedErr) > 0 && syncErr != nil && syncErr.Error() != c.expectedErr { + t.Errorf("expected %q error, got %q", c.expectedErr, syncErr.Error()) + return + } + if len(c.expectedErr) == 0 && syncErr != nil { + t.Errorf("unexpected err: %v", syncErr) + } + + if hasSpokeCluster != c.hasSpokeCluster { + t.Errorf("expected %t error, but failed", c.hasSpokeCluster) + return + } + + c.validateActions(t, clusterClient.Actions()) + }) + } +} + +func assertSpokeExternalServerUrl(t *testing.T, actual runtime.Object, expected string) { + spokeCluster := actual.(*clusterv1.SpokeCluster) + if spokeCluster.Spec.SpokeClientConfig.URL != expected { + t.Errorf("expected %q error, but got %q", expected, spokeCluster.Spec.SpokeClientConfig.URL) + } +} + +func assertSpokeCABundle(t *testing.T, actual runtime.Object, expected []byte) { + spokeCluster := actual.(*clusterv1.SpokeCluster) + if !bytes.Equal(spokeCluster.Spec.SpokeClientConfig.CABundle, expected) { + t.Errorf("expected %q error, but got %q", expected, spokeCluster.Spec.SpokeClientConfig.CABundle) + } +}