exclude the cordoned nodes when calc node allocatable (#174)

Signed-off-by: liuwei <liuweixa@redhat.com>
This commit is contained in:
Wei Liu
2021-11-05 17:00:21 +08:00
committed by GitHub
parent ef7c23eded
commit 7bb4b6864d
9 changed files with 518 additions and 345 deletions

View File

@@ -1,106 +0,0 @@
package managedcluster
import (
"context"
"fmt"
"net/http"
"time"
clientset "open-cluster-management.io/api/client/cluster/clientset/versioned"
clusterv1informer "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1"
clusterv1listers "open-cluster-management.io/api/client/cluster/listers/cluster/v1"
clusterv1 "open-cluster-management.io/api/cluster/v1"
"open-cluster-management.io/registration/pkg/helpers"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
discovery "k8s.io/client-go/discovery"
)
// managedClusterHealthCheckController checks the kube-apiserver health on managed cluster to determine it whether is available.
type managedClusterHealthCheckController struct {
clusterName string
hubClusterClient clientset.Interface
hubClusterLister clusterv1listers.ManagedClusterLister
managedClusterDiscoveryClient discovery.DiscoveryInterface
}
// NewManagedClusterHealthCheckController creates a managed cluster health check controller on managed cluster.
func NewManagedClusterHealthCheckController(
clusterName string,
hubClusterClient clientset.Interface,
hubClusterInformer clusterv1informer.ManagedClusterInformer,
managedClusterDiscoveryClient discovery.DiscoveryInterface,
resyncInterval time.Duration,
recorder events.Recorder) factory.Controller {
c := &managedClusterHealthCheckController{
clusterName: clusterName,
hubClusterClient: hubClusterClient,
hubClusterLister: hubClusterInformer.Lister(),
managedClusterDiscoveryClient: managedClusterDiscoveryClient,
}
return factory.New().
WithInformers(hubClusterInformer.Informer()).
WithSync(c.sync).
ResyncEvery(resyncInterval).
ToController("ManagedClusterHealthCheckController", recorder)
}
// sync updates managed cluster available condition by checking kube-apiserver health on managed cluster.
func (c *managedClusterHealthCheckController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
if _, err := c.hubClusterLister.Get(c.clusterName); err != nil {
return fmt.Errorf("unable to get managed cluster %q from hub: %w", c.clusterName, err)
}
// check the kube-apiserver health on managed cluster.
condition := c.checkKubeAPIServerStatus(ctx)
conditionUpdateFn := helpers.UpdateManagedClusterConditionFn(condition)
_, updated, err := helpers.UpdateManagedClusterStatus(ctx, c.hubClusterClient, c.clusterName, conditionUpdateFn)
if err != nil {
return fmt.Errorf("unable to update status of managed cluster %q: %w", c.clusterName, err)
}
if updated {
syncCtx.Recorder().Eventf("ManagedClusterAvailableConditionUpdated", "update managed cluster %q available condition to %q, due to %q",
c.clusterName, condition.Status, condition.Message)
}
return nil
}
// using readyz api to check the status of kube apiserver
func (c *managedClusterHealthCheckController) checkKubeAPIServerStatus(ctx context.Context) metav1.Condition {
statusCode := 0
condition := metav1.Condition{Type: clusterv1.ManagedClusterConditionAvailable}
result := c.managedClusterDiscoveryClient.RESTClient().Get().AbsPath("/livez").Do(ctx).StatusCode(&statusCode)
if statusCode == http.StatusOK {
condition.Status = metav1.ConditionTrue
condition.Reason = "ManagedClusterAvailable"
condition.Message = "Managed cluster is available"
return condition
}
// for backward compatible, the livez endpoint is supported from Kubernetes 1.16, so if the livez is not found or
// forbidden, the healthz endpoint will be used.
if statusCode == http.StatusNotFound || statusCode == http.StatusForbidden {
result = c.managedClusterDiscoveryClient.RESTClient().Get().AbsPath("/healthz").Do(ctx).StatusCode(&statusCode)
if statusCode == http.StatusOK {
condition.Status = metav1.ConditionTrue
condition.Reason = "ManagedClusterAvailable"
condition.Message = "Managed cluster is available"
return condition
}
}
condition.Status = metav1.ConditionFalse
condition.Reason = "ManagedClusterKubeAPIServerUnavailable"
body, err := result.Raw()
if err == nil {
condition.Message = fmt.Sprintf("The kube-apiserver is not ok, status code: %d, %v", statusCode, string(body))
return condition
}
condition.Message = fmt.Sprintf("The kube-apiserver is not ok, status code: %d, %v", statusCode, err)
return condition
}

View File

@@ -15,13 +15,7 @@ import (
"github.com/openshift/library-go/pkg/operator/events"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
discovery "k8s.io/client-go/discovery"
corev1informers "k8s.io/client-go/informers/core/v1"
corev1lister "k8s.io/client-go/listers/core/v1"
"k8s.io/klog/v2"
)
// managedClusterJoiningController add the joined condition to a ManagedCluster on the managed cluster after it is accepted by hub cluster admin.
@@ -29,8 +23,6 @@ type managedClusterJoiningController struct {
clusterName string
hubClusterClient clientset.Interface
hubClusterLister clusterv1listers.ManagedClusterLister
discoveryClient discovery.DiscoveryInterface
nodeLister corev1lister.NodeLister
}
// NewManagedClusterJoiningController creates a new managed cluster joining controller on the managed cluster.
@@ -38,28 +30,22 @@ func NewManagedClusterJoiningController(
clusterName string,
hubClusterClient clientset.Interface,
hubManagedClusterInformer clusterv1informer.ManagedClusterInformer,
discoveryClient discovery.DiscoveryInterface,
nodeInformer corev1informers.NodeInformer,
recorder events.Recorder) factory.Controller {
c := &managedClusterJoiningController{
clusterName: clusterName,
hubClusterClient: hubClusterClient,
hubClusterLister: hubManagedClusterInformer.Lister(),
discoveryClient: discoveryClient,
nodeLister: nodeInformer.Lister(),
}
return factory.New().
// TODO need to have conditional node capacity recalculation based on number of nodes here and decoupling
// the node capacity calculation to another controller.
WithInformers(hubManagedClusterInformer.Informer(), nodeInformer.Informer()).
WithInformers(hubManagedClusterInformer.Informer()).
WithSync(c.sync).
ResyncEvery(5*time.Minute).
ToController("ManagedClusterController", recorder)
ToController("ManagedClusterJoiningController", recorder)
}
// sync maintains the managed cluster side status of a ManagedCluster, it maintains the ManagedClusterJoined condition according to
// the value of the ManagedClusterHubAccepted condition and ensures resource and version are up to date.
// the value of the ManagedClusterHubAccepted condition.
func (c managedClusterJoiningController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
managedCluster, err := c.hubClusterLister.Get(c.clusterName)
if err != nil {
@@ -72,111 +58,29 @@ func (c managedClusterJoiningController) sync(ctx context.Context, syncCtx facto
return nil
}
// current managed cluster is accepted, update its status if necessary.
capacity, allocatable, err := c.getClusterResources()
if err != nil {
return fmt.Errorf("unable to get capacity and allocatable of managed cluster %q: %w", c.clusterName, err)
}
clusterVersion, err := c.getClusterVersion()
if err != nil {
return fmt.Errorf("unable to get server version of managed cluster %q: %w", c.clusterName, err)
}
updateStatusFuncs := []helpers.UpdateManagedClusterStatusFunc{}
// current managed cluster did not join the hub cluster, join it.
joined := meta.IsStatusConditionTrue(managedCluster.Status.Conditions, clusterv1.ManagedClusterConditionJoined)
if !joined {
updateStatusFuncs = append(updateStatusFuncs, helpers.UpdateManagedClusterConditionFn(metav1.Condition{
if joined {
// current managed cluster is joined, do nothing.
return nil
}
// current managed cluster did not join the hub cluster, join it.
_, updated, err := helpers.UpdateManagedClusterStatus(
ctx,
c.hubClusterClient,
c.clusterName,
helpers.UpdateManagedClusterConditionFn(metav1.Condition{
Type: clusterv1.ManagedClusterConditionJoined,
Status: metav1.ConditionTrue,
Reason: "ManagedClusterJoined",
Message: "Managed cluster joined",
}))
}
updateStatusFuncs = append(updateStatusFuncs, updateClusterResourcesFn(clusterv1.ManagedClusterStatus{
Capacity: capacity,
Allocatable: allocatable,
Version: *clusterVersion,
}))
_, updated, err := helpers.UpdateManagedClusterStatus(ctx, c.hubClusterClient, c.clusterName, updateStatusFuncs...)
}),
)
if err != nil {
return fmt.Errorf("unable to update status of managed cluster %q: %w", c.clusterName, err)
}
if updated {
if !joined {
syncCtx.Recorder().Eventf("ManagedClusterJoined", "Managed cluster %q joined hub", c.clusterName)
}
klog.V(4).Infof("The status of managed cluster %q has been updated", c.clusterName)
syncCtx.Recorder().Eventf("ManagedClusterJoined", "Managed cluster %q joined hub", c.clusterName)
}
return nil
}
func (c *managedClusterJoiningController) getClusterVersion() (*clusterv1.ManagedClusterVersion, error) {
serverVersion, err := c.discoveryClient.ServerVersion()
if err != nil {
return nil, err
}
return &clusterv1.ManagedClusterVersion{Kubernetes: serverVersion.String()}, nil
}
func (c *managedClusterJoiningController) getClusterResources() (capacity, allocatable clusterv1.ResourceList, err error) {
nodes, err := c.nodeLister.List(labels.Everything())
if err != nil {
return nil, nil, err
}
capacityList := make(map[clusterv1.ResourceName]resource.Quantity)
allocatableList := make(map[clusterv1.ResourceName]resource.Quantity)
for _, node := range nodes {
for key, value := range node.Status.Capacity {
if capacity, exist := capacityList[clusterv1.ResourceName(key)]; exist {
capacity.Add(value)
capacityList[clusterv1.ResourceName(key)] = capacity
} else {
capacityList[clusterv1.ResourceName(key)] = value
}
}
for key, value := range node.Status.Allocatable {
if allocatable, exist := allocatableList[clusterv1.ResourceName(key)]; exist {
allocatable.Add(value)
allocatableList[clusterv1.ResourceName(key)] = allocatable
} else {
allocatableList[clusterv1.ResourceName(key)] = value
}
}
}
return capacityList, allocatableList, nil
}
func formatQuantityToMi(q resource.Quantity) resource.Quantity {
raw, _ := q.AsInt64()
raw /= (1024 * 1024)
rq, err := resource.ParseQuantity(fmt.Sprintf("%dMi", raw))
if err != nil {
return q
}
return rq
}
func updateClusterResourcesFn(status clusterv1.ManagedClusterStatus) helpers.UpdateManagedClusterStatusFunc {
return func(oldStatus *clusterv1.ManagedClusterStatus) error {
// merge the old capacity to new capacity, if one old capacity entry does not exist in new capacity,
// we add it back to new capacity
for key, val := range oldStatus.Capacity {
if _, ok := status.Capacity[key]; !ok {
status.Capacity[key] = val
continue
}
}
oldStatus.Capacity = status.Capacity
oldStatus.Allocatable = status.Allocatable
oldStatus.Version = status.Version
return nil
}
}

View File

@@ -10,13 +10,8 @@ import (
clusterv1 "open-cluster-management.io/api/cluster/v1"
testinghelpers "open-cluster-management.io/registration/pkg/helpers/testing"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
kubeinformers "k8s.io/client-go/informers"
kubefake "k8s.io/client-go/kubernetes/fake"
kubeversion "k8s.io/client-go/pkg/version"
clienttesting "k8s.io/client-go/testing"
)
@@ -24,7 +19,6 @@ func TestSyncManagedCluster(t *testing.T) {
cases := []struct {
name string
startingObjects []runtime.Object
nodes []runtime.Object
validateActions func(t *testing.T, actions []clienttesting.Action)
expectedErr string
}{
@@ -42,9 +36,6 @@ func TestSyncManagedCluster(t *testing.T) {
{
name: "sync an accepted managed cluster",
startingObjects: []runtime.Object{testinghelpers.NewAcceptedManagedCluster()},
nodes: []runtime.Object{
testinghelpers.NewNode("testnode1", testinghelpers.NewResourceList(32, 64), testinghelpers.NewResourceList(16, 32)),
},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
expectedCondition := metav1.Condition{
Type: clusterv1.ManagedClusterConditionJoined,
@@ -52,110 +43,9 @@ func TestSyncManagedCluster(t *testing.T) {
Reason: "ManagedClusterJoined",
Message: "Managed cluster joined",
}
expectedStatus := clusterv1.ManagedClusterStatus{
Version: clusterv1.ManagedClusterVersion{
Kubernetes: kubeversion.Get().GitVersion,
},
Capacity: clusterv1.ResourceList{
clusterv1.ResourceCPU: *resource.NewQuantity(int64(32), resource.DecimalExponent),
clusterv1.ResourceMemory: *resource.NewQuantity(int64(1024*1024*64), resource.BinarySI),
},
Allocatable: clusterv1.ResourceList{
clusterv1.ResourceCPU: *resource.NewQuantity(int64(16), resource.DecimalExponent),
clusterv1.ResourceMemory: *resource.NewQuantity(int64(1024*1024*32), resource.BinarySI),
},
}
testinghelpers.AssertActions(t, actions, "get", "update")
actual := actions[1].(clienttesting.UpdateActionImpl).Object
testinghelpers.AssertManagedClusterCondition(t, actual.(*clusterv1.ManagedCluster).Status.Conditions, expectedCondition)
testinghelpers.AssertManagedClusterStatus(t, actual.(*clusterv1.ManagedCluster).Status, expectedStatus)
},
},
{
name: "sync a joined managed cluster without status change",
startingObjects: []runtime.Object{
testinghelpers.NewManagedClusterWithStatus(testinghelpers.NewResourceList(32, 64), testinghelpers.NewResourceList(16, 32)),
},
nodes: []runtime.Object{
testinghelpers.NewNode("testnode1", testinghelpers.NewResourceList(32, 64), testinghelpers.NewResourceList(16, 32)),
},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testinghelpers.AssertActions(t, actions, "get")
},
},
{
name: "sync a joined managed cluster with status change",
startingObjects: []runtime.Object{testinghelpers.NewJoinedManagedCluster()},
nodes: []runtime.Object{
testinghelpers.NewNode("testnode1", testinghelpers.NewResourceList(32, 64), testinghelpers.NewResourceList(16, 32)),
testinghelpers.NewNode("testnode2", testinghelpers.NewResourceList(32, 64), testinghelpers.NewResourceList(16, 32)),
},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
expectedCondition := metav1.Condition{
Type: clusterv1.ManagedClusterConditionJoined,
Status: metav1.ConditionTrue,
Reason: "ManagedClusterJoined",
Message: "Managed cluster joined",
}
expectedStatus := clusterv1.ManagedClusterStatus{
Version: clusterv1.ManagedClusterVersion{
Kubernetes: kubeversion.Get().GitVersion,
},
Capacity: clusterv1.ResourceList{
clusterv1.ResourceCPU: *resource.NewQuantity(int64(64), resource.DecimalExponent),
clusterv1.ResourceMemory: *resource.NewQuantity(int64(1024*1024*128), resource.BinarySI),
},
Allocatable: clusterv1.ResourceList{
clusterv1.ResourceCPU: *resource.NewQuantity(int64(32), resource.DecimalExponent),
clusterv1.ResourceMemory: *resource.NewQuantity(int64(1024*1024*64), resource.BinarySI),
},
}
testinghelpers.AssertActions(t, actions, "get", "update")
actual := actions[1].(clienttesting.UpdateActionImpl).Object
testinghelpers.AssertManagedClusterCondition(t, actual.(*clusterv1.ManagedCluster).Status.Conditions, expectedCondition)
testinghelpers.AssertManagedClusterStatus(t, actual.(*clusterv1.ManagedCluster).Status, expectedStatus)
},
},
{
name: "merge a joined managed cluster status",
startingObjects: []runtime.Object{
testinghelpers.NewManagedClusterWithStatus(
corev1.ResourceList{
"sockets": *resource.NewQuantity(int64(1200), resource.DecimalExponent),
"cores": *resource.NewQuantity(int64(128), resource.DecimalExponent),
},
testinghelpers.NewResourceList(16, 32)),
},
nodes: []runtime.Object{
testinghelpers.NewNode("testnode1", testinghelpers.NewResourceList(32, 64), testinghelpers.NewResourceList(16, 32)),
testinghelpers.NewNode("testnode2", testinghelpers.NewResourceList(32, 64), testinghelpers.NewResourceList(16, 32)),
},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
expectedCondition := metav1.Condition{
Type: clusterv1.ManagedClusterConditionJoined,
Status: metav1.ConditionTrue,
Reason: "ManagedClusterJoined",
Message: "Managed cluster joined",
}
expectedStatus := clusterv1.ManagedClusterStatus{
Version: clusterv1.ManagedClusterVersion{
Kubernetes: kubeversion.Get().GitVersion,
},
Capacity: clusterv1.ResourceList{
"sockets": *resource.NewQuantity(int64(1200), resource.DecimalExponent),
"cores": *resource.NewQuantity(int64(128), resource.DecimalExponent),
clusterv1.ResourceCPU: *resource.NewQuantity(int64(64), resource.DecimalExponent),
clusterv1.ResourceMemory: *resource.NewQuantity(int64(1024*1024*128), resource.BinarySI),
},
Allocatable: clusterv1.ResourceList{
clusterv1.ResourceCPU: *resource.NewQuantity(int64(32), resource.DecimalExponent),
clusterv1.ResourceMemory: *resource.NewQuantity(int64(1024*1024*64), resource.BinarySI),
},
}
testinghelpers.AssertActions(t, actions, "get", "update")
actual := actions[1].(clienttesting.UpdateActionImpl).Object
testinghelpers.AssertManagedClusterCondition(t, actual.(*clusterv1.ManagedCluster).Status.Conditions, expectedCondition)
testinghelpers.AssertManagedClusterStatus(t, actual.(*clusterv1.ManagedCluster).Status, expectedStatus)
},
},
}
@@ -169,19 +59,10 @@ func TestSyncManagedCluster(t *testing.T) {
clusterStore.Add(cluster)
}
kubeClient := kubefake.NewSimpleClientset(c.nodes...)
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Minute*10)
nodeStore := kubeInformerFactory.Core().V1().Nodes().Informer().GetStore()
for _, node := range c.nodes {
nodeStore.Add(node)
}
ctrl := managedClusterJoiningController{
clusterName: testinghelpers.TestManagedClusterName,
hubClusterClient: clusterClient,
hubClusterLister: clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(),
discoveryClient: kubeClient.Discovery(),
nodeLister: kubeInformerFactory.Core().V1().Nodes().Lister(),
}
syncErr := ctrl.sync(context.TODO(), testinghelpers.NewFakeSyncContext(t, ""))

View File

@@ -0,0 +1,199 @@
package managedcluster
import (
"context"
"fmt"
"net/http"
"time"
clientset "open-cluster-management.io/api/client/cluster/clientset/versioned"
clusterv1informer "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1"
clusterv1listers "open-cluster-management.io/api/client/cluster/listers/cluster/v1"
clusterv1 "open-cluster-management.io/api/cluster/v1"
"open-cluster-management.io/registration/pkg/helpers"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
discovery "k8s.io/client-go/discovery"
corev1informers "k8s.io/client-go/informers/core/v1"
corev1lister "k8s.io/client-go/listers/core/v1"
)
// managedClusterStatusController checks the kube-apiserver health on managed cluster to determine it whether is available
// and ensure that the managed cluster resources and version are up to date.
type managedClusterStatusController struct {
clusterName string
hubClusterClient clientset.Interface
hubClusterLister clusterv1listers.ManagedClusterLister
managedClusterDiscoveryClient discovery.DiscoveryInterface
nodeLister corev1lister.NodeLister
}
// NewManagedClusterStatusController creates a managed cluster status controller on managed cluster.
func NewManagedClusterStatusController(
clusterName string,
hubClusterClient clientset.Interface,
hubClusterInformer clusterv1informer.ManagedClusterInformer,
managedClusterDiscoveryClient discovery.DiscoveryInterface,
nodeInformer corev1informers.NodeInformer,
resyncInterval time.Duration,
recorder events.Recorder) factory.Controller {
c := &managedClusterStatusController{
clusterName: clusterName,
hubClusterClient: hubClusterClient,
hubClusterLister: hubClusterInformer.Lister(),
managedClusterDiscoveryClient: managedClusterDiscoveryClient,
nodeLister: nodeInformer.Lister(),
}
return factory.New().
WithInformers(hubClusterInformer.Informer(), nodeInformer.Informer()).
WithSync(c.sync).
ResyncEvery(resyncInterval).
ToController("ManagedClusterStatusController", recorder)
}
// sync updates managed cluster available condition by checking kube-apiserver health on managed cluster.
// if the kube-apiserver is health, it will ensure that managed cluster resources and version are up to date.
func (c *managedClusterStatusController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
if _, err := c.hubClusterLister.Get(c.clusterName); err != nil {
return fmt.Errorf("unable to get managed cluster %q from hub: %w", c.clusterName, err)
}
updateStatusFuncs := []helpers.UpdateManagedClusterStatusFunc{}
// check the kube-apiserver health on managed cluster.
condition := c.checkKubeAPIServerStatus(ctx)
// the managed cluster kube-apiserver is health, update its version and resources if necessary.
if condition.Status == metav1.ConditionTrue {
clusterVersion, err := c.getClusterVersion()
if err != nil {
return fmt.Errorf("unable to get server version of managed cluster %q: %w", c.clusterName, err)
}
capacity, allocatable, err := c.getClusterResources()
if err != nil {
return fmt.Errorf("unable to get capacity and allocatable of managed cluster %q: %w", c.clusterName, err)
}
updateStatusFuncs = append(updateStatusFuncs, updateClusterResourcesFn(clusterv1.ManagedClusterStatus{
Capacity: capacity,
Allocatable: allocatable,
Version: *clusterVersion,
}))
}
updateStatusFuncs = append(updateStatusFuncs, helpers.UpdateManagedClusterConditionFn(condition))
_, updated, err := helpers.UpdateManagedClusterStatus(ctx, c.hubClusterClient, c.clusterName, updateStatusFuncs...)
if err != nil {
return fmt.Errorf("unable to update status of managed cluster %q: %w", c.clusterName, err)
}
if updated {
syncCtx.Recorder().Eventf("ManagedClusterStatusUpdated", "the status of managed cluster %q has been updated, available condition is %q, due to %q",
c.clusterName, condition.Status, condition.Message)
}
return nil
}
// using readyz api to check the status of kube apiserver
func (c *managedClusterStatusController) checkKubeAPIServerStatus(ctx context.Context) metav1.Condition {
statusCode := 0
condition := metav1.Condition{Type: clusterv1.ManagedClusterConditionAvailable}
result := c.managedClusterDiscoveryClient.RESTClient().Get().AbsPath("/livez").Do(ctx).StatusCode(&statusCode)
if statusCode == http.StatusOK {
condition.Status = metav1.ConditionTrue
condition.Reason = "ManagedClusterAvailable"
condition.Message = "Managed cluster is available"
return condition
}
// for backward compatible, the livez endpoint is supported from Kubernetes 1.16, so if the livez is not found or
// forbidden, the healthz endpoint will be used.
if statusCode == http.StatusNotFound || statusCode == http.StatusForbidden {
result = c.managedClusterDiscoveryClient.RESTClient().Get().AbsPath("/healthz").Do(ctx).StatusCode(&statusCode)
if statusCode == http.StatusOK {
condition.Status = metav1.ConditionTrue
condition.Reason = "ManagedClusterAvailable"
condition.Message = "Managed cluster is available"
return condition
}
}
condition.Status = metav1.ConditionFalse
condition.Reason = "ManagedClusterKubeAPIServerUnavailable"
body, err := result.Raw()
if err == nil {
condition.Message = fmt.Sprintf("The kube-apiserver is not ok, status code: %d, %v", statusCode, string(body))
return condition
}
condition.Message = fmt.Sprintf("The kube-apiserver is not ok, status code: %d, %v", statusCode, err)
return condition
}
func (c *managedClusterStatusController) getClusterVersion() (*clusterv1.ManagedClusterVersion, error) {
serverVersion, err := c.managedClusterDiscoveryClient.ServerVersion()
if err != nil {
return nil, err
}
return &clusterv1.ManagedClusterVersion{Kubernetes: serverVersion.String()}, nil
}
func (c *managedClusterStatusController) getClusterResources() (capacity, allocatable clusterv1.ResourceList, err error) {
nodes, err := c.nodeLister.List(labels.Everything())
if err != nil {
return nil, nil, err
}
capacityList := make(map[clusterv1.ResourceName]resource.Quantity)
allocatableList := make(map[clusterv1.ResourceName]resource.Quantity)
for _, node := range nodes {
for key, value := range node.Status.Capacity {
if capacity, exist := capacityList[clusterv1.ResourceName(key)]; exist {
capacity.Add(value)
capacityList[clusterv1.ResourceName(key)] = capacity
} else {
capacityList[clusterv1.ResourceName(key)] = value
}
}
// the node is unschedulable, ignore its allocatable resources
if node.Spec.Unschedulable {
continue
}
for key, value := range node.Status.Allocatable {
if allocatable, exist := allocatableList[clusterv1.ResourceName(key)]; exist {
allocatable.Add(value)
allocatableList[clusterv1.ResourceName(key)] = allocatable
} else {
allocatableList[clusterv1.ResourceName(key)] = value
}
}
}
return capacityList, allocatableList, nil
}
func updateClusterResourcesFn(status clusterv1.ManagedClusterStatus) helpers.UpdateManagedClusterStatusFunc {
return func(oldStatus *clusterv1.ManagedClusterStatus) error {
// merge the old capacity to new capacity, if one old capacity entry does not exist in new capacity,
// we add it back to new capacity
for key, val := range oldStatus.Capacity {
if _, ok := status.Capacity[key]; !ok {
status.Capacity[key] = val
continue
}
}
oldStatus.Capacity = status.Capacity
oldStatus.Allocatable = status.Allocatable
oldStatus.Version = status.Version
return nil
}
}

View File

@@ -2,6 +2,7 @@ package managedcluster
import (
"context"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
@@ -12,9 +13,14 @@ import (
clusterv1 "open-cluster-management.io/api/cluster/v1"
testinghelpers "open-cluster-management.io/registration/pkg/helpers/testing"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/version"
discovery "k8s.io/client-go/discovery"
kubeinformers "k8s.io/client-go/informers"
kubefake "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
clienttesting "k8s.io/client-go/testing"
)
@@ -31,6 +37,21 @@ func TestHealthCheck(t *testing.T) {
w.WriteHeader(http.StatusOK)
return
}
if req.URL.Path == "/version" {
output, err := json.Marshal(version.Info{
GitVersion: "test-version",
})
if err != nil {
t.Errorf("unexpected encoding error: %v", err)
return
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write(output)
return
}
w.WriteHeader(serverResponse.httpStatus)
w.Write([]byte(serverResponse.responseMsg))
}))
@@ -41,6 +62,7 @@ func TestHealthCheck(t *testing.T) {
cases := []struct {
name string
clusters []runtime.Object
nodes []runtime.Object
httpStatus int
responseMsg string
validateActions func(t *testing.T, actions []clienttesting.Action)
@@ -70,8 +92,11 @@ func TestHealthCheck(t *testing.T) {
},
},
{
name: "kube-apiserver is ok",
clusters: []runtime.Object{testinghelpers.NewAcceptedManagedCluster()},
name: "kube-apiserver is ok",
clusters: []runtime.Object{testinghelpers.NewAcceptedManagedCluster()},
nodes: []runtime.Object{
testinghelpers.NewNode("testnode1", testinghelpers.NewResourceList(32, 64), testinghelpers.NewResourceList(16, 32)),
},
httpStatus: http.StatusOK,
validateActions: func(t *testing.T, actions []clienttesting.Action) {
expectedCondition := metav1.Condition{
@@ -80,14 +105,29 @@ func TestHealthCheck(t *testing.T) {
Reason: "ManagedClusterAvailable",
Message: "Managed cluster is available",
}
expectedStatus := clusterv1.ManagedClusterStatus{
Version: clusterv1.ManagedClusterVersion{
Kubernetes: "test-version",
},
Capacity: clusterv1.ResourceList{
clusterv1.ResourceCPU: *resource.NewQuantity(int64(32), resource.DecimalExponent),
clusterv1.ResourceMemory: *resource.NewQuantity(int64(1024*1024*64), resource.BinarySI),
},
Allocatable: clusterv1.ResourceList{
clusterv1.ResourceCPU: *resource.NewQuantity(int64(16), resource.DecimalExponent),
clusterv1.ResourceMemory: *resource.NewQuantity(int64(1024*1024*32), resource.BinarySI),
},
}
testinghelpers.AssertActions(t, actions, "get", "update")
actual := actions[1].(clienttesting.UpdateActionImpl).Object
testinghelpers.AssertManagedClusterCondition(t, actual.(*clusterv1.ManagedCluster).Status.Conditions, expectedCondition)
testinghelpers.AssertManagedClusterStatus(t, actual.(*clusterv1.ManagedCluster).Status, expectedStatus)
},
},
{
name: "there is no livez endpoint",
clusters: []runtime.Object{testinghelpers.NewAcceptedManagedCluster()},
nodes: []runtime.Object{},
httpStatus: http.StatusNotFound,
validateActions: func(t *testing.T, actions []clienttesting.Action) {
expectedCondition := metav1.Condition{
@@ -104,6 +144,7 @@ func TestHealthCheck(t *testing.T) {
{
name: "livez is forbidden",
clusters: []runtime.Object{testinghelpers.NewAcceptedManagedCluster()},
nodes: []runtime.Object{},
httpStatus: http.StatusForbidden,
validateActions: func(t *testing.T, actions []clienttesting.Action) {
expectedCondition := metav1.Condition{
@@ -117,6 +158,49 @@ func TestHealthCheck(t *testing.T) {
testinghelpers.AssertManagedClusterCondition(t, actual.(*clusterv1.ManagedCluster).Status.Conditions, expectedCondition)
},
},
{
name: "merge managed cluster status",
clusters: []runtime.Object{
testinghelpers.NewManagedClusterWithStatus(
corev1.ResourceList{
"sockets": *resource.NewQuantity(int64(1200), resource.DecimalExponent),
"cores": *resource.NewQuantity(int64(128), resource.DecimalExponent),
},
testinghelpers.NewResourceList(16, 32)),
},
nodes: []runtime.Object{
testinghelpers.NewNode("testnode1", testinghelpers.NewResourceList(32, 64), testinghelpers.NewResourceList(16, 32)),
testinghelpers.NewNode("testnode2", testinghelpers.NewResourceList(32, 64), testinghelpers.NewResourceList(16, 32)),
},
httpStatus: http.StatusOK,
validateActions: func(t *testing.T, actions []clienttesting.Action) {
expectedCondition := metav1.Condition{
Type: clusterv1.ManagedClusterConditionJoined,
Status: metav1.ConditionTrue,
Reason: "ManagedClusterJoined",
Message: "Managed cluster joined",
}
expectedStatus := clusterv1.ManagedClusterStatus{
Version: clusterv1.ManagedClusterVersion{
Kubernetes: "test-version",
},
Capacity: clusterv1.ResourceList{
"sockets": *resource.NewQuantity(int64(1200), resource.DecimalExponent),
"cores": *resource.NewQuantity(int64(128), resource.DecimalExponent),
clusterv1.ResourceCPU: *resource.NewQuantity(int64(64), resource.DecimalExponent),
clusterv1.ResourceMemory: *resource.NewQuantity(int64(1024*1024*128), resource.BinarySI),
},
Allocatable: clusterv1.ResourceList{
clusterv1.ResourceCPU: *resource.NewQuantity(int64(32), resource.DecimalExponent),
clusterv1.ResourceMemory: *resource.NewQuantity(int64(1024*1024*64), resource.BinarySI),
},
}
testinghelpers.AssertActions(t, actions, "get", "update")
actual := actions[1].(clienttesting.UpdateActionImpl).Object
testinghelpers.AssertManagedClusterCondition(t, actual.(*clusterv1.ManagedCluster).Status.Conditions, expectedCondition)
testinghelpers.AssertManagedClusterStatus(t, actual.(*clusterv1.ManagedCluster).Status, expectedStatus)
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
@@ -127,14 +211,22 @@ func TestHealthCheck(t *testing.T) {
clusterStore.Add(cluster)
}
kubeClient := kubefake.NewSimpleClientset(c.nodes...)
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Minute*10)
nodeStore := kubeInformerFactory.Core().V1().Nodes().Informer().GetStore()
for _, node := range c.nodes {
nodeStore.Add(node)
}
serverResponse.httpStatus = c.httpStatus
serverResponse.responseMsg = c.responseMsg
ctrl := &managedClusterHealthCheckController{
ctrl := &managedClusterStatusController{
clusterName: testinghelpers.TestManagedClusterName,
hubClusterClient: clusterClient,
hubClusterLister: clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(),
managedClusterDiscoveryClient: discoveryClient,
nodeLister: kubeInformerFactory.Core().V1().Nodes().Lister(),
}
syncErr := ctrl.sync(context.TODO(), testinghelpers.NewFakeSyncContext(t, ""))
testinghelpers.AssertError(t, syncErr, c.expectedErr)

View File

@@ -264,8 +264,6 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
o.ClusterName,
hubClusterClient,
hubClusterInformerFactory.Cluster().V1().ManagedClusters(),
spokeKubeClient.Discovery(),
spokeKubeInformerFactory.Core().V1().Nodes(),
controllerContext.EventRecorder,
)
@@ -277,12 +275,13 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
controllerContext.EventRecorder,
)
// create ManagedClusterHealthCheckController to check the spoke cluster health
managedClusterHealthCheckController := managedcluster.NewManagedClusterHealthCheckController(
// create NewManagedClusterStatusController to update the spoke cluster status
managedClusterHealthCheckController := managedcluster.NewManagedClusterStatusController(
o.ClusterName,
hubClusterClient,
hubClusterInformerFactory.Cluster().V1().ManagedClusters(),
spokeKubeClient.Discovery(),
spokeKubeInformerFactory.Core().V1().Nodes(),
o.ClusterHealthCheckPeriod,
controllerContext.EventRecorder,
)

View File

@@ -0,0 +1,158 @@
package integration_test
import (
"context"
"fmt"
"path"
"time"
"github.com/onsi/ginkgo"
"github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/api/meta"
clusterv1 "open-cluster-management.io/api/cluster/v1"
"open-cluster-management.io/registration/pkg/spoke"
"open-cluster-management.io/registration/test/integration/util"
"github.com/openshift/library-go/pkg/controller/controllercmd"
)
var _ = ginkgo.Describe("Collecting Node Resource", func() {
ginkgo.It("managed cluster node resource should be collected successfully", func() {
var err error
// create one node
capacity := util.NewResourceList(32, 64)
allocatable := util.NewResourceList(16, 32)
err = util.CreateNode(kubeClient, "node1", capacity, allocatable)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
managedClusterName := "resorucetest-managedcluster"
hubKubeconfigSecret := "resorucetest-hub-kubeconfig-secret"
hubKubeconfigDir := path.Join(util.TestDir, "resorucetest", "hub-kubeconfig")
// run registration agent
go func() {
agentOptions := spoke.SpokeAgentOptions{
ClusterName: managedClusterName,
BootstrapKubeconfig: bootstrapKubeConfigFile,
HubKubeconfigSecret: hubKubeconfigSecret,
HubKubeconfigDir: hubKubeconfigDir,
ClusterHealthCheckPeriod: 1 * time.Minute,
}
err := agentOptions.RunSpokeAgent(context.Background(), &controllercmd.ControllerContext{
KubeConfig: spokeCfg,
EventRecorder: util.NewIntegrationTestEventRecorder("resorucetest"),
})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}()
// the spoke cluster and csr should be created after bootstrap
gomega.Eventually(func() bool {
if _, err := util.GetManagedCluster(clusterClient, managedClusterName); err != nil {
return false
}
return true
}, eventuallyTimeout, eventuallyInterval).Should(gomega.BeTrue())
gomega.Eventually(func() bool {
if _, err := util.FindUnapprovedSpokeCSR(kubeClient, managedClusterName); err != nil {
return false
}
return true
}, eventuallyTimeout, eventuallyInterval).Should(gomega.BeTrue())
// the spoke cluster should has finalizer that is added by hub controller
gomega.Eventually(func() bool {
spokeCluster, err := util.GetManagedCluster(clusterClient, managedClusterName)
if err != nil {
return false
}
if len(spokeCluster.Finalizers) != 1 {
return false
}
if spokeCluster.Finalizers[0] != "cluster.open-cluster-management.io/api-resource-cleanup" {
return false
}
return true
}, eventuallyTimeout, eventuallyInterval).Should(gomega.BeTrue())
// simulate hub cluster admin to accept the managedcluster and approve the csr
err = util.AcceptManagedCluster(clusterClient, managedClusterName)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
err = util.ApproveSpokeClusterCSR(kubeClient, managedClusterName, time.Hour*24)
gomega.Expect(err).NotTo(gomega.HaveOccurred())
// the managed cluster should have accepted condition after it is accepted
gomega.Eventually(func() bool {
spokeCluster, err := util.GetManagedCluster(clusterClient, managedClusterName)
if err != nil {
return false
}
accpeted := meta.FindStatusCondition(spokeCluster.Status.Conditions, clusterv1.ManagedClusterConditionHubAccepted)
if accpeted == nil {
return false
}
return true
}, eventuallyTimeout, eventuallyInterval).Should(gomega.BeTrue())
// the hub kubeconfig secret should be filled after the csr is approved
gomega.Eventually(func() bool {
if _, err := util.GetFilledHubKubeConfigSecret(kubeClient, testNamespace, hubKubeconfigSecret); err != nil {
return false
}
return true
}, eventuallyTimeout, eventuallyInterval).Should(gomega.BeTrue())
// the resource of spoke cluster should be updated finally
gomega.Eventually(func() bool {
spokeCluster, err := util.GetManagedCluster(clusterClient, managedClusterName)
if err != nil {
return false
}
if !util.CmpResourceQuantity("cpu", capacity, spokeCluster.Status.Capacity) {
fmt.Printf("expected cpu capacity %#v but got: %#v\n", capacity["cpu"], spokeCluster.Status.Capacity["cpu"])
return false
}
if !util.CmpResourceQuantity("memory", capacity, spokeCluster.Status.Capacity) {
return false
}
if !util.CmpResourceQuantity("cpu", allocatable, spokeCluster.Status.Allocatable) {
return false
}
if !util.CmpResourceQuantity("memory", allocatable, spokeCluster.Status.Allocatable) {
return false
}
return true
}, eventuallyTimeout, eventuallyInterval).Should(gomega.BeTrue())
// cordon the node
err = util.CordonNode(kubeClient, "node1")
gomega.Expect(err).NotTo(gomega.HaveOccurred())
// the resource of spoke cluster should be updated finally
gomega.Eventually(func() bool {
spokeCluster, err := util.GetManagedCluster(clusterClient, managedClusterName)
if err != nil {
return false
}
if !util.CmpResourceQuantity("cpu", capacity, spokeCluster.Status.Capacity) {
fmt.Printf("expected cpu capacity %#v but got: %#v\n", capacity["cpu"], spokeCluster.Status.Capacity["cpu"])
return false
}
if !util.CmpResourceQuantity("memory", capacity, spokeCluster.Status.Capacity) {
return false
}
// after cordoned the node, there should be no allocatable resource
if len(spokeCluster.Status.Allocatable) != 0 {
return false
}
return true
}, eventuallyTimeout, eventuallyInterval).Should(gomega.BeTrue())
})
})

View File

@@ -26,6 +26,7 @@ import (
certificates "k8s.io/api/certificates/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
@@ -516,6 +517,51 @@ func AcceptManagedCluster(clusterClient clusterclientset.Interface, spokeCluster
return err
}
func CreateNode(kubeClient kubernetes.Interface, name string, capacity, allocatable corev1.ResourceList) error {
node := &corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Status: corev1.NodeStatus{
Capacity: capacity,
Allocatable: allocatable,
},
}
_, err := kubeClient.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{})
return err
}
func CordonNode(kubeClient kubernetes.Interface, name string) error {
node, err := kubeClient.CoreV1().Nodes().Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return err
}
node = node.DeepCopy()
node.Spec.Unschedulable = true
_, err = kubeClient.CoreV1().Nodes().Update(context.TODO(), node, metav1.UpdateOptions{})
return err
}
func NewResourceList(cpu, mem int) corev1.ResourceList {
return corev1.ResourceList{
corev1.ResourceCPU: *resource.NewQuantity(int64(cpu), resource.DecimalExponent),
corev1.ResourceMemory: *resource.NewQuantity(int64(1024*1024*mem), resource.BinarySI),
}
}
func CmpResourceQuantity(key string, nodeResourceList corev1.ResourceList, clusterResorceList clusterv1.ResourceList) bool {
nodeResource, ok := nodeResourceList[corev1.ResourceName(key)]
if !ok {
return false
}
clusterResouce, ok := clusterResorceList[clusterv1.ResourceName(key)]
if !ok {
return false
}
return nodeResource.Equal(clusterResouce)
}
func NewIntegrationTestEventRecorder(componet string) events.Recorder {
return &IntegrationTestEventRecorder{component: componet}
}