From 7bb4b6864dee695026d2d23abf09bd65289095cf Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Fri, 5 Nov 2021 17:00:21 +0800 Subject: [PATCH] exclude the cordoned nodes when calc node allocatable (#174) Signed-off-by: liuwei --- .../managedcluster/healthcheck_controller.go | 106 ---------- .../managedcluster/joining_controller.go | 130 ++---------- .../managedcluster/joining_controller_test.go | 119 ----------- pkg/spoke/managedcluster/status_controller.go | 199 ++++++++++++++++++ ...ller_test.go => status_controller_test.go} | 98 ++++++++- pkg/spoke/spokeagent.go | 7 +- ...est.go => managedcluster_deletion_test.go} | 0 test/integration/spokecluster_status_test.go | 158 ++++++++++++++ test/integration/util/util.go | 46 ++++ 9 files changed, 518 insertions(+), 345 deletions(-) delete mode 100644 pkg/spoke/managedcluster/healthcheck_controller.go create mode 100644 pkg/spoke/managedcluster/status_controller.go rename pkg/spoke/managedcluster/{healthcheck_controller_test.go => status_controller_test.go} (57%) rename test/integration/{managedcluster_deletiong_test.go => managedcluster_deletion_test.go} (100%) create mode 100644 test/integration/spokecluster_status_test.go diff --git a/pkg/spoke/managedcluster/healthcheck_controller.go b/pkg/spoke/managedcluster/healthcheck_controller.go deleted file mode 100644 index dd86e60f5..000000000 --- a/pkg/spoke/managedcluster/healthcheck_controller.go +++ /dev/null @@ -1,106 +0,0 @@ -package managedcluster - -import ( - "context" - "fmt" - "net/http" - "time" - - clientset "open-cluster-management.io/api/client/cluster/clientset/versioned" - clusterv1informer "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1" - clusterv1listers "open-cluster-management.io/api/client/cluster/listers/cluster/v1" - clusterv1 "open-cluster-management.io/api/cluster/v1" - "open-cluster-management.io/registration/pkg/helpers" - - "github.com/openshift/library-go/pkg/controller/factory" - "github.com/openshift/library-go/pkg/operator/events" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - discovery "k8s.io/client-go/discovery" -) - -// managedClusterHealthCheckController checks the kube-apiserver health on managed cluster to determine it whether is available. -type managedClusterHealthCheckController struct { - clusterName string - hubClusterClient clientset.Interface - hubClusterLister clusterv1listers.ManagedClusterLister - managedClusterDiscoveryClient discovery.DiscoveryInterface -} - -// NewManagedClusterHealthCheckController creates a managed cluster health check controller on managed cluster. -func NewManagedClusterHealthCheckController( - clusterName string, - hubClusterClient clientset.Interface, - hubClusterInformer clusterv1informer.ManagedClusterInformer, - managedClusterDiscoveryClient discovery.DiscoveryInterface, - resyncInterval time.Duration, - recorder events.Recorder) factory.Controller { - c := &managedClusterHealthCheckController{ - clusterName: clusterName, - hubClusterClient: hubClusterClient, - hubClusterLister: hubClusterInformer.Lister(), - managedClusterDiscoveryClient: managedClusterDiscoveryClient, - } - - return factory.New(). - WithInformers(hubClusterInformer.Informer()). - WithSync(c.sync). - ResyncEvery(resyncInterval). - ToController("ManagedClusterHealthCheckController", recorder) -} - -// sync updates managed cluster available condition by checking kube-apiserver health on managed cluster. -func (c *managedClusterHealthCheckController) sync(ctx context.Context, syncCtx factory.SyncContext) error { - if _, err := c.hubClusterLister.Get(c.clusterName); err != nil { - return fmt.Errorf("unable to get managed cluster %q from hub: %w", c.clusterName, err) - } - - // check the kube-apiserver health on managed cluster. - condition := c.checkKubeAPIServerStatus(ctx) - conditionUpdateFn := helpers.UpdateManagedClusterConditionFn(condition) - _, updated, err := helpers.UpdateManagedClusterStatus(ctx, c.hubClusterClient, c.clusterName, conditionUpdateFn) - if err != nil { - return fmt.Errorf("unable to update status of managed cluster %q: %w", c.clusterName, err) - } - if updated { - syncCtx.Recorder().Eventf("ManagedClusterAvailableConditionUpdated", "update managed cluster %q available condition to %q, due to %q", - c.clusterName, condition.Status, condition.Message) - } - return nil -} - -// using readyz api to check the status of kube apiserver -func (c *managedClusterHealthCheckController) checkKubeAPIServerStatus(ctx context.Context) metav1.Condition { - statusCode := 0 - condition := metav1.Condition{Type: clusterv1.ManagedClusterConditionAvailable} - result := c.managedClusterDiscoveryClient.RESTClient().Get().AbsPath("/livez").Do(ctx).StatusCode(&statusCode) - if statusCode == http.StatusOK { - condition.Status = metav1.ConditionTrue - condition.Reason = "ManagedClusterAvailable" - condition.Message = "Managed cluster is available" - return condition - } - - // for backward compatible, the livez endpoint is supported from Kubernetes 1.16, so if the livez is not found or - // forbidden, the healthz endpoint will be used. - if statusCode == http.StatusNotFound || statusCode == http.StatusForbidden { - result = c.managedClusterDiscoveryClient.RESTClient().Get().AbsPath("/healthz").Do(ctx).StatusCode(&statusCode) - if statusCode == http.StatusOK { - condition.Status = metav1.ConditionTrue - condition.Reason = "ManagedClusterAvailable" - condition.Message = "Managed cluster is available" - return condition - } - } - - condition.Status = metav1.ConditionFalse - condition.Reason = "ManagedClusterKubeAPIServerUnavailable" - body, err := result.Raw() - if err == nil { - condition.Message = fmt.Sprintf("The kube-apiserver is not ok, status code: %d, %v", statusCode, string(body)) - return condition - } - - condition.Message = fmt.Sprintf("The kube-apiserver is not ok, status code: %d, %v", statusCode, err) - return condition -} diff --git a/pkg/spoke/managedcluster/joining_controller.go b/pkg/spoke/managedcluster/joining_controller.go index e0f158121..f2bf5a0d9 100644 --- a/pkg/spoke/managedcluster/joining_controller.go +++ b/pkg/spoke/managedcluster/joining_controller.go @@ -15,13 +15,7 @@ import ( "github.com/openshift/library-go/pkg/operator/events" "k8s.io/apimachinery/pkg/api/meta" - "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - discovery "k8s.io/client-go/discovery" - corev1informers "k8s.io/client-go/informers/core/v1" - corev1lister "k8s.io/client-go/listers/core/v1" - "k8s.io/klog/v2" ) // managedClusterJoiningController add the joined condition to a ManagedCluster on the managed cluster after it is accepted by hub cluster admin. @@ -29,8 +23,6 @@ type managedClusterJoiningController struct { clusterName string hubClusterClient clientset.Interface hubClusterLister clusterv1listers.ManagedClusterLister - discoveryClient discovery.DiscoveryInterface - nodeLister corev1lister.NodeLister } // NewManagedClusterJoiningController creates a new managed cluster joining controller on the managed cluster. @@ -38,28 +30,22 @@ func NewManagedClusterJoiningController( clusterName string, hubClusterClient clientset.Interface, hubManagedClusterInformer clusterv1informer.ManagedClusterInformer, - discoveryClient discovery.DiscoveryInterface, - nodeInformer corev1informers.NodeInformer, recorder events.Recorder) factory.Controller { c := &managedClusterJoiningController{ clusterName: clusterName, hubClusterClient: hubClusterClient, hubClusterLister: hubManagedClusterInformer.Lister(), - discoveryClient: discoveryClient, - nodeLister: nodeInformer.Lister(), } return factory.New(). - // TODO need to have conditional node capacity recalculation based on number of nodes here and decoupling - // the node capacity calculation to another controller. - WithInformers(hubManagedClusterInformer.Informer(), nodeInformer.Informer()). + WithInformers(hubManagedClusterInformer.Informer()). WithSync(c.sync). ResyncEvery(5*time.Minute). - ToController("ManagedClusterController", recorder) + ToController("ManagedClusterJoiningController", recorder) } // sync maintains the managed cluster side status of a ManagedCluster, it maintains the ManagedClusterJoined condition according to -// the value of the ManagedClusterHubAccepted condition and ensures resource and version are up to date. +// the value of the ManagedClusterHubAccepted condition. func (c managedClusterJoiningController) sync(ctx context.Context, syncCtx factory.SyncContext) error { managedCluster, err := c.hubClusterLister.Get(c.clusterName) if err != nil { @@ -72,111 +58,29 @@ func (c managedClusterJoiningController) sync(ctx context.Context, syncCtx facto return nil } - // current managed cluster is accepted, update its status if necessary. - capacity, allocatable, err := c.getClusterResources() - if err != nil { - return fmt.Errorf("unable to get capacity and allocatable of managed cluster %q: %w", c.clusterName, err) - } - - clusterVersion, err := c.getClusterVersion() - if err != nil { - return fmt.Errorf("unable to get server version of managed cluster %q: %w", c.clusterName, err) - } - - updateStatusFuncs := []helpers.UpdateManagedClusterStatusFunc{} - // current managed cluster did not join the hub cluster, join it. joined := meta.IsStatusConditionTrue(managedCluster.Status.Conditions, clusterv1.ManagedClusterConditionJoined) - if !joined { - updateStatusFuncs = append(updateStatusFuncs, helpers.UpdateManagedClusterConditionFn(metav1.Condition{ + if joined { + // current managed cluster is joined, do nothing. + return nil + } + + // current managed cluster did not join the hub cluster, join it. + _, updated, err := helpers.UpdateManagedClusterStatus( + ctx, + c.hubClusterClient, + c.clusterName, + helpers.UpdateManagedClusterConditionFn(metav1.Condition{ Type: clusterv1.ManagedClusterConditionJoined, Status: metav1.ConditionTrue, Reason: "ManagedClusterJoined", Message: "Managed cluster joined", - })) - } - - updateStatusFuncs = append(updateStatusFuncs, updateClusterResourcesFn(clusterv1.ManagedClusterStatus{ - Capacity: capacity, - Allocatable: allocatable, - Version: *clusterVersion, - })) - - _, updated, err := helpers.UpdateManagedClusterStatus(ctx, c.hubClusterClient, c.clusterName, updateStatusFuncs...) + }), + ) if err != nil { return fmt.Errorf("unable to update status of managed cluster %q: %w", c.clusterName, err) } if updated { - if !joined { - syncCtx.Recorder().Eventf("ManagedClusterJoined", "Managed cluster %q joined hub", c.clusterName) - } - klog.V(4).Infof("The status of managed cluster %q has been updated", c.clusterName) + syncCtx.Recorder().Eventf("ManagedClusterJoined", "Managed cluster %q joined hub", c.clusterName) } return nil } - -func (c *managedClusterJoiningController) getClusterVersion() (*clusterv1.ManagedClusterVersion, error) { - serverVersion, err := c.discoveryClient.ServerVersion() - if err != nil { - return nil, err - } - return &clusterv1.ManagedClusterVersion{Kubernetes: serverVersion.String()}, nil -} - -func (c *managedClusterJoiningController) getClusterResources() (capacity, allocatable clusterv1.ResourceList, err error) { - nodes, err := c.nodeLister.List(labels.Everything()) - if err != nil { - return nil, nil, err - } - - capacityList := make(map[clusterv1.ResourceName]resource.Quantity) - allocatableList := make(map[clusterv1.ResourceName]resource.Quantity) - - for _, node := range nodes { - for key, value := range node.Status.Capacity { - if capacity, exist := capacityList[clusterv1.ResourceName(key)]; exist { - capacity.Add(value) - capacityList[clusterv1.ResourceName(key)] = capacity - } else { - capacityList[clusterv1.ResourceName(key)] = value - } - } - - for key, value := range node.Status.Allocatable { - if allocatable, exist := allocatableList[clusterv1.ResourceName(key)]; exist { - allocatable.Add(value) - allocatableList[clusterv1.ResourceName(key)] = allocatable - } else { - allocatableList[clusterv1.ResourceName(key)] = value - } - } - } - - return capacityList, allocatableList, nil -} - -func formatQuantityToMi(q resource.Quantity) resource.Quantity { - raw, _ := q.AsInt64() - raw /= (1024 * 1024) - rq, err := resource.ParseQuantity(fmt.Sprintf("%dMi", raw)) - if err != nil { - return q - } - return rq -} - -func updateClusterResourcesFn(status clusterv1.ManagedClusterStatus) helpers.UpdateManagedClusterStatusFunc { - return func(oldStatus *clusterv1.ManagedClusterStatus) error { - // merge the old capacity to new capacity, if one old capacity entry does not exist in new capacity, - // we add it back to new capacity - for key, val := range oldStatus.Capacity { - if _, ok := status.Capacity[key]; !ok { - status.Capacity[key] = val - continue - } - } - oldStatus.Capacity = status.Capacity - oldStatus.Allocatable = status.Allocatable - oldStatus.Version = status.Version - return nil - } -} diff --git a/pkg/spoke/managedcluster/joining_controller_test.go b/pkg/spoke/managedcluster/joining_controller_test.go index 37f539276..4506d259f 100644 --- a/pkg/spoke/managedcluster/joining_controller_test.go +++ b/pkg/spoke/managedcluster/joining_controller_test.go @@ -10,13 +10,8 @@ import ( clusterv1 "open-cluster-management.io/api/cluster/v1" testinghelpers "open-cluster-management.io/registration/pkg/helpers/testing" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - kubeinformers "k8s.io/client-go/informers" - kubefake "k8s.io/client-go/kubernetes/fake" - kubeversion "k8s.io/client-go/pkg/version" clienttesting "k8s.io/client-go/testing" ) @@ -24,7 +19,6 @@ func TestSyncManagedCluster(t *testing.T) { cases := []struct { name string startingObjects []runtime.Object - nodes []runtime.Object validateActions func(t *testing.T, actions []clienttesting.Action) expectedErr string }{ @@ -42,9 +36,6 @@ func TestSyncManagedCluster(t *testing.T) { { name: "sync an accepted managed cluster", startingObjects: []runtime.Object{testinghelpers.NewAcceptedManagedCluster()}, - nodes: []runtime.Object{ - testinghelpers.NewNode("testnode1", testinghelpers.NewResourceList(32, 64), testinghelpers.NewResourceList(16, 32)), - }, validateActions: func(t *testing.T, actions []clienttesting.Action) { expectedCondition := metav1.Condition{ Type: clusterv1.ManagedClusterConditionJoined, @@ -52,110 +43,9 @@ func TestSyncManagedCluster(t *testing.T) { Reason: "ManagedClusterJoined", Message: "Managed cluster joined", } - expectedStatus := clusterv1.ManagedClusterStatus{ - Version: clusterv1.ManagedClusterVersion{ - Kubernetes: kubeversion.Get().GitVersion, - }, - Capacity: clusterv1.ResourceList{ - clusterv1.ResourceCPU: *resource.NewQuantity(int64(32), resource.DecimalExponent), - clusterv1.ResourceMemory: *resource.NewQuantity(int64(1024*1024*64), resource.BinarySI), - }, - Allocatable: clusterv1.ResourceList{ - clusterv1.ResourceCPU: *resource.NewQuantity(int64(16), resource.DecimalExponent), - clusterv1.ResourceMemory: *resource.NewQuantity(int64(1024*1024*32), resource.BinarySI), - }, - } testinghelpers.AssertActions(t, actions, "get", "update") actual := actions[1].(clienttesting.UpdateActionImpl).Object testinghelpers.AssertManagedClusterCondition(t, actual.(*clusterv1.ManagedCluster).Status.Conditions, expectedCondition) - testinghelpers.AssertManagedClusterStatus(t, actual.(*clusterv1.ManagedCluster).Status, expectedStatus) - }, - }, - { - name: "sync a joined managed cluster without status change", - startingObjects: []runtime.Object{ - testinghelpers.NewManagedClusterWithStatus(testinghelpers.NewResourceList(32, 64), testinghelpers.NewResourceList(16, 32)), - }, - nodes: []runtime.Object{ - testinghelpers.NewNode("testnode1", testinghelpers.NewResourceList(32, 64), testinghelpers.NewResourceList(16, 32)), - }, - validateActions: func(t *testing.T, actions []clienttesting.Action) { - testinghelpers.AssertActions(t, actions, "get") - }, - }, - { - name: "sync a joined managed cluster with status change", - startingObjects: []runtime.Object{testinghelpers.NewJoinedManagedCluster()}, - nodes: []runtime.Object{ - testinghelpers.NewNode("testnode1", testinghelpers.NewResourceList(32, 64), testinghelpers.NewResourceList(16, 32)), - testinghelpers.NewNode("testnode2", testinghelpers.NewResourceList(32, 64), testinghelpers.NewResourceList(16, 32)), - }, - validateActions: func(t *testing.T, actions []clienttesting.Action) { - expectedCondition := metav1.Condition{ - Type: clusterv1.ManagedClusterConditionJoined, - Status: metav1.ConditionTrue, - Reason: "ManagedClusterJoined", - Message: "Managed cluster joined", - } - expectedStatus := clusterv1.ManagedClusterStatus{ - Version: clusterv1.ManagedClusterVersion{ - Kubernetes: kubeversion.Get().GitVersion, - }, - Capacity: clusterv1.ResourceList{ - clusterv1.ResourceCPU: *resource.NewQuantity(int64(64), resource.DecimalExponent), - clusterv1.ResourceMemory: *resource.NewQuantity(int64(1024*1024*128), resource.BinarySI), - }, - Allocatable: clusterv1.ResourceList{ - clusterv1.ResourceCPU: *resource.NewQuantity(int64(32), resource.DecimalExponent), - clusterv1.ResourceMemory: *resource.NewQuantity(int64(1024*1024*64), resource.BinarySI), - }, - } - testinghelpers.AssertActions(t, actions, "get", "update") - actual := actions[1].(clienttesting.UpdateActionImpl).Object - testinghelpers.AssertManagedClusterCondition(t, actual.(*clusterv1.ManagedCluster).Status.Conditions, expectedCondition) - testinghelpers.AssertManagedClusterStatus(t, actual.(*clusterv1.ManagedCluster).Status, expectedStatus) - }, - }, - { - name: "merge a joined managed cluster status", - startingObjects: []runtime.Object{ - testinghelpers.NewManagedClusterWithStatus( - corev1.ResourceList{ - "sockets": *resource.NewQuantity(int64(1200), resource.DecimalExponent), - "cores": *resource.NewQuantity(int64(128), resource.DecimalExponent), - }, - testinghelpers.NewResourceList(16, 32)), - }, - nodes: []runtime.Object{ - testinghelpers.NewNode("testnode1", testinghelpers.NewResourceList(32, 64), testinghelpers.NewResourceList(16, 32)), - testinghelpers.NewNode("testnode2", testinghelpers.NewResourceList(32, 64), testinghelpers.NewResourceList(16, 32)), - }, - validateActions: func(t *testing.T, actions []clienttesting.Action) { - expectedCondition := metav1.Condition{ - Type: clusterv1.ManagedClusterConditionJoined, - Status: metav1.ConditionTrue, - Reason: "ManagedClusterJoined", - Message: "Managed cluster joined", - } - expectedStatus := clusterv1.ManagedClusterStatus{ - Version: clusterv1.ManagedClusterVersion{ - Kubernetes: kubeversion.Get().GitVersion, - }, - Capacity: clusterv1.ResourceList{ - "sockets": *resource.NewQuantity(int64(1200), resource.DecimalExponent), - "cores": *resource.NewQuantity(int64(128), resource.DecimalExponent), - clusterv1.ResourceCPU: *resource.NewQuantity(int64(64), resource.DecimalExponent), - clusterv1.ResourceMemory: *resource.NewQuantity(int64(1024*1024*128), resource.BinarySI), - }, - Allocatable: clusterv1.ResourceList{ - clusterv1.ResourceCPU: *resource.NewQuantity(int64(32), resource.DecimalExponent), - clusterv1.ResourceMemory: *resource.NewQuantity(int64(1024*1024*64), resource.BinarySI), - }, - } - testinghelpers.AssertActions(t, actions, "get", "update") - actual := actions[1].(clienttesting.UpdateActionImpl).Object - testinghelpers.AssertManagedClusterCondition(t, actual.(*clusterv1.ManagedCluster).Status.Conditions, expectedCondition) - testinghelpers.AssertManagedClusterStatus(t, actual.(*clusterv1.ManagedCluster).Status, expectedStatus) }, }, } @@ -169,19 +59,10 @@ func TestSyncManagedCluster(t *testing.T) { clusterStore.Add(cluster) } - kubeClient := kubefake.NewSimpleClientset(c.nodes...) - kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Minute*10) - nodeStore := kubeInformerFactory.Core().V1().Nodes().Informer().GetStore() - for _, node := range c.nodes { - nodeStore.Add(node) - } - ctrl := managedClusterJoiningController{ clusterName: testinghelpers.TestManagedClusterName, hubClusterClient: clusterClient, hubClusterLister: clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(), - discoveryClient: kubeClient.Discovery(), - nodeLister: kubeInformerFactory.Core().V1().Nodes().Lister(), } syncErr := ctrl.sync(context.TODO(), testinghelpers.NewFakeSyncContext(t, "")) diff --git a/pkg/spoke/managedcluster/status_controller.go b/pkg/spoke/managedcluster/status_controller.go new file mode 100644 index 000000000..17309fbc6 --- /dev/null +++ b/pkg/spoke/managedcluster/status_controller.go @@ -0,0 +1,199 @@ +package managedcluster + +import ( + "context" + "fmt" + "net/http" + "time" + + clientset "open-cluster-management.io/api/client/cluster/clientset/versioned" + clusterv1informer "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1" + clusterv1listers "open-cluster-management.io/api/client/cluster/listers/cluster/v1" + clusterv1 "open-cluster-management.io/api/cluster/v1" + "open-cluster-management.io/registration/pkg/helpers" + + "github.com/openshift/library-go/pkg/controller/factory" + "github.com/openshift/library-go/pkg/operator/events" + + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + discovery "k8s.io/client-go/discovery" + corev1informers "k8s.io/client-go/informers/core/v1" + corev1lister "k8s.io/client-go/listers/core/v1" +) + +// managedClusterStatusController checks the kube-apiserver health on managed cluster to determine it whether is available +// and ensure that the managed cluster resources and version are up to date. +type managedClusterStatusController struct { + clusterName string + hubClusterClient clientset.Interface + hubClusterLister clusterv1listers.ManagedClusterLister + managedClusterDiscoveryClient discovery.DiscoveryInterface + nodeLister corev1lister.NodeLister +} + +// NewManagedClusterStatusController creates a managed cluster status controller on managed cluster. +func NewManagedClusterStatusController( + clusterName string, + hubClusterClient clientset.Interface, + hubClusterInformer clusterv1informer.ManagedClusterInformer, + managedClusterDiscoveryClient discovery.DiscoveryInterface, + nodeInformer corev1informers.NodeInformer, + resyncInterval time.Duration, + recorder events.Recorder) factory.Controller { + c := &managedClusterStatusController{ + clusterName: clusterName, + hubClusterClient: hubClusterClient, + hubClusterLister: hubClusterInformer.Lister(), + managedClusterDiscoveryClient: managedClusterDiscoveryClient, + nodeLister: nodeInformer.Lister(), + } + + return factory.New(). + WithInformers(hubClusterInformer.Informer(), nodeInformer.Informer()). + WithSync(c.sync). + ResyncEvery(resyncInterval). + ToController("ManagedClusterStatusController", recorder) +} + +// sync updates managed cluster available condition by checking kube-apiserver health on managed cluster. +// if the kube-apiserver is health, it will ensure that managed cluster resources and version are up to date. +func (c *managedClusterStatusController) sync(ctx context.Context, syncCtx factory.SyncContext) error { + if _, err := c.hubClusterLister.Get(c.clusterName); err != nil { + return fmt.Errorf("unable to get managed cluster %q from hub: %w", c.clusterName, err) + } + + updateStatusFuncs := []helpers.UpdateManagedClusterStatusFunc{} + + // check the kube-apiserver health on managed cluster. + condition := c.checkKubeAPIServerStatus(ctx) + + // the managed cluster kube-apiserver is health, update its version and resources if necessary. + if condition.Status == metav1.ConditionTrue { + clusterVersion, err := c.getClusterVersion() + if err != nil { + return fmt.Errorf("unable to get server version of managed cluster %q: %w", c.clusterName, err) + } + + capacity, allocatable, err := c.getClusterResources() + if err != nil { + return fmt.Errorf("unable to get capacity and allocatable of managed cluster %q: %w", c.clusterName, err) + } + + updateStatusFuncs = append(updateStatusFuncs, updateClusterResourcesFn(clusterv1.ManagedClusterStatus{ + Capacity: capacity, + Allocatable: allocatable, + Version: *clusterVersion, + })) + } + + updateStatusFuncs = append(updateStatusFuncs, helpers.UpdateManagedClusterConditionFn(condition)) + _, updated, err := helpers.UpdateManagedClusterStatus(ctx, c.hubClusterClient, c.clusterName, updateStatusFuncs...) + if err != nil { + return fmt.Errorf("unable to update status of managed cluster %q: %w", c.clusterName, err) + } + if updated { + syncCtx.Recorder().Eventf("ManagedClusterStatusUpdated", "the status of managed cluster %q has been updated, available condition is %q, due to %q", + c.clusterName, condition.Status, condition.Message) + } + return nil +} + +// using readyz api to check the status of kube apiserver +func (c *managedClusterStatusController) checkKubeAPIServerStatus(ctx context.Context) metav1.Condition { + statusCode := 0 + condition := metav1.Condition{Type: clusterv1.ManagedClusterConditionAvailable} + result := c.managedClusterDiscoveryClient.RESTClient().Get().AbsPath("/livez").Do(ctx).StatusCode(&statusCode) + if statusCode == http.StatusOK { + condition.Status = metav1.ConditionTrue + condition.Reason = "ManagedClusterAvailable" + condition.Message = "Managed cluster is available" + return condition + } + + // for backward compatible, the livez endpoint is supported from Kubernetes 1.16, so if the livez is not found or + // forbidden, the healthz endpoint will be used. + if statusCode == http.StatusNotFound || statusCode == http.StatusForbidden { + result = c.managedClusterDiscoveryClient.RESTClient().Get().AbsPath("/healthz").Do(ctx).StatusCode(&statusCode) + if statusCode == http.StatusOK { + condition.Status = metav1.ConditionTrue + condition.Reason = "ManagedClusterAvailable" + condition.Message = "Managed cluster is available" + return condition + } + } + + condition.Status = metav1.ConditionFalse + condition.Reason = "ManagedClusterKubeAPIServerUnavailable" + body, err := result.Raw() + if err == nil { + condition.Message = fmt.Sprintf("The kube-apiserver is not ok, status code: %d, %v", statusCode, string(body)) + return condition + } + + condition.Message = fmt.Sprintf("The kube-apiserver is not ok, status code: %d, %v", statusCode, err) + return condition +} + +func (c *managedClusterStatusController) getClusterVersion() (*clusterv1.ManagedClusterVersion, error) { + serverVersion, err := c.managedClusterDiscoveryClient.ServerVersion() + if err != nil { + return nil, err + } + return &clusterv1.ManagedClusterVersion{Kubernetes: serverVersion.String()}, nil +} + +func (c *managedClusterStatusController) getClusterResources() (capacity, allocatable clusterv1.ResourceList, err error) { + nodes, err := c.nodeLister.List(labels.Everything()) + if err != nil { + return nil, nil, err + } + + capacityList := make(map[clusterv1.ResourceName]resource.Quantity) + allocatableList := make(map[clusterv1.ResourceName]resource.Quantity) + + for _, node := range nodes { + for key, value := range node.Status.Capacity { + if capacity, exist := capacityList[clusterv1.ResourceName(key)]; exist { + capacity.Add(value) + capacityList[clusterv1.ResourceName(key)] = capacity + } else { + capacityList[clusterv1.ResourceName(key)] = value + } + } + + // the node is unschedulable, ignore its allocatable resources + if node.Spec.Unschedulable { + continue + } + + for key, value := range node.Status.Allocatable { + if allocatable, exist := allocatableList[clusterv1.ResourceName(key)]; exist { + allocatable.Add(value) + allocatableList[clusterv1.ResourceName(key)] = allocatable + } else { + allocatableList[clusterv1.ResourceName(key)] = value + } + } + } + + return capacityList, allocatableList, nil +} + +func updateClusterResourcesFn(status clusterv1.ManagedClusterStatus) helpers.UpdateManagedClusterStatusFunc { + return func(oldStatus *clusterv1.ManagedClusterStatus) error { + // merge the old capacity to new capacity, if one old capacity entry does not exist in new capacity, + // we add it back to new capacity + for key, val := range oldStatus.Capacity { + if _, ok := status.Capacity[key]; !ok { + status.Capacity[key] = val + continue + } + } + oldStatus.Capacity = status.Capacity + oldStatus.Allocatable = status.Allocatable + oldStatus.Version = status.Version + return nil + } +} diff --git a/pkg/spoke/managedcluster/healthcheck_controller_test.go b/pkg/spoke/managedcluster/status_controller_test.go similarity index 57% rename from pkg/spoke/managedcluster/healthcheck_controller_test.go rename to pkg/spoke/managedcluster/status_controller_test.go index 0d8558f0b..7ba3b7282 100644 --- a/pkg/spoke/managedcluster/healthcheck_controller_test.go +++ b/pkg/spoke/managedcluster/status_controller_test.go @@ -2,6 +2,7 @@ package managedcluster import ( "context" + "encoding/json" "net/http" "net/http/httptest" "testing" @@ -12,9 +13,14 @@ import ( clusterv1 "open-cluster-management.io/api/cluster/v1" testinghelpers "open-cluster-management.io/registration/pkg/helpers/testing" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/version" discovery "k8s.io/client-go/discovery" + kubeinformers "k8s.io/client-go/informers" + kubefake "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/rest" clienttesting "k8s.io/client-go/testing" ) @@ -31,6 +37,21 @@ func TestHealthCheck(t *testing.T) { w.WriteHeader(http.StatusOK) return } + + if req.URL.Path == "/version" { + output, err := json.Marshal(version.Info{ + GitVersion: "test-version", + }) + if err != nil { + t.Errorf("unexpected encoding error: %v", err) + return + } + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write(output) + return + } + w.WriteHeader(serverResponse.httpStatus) w.Write([]byte(serverResponse.responseMsg)) })) @@ -41,6 +62,7 @@ func TestHealthCheck(t *testing.T) { cases := []struct { name string clusters []runtime.Object + nodes []runtime.Object httpStatus int responseMsg string validateActions func(t *testing.T, actions []clienttesting.Action) @@ -70,8 +92,11 @@ func TestHealthCheck(t *testing.T) { }, }, { - name: "kube-apiserver is ok", - clusters: []runtime.Object{testinghelpers.NewAcceptedManagedCluster()}, + name: "kube-apiserver is ok", + clusters: []runtime.Object{testinghelpers.NewAcceptedManagedCluster()}, + nodes: []runtime.Object{ + testinghelpers.NewNode("testnode1", testinghelpers.NewResourceList(32, 64), testinghelpers.NewResourceList(16, 32)), + }, httpStatus: http.StatusOK, validateActions: func(t *testing.T, actions []clienttesting.Action) { expectedCondition := metav1.Condition{ @@ -80,14 +105,29 @@ func TestHealthCheck(t *testing.T) { Reason: "ManagedClusterAvailable", Message: "Managed cluster is available", } + expectedStatus := clusterv1.ManagedClusterStatus{ + Version: clusterv1.ManagedClusterVersion{ + Kubernetes: "test-version", + }, + Capacity: clusterv1.ResourceList{ + clusterv1.ResourceCPU: *resource.NewQuantity(int64(32), resource.DecimalExponent), + clusterv1.ResourceMemory: *resource.NewQuantity(int64(1024*1024*64), resource.BinarySI), + }, + Allocatable: clusterv1.ResourceList{ + clusterv1.ResourceCPU: *resource.NewQuantity(int64(16), resource.DecimalExponent), + clusterv1.ResourceMemory: *resource.NewQuantity(int64(1024*1024*32), resource.BinarySI), + }, + } testinghelpers.AssertActions(t, actions, "get", "update") actual := actions[1].(clienttesting.UpdateActionImpl).Object testinghelpers.AssertManagedClusterCondition(t, actual.(*clusterv1.ManagedCluster).Status.Conditions, expectedCondition) + testinghelpers.AssertManagedClusterStatus(t, actual.(*clusterv1.ManagedCluster).Status, expectedStatus) }, }, { name: "there is no livez endpoint", clusters: []runtime.Object{testinghelpers.NewAcceptedManagedCluster()}, + nodes: []runtime.Object{}, httpStatus: http.StatusNotFound, validateActions: func(t *testing.T, actions []clienttesting.Action) { expectedCondition := metav1.Condition{ @@ -104,6 +144,7 @@ func TestHealthCheck(t *testing.T) { { name: "livez is forbidden", clusters: []runtime.Object{testinghelpers.NewAcceptedManagedCluster()}, + nodes: []runtime.Object{}, httpStatus: http.StatusForbidden, validateActions: func(t *testing.T, actions []clienttesting.Action) { expectedCondition := metav1.Condition{ @@ -117,6 +158,49 @@ func TestHealthCheck(t *testing.T) { testinghelpers.AssertManagedClusterCondition(t, actual.(*clusterv1.ManagedCluster).Status.Conditions, expectedCondition) }, }, + { + name: "merge managed cluster status", + clusters: []runtime.Object{ + testinghelpers.NewManagedClusterWithStatus( + corev1.ResourceList{ + "sockets": *resource.NewQuantity(int64(1200), resource.DecimalExponent), + "cores": *resource.NewQuantity(int64(128), resource.DecimalExponent), + }, + testinghelpers.NewResourceList(16, 32)), + }, + nodes: []runtime.Object{ + testinghelpers.NewNode("testnode1", testinghelpers.NewResourceList(32, 64), testinghelpers.NewResourceList(16, 32)), + testinghelpers.NewNode("testnode2", testinghelpers.NewResourceList(32, 64), testinghelpers.NewResourceList(16, 32)), + }, + httpStatus: http.StatusOK, + validateActions: func(t *testing.T, actions []clienttesting.Action) { + expectedCondition := metav1.Condition{ + Type: clusterv1.ManagedClusterConditionJoined, + Status: metav1.ConditionTrue, + Reason: "ManagedClusterJoined", + Message: "Managed cluster joined", + } + expectedStatus := clusterv1.ManagedClusterStatus{ + Version: clusterv1.ManagedClusterVersion{ + Kubernetes: "test-version", + }, + Capacity: clusterv1.ResourceList{ + "sockets": *resource.NewQuantity(int64(1200), resource.DecimalExponent), + "cores": *resource.NewQuantity(int64(128), resource.DecimalExponent), + clusterv1.ResourceCPU: *resource.NewQuantity(int64(64), resource.DecimalExponent), + clusterv1.ResourceMemory: *resource.NewQuantity(int64(1024*1024*128), resource.BinarySI), + }, + Allocatable: clusterv1.ResourceList{ + clusterv1.ResourceCPU: *resource.NewQuantity(int64(32), resource.DecimalExponent), + clusterv1.ResourceMemory: *resource.NewQuantity(int64(1024*1024*64), resource.BinarySI), + }, + } + testinghelpers.AssertActions(t, actions, "get", "update") + actual := actions[1].(clienttesting.UpdateActionImpl).Object + testinghelpers.AssertManagedClusterCondition(t, actual.(*clusterv1.ManagedCluster).Status.Conditions, expectedCondition) + testinghelpers.AssertManagedClusterStatus(t, actual.(*clusterv1.ManagedCluster).Status, expectedStatus) + }, + }, } for _, c := range cases { t.Run(c.name, func(t *testing.T) { @@ -127,14 +211,22 @@ func TestHealthCheck(t *testing.T) { clusterStore.Add(cluster) } + kubeClient := kubefake.NewSimpleClientset(c.nodes...) + kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Minute*10) + nodeStore := kubeInformerFactory.Core().V1().Nodes().Informer().GetStore() + for _, node := range c.nodes { + nodeStore.Add(node) + } + serverResponse.httpStatus = c.httpStatus serverResponse.responseMsg = c.responseMsg - ctrl := &managedClusterHealthCheckController{ + ctrl := &managedClusterStatusController{ clusterName: testinghelpers.TestManagedClusterName, hubClusterClient: clusterClient, hubClusterLister: clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(), managedClusterDiscoveryClient: discoveryClient, + nodeLister: kubeInformerFactory.Core().V1().Nodes().Lister(), } syncErr := ctrl.sync(context.TODO(), testinghelpers.NewFakeSyncContext(t, "")) testinghelpers.AssertError(t, syncErr, c.expectedErr) diff --git a/pkg/spoke/spokeagent.go b/pkg/spoke/spokeagent.go index 11acd7b97..2d79140a1 100644 --- a/pkg/spoke/spokeagent.go +++ b/pkg/spoke/spokeagent.go @@ -264,8 +264,6 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext o.ClusterName, hubClusterClient, hubClusterInformerFactory.Cluster().V1().ManagedClusters(), - spokeKubeClient.Discovery(), - spokeKubeInformerFactory.Core().V1().Nodes(), controllerContext.EventRecorder, ) @@ -277,12 +275,13 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext controllerContext.EventRecorder, ) - // create ManagedClusterHealthCheckController to check the spoke cluster health - managedClusterHealthCheckController := managedcluster.NewManagedClusterHealthCheckController( + // create NewManagedClusterStatusController to update the spoke cluster status + managedClusterHealthCheckController := managedcluster.NewManagedClusterStatusController( o.ClusterName, hubClusterClient, hubClusterInformerFactory.Cluster().V1().ManagedClusters(), spokeKubeClient.Discovery(), + spokeKubeInformerFactory.Core().V1().Nodes(), o.ClusterHealthCheckPeriod, controllerContext.EventRecorder, ) diff --git a/test/integration/managedcluster_deletiong_test.go b/test/integration/managedcluster_deletion_test.go similarity index 100% rename from test/integration/managedcluster_deletiong_test.go rename to test/integration/managedcluster_deletion_test.go diff --git a/test/integration/spokecluster_status_test.go b/test/integration/spokecluster_status_test.go new file mode 100644 index 000000000..1a9b1c5f8 --- /dev/null +++ b/test/integration/spokecluster_status_test.go @@ -0,0 +1,158 @@ +package integration_test + +import ( + "context" + "fmt" + "path" + "time" + + "github.com/onsi/ginkgo" + "github.com/onsi/gomega" + "k8s.io/apimachinery/pkg/api/meta" + + clusterv1 "open-cluster-management.io/api/cluster/v1" + "open-cluster-management.io/registration/pkg/spoke" + "open-cluster-management.io/registration/test/integration/util" + + "github.com/openshift/library-go/pkg/controller/controllercmd" +) + +var _ = ginkgo.Describe("Collecting Node Resource", func() { + ginkgo.It("managed cluster node resource should be collected successfully", func() { + var err error + + // create one node + capacity := util.NewResourceList(32, 64) + allocatable := util.NewResourceList(16, 32) + err = util.CreateNode(kubeClient, "node1", capacity, allocatable) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + managedClusterName := "resorucetest-managedcluster" + hubKubeconfigSecret := "resorucetest-hub-kubeconfig-secret" + hubKubeconfigDir := path.Join(util.TestDir, "resorucetest", "hub-kubeconfig") + + // run registration agent + go func() { + agentOptions := spoke.SpokeAgentOptions{ + ClusterName: managedClusterName, + BootstrapKubeconfig: bootstrapKubeConfigFile, + HubKubeconfigSecret: hubKubeconfigSecret, + HubKubeconfigDir: hubKubeconfigDir, + ClusterHealthCheckPeriod: 1 * time.Minute, + } + err := agentOptions.RunSpokeAgent(context.Background(), &controllercmd.ControllerContext{ + KubeConfig: spokeCfg, + EventRecorder: util.NewIntegrationTestEventRecorder("resorucetest"), + }) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + }() + + // the spoke cluster and csr should be created after bootstrap + gomega.Eventually(func() bool { + if _, err := util.GetManagedCluster(clusterClient, managedClusterName); err != nil { + return false + } + return true + }, eventuallyTimeout, eventuallyInterval).Should(gomega.BeTrue()) + + gomega.Eventually(func() bool { + if _, err := util.FindUnapprovedSpokeCSR(kubeClient, managedClusterName); err != nil { + return false + } + return true + }, eventuallyTimeout, eventuallyInterval).Should(gomega.BeTrue()) + + // the spoke cluster should has finalizer that is added by hub controller + gomega.Eventually(func() bool { + spokeCluster, err := util.GetManagedCluster(clusterClient, managedClusterName) + if err != nil { + return false + } + if len(spokeCluster.Finalizers) != 1 { + return false + } + + if spokeCluster.Finalizers[0] != "cluster.open-cluster-management.io/api-resource-cleanup" { + return false + } + + return true + }, eventuallyTimeout, eventuallyInterval).Should(gomega.BeTrue()) + + // simulate hub cluster admin to accept the managedcluster and approve the csr + err = util.AcceptManagedCluster(clusterClient, managedClusterName) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + err = util.ApproveSpokeClusterCSR(kubeClient, managedClusterName, time.Hour*24) + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + // the managed cluster should have accepted condition after it is accepted + gomega.Eventually(func() bool { + spokeCluster, err := util.GetManagedCluster(clusterClient, managedClusterName) + if err != nil { + return false + } + accpeted := meta.FindStatusCondition(spokeCluster.Status.Conditions, clusterv1.ManagedClusterConditionHubAccepted) + if accpeted == nil { + return false + } + return true + }, eventuallyTimeout, eventuallyInterval).Should(gomega.BeTrue()) + + // the hub kubeconfig secret should be filled after the csr is approved + gomega.Eventually(func() bool { + if _, err := util.GetFilledHubKubeConfigSecret(kubeClient, testNamespace, hubKubeconfigSecret); err != nil { + return false + } + return true + }, eventuallyTimeout, eventuallyInterval).Should(gomega.BeTrue()) + + // the resource of spoke cluster should be updated finally + gomega.Eventually(func() bool { + spokeCluster, err := util.GetManagedCluster(clusterClient, managedClusterName) + if err != nil { + return false + } + if !util.CmpResourceQuantity("cpu", capacity, spokeCluster.Status.Capacity) { + fmt.Printf("expected cpu capacity %#v but got: %#v\n", capacity["cpu"], spokeCluster.Status.Capacity["cpu"]) + return false + } + if !util.CmpResourceQuantity("memory", capacity, spokeCluster.Status.Capacity) { + return false + } + if !util.CmpResourceQuantity("cpu", allocatable, spokeCluster.Status.Allocatable) { + return false + } + if !util.CmpResourceQuantity("memory", allocatable, spokeCluster.Status.Allocatable) { + return false + } + return true + }, eventuallyTimeout, eventuallyInterval).Should(gomega.BeTrue()) + + // cordon the node + err = util.CordonNode(kubeClient, "node1") + gomega.Expect(err).NotTo(gomega.HaveOccurred()) + + // the resource of spoke cluster should be updated finally + gomega.Eventually(func() bool { + spokeCluster, err := util.GetManagedCluster(clusterClient, managedClusterName) + if err != nil { + return false + } + if !util.CmpResourceQuantity("cpu", capacity, spokeCluster.Status.Capacity) { + fmt.Printf("expected cpu capacity %#v but got: %#v\n", capacity["cpu"], spokeCluster.Status.Capacity["cpu"]) + return false + } + if !util.CmpResourceQuantity("memory", capacity, spokeCluster.Status.Capacity) { + return false + } + + // after cordoned the node, there should be no allocatable resource + if len(spokeCluster.Status.Allocatable) != 0 { + return false + } + + return true + }, eventuallyTimeout, eventuallyInterval).Should(gomega.BeTrue()) + }) +}) diff --git a/test/integration/util/util.go b/test/integration/util/util.go index dcd8879bf..54612e2d5 100644 --- a/test/integration/util/util.go +++ b/test/integration/util/util.go @@ -26,6 +26,7 @@ import ( certificates "k8s.io/api/certificates/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" @@ -516,6 +517,51 @@ func AcceptManagedCluster(clusterClient clusterclientset.Interface, spokeCluster return err } +func CreateNode(kubeClient kubernetes.Interface, name string, capacity, allocatable corev1.ResourceList) error { + node := &corev1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Status: corev1.NodeStatus{ + Capacity: capacity, + Allocatable: allocatable, + }, + } + _, err := kubeClient.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{}) + return err +} + +func CordonNode(kubeClient kubernetes.Interface, name string) error { + node, err := kubeClient.CoreV1().Nodes().Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + return err + } + + node = node.DeepCopy() + node.Spec.Unschedulable = true + _, err = kubeClient.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{}) + return err +} + +func NewResourceList(cpu, mem int) corev1.ResourceList { + return corev1.ResourceList{ + corev1.ResourceCPU: *resource.NewQuantity(int64(cpu), resource.DecimalExponent), + corev1.ResourceMemory: *resource.NewQuantity(int64(1024*1024*mem), resource.BinarySI), + } +} + +func CmpResourceQuantity(key string, nodeResourceList corev1.ResourceList, clusterResorceList clusterv1.ResourceList) bool { + nodeResource, ok := nodeResourceList[corev1.ResourceName(key)] + if !ok { + return false + } + clusterResouce, ok := clusterResorceList[clusterv1.ResourceName(key)] + if !ok { + return false + } + return nodeResource.Equal(clusterResouce) +} + func NewIntegrationTestEventRecorder(componet string) events.Recorder { return &IntegrationTestEventRecorder{component: componet} }