diff --git a/pkg/spoke/spokeagent.go b/pkg/spoke/spokeagent.go index fbbb3cd65..0c93ab118 100644 --- a/pkg/spoke/spokeagent.go +++ b/pkg/spoke/spokeagent.go @@ -9,19 +9,20 @@ import ( "path" "time" + clusterv1client "github.com/open-cluster-management/api/client/cluster/clientset/versioned" + clusterv1informers "github.com/open-cluster-management/api/client/cluster/informers/externalversions" "github.com/open-cluster-management/registration/pkg/spoke/hubclientcert" + "github.com/open-cluster-management/registration/pkg/spoke/spokecluster" "github.com/openshift/library-go/pkg/controller/controllercmd" - "github.com/openshift/library-go/pkg/controller/factory" - "github.com/openshift/library-go/pkg/operator/events" "github.com/spf13/pflag" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilrand "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/informers" - corev1informers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" restclient "k8s.io/client-go/rest" - "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" "k8s.io/klog" ) @@ -41,6 +42,7 @@ type SpokeAgentOptions struct { BootstrapKubeconfig string HubKubeconfigSecret string HubKubeconfigDir string + SpokeServerUrl string } // NewSpokeAgentOptions returns a SpokeAgentOptions @@ -72,16 +74,12 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext klog.Infof("Cluster name is %q and agent name is %q", o.ClusterName, o.AgentName) - // create kube client for spoke cluster + // create kube client and shared informer factory for spoke cluster spokeKubeClient, err := kubernetes.NewForConfig(controllerContext.KubeConfig) if err != nil { return err } - - // create an informer for secrets on spoke cluster - spokeSecretInformer := informers.NewSharedInformerFactory(spokeKubeClient, - 10*time.Minute).Core().V1().Secrets() - go spokeSecretInformer.Informer().Run(ctx.Done()) + spokeKubeInformerFactory := informers.NewSharedInformerFactory(spokeKubeClient, 10*time.Minute) // check if there already exists a valid client config for hub ok, err := o.hasValidHubClientConfig() @@ -96,21 +94,37 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext // informer cache' var stopBootstrap context.CancelFunc if !ok { - // load bootstrap clent config + // create bootstrap client and shared informer factory from bootstrap hub kube config bootstrapClientConfig, err := clientcmd.BuildConfigFromFlags("", o.BootstrapKubeconfig) if err != nil { return fmt.Errorf("unable to load bootstrap kubeconfig from file %q: %w", o.BootstrapKubeconfig, err) } + bootstrapClient, err := kubernetes.NewForConfig(bootstrapClientConfig) + if err != nil { + return err + } + bootstrapInformerFactory := informers.NewSharedInformerFactory(bootstrapClient, 10*time.Minute) - // create and start a ClientCertForHubController for spoke agent bootstrap - hubCSRInformer, clientCertForHubController, err := o.buildClientCertForHubController(o.ClusterName, o.AgentName, - bootstrapClientConfig, spokeKubeClient, spokeSecretInformer, controllerContext.EventRecorder, "BoostrapClientCertForHubController") + // create a ClientCertForHubController for spoke agent bootstrap + clientCertForHubController, err := hubclientcert.NewClientCertForHubController( + o.ClusterName, o.AgentName, o.ComponentNamespace, o.HubKubeconfigSecret, + restclient.AnonymousClientConfig(bootstrapClientConfig), + spokeKubeClient.CoreV1(), + bootstrapClient.CertificatesV1beta1().CertificateSigningRequests(), + bootstrapInformerFactory.Certificates().V1beta1().CertificateSigningRequests(), + spokeKubeInformerFactory.Core().V1().Secrets(), + controllerContext.EventRecorder, + "BoostrapClientCertForHubController", + ) if err != nil { return err } var bootstrapCtx context.Context bootstrapCtx, stopBootstrap = context.WithCancel(ctx) - go hubCSRInformer.Run(bootstrapCtx.Done()) + + go bootstrapInformerFactory.Start(bootstrapCtx.Done()) + go spokeKubeInformerFactory.Start(bootstrapCtx.Done()) + go clientCertForHubController.Run(bootstrapCtx, 1) } @@ -129,21 +143,73 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext stopBootstrap() } - kubeconfigPath := path.Join(o.HubKubeconfigDir, hubclientcert.KubeconfigFile) - hubClientConfig, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath) - if err != nil { - return err - } - controllerContext.EventRecorder.Event("HubClientConfigReady", "Client config for hub is ready.") - // build and start another ClientCertForHubController for client certificate rotation - hubCSRInformer, clientCertForHubController, err := o.buildClientCertForHubController(o.ClusterName, o.AgentName, - hubClientConfig, spokeKubeClient, spokeSecretInformer, controllerContext.EventRecorder, "") + // create hub clients and shared informer factories from hub kube config + hubClientConfig, err := clientcmd.BuildConfigFromFlags("", path.Join(o.HubKubeconfigDir, hubclientcert.KubeconfigFile)) if err != nil { return err } - go hubCSRInformer.Run(ctx.Done()) + hubKubeClient, err := kubernetes.NewForConfig(hubClientConfig) + if err != nil { + return err + } + + hubClusterClient, err := clusterv1client.NewForConfig(hubClientConfig) + if err != nil { + return err + } + + hubKubeInformerFactory := informers.NewSharedInformerFactory(hubKubeClient, 10*time.Minute) + hubClusterInformerFactory := clusterv1informers.NewSharedInformerFactory(hubClusterClient, 10*time.Minute) + + controllerContext.EventRecorder.Event("HubClientConfigReady", "Client config for hub is ready.") + + // create another ClientCertForHubController for client certificate rotation + clientCertForHubController, err := hubclientcert.NewClientCertForHubController( + o.ClusterName, o.AgentName, o.ComponentNamespace, o.HubKubeconfigSecret, + restclient.AnonymousClientConfig(hubClientConfig), + spokeKubeClient.CoreV1(), + hubKubeClient.CertificatesV1beta1().CertificateSigningRequests(), + hubKubeInformerFactory.Certificates().V1beta1().CertificateSigningRequests(), + spokeKubeInformerFactory.Core().V1().Secrets(), + controllerContext.EventRecorder, + "", + ) + if err != nil { + return err + } + + // create SpokeClusterController to reconcile instances of SpokeCluster on the spoke cluster + caBundle := controllerContext.KubeConfig.CAData + if caBundle == nil && controllerContext.KubeConfig.CAFile != "" { + data, err := ioutil.ReadFile(controllerContext.KubeConfig.CAFile) + if err != nil { + return fmt.Errorf("unable to load CA data from file %q: %w", controllerContext.KubeConfig.CAFile, err) + } + caBundle = data + } + + spokeServerURL, err := o.getSpokeServerURL(spokeKubeClient.CoreV1()) + if err != nil { + return err + } + + spokeClusterController := spokecluster.NewSpokeClusterController( + o.ClusterName, spokeServerURL, + caBundle, + hubClusterClient, + hubClusterInformerFactory.Cluster().V1().SpokeClusters(), + spokeKubeClient.Discovery(), + spokeKubeInformerFactory.Core().V1().Nodes(), + controllerContext.EventRecorder, + ) + + go hubKubeInformerFactory.Start(ctx.Done()) + go hubClusterInformerFactory.Start(ctx.Done()) + go spokeKubeInformerFactory.Start(ctx.Done()) + go clientCertForHubController.Run(ctx, 1) + go spokeClusterController.Run(ctx, 1) <-ctx.Done() return nil @@ -159,6 +225,8 @@ func (o *SpokeAgentOptions) AddFlags(fs *pflag.FlagSet) { "The name of secret in component namespace storing kubeconfig for hub.") fs.StringVar(&o.HubKubeconfigDir, "hub-kubeconfig-dir", o.HubKubeconfigDir, "The mount path of hub-kubeconfig-secret in the container.") + fs.StringVar(&o.SpokeServerUrl, "spoke-server-url", o.SpokeServerUrl, + "A reachable URL of spoke cluster api server for hub cluster.") } // Validate verifies the inputs. @@ -204,29 +272,6 @@ func generateAgentName() string { return utilrand.String(spokeAgentNameLength) } -// buildClientCertForHubController creates and returns a csr informer and a ClientCertForHubController -func (o *SpokeAgentOptions) buildClientCertForHubController(clusterName, agentName string, - initialHubClientConfig *restclient.Config, spokeKubeClient kubernetes.Interface, spokeSecretInformer corev1informers.SecretInformer, - recorder events.Recorder, controllerNameOverride string) (cache.SharedIndexInformer, factory.Controller, error) { - initialHubKubeClient, err := kubernetes.NewForConfig(initialHubClientConfig) - if err != nil { - return nil, nil, err - } - - // create an informer for csrs on hub cluster - hubCSRInformer := informers.NewSharedInformerFactory(initialHubKubeClient, - 10*time.Minute).Certificates().V1beta1().CertificateSigningRequests() - - clientCertForHubController, err := hubclientcert.NewClientCertForHubController(clusterName, agentName, - o.ComponentNamespace, o.HubKubeconfigSecret, restclient.AnonymousClientConfig(initialHubClientConfig), - spokeKubeClient.CoreV1(), initialHubKubeClient.CertificatesV1beta1().CertificateSigningRequests(), - hubCSRInformer, spokeSecretInformer, recorder, controllerNameOverride) - if err != nil { - return nil, nil, err - } - return hubCSRInformer.Informer(), clientCertForHubController, nil -} - // hasValidHubClientConfig returns ture if the conditions below are met: // 1. KubeconfigFile exists // 2. TLSKeyFile exists @@ -291,3 +336,32 @@ func (o *SpokeAgentOptions) getOrGenerateClusterAgentNames() (string, string) { return clusterName, agentName } + +// getSpokeServerURL returns the api server url of spoke cluster +func (o *SpokeAgentOptions) getSpokeServerURL(spokeCoreClient corev1client.CoreV1Interface) (string, error) { + // use user specified spoke server url if exists + if o.SpokeServerUrl != "" { + return o.SpokeServerUrl, nil + } + + // otherwise try to get it from the default/kubernetes endpoints + klog.Infof("Try to get spoke server URL from default/kubernetes endpoints") + kubeEndpoints, err := spokeCoreClient.Endpoints("default").Get(context.TODO(), "kubernetes", metav1.GetOptions{}) + if err != nil { + return "", fmt.Errorf("unable to get server URL from default/kubernetes endpoint: %w", err) + } + + if len(kubeEndpoints.Subsets) == 0 { + return "", fmt.Errorf("unable to get server URL from default/kubernetes endpoint: no subsets") + } + + if len(kubeEndpoints.Subsets[0].Addresses) == 0 { + return "", fmt.Errorf("unable to get server URL from default/kubernetes endpoint: no addresses") + } + + if len(kubeEndpoints.Subsets[0].Ports) == 0 { + return "", fmt.Errorf("unable to get server URL from default/kubernetes endpoint: no ports") + } + + return fmt.Sprintf("%s:%d", kubeEndpoints.Subsets[0].Addresses[0].IP, kubeEndpoints.Subsets[0].Ports[0].Port), nil +} diff --git a/pkg/spoke/spokecluster/controller.go b/pkg/spoke/spokecluster/controller.go new file mode 100644 index 000000000..3acb8ef9a --- /dev/null +++ b/pkg/spoke/spokecluster/controller.go @@ -0,0 +1,195 @@ +package spokecluster + +import ( + "context" + "fmt" + "time" + + "github.com/open-cluster-management/registration/pkg/helpers" + + "github.com/openshift/library-go/pkg/controller/factory" + "github.com/openshift/library-go/pkg/operator/events" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + corev1informers "k8s.io/client-go/informers/core/v1" + corev1lister "k8s.io/client-go/listers/core/v1" + "k8s.io/klog" + + clientset "github.com/open-cluster-management/api/client/cluster/clientset/versioned" + clusterv1informer "github.com/open-cluster-management/api/client/cluster/informers/externalversions/cluster/v1" + clusterv1listers "github.com/open-cluster-management/api/client/cluster/listers/cluster/v1" + clusterv1 "github.com/open-cluster-management/api/cluster/v1" + discovery "k8s.io/client-go/discovery" +) + +// spokeClusterController reconciles instances of SpokeCluster on the spoke cluster. +type spokeClusterController struct { + clusterName string + spokeServerURL string + spokeCABundle []byte + hubClusterClient clientset.Interface + hubClusterLister clusterv1listers.SpokeClusterLister + spokeDiscoveryClient discovery.DiscoveryInterface + spokeNodeLister corev1lister.NodeLister +} + +// NewSpokeClusterController creates a new spoke cluster controller on the spoke cluster. +func NewSpokeClusterController( + clusterName, spokeServerURL string, + spokeCABundle []byte, + hubClusterClient clientset.Interface, + hubSpokeClusterInformer clusterv1informer.SpokeClusterInformer, + spokeDiscoveryClient discovery.DiscoveryInterface, + spokeNodeInformer corev1informers.NodeInformer, + recorder events.Recorder) factory.Controller { + c := &spokeClusterController{ + clusterName: clusterName, + spokeServerURL: spokeServerURL, + spokeCABundle: spokeCABundle, + hubClusterClient: hubClusterClient, + hubClusterLister: hubSpokeClusterInformer.Lister(), + spokeDiscoveryClient: spokeDiscoveryClient, + spokeNodeLister: spokeNodeInformer.Lister(), + } + return factory.New(). + WithInformers(hubSpokeClusterInformer.Informer(), spokeNodeInformer.Informer()). + WithSync(c.sync). + ResyncEvery(5*time.Minute). + ToController("SpokeClusterController", recorder) +} + +func (c spokeClusterController) sync(ctx context.Context, syncCtx factory.SyncContext) error { + spokeCluster, err := c.hubClusterLister.Get(c.clusterName) + if errors.IsNotFound(err) { + return c.createSpokeCluster(ctx, syncCtx) + } + if err != nil { + return fmt.Errorf("unable to get spoke cluster with name %q from hub: %w", c.clusterName, err) + } + + // current spoke cluster is not accepted, do nothing. + acceptedCondition := helpers.FindSpokeClusterCondition(spokeCluster.Status.Conditions, clusterv1.SpokeClusterConditionHubAccepted) + if !helpers.IsConditionTrue(acceptedCondition) { + klog.V(4).Infof("Spoke cluster %q is not accepted by hub yet", c.clusterName) + return nil + } + + // current spoke cluster is accepted, update its status if necessary. + capacity, allocatable, err := c.getClusterResources() + if err != nil { + return fmt.Errorf("unable to get capacity and allocatable of spoke cluster %q: %w", c.clusterName, err) + } + + spokeVersion, err := c.getSpokeVersion() + if err != nil { + return fmt.Errorf("unable to get server version of spoke cluster %q: %w", c.clusterName, err) + } + + updateStatusFuncs := []helpers.UpdateSpokeClusterStatusFunc{} + joinedCondition := helpers.FindSpokeClusterCondition(spokeCluster.Status.Conditions, clusterv1.SpokeClusterConditionJoined) + joined := helpers.IsConditionTrue(joinedCondition) + // current spoke cluster did not join the hub cluster, join it. + if !joined { + updateStatusFuncs = append(updateStatusFuncs, helpers.UpdateSpokeClusterConditionFn(clusterv1.StatusCondition{ + Type: clusterv1.SpokeClusterConditionJoined, + Status: metav1.ConditionTrue, + Reason: "SpokeClusterJoined", + Message: "Spoke cluster joined", + })) + } + + updateStatusFuncs = append(updateStatusFuncs, updateClusterResourcesFn(clusterv1.SpokeClusterStatus{ + Capacity: capacity, + Allocatable: allocatable, + Version: *spokeVersion, + })) + + _, updated, err := helpers.UpdateSpokeClusterStatus(ctx, c.hubClusterClient, c.clusterName, updateStatusFuncs...) + if err != nil { + return fmt.Errorf("unable to update status of spoke cluster %q: %w", c.clusterName, err) + } + if updated { + if !joined { + syncCtx.Recorder().Eventf("SpokeClusterJoined", "Spoke cluster %q joined hub", c.clusterName) + } + klog.V(4).Infof("The status of spoke cluster %q has been updated", c.clusterName) + } + return nil +} + +func (c *spokeClusterController) createSpokeCluster(ctx context.Context, syncCtx factory.SyncContext) error { + spokeCluster := &clusterv1.SpokeCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: c.clusterName, + }, + Spec: clusterv1.SpokeClusterSpec{ + SpokeClientConfig: clusterv1.ClientConfig{ + URL: c.spokeServerURL, + CABundle: c.spokeCABundle, + }, + }, + } + + if _, err := c.hubClusterClient.ClusterV1().SpokeClusters().Create(ctx, spokeCluster, metav1.CreateOptions{}); err != nil { + return fmt.Errorf("unable to create spoke cluster with name %q on hub: %w", c.clusterName, err) + } + + syncCtx.Recorder().Eventf("SpokeClusterCreated", "Spoke cluster %q created on hub", c.clusterName) + return nil +} + +func (c *spokeClusterController) getSpokeVersion() (*clusterv1.SpokeVersion, error) { + serverVersion, err := c.spokeDiscoveryClient.ServerVersion() + if err != nil { + return nil, err + } + return &clusterv1.SpokeVersion{Kubernetes: serverVersion.String()}, nil +} + +func (c *spokeClusterController) getClusterResources() (capacity, allocatable clusterv1.ResourceList, err error) { + nodes, err := c.spokeNodeLister.List(labels.Everything()) + if err != nil { + return nil, nil, err + } + + cpuCapacity := *resource.NewQuantity(int64(0), resource.DecimalSI) + memoryCapacity := *resource.NewQuantity(int64(0), resource.BinarySI) + cpuAllocatable := *resource.NewQuantity(int64(0), resource.DecimalSI) + memoryAllocatable := *resource.NewQuantity(int64(0), resource.BinarySI) + for _, node := range nodes { + cpuCapacity.Add(*node.Status.Capacity.Cpu()) + memoryCapacity.Add(*node.Status.Capacity.Memory()) + cpuAllocatable.Add(*node.Status.Allocatable.Cpu()) + memoryAllocatable.Add(*node.Status.Allocatable.Memory()) + } + + return clusterv1.ResourceList{ + clusterv1.ResourceCPU: cpuCapacity, + clusterv1.ResourceMemory: formatQuatityToMi(memoryCapacity), + }, + clusterv1.ResourceList{ + clusterv1.ResourceCPU: cpuAllocatable, + clusterv1.ResourceMemory: formatQuatityToMi(memoryAllocatable), + }, nil +} + +func formatQuatityToMi(q resource.Quantity) resource.Quantity { + raw, _ := q.AsInt64() + raw /= (1024 * 1024) + rq, err := resource.ParseQuantity(fmt.Sprintf("%dMi", raw)) + if err != nil { + return q + } + return rq +} + +func updateClusterResourcesFn(status clusterv1.SpokeClusterStatus) helpers.UpdateSpokeClusterStatusFunc { + return func(oldStatus *clusterv1.SpokeClusterStatus) error { + oldStatus.Capacity = status.Capacity + oldStatus.Allocatable = status.Allocatable + oldStatus.Version = status.Version + return nil + } +} diff --git a/pkg/spoke/spokecluster/controller_test.go b/pkg/spoke/spokecluster/controller_test.go new file mode 100644 index 000000000..8a6489b97 --- /dev/null +++ b/pkg/spoke/spokecluster/controller_test.go @@ -0,0 +1,284 @@ +package spokecluster + +import ( + "context" + "reflect" + "testing" + "time" + + clusterfake "github.com/open-cluster-management/api/client/cluster/clientset/versioned/fake" + clusterinformers "github.com/open-cluster-management/api/client/cluster/informers/externalversions" + clusterv1 "github.com/open-cluster-management/api/cluster/v1" + "github.com/openshift/library-go/pkg/operator/events" + "github.com/openshift/library-go/pkg/operator/events/eventstesting" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/version" + kubeinformers "k8s.io/client-go/informers" + kubefake "k8s.io/client-go/kubernetes/fake" + kubeversion "k8s.io/client-go/pkg/version" + clienttesting "k8s.io/client-go/testing" + "k8s.io/client-go/util/workqueue" +) + +const testSpokeClusterName = "testspokecluster" + +func TestSyncSpokeCluster(t *testing.T) { + cases := []struct { + name string + startingObjects []runtime.Object + nodes []runtime.Object + validateActions func(t *testing.T, actions []clienttesting.Action) + expectedErr string + }{ + { + name: "sync no spoke cluster", + startingObjects: []runtime.Object{}, + validateActions: func(t *testing.T, actions []clienttesting.Action) { + if len(actions) != 1 { + t.Errorf("expected 1 call but got: %#v", actions) + } + assertAction(t, actions[0], "create") + assertSpokeCluster(t, actions[0].(clienttesting.CreateActionImpl).Object, testSpokeClusterName) + }, + }, + { + name: "sync an unaccepted spoke cluster", + startingObjects: []runtime.Object{newSpokeCluster([]clusterv1.StatusCondition{})}, + validateActions: func(t *testing.T, actions []clienttesting.Action) { + if len(actions) != 0 { + t.Errorf("expected 0 call but got: %#v", actions) + } + }, + }, + { + name: "sync an accepted spoke cluster", + startingObjects: []runtime.Object{newAcceptedSpokeCluster()}, + nodes: []runtime.Object{newNode("testnode1", newResourceList(32, 64), newResourceList(16, 32))}, + validateActions: func(t *testing.T, actions []clienttesting.Action) { + if len(actions) != 2 { + t.Errorf("expected 2 call but got: %#v", actions) + } + assertAction(t, actions[1], "update") + actual := actions[1].(clienttesting.UpdateActionImpl).Object + assertCondition(t, actual, clusterv1.SpokeClusterConditionJoined, metav1.ConditionTrue) + assertStatusVersion(t, actual, kubeversion.Get()) + assertStatusResource(t, actual, newResourceList(32, 64), newResourceList(16, 32)) + }, + }, + { + name: "sync a joined spoke cluster without status change", + startingObjects: []runtime.Object{newJoinedSpokeCluster(newResourceList(32, 64), newResourceList(16, 32))}, + nodes: []runtime.Object{newNode("testnode1", newResourceList(32, 64), newResourceList(16, 32))}, + validateActions: func(t *testing.T, actions []clienttesting.Action) { + if len(actions) != 1 { + t.Errorf("expected 1 call but got: %#v", actions) + } + assertAction(t, actions[0], "get") + }, + }, + { + name: "sync a joined spoke cluster with status change", + startingObjects: []runtime.Object{newJoinedSpokeCluster(newResourceList(32, 64), newResourceList(16, 32))}, + nodes: []runtime.Object{ + newNode("testnode1", newResourceList(32, 64), newResourceList(16, 32)), + newNode("testnode2", newResourceList(32, 64), newResourceList(16, 32)), + }, + validateActions: func(t *testing.T, actions []clienttesting.Action) { + if len(actions) != 2 { + t.Errorf("expected 1 call but got: %#v", actions) + } + assertAction(t, actions[1], "update") + actual := actions[1].(clienttesting.UpdateActionImpl).Object + assertCondition(t, actual, clusterv1.SpokeClusterConditionJoined, metav1.ConditionTrue) + assertStatusVersion(t, actual, kubeversion.Get()) + assertStatusResource(t, actual, newResourceList(64, 128), newResourceList(32, 64)) + }, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + clusterClient := clusterfake.NewSimpleClientset(c.startingObjects...) + clusterInformerFactory := clusterinformers.NewSharedInformerFactory(clusterClient, time.Minute*10) + clusterStore := clusterInformerFactory.Cluster().V1().SpokeClusters().Informer().GetStore() + for _, cluster := range c.startingObjects { + clusterStore.Add(cluster) + } + + kubeClient := kubefake.NewSimpleClientset(c.nodes...) + kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Minute*10) + nodeStore := kubeInformerFactory.Core().V1().Nodes().Informer().GetStore() + for _, node := range c.nodes { + nodeStore.Add(node) + } + + ctrl := spokeClusterController{ + clusterName: testSpokeClusterName, + spokeServerURL: "https://127.0.0.1:32768", + spokeCABundle: []byte("testspokeclusterca"), + hubClusterClient: clusterClient, + hubClusterLister: clusterInformerFactory.Cluster().V1().SpokeClusters().Lister(), + spokeDiscoveryClient: kubeClient.Discovery(), + spokeNodeLister: kubeInformerFactory.Core().V1().Nodes().Lister(), + } + + syncErr := ctrl.sync(context.TODO(), newFakeSyncContext(t)) + if len(c.expectedErr) > 0 && syncErr == nil { + t.Errorf("expected %q error", c.expectedErr) + return + } + if len(c.expectedErr) > 0 && syncErr != nil && syncErr.Error() != c.expectedErr { + t.Errorf("expected %q error, got %q", c.expectedErr, syncErr.Error()) + return + } + if syncErr != nil { + t.Errorf("unexpected err: %v", syncErr) + } + + c.validateActions(t, clusterClient.Actions()) + }) + } +} + +func assertAction(t *testing.T, actual clienttesting.Action, expected string) { + if actual.GetVerb() != expected { + t.Errorf("expected %s action but got: %#v", expected, actual) + } +} + +func assertSpokeCluster(t *testing.T, actual runtime.Object, expectedName string) { + spokeCluster, ok := actual.(*clusterv1.SpokeCluster) + if !ok { + t.Errorf("expected spoke cluster but got: %#v", actual) + } + if spokeCluster.Name != expectedName { + t.Errorf("expected %s but got: %#v", expectedName, spokeCluster.Name) + } +} + +func assertCondition(t *testing.T, actual runtime.Object, expectedCondition string, expectedStatus metav1.ConditionStatus) { + spokeCluster := actual.(*clusterv1.SpokeCluster) + conditions := spokeCluster.Status.Conditions + if len(conditions) != 2 { + t.Errorf("expected 2 condition but got: %#v", conditions) + } + condition := conditions[1] + if condition.Type != expectedCondition { + t.Errorf("expected %s but got: %s", expectedCondition, condition.Type) + } + if condition.Status != expectedStatus { + t.Errorf("expected %s but got: %s", expectedStatus, condition.Status) + } +} + +func assertStatusVersion(t *testing.T, actual runtime.Object, expected version.Info) { + spokeCluster := actual.(*clusterv1.SpokeCluster) + if !reflect.DeepEqual(spokeCluster.Status.Version, clusterv1.SpokeVersion{ + Kubernetes: expected.GitVersion, + }) { + t.Errorf("expected %s but got: %#v", expected, spokeCluster.Status.Version) + } +} + +func assertStatusResource(t *testing.T, actual runtime.Object, expectedCapacity, expectedAllocatable corev1.ResourceList) { + spokeCluster := actual.(*clusterv1.SpokeCluster) + if !reflect.DeepEqual(spokeCluster.Status.Capacity["cpu"], expectedCapacity["cpu"]) { + t.Errorf("expected %#v but got: %#v", expectedCapacity, spokeCluster.Status.Capacity) + } + if !reflect.DeepEqual(spokeCluster.Status.Capacity["memory"], expectedCapacity["memory"]) { + t.Errorf("expected %#v but got: %#v", expectedCapacity, spokeCluster.Status.Capacity) + } + if !reflect.DeepEqual(spokeCluster.Status.Allocatable["cpu"], expectedAllocatable["cpu"]) { + t.Errorf("expected %#v but got: %#v", expectedAllocatable, spokeCluster.Status.Allocatable) + } + if !reflect.DeepEqual(spokeCluster.Status.Allocatable["memory"], expectedAllocatable["memory"]) { + t.Errorf("expected %#v but got: %#v", expectedAllocatable, spokeCluster.Status.Allocatable) + } +} + +func newSpokeCluster(conditions []clusterv1.StatusCondition) *clusterv1.SpokeCluster { + return &clusterv1.SpokeCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: testSpokeClusterName, + }, + Status: clusterv1.SpokeClusterStatus{ + Conditions: conditions, + }, + } +} + +func newAcceptedSpokeCluster() *clusterv1.SpokeCluster { + return newSpokeCluster([]clusterv1.StatusCondition{ + { + Type: clusterv1.SpokeClusterConditionHubAccepted, + Status: metav1.ConditionTrue, + Reason: "HubClusterAdminAccepted", + Message: "Accepted by hub cluster admin", + }, + }) +} + +func newJoinedSpokeCluster(capacity, allocatable corev1.ResourceList) *clusterv1.SpokeCluster { + spokeCluster := newSpokeCluster([]clusterv1.StatusCondition{ + { + Type: clusterv1.SpokeClusterConditionHubAccepted, + Status: metav1.ConditionTrue, + Reason: "HubClusterAdminAccepted", + Message: "Accepted by hub cluster admin", + }, + { + Type: clusterv1.SpokeClusterConditionJoined, + Status: metav1.ConditionTrue, + Reason: "SpokeClusterJoined", + Message: "Spoke cluster joined", + }, + }) + spokeCluster.Status.Capacity = clusterv1.ResourceList{ + "cpu": capacity.Cpu().DeepCopy(), + "memory": capacity.Memory().DeepCopy(), + } + spokeCluster.Status.Allocatable = clusterv1.ResourceList{ + "cpu": allocatable.Cpu().DeepCopy(), + "memory": allocatable.Memory().DeepCopy(), + } + spokeCluster.Status.Version = clusterv1.SpokeVersion{ + Kubernetes: kubeversion.Get().GitVersion, + } + return spokeCluster +} + +func newResourceList(cpu, mem int) corev1.ResourceList { + return corev1.ResourceList{ + corev1.ResourceCPU: *resource.NewQuantity(int64(cpu), resource.DecimalExponent), + corev1.ResourceMemory: *resource.NewQuantity(int64(1024*1024*mem), resource.BinarySI), + } +} + +func newNode(name string, capacity, allocatable corev1.ResourceList) *corev1.Node { + return &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Status: corev1.NodeStatus{ + Capacity: capacity, + Allocatable: allocatable, + }, + } +} + +type fakeSyncContext struct { + recorder events.Recorder +} + +func newFakeSyncContext(t *testing.T) *fakeSyncContext { + return &fakeSyncContext{ + recorder: eventstesting.NewTestingEventRecorder(t), + } +} + +func (f fakeSyncContext) Queue() workqueue.RateLimitingInterface { return nil } +func (f fakeSyncContext) QueueKey() string { return "" } +func (f fakeSyncContext) Recorder() events.Recorder { return f.recorder } diff --git a/pkg/spoke/spokecluster/doc.go b/pkg/spoke/spokecluster/doc.go new file mode 100644 index 000000000..10f11073e --- /dev/null +++ b/pkg/spoke/spokecluster/doc.go @@ -0,0 +1,2 @@ +// package spokecluster contains the spoke cluster side reconciler for the SpokeCluster resource. +package spokecluster