Files
open-cluster-management/pkg/spoke/spokecluster/controller.go
2020-04-29 20:52:57 +08:00

171 lines
6.5 KiB
Go

package spokecluster
import (
"context"
"fmt"
"time"
"github.com/open-cluster-management/registration/pkg/helpers"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
corev1informers "k8s.io/client-go/informers/core/v1"
corev1lister "k8s.io/client-go/listers/core/v1"
"k8s.io/klog"
clientset "github.com/open-cluster-management/api/client/cluster/clientset/versioned"
clusterv1informer "github.com/open-cluster-management/api/client/cluster/informers/externalversions/cluster/v1"
clusterv1listers "github.com/open-cluster-management/api/client/cluster/listers/cluster/v1"
clusterv1 "github.com/open-cluster-management/api/cluster/v1"
discovery "k8s.io/client-go/discovery"
)
// spokeClusterController reconciles instances of SpokeCluster on the spoke cluster.
type spokeClusterController struct {
clusterName string
hubClusterClient clientset.Interface
hubClusterLister clusterv1listers.SpokeClusterLister
spokeDiscoveryClient discovery.DiscoveryInterface
spokeNodeLister corev1lister.NodeLister
}
// NewSpokeClusterController creates a new spoke cluster controller on the spoke cluster.
func NewSpokeClusterController(
clusterName string,
hubClusterClient clientset.Interface,
hubSpokeClusterInformer clusterv1informer.SpokeClusterInformer,
spokeDiscoveryClient discovery.DiscoveryInterface,
spokeNodeInformer corev1informers.NodeInformer,
recorder events.Recorder) factory.Controller {
c := &spokeClusterController{
clusterName: clusterName,
hubClusterClient: hubClusterClient,
hubClusterLister: hubSpokeClusterInformer.Lister(),
spokeDiscoveryClient: spokeDiscoveryClient,
spokeNodeLister: spokeNodeInformer.Lister(),
}
return factory.New().
// TODO need to have conditional node capacity recalculation based on number of nodes here and decoupling
// the node capacity calculation to another controller.
WithInformers(hubSpokeClusterInformer.Informer(), spokeNodeInformer.Informer()).
WithSync(c.sync).
ResyncEvery(5*time.Minute).
ToController("SpokeClusterController", recorder)
}
// sync maintains the spoke-side status of a SpokeCluster, it maintains the SpokeClusterJoined condition according to
// the value of the SpokeClusterHubAccepted condition and ensures resource and version are up to date.
func (c spokeClusterController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
spokeCluster, err := c.hubClusterLister.Get(c.clusterName)
if err != nil {
return fmt.Errorf("unable to get spoke cluster with name %q from hub: %w", c.clusterName, err)
}
// current spoke cluster is not accepted, do nothing.
acceptedCondition := helpers.FindSpokeClusterCondition(spokeCluster.Status.Conditions, clusterv1.SpokeClusterConditionHubAccepted)
if !helpers.IsConditionTrue(acceptedCondition) {
syncCtx.Recorder().Eventf("SpokeClusterIsNotAccepted", "Spoke cluster %q is not accepted by hub yet", c.clusterName)
return nil
}
// current spoke cluster is accepted, update its status if necessary.
capacity, allocatable, err := c.getClusterResources()
if err != nil {
return fmt.Errorf("unable to get capacity and allocatable of spoke cluster %q: %w", c.clusterName, err)
}
spokeVersion, err := c.getSpokeVersion()
if err != nil {
return fmt.Errorf("unable to get server version of spoke cluster %q: %w", c.clusterName, err)
}
updateStatusFuncs := []helpers.UpdateSpokeClusterStatusFunc{}
joinedCondition := helpers.FindSpokeClusterCondition(spokeCluster.Status.Conditions, clusterv1.SpokeClusterConditionJoined)
joined := helpers.IsConditionTrue(joinedCondition)
// current spoke cluster did not join the hub cluster, join it.
if !joined {
updateStatusFuncs = append(updateStatusFuncs, helpers.UpdateSpokeClusterConditionFn(clusterv1.StatusCondition{
Type: clusterv1.SpokeClusterConditionJoined,
Status: metav1.ConditionTrue,
Reason: "SpokeClusterJoined",
Message: "Spoke cluster joined",
}))
}
updateStatusFuncs = append(updateStatusFuncs, updateClusterResourcesFn(clusterv1.SpokeClusterStatus{
Capacity: capacity,
Allocatable: allocatable,
Version: *spokeVersion,
}))
_, updated, err := helpers.UpdateSpokeClusterStatus(ctx, c.hubClusterClient, c.clusterName, updateStatusFuncs...)
if err != nil {
return fmt.Errorf("unable to update status of spoke cluster %q: %w", c.clusterName, err)
}
if updated {
if !joined {
syncCtx.Recorder().Eventf("SpokeClusterJoined", "Spoke cluster %q joined hub", c.clusterName)
}
klog.V(4).Infof("The status of spoke cluster %q has been updated", c.clusterName)
}
return nil
}
func (c *spokeClusterController) getSpokeVersion() (*clusterv1.SpokeVersion, error) {
serverVersion, err := c.spokeDiscoveryClient.ServerVersion()
if err != nil {
return nil, err
}
return &clusterv1.SpokeVersion{Kubernetes: serverVersion.String()}, nil
}
func (c *spokeClusterController) getClusterResources() (capacity, allocatable clusterv1.ResourceList, err error) {
nodes, err := c.spokeNodeLister.List(labels.Everything())
if err != nil {
return nil, nil, err
}
cpuCapacity := *resource.NewQuantity(int64(0), resource.DecimalSI)
memoryCapacity := *resource.NewQuantity(int64(0), resource.BinarySI)
cpuAllocatable := *resource.NewQuantity(int64(0), resource.DecimalSI)
memoryAllocatable := *resource.NewQuantity(int64(0), resource.BinarySI)
for _, node := range nodes {
cpuCapacity.Add(*node.Status.Capacity.Cpu())
memoryCapacity.Add(*node.Status.Capacity.Memory())
cpuAllocatable.Add(*node.Status.Allocatable.Cpu())
memoryAllocatable.Add(*node.Status.Allocatable.Memory())
}
return clusterv1.ResourceList{
clusterv1.ResourceCPU: cpuCapacity,
clusterv1.ResourceMemory: formatQuantityToMi(memoryCapacity),
},
clusterv1.ResourceList{
clusterv1.ResourceCPU: cpuAllocatable,
clusterv1.ResourceMemory: formatQuantityToMi(memoryAllocatable),
}, nil
}
func formatQuantityToMi(q resource.Quantity) resource.Quantity {
raw, _ := q.AsInt64()
raw /= (1024 * 1024)
rq, err := resource.ParseQuantity(fmt.Sprintf("%dMi", raw))
if err != nil {
return q
}
return rq
}
func updateClusterResourcesFn(status clusterv1.SpokeClusterStatus) helpers.UpdateSpokeClusterStatusFunc {
return func(oldStatus *clusterv1.SpokeClusterStatus) error {
oldStatus.Capacity = status.Capacity
oldStatus.Allocatable = status.Allocatable
oldStatus.Version = status.Version
return nil
}
}