spoke agent spoke cluster controller

This commit is contained in:
liuwei
2020-04-23 19:40:38 +08:00
parent 15a0c82c2c
commit eb601547e7
4 changed files with 603 additions and 48 deletions

View File

@@ -9,19 +9,20 @@ import (
"path"
"time"
clusterv1client "github.com/open-cluster-management/api/client/cluster/clientset/versioned"
clusterv1informers "github.com/open-cluster-management/api/client/cluster/informers/externalversions"
"github.com/open-cluster-management/registration/pkg/spoke/hubclientcert"
"github.com/open-cluster-management/registration/pkg/spoke/spokecluster"
"github.com/openshift/library-go/pkg/controller/controllercmd"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
"github.com/spf13/pflag"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilrand "k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/informers"
corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog"
)
@@ -41,6 +42,7 @@ type SpokeAgentOptions struct {
BootstrapKubeconfig string
HubKubeconfigSecret string
HubKubeconfigDir string
SpokeServerUrl string
}
// NewSpokeAgentOptions returns a SpokeAgentOptions
@@ -72,16 +74,12 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
klog.Infof("Cluster name is %q and agent name is %q", o.ClusterName, o.AgentName)
// create kube client for spoke cluster
// create kube client and shared informer factory for spoke cluster
spokeKubeClient, err := kubernetes.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return err
}
// create an informer for secrets on spoke cluster
spokeSecretInformer := informers.NewSharedInformerFactory(spokeKubeClient,
10*time.Minute).Core().V1().Secrets()
go spokeSecretInformer.Informer().Run(ctx.Done())
spokeKubeInformerFactory := informers.NewSharedInformerFactory(spokeKubeClient, 10*time.Minute)
// check if there already exists a valid client config for hub
ok, err := o.hasValidHubClientConfig()
@@ -96,21 +94,37 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
// informer cache'
var stopBootstrap context.CancelFunc
if !ok {
// load bootstrap clent config
// create bootstrap client and shared informer factory from bootstrap hub kube config
bootstrapClientConfig, err := clientcmd.BuildConfigFromFlags("", o.BootstrapKubeconfig)
if err != nil {
return fmt.Errorf("unable to load bootstrap kubeconfig from file %q: %w", o.BootstrapKubeconfig, err)
}
bootstrapClient, err := kubernetes.NewForConfig(bootstrapClientConfig)
if err != nil {
return err
}
bootstrapInformerFactory := informers.NewSharedInformerFactory(bootstrapClient, 10*time.Minute)
// create and start a ClientCertForHubController for spoke agent bootstrap
hubCSRInformer, clientCertForHubController, err := o.buildClientCertForHubController(o.ClusterName, o.AgentName,
bootstrapClientConfig, spokeKubeClient, spokeSecretInformer, controllerContext.EventRecorder, "BoostrapClientCertForHubController")
// create a ClientCertForHubController for spoke agent bootstrap
clientCertForHubController, err := hubclientcert.NewClientCertForHubController(
o.ClusterName, o.AgentName, o.ComponentNamespace, o.HubKubeconfigSecret,
restclient.AnonymousClientConfig(bootstrapClientConfig),
spokeKubeClient.CoreV1(),
bootstrapClient.CertificatesV1beta1().CertificateSigningRequests(),
bootstrapInformerFactory.Certificates().V1beta1().CertificateSigningRequests(),
spokeKubeInformerFactory.Core().V1().Secrets(),
controllerContext.EventRecorder,
"BoostrapClientCertForHubController",
)
if err != nil {
return err
}
var bootstrapCtx context.Context
bootstrapCtx, stopBootstrap = context.WithCancel(ctx)
go hubCSRInformer.Run(bootstrapCtx.Done())
go bootstrapInformerFactory.Start(bootstrapCtx.Done())
go spokeKubeInformerFactory.Start(bootstrapCtx.Done())
go clientCertForHubController.Run(bootstrapCtx, 1)
}
@@ -129,21 +143,73 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
stopBootstrap()
}
kubeconfigPath := path.Join(o.HubKubeconfigDir, hubclientcert.KubeconfigFile)
hubClientConfig, err := clientcmd.BuildConfigFromFlags("", kubeconfigPath)
if err != nil {
return err
}
controllerContext.EventRecorder.Event("HubClientConfigReady", "Client config for hub is ready.")
// build and start another ClientCertForHubController for client certificate rotation
hubCSRInformer, clientCertForHubController, err := o.buildClientCertForHubController(o.ClusterName, o.AgentName,
hubClientConfig, spokeKubeClient, spokeSecretInformer, controllerContext.EventRecorder, "")
// create hub clients and shared informer factories from hub kube config
hubClientConfig, err := clientcmd.BuildConfigFromFlags("", path.Join(o.HubKubeconfigDir, hubclientcert.KubeconfigFile))
if err != nil {
return err
}
go hubCSRInformer.Run(ctx.Done())
hubKubeClient, err := kubernetes.NewForConfig(hubClientConfig)
if err != nil {
return err
}
hubClusterClient, err := clusterv1client.NewForConfig(hubClientConfig)
if err != nil {
return err
}
hubKubeInformerFactory := informers.NewSharedInformerFactory(hubKubeClient, 10*time.Minute)
hubClusterInformerFactory := clusterv1informers.NewSharedInformerFactory(hubClusterClient, 10*time.Minute)
controllerContext.EventRecorder.Event("HubClientConfigReady", "Client config for hub is ready.")
// create another ClientCertForHubController for client certificate rotation
clientCertForHubController, err := hubclientcert.NewClientCertForHubController(
o.ClusterName, o.AgentName, o.ComponentNamespace, o.HubKubeconfigSecret,
restclient.AnonymousClientConfig(hubClientConfig),
spokeKubeClient.CoreV1(),
hubKubeClient.CertificatesV1beta1().CertificateSigningRequests(),
hubKubeInformerFactory.Certificates().V1beta1().CertificateSigningRequests(),
spokeKubeInformerFactory.Core().V1().Secrets(),
controllerContext.EventRecorder,
"",
)
if err != nil {
return err
}
// create SpokeClusterController to reconcile instances of SpokeCluster on the spoke cluster
caBundle := controllerContext.KubeConfig.CAData
if caBundle == nil && controllerContext.KubeConfig.CAFile != "" {
data, err := ioutil.ReadFile(controllerContext.KubeConfig.CAFile)
if err != nil {
return fmt.Errorf("unable to load CA data from file %q: %w", controllerContext.KubeConfig.CAFile, err)
}
caBundle = data
}
spokeServerURL, err := o.getSpokeServerURL(spokeKubeClient.CoreV1())
if err != nil {
return err
}
spokeClusterController := spokecluster.NewSpokeClusterController(
o.ClusterName, spokeServerURL,
caBundle,
hubClusterClient,
hubClusterInformerFactory.Cluster().V1().SpokeClusters(),
spokeKubeClient.Discovery(),
spokeKubeInformerFactory.Core().V1().Nodes(),
controllerContext.EventRecorder,
)
go hubKubeInformerFactory.Start(ctx.Done())
go hubClusterInformerFactory.Start(ctx.Done())
go spokeKubeInformerFactory.Start(ctx.Done())
go clientCertForHubController.Run(ctx, 1)
go spokeClusterController.Run(ctx, 1)
<-ctx.Done()
return nil
@@ -159,6 +225,8 @@ func (o *SpokeAgentOptions) AddFlags(fs *pflag.FlagSet) {
"The name of secret in component namespace storing kubeconfig for hub.")
fs.StringVar(&o.HubKubeconfigDir, "hub-kubeconfig-dir", o.HubKubeconfigDir,
"The mount path of hub-kubeconfig-secret in the container.")
fs.StringVar(&o.SpokeServerUrl, "spoke-server-url", o.SpokeServerUrl,
"A reachable URL of spoke cluster api server for hub cluster.")
}
// Validate verifies the inputs.
@@ -204,29 +272,6 @@ func generateAgentName() string {
return utilrand.String(spokeAgentNameLength)
}
// buildClientCertForHubController creates and returns a csr informer and a ClientCertForHubController
func (o *SpokeAgentOptions) buildClientCertForHubController(clusterName, agentName string,
initialHubClientConfig *restclient.Config, spokeKubeClient kubernetes.Interface, spokeSecretInformer corev1informers.SecretInformer,
recorder events.Recorder, controllerNameOverride string) (cache.SharedIndexInformer, factory.Controller, error) {
initialHubKubeClient, err := kubernetes.NewForConfig(initialHubClientConfig)
if err != nil {
return nil, nil, err
}
// create an informer for csrs on hub cluster
hubCSRInformer := informers.NewSharedInformerFactory(initialHubKubeClient,
10*time.Minute).Certificates().V1beta1().CertificateSigningRequests()
clientCertForHubController, err := hubclientcert.NewClientCertForHubController(clusterName, agentName,
o.ComponentNamespace, o.HubKubeconfigSecret, restclient.AnonymousClientConfig(initialHubClientConfig),
spokeKubeClient.CoreV1(), initialHubKubeClient.CertificatesV1beta1().CertificateSigningRequests(),
hubCSRInformer, spokeSecretInformer, recorder, controllerNameOverride)
if err != nil {
return nil, nil, err
}
return hubCSRInformer.Informer(), clientCertForHubController, nil
}
// hasValidHubClientConfig returns ture if the conditions below are met:
// 1. KubeconfigFile exists
// 2. TLSKeyFile exists
@@ -291,3 +336,32 @@ func (o *SpokeAgentOptions) getOrGenerateClusterAgentNames() (string, string) {
return clusterName, agentName
}
// getSpokeServerURL returns the api server url of spoke cluster
func (o *SpokeAgentOptions) getSpokeServerURL(spokeCoreClient corev1client.CoreV1Interface) (string, error) {
// use user specified spoke server url if exists
if o.SpokeServerUrl != "" {
return o.SpokeServerUrl, nil
}
// otherwise try to get it from the default/kubernetes endpoints
klog.Infof("Try to get spoke server URL from default/kubernetes endpoints")
kubeEndpoints, err := spokeCoreClient.Endpoints("default").Get(context.TODO(), "kubernetes", metav1.GetOptions{})
if err != nil {
return "", fmt.Errorf("unable to get server URL from default/kubernetes endpoint: %w", err)
}
if len(kubeEndpoints.Subsets) == 0 {
return "", fmt.Errorf("unable to get server URL from default/kubernetes endpoint: no subsets")
}
if len(kubeEndpoints.Subsets[0].Addresses) == 0 {
return "", fmt.Errorf("unable to get server URL from default/kubernetes endpoint: no addresses")
}
if len(kubeEndpoints.Subsets[0].Ports) == 0 {
return "", fmt.Errorf("unable to get server URL from default/kubernetes endpoint: no ports")
}
return fmt.Sprintf("%s:%d", kubeEndpoints.Subsets[0].Addresses[0].IP, kubeEndpoints.Subsets[0].Ports[0].Port), nil
}

View File

@@ -0,0 +1,195 @@
package spokecluster
import (
"context"
"fmt"
"time"
"github.com/open-cluster-management/registration/pkg/helpers"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
corev1informers "k8s.io/client-go/informers/core/v1"
corev1lister "k8s.io/client-go/listers/core/v1"
"k8s.io/klog"
clientset "github.com/open-cluster-management/api/client/cluster/clientset/versioned"
clusterv1informer "github.com/open-cluster-management/api/client/cluster/informers/externalversions/cluster/v1"
clusterv1listers "github.com/open-cluster-management/api/client/cluster/listers/cluster/v1"
clusterv1 "github.com/open-cluster-management/api/cluster/v1"
discovery "k8s.io/client-go/discovery"
)
// spokeClusterController reconciles instances of SpokeCluster on the spoke cluster.
type spokeClusterController struct {
clusterName string
spokeServerURL string
spokeCABundle []byte
hubClusterClient clientset.Interface
hubClusterLister clusterv1listers.SpokeClusterLister
spokeDiscoveryClient discovery.DiscoveryInterface
spokeNodeLister corev1lister.NodeLister
}
// NewSpokeClusterController creates a new spoke cluster controller on the spoke cluster.
func NewSpokeClusterController(
clusterName, spokeServerURL string,
spokeCABundle []byte,
hubClusterClient clientset.Interface,
hubSpokeClusterInformer clusterv1informer.SpokeClusterInformer,
spokeDiscoveryClient discovery.DiscoveryInterface,
spokeNodeInformer corev1informers.NodeInformer,
recorder events.Recorder) factory.Controller {
c := &spokeClusterController{
clusterName: clusterName,
spokeServerURL: spokeServerURL,
spokeCABundle: spokeCABundle,
hubClusterClient: hubClusterClient,
hubClusterLister: hubSpokeClusterInformer.Lister(),
spokeDiscoveryClient: spokeDiscoveryClient,
spokeNodeLister: spokeNodeInformer.Lister(),
}
return factory.New().
WithInformers(hubSpokeClusterInformer.Informer(), spokeNodeInformer.Informer()).
WithSync(c.sync).
ResyncEvery(5*time.Minute).
ToController("SpokeClusterController", recorder)
}
func (c spokeClusterController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
spokeCluster, err := c.hubClusterLister.Get(c.clusterName)
if errors.IsNotFound(err) {
return c.createSpokeCluster(ctx, syncCtx)
}
if err != nil {
return fmt.Errorf("unable to get spoke cluster with name %q from hub: %w", c.clusterName, err)
}
// current spoke cluster is not accepted, do nothing.
acceptedCondition := helpers.FindSpokeClusterCondition(spokeCluster.Status.Conditions, clusterv1.SpokeClusterConditionHubAccepted)
if !helpers.IsConditionTrue(acceptedCondition) {
klog.V(4).Infof("Spoke cluster %q is not accepted by hub yet", c.clusterName)
return nil
}
// current spoke cluster is accepted, update its status if necessary.
capacity, allocatable, err := c.getClusterResources()
if err != nil {
return fmt.Errorf("unable to get capacity and allocatable of spoke cluster %q: %w", c.clusterName, err)
}
spokeVersion, err := c.getSpokeVersion()
if err != nil {
return fmt.Errorf("unable to get server version of spoke cluster %q: %w", c.clusterName, err)
}
updateStatusFuncs := []helpers.UpdateSpokeClusterStatusFunc{}
joinedCondition := helpers.FindSpokeClusterCondition(spokeCluster.Status.Conditions, clusterv1.SpokeClusterConditionJoined)
joined := helpers.IsConditionTrue(joinedCondition)
// current spoke cluster did not join the hub cluster, join it.
if !joined {
updateStatusFuncs = append(updateStatusFuncs, helpers.UpdateSpokeClusterConditionFn(clusterv1.StatusCondition{
Type: clusterv1.SpokeClusterConditionJoined,
Status: metav1.ConditionTrue,
Reason: "SpokeClusterJoined",
Message: "Spoke cluster joined",
}))
}
updateStatusFuncs = append(updateStatusFuncs, updateClusterResourcesFn(clusterv1.SpokeClusterStatus{
Capacity: capacity,
Allocatable: allocatable,
Version: *spokeVersion,
}))
_, updated, err := helpers.UpdateSpokeClusterStatus(ctx, c.hubClusterClient, c.clusterName, updateStatusFuncs...)
if err != nil {
return fmt.Errorf("unable to update status of spoke cluster %q: %w", c.clusterName, err)
}
if updated {
if !joined {
syncCtx.Recorder().Eventf("SpokeClusterJoined", "Spoke cluster %q joined hub", c.clusterName)
}
klog.V(4).Infof("The status of spoke cluster %q has been updated", c.clusterName)
}
return nil
}
func (c *spokeClusterController) createSpokeCluster(ctx context.Context, syncCtx factory.SyncContext) error {
spokeCluster := &clusterv1.SpokeCluster{
ObjectMeta: metav1.ObjectMeta{
Name: c.clusterName,
},
Spec: clusterv1.SpokeClusterSpec{
SpokeClientConfig: clusterv1.ClientConfig{
URL: c.spokeServerURL,
CABundle: c.spokeCABundle,
},
},
}
if _, err := c.hubClusterClient.ClusterV1().SpokeClusters().Create(ctx, spokeCluster, metav1.CreateOptions{}); err != nil {
return fmt.Errorf("unable to create spoke cluster with name %q on hub: %w", c.clusterName, err)
}
syncCtx.Recorder().Eventf("SpokeClusterCreated", "Spoke cluster %q created on hub", c.clusterName)
return nil
}
func (c *spokeClusterController) getSpokeVersion() (*clusterv1.SpokeVersion, error) {
serverVersion, err := c.spokeDiscoveryClient.ServerVersion()
if err != nil {
return nil, err
}
return &clusterv1.SpokeVersion{Kubernetes: serverVersion.String()}, nil
}
func (c *spokeClusterController) getClusterResources() (capacity, allocatable clusterv1.ResourceList, err error) {
nodes, err := c.spokeNodeLister.List(labels.Everything())
if err != nil {
return nil, nil, err
}
cpuCapacity := *resource.NewQuantity(int64(0), resource.DecimalSI)
memoryCapacity := *resource.NewQuantity(int64(0), resource.BinarySI)
cpuAllocatable := *resource.NewQuantity(int64(0), resource.DecimalSI)
memoryAllocatable := *resource.NewQuantity(int64(0), resource.BinarySI)
for _, node := range nodes {
cpuCapacity.Add(*node.Status.Capacity.Cpu())
memoryCapacity.Add(*node.Status.Capacity.Memory())
cpuAllocatable.Add(*node.Status.Allocatable.Cpu())
memoryAllocatable.Add(*node.Status.Allocatable.Memory())
}
return clusterv1.ResourceList{
clusterv1.ResourceCPU: cpuCapacity,
clusterv1.ResourceMemory: formatQuatityToMi(memoryCapacity),
},
clusterv1.ResourceList{
clusterv1.ResourceCPU: cpuAllocatable,
clusterv1.ResourceMemory: formatQuatityToMi(memoryAllocatable),
}, nil
}
func formatQuatityToMi(q resource.Quantity) resource.Quantity {
raw, _ := q.AsInt64()
raw /= (1024 * 1024)
rq, err := resource.ParseQuantity(fmt.Sprintf("%dMi", raw))
if err != nil {
return q
}
return rq
}
func updateClusterResourcesFn(status clusterv1.SpokeClusterStatus) helpers.UpdateSpokeClusterStatusFunc {
return func(oldStatus *clusterv1.SpokeClusterStatus) error {
oldStatus.Capacity = status.Capacity
oldStatus.Allocatable = status.Allocatable
oldStatus.Version = status.Version
return nil
}
}

View File

@@ -0,0 +1,284 @@
package spokecluster
import (
"context"
"reflect"
"testing"
"time"
clusterfake "github.com/open-cluster-management/api/client/cluster/clientset/versioned/fake"
clusterinformers "github.com/open-cluster-management/api/client/cluster/informers/externalversions"
clusterv1 "github.com/open-cluster-management/api/cluster/v1"
"github.com/openshift/library-go/pkg/operator/events"
"github.com/openshift/library-go/pkg/operator/events/eventstesting"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/version"
kubeinformers "k8s.io/client-go/informers"
kubefake "k8s.io/client-go/kubernetes/fake"
kubeversion "k8s.io/client-go/pkg/version"
clienttesting "k8s.io/client-go/testing"
"k8s.io/client-go/util/workqueue"
)
const testSpokeClusterName = "testspokecluster"
func TestSyncSpokeCluster(t *testing.T) {
cases := []struct {
name string
startingObjects []runtime.Object
nodes []runtime.Object
validateActions func(t *testing.T, actions []clienttesting.Action)
expectedErr string
}{
{
name: "sync no spoke cluster",
startingObjects: []runtime.Object{},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 1 {
t.Errorf("expected 1 call but got: %#v", actions)
}
assertAction(t, actions[0], "create")
assertSpokeCluster(t, actions[0].(clienttesting.CreateActionImpl).Object, testSpokeClusterName)
},
},
{
name: "sync an unaccepted spoke cluster",
startingObjects: []runtime.Object{newSpokeCluster([]clusterv1.StatusCondition{})},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 0 {
t.Errorf("expected 0 call but got: %#v", actions)
}
},
},
{
name: "sync an accepted spoke cluster",
startingObjects: []runtime.Object{newAcceptedSpokeCluster()},
nodes: []runtime.Object{newNode("testnode1", newResourceList(32, 64), newResourceList(16, 32))},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 2 {
t.Errorf("expected 2 call but got: %#v", actions)
}
assertAction(t, actions[1], "update")
actual := actions[1].(clienttesting.UpdateActionImpl).Object
assertCondition(t, actual, clusterv1.SpokeClusterConditionJoined, metav1.ConditionTrue)
assertStatusVersion(t, actual, kubeversion.Get())
assertStatusResource(t, actual, newResourceList(32, 64), newResourceList(16, 32))
},
},
{
name: "sync a joined spoke cluster without status change",
startingObjects: []runtime.Object{newJoinedSpokeCluster(newResourceList(32, 64), newResourceList(16, 32))},
nodes: []runtime.Object{newNode("testnode1", newResourceList(32, 64), newResourceList(16, 32))},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 1 {
t.Errorf("expected 1 call but got: %#v", actions)
}
assertAction(t, actions[0], "get")
},
},
{
name: "sync a joined spoke cluster with status change",
startingObjects: []runtime.Object{newJoinedSpokeCluster(newResourceList(32, 64), newResourceList(16, 32))},
nodes: []runtime.Object{
newNode("testnode1", newResourceList(32, 64), newResourceList(16, 32)),
newNode("testnode2", newResourceList(32, 64), newResourceList(16, 32)),
},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 2 {
t.Errorf("expected 1 call but got: %#v", actions)
}
assertAction(t, actions[1], "update")
actual := actions[1].(clienttesting.UpdateActionImpl).Object
assertCondition(t, actual, clusterv1.SpokeClusterConditionJoined, metav1.ConditionTrue)
assertStatusVersion(t, actual, kubeversion.Get())
assertStatusResource(t, actual, newResourceList(64, 128), newResourceList(32, 64))
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
clusterClient := clusterfake.NewSimpleClientset(c.startingObjects...)
clusterInformerFactory := clusterinformers.NewSharedInformerFactory(clusterClient, time.Minute*10)
clusterStore := clusterInformerFactory.Cluster().V1().SpokeClusters().Informer().GetStore()
for _, cluster := range c.startingObjects {
clusterStore.Add(cluster)
}
kubeClient := kubefake.NewSimpleClientset(c.nodes...)
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Minute*10)
nodeStore := kubeInformerFactory.Core().V1().Nodes().Informer().GetStore()
for _, node := range c.nodes {
nodeStore.Add(node)
}
ctrl := spokeClusterController{
clusterName: testSpokeClusterName,
spokeServerURL: "https://127.0.0.1:32768",
spokeCABundle: []byte("testspokeclusterca"),
hubClusterClient: clusterClient,
hubClusterLister: clusterInformerFactory.Cluster().V1().SpokeClusters().Lister(),
spokeDiscoveryClient: kubeClient.Discovery(),
spokeNodeLister: kubeInformerFactory.Core().V1().Nodes().Lister(),
}
syncErr := ctrl.sync(context.TODO(), newFakeSyncContext(t))
if len(c.expectedErr) > 0 && syncErr == nil {
t.Errorf("expected %q error", c.expectedErr)
return
}
if len(c.expectedErr) > 0 && syncErr != nil && syncErr.Error() != c.expectedErr {
t.Errorf("expected %q error, got %q", c.expectedErr, syncErr.Error())
return
}
if syncErr != nil {
t.Errorf("unexpected err: %v", syncErr)
}
c.validateActions(t, clusterClient.Actions())
})
}
}
func assertAction(t *testing.T, actual clienttesting.Action, expected string) {
if actual.GetVerb() != expected {
t.Errorf("expected %s action but got: %#v", expected, actual)
}
}
func assertSpokeCluster(t *testing.T, actual runtime.Object, expectedName string) {
spokeCluster, ok := actual.(*clusterv1.SpokeCluster)
if !ok {
t.Errorf("expected spoke cluster but got: %#v", actual)
}
if spokeCluster.Name != expectedName {
t.Errorf("expected %s but got: %#v", expectedName, spokeCluster.Name)
}
}
func assertCondition(t *testing.T, actual runtime.Object, expectedCondition string, expectedStatus metav1.ConditionStatus) {
spokeCluster := actual.(*clusterv1.SpokeCluster)
conditions := spokeCluster.Status.Conditions
if len(conditions) != 2 {
t.Errorf("expected 2 condition but got: %#v", conditions)
}
condition := conditions[1]
if condition.Type != expectedCondition {
t.Errorf("expected %s but got: %s", expectedCondition, condition.Type)
}
if condition.Status != expectedStatus {
t.Errorf("expected %s but got: %s", expectedStatus, condition.Status)
}
}
func assertStatusVersion(t *testing.T, actual runtime.Object, expected version.Info) {
spokeCluster := actual.(*clusterv1.SpokeCluster)
if !reflect.DeepEqual(spokeCluster.Status.Version, clusterv1.SpokeVersion{
Kubernetes: expected.GitVersion,
}) {
t.Errorf("expected %s but got: %#v", expected, spokeCluster.Status.Version)
}
}
func assertStatusResource(t *testing.T, actual runtime.Object, expectedCapacity, expectedAllocatable corev1.ResourceList) {
spokeCluster := actual.(*clusterv1.SpokeCluster)
if !reflect.DeepEqual(spokeCluster.Status.Capacity["cpu"], expectedCapacity["cpu"]) {
t.Errorf("expected %#v but got: %#v", expectedCapacity, spokeCluster.Status.Capacity)
}
if !reflect.DeepEqual(spokeCluster.Status.Capacity["memory"], expectedCapacity["memory"]) {
t.Errorf("expected %#v but got: %#v", expectedCapacity, spokeCluster.Status.Capacity)
}
if !reflect.DeepEqual(spokeCluster.Status.Allocatable["cpu"], expectedAllocatable["cpu"]) {
t.Errorf("expected %#v but got: %#v", expectedAllocatable, spokeCluster.Status.Allocatable)
}
if !reflect.DeepEqual(spokeCluster.Status.Allocatable["memory"], expectedAllocatable["memory"]) {
t.Errorf("expected %#v but got: %#v", expectedAllocatable, spokeCluster.Status.Allocatable)
}
}
func newSpokeCluster(conditions []clusterv1.StatusCondition) *clusterv1.SpokeCluster {
return &clusterv1.SpokeCluster{
ObjectMeta: metav1.ObjectMeta{
Name: testSpokeClusterName,
},
Status: clusterv1.SpokeClusterStatus{
Conditions: conditions,
},
}
}
func newAcceptedSpokeCluster() *clusterv1.SpokeCluster {
return newSpokeCluster([]clusterv1.StatusCondition{
{
Type: clusterv1.SpokeClusterConditionHubAccepted,
Status: metav1.ConditionTrue,
Reason: "HubClusterAdminAccepted",
Message: "Accepted by hub cluster admin",
},
})
}
func newJoinedSpokeCluster(capacity, allocatable corev1.ResourceList) *clusterv1.SpokeCluster {
spokeCluster := newSpokeCluster([]clusterv1.StatusCondition{
{
Type: clusterv1.SpokeClusterConditionHubAccepted,
Status: metav1.ConditionTrue,
Reason: "HubClusterAdminAccepted",
Message: "Accepted by hub cluster admin",
},
{
Type: clusterv1.SpokeClusterConditionJoined,
Status: metav1.ConditionTrue,
Reason: "SpokeClusterJoined",
Message: "Spoke cluster joined",
},
})
spokeCluster.Status.Capacity = clusterv1.ResourceList{
"cpu": capacity.Cpu().DeepCopy(),
"memory": capacity.Memory().DeepCopy(),
}
spokeCluster.Status.Allocatable = clusterv1.ResourceList{
"cpu": allocatable.Cpu().DeepCopy(),
"memory": allocatable.Memory().DeepCopy(),
}
spokeCluster.Status.Version = clusterv1.SpokeVersion{
Kubernetes: kubeversion.Get().GitVersion,
}
return spokeCluster
}
func newResourceList(cpu, mem int) corev1.ResourceList {
return corev1.ResourceList{
corev1.ResourceCPU: *resource.NewQuantity(int64(cpu), resource.DecimalExponent),
corev1.ResourceMemory: *resource.NewQuantity(int64(1024*1024*mem), resource.BinarySI),
}
}
func newNode(name string, capacity, allocatable corev1.ResourceList) *corev1.Node {
return &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Status: corev1.NodeStatus{
Capacity: capacity,
Allocatable: allocatable,
},
}
}
type fakeSyncContext struct {
recorder events.Recorder
}
func newFakeSyncContext(t *testing.T) *fakeSyncContext {
return &fakeSyncContext{
recorder: eventstesting.NewTestingEventRecorder(t),
}
}
func (f fakeSyncContext) Queue() workqueue.RateLimitingInterface { return nil }
func (f fakeSyncContext) QueueKey() string { return "" }
func (f fakeSyncContext) Recorder() events.Recorder { return f.recorder }

View File

@@ -0,0 +1,2 @@
// package spokecluster contains the spoke cluster side reconciler for the SpokeCluster resource.
package spokecluster