mirror of
https://github.com/open-cluster-management-io/ocm.git
synced 2026-05-22 00:54:00 +00:00
487 lines
18 KiB
Go
487 lines
18 KiB
Go
package scheduling
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"reflect"
|
|
"sort"
|
|
"strings"
|
|
|
|
"github.com/openshift/library-go/pkg/controller/factory"
|
|
"github.com/openshift/library-go/pkg/operator/events"
|
|
errorhelpers "github.com/openshift/library-go/pkg/operator/v1helpers"
|
|
corev1 "k8s.io/api/core/v1"
|
|
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
|
"k8s.io/apimachinery/pkg/api/errors"
|
|
"k8s.io/apimachinery/pkg/api/meta"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/labels"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/selection"
|
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
cache "k8s.io/client-go/tools/cache"
|
|
kevents "k8s.io/client-go/tools/events"
|
|
"k8s.io/klog/v2"
|
|
clusterclient "open-cluster-management.io/api/client/cluster/clientset/versioned"
|
|
clusterinformerv1 "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1"
|
|
clusterinformerv1alpha1 "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1alpha1"
|
|
clusterlisterv1 "open-cluster-management.io/api/client/cluster/listers/cluster/v1"
|
|
clusterlisterv1alpha1 "open-cluster-management.io/api/client/cluster/listers/cluster/v1alpha1"
|
|
clusterapiv1 "open-cluster-management.io/api/cluster/v1"
|
|
clusterapiv1alpha1 "open-cluster-management.io/api/cluster/v1alpha1"
|
|
)
|
|
|
|
const (
|
|
clusterSetLabel = "cluster.open-cluster-management.io/clusterset"
|
|
placementLabel = "cluster.open-cluster-management.io/placement"
|
|
schedulingControllerName = "SchedulingController"
|
|
maxNumOfClusterDecisions = 100
|
|
)
|
|
|
|
type enqueuePlacementFunc func(namespace, name string)
|
|
|
|
// schedulingController schedules cluster decisions for Placements
|
|
type schedulingController struct {
|
|
clusterClient clusterclient.Interface
|
|
clusterLister clusterlisterv1.ManagedClusterLister
|
|
clusterSetLister clusterlisterv1alpha1.ManagedClusterSetLister
|
|
clusterSetBindingLister clusterlisterv1alpha1.ManagedClusterSetBindingLister
|
|
placementLister clusterlisterv1alpha1.PlacementLister
|
|
placementDecisionLister clusterlisterv1alpha1.PlacementDecisionLister
|
|
enqueuePlacementFunc enqueuePlacementFunc
|
|
scheduler Scheduler
|
|
recorder kevents.EventRecorder
|
|
}
|
|
|
|
// NewDecisionSchedulingController return an instance of schedulingController
|
|
func NewSchedulingController(
|
|
clusterClient clusterclient.Interface,
|
|
clusterInformer clusterinformerv1.ManagedClusterInformer,
|
|
clusterSetInformer clusterinformerv1alpha1.ManagedClusterSetInformer,
|
|
clusterSetBindingInformer clusterinformerv1alpha1.ManagedClusterSetBindingInformer,
|
|
placementInformer clusterinformerv1alpha1.PlacementInformer,
|
|
placementDecisionInformer clusterinformerv1alpha1.PlacementDecisionInformer,
|
|
scheduler Scheduler,
|
|
recorder events.Recorder, krecorder kevents.EventRecorder,
|
|
) factory.Controller {
|
|
syncCtx := factory.NewSyncContext(schedulingControllerName, recorder)
|
|
enqueuePlacementFunc := func(namespace, name string) {
|
|
syncCtx.Queue().Add(fmt.Sprintf("%s/%s", namespace, name))
|
|
}
|
|
|
|
// build controller
|
|
c := &schedulingController{
|
|
clusterClient: clusterClient,
|
|
clusterLister: clusterInformer.Lister(),
|
|
clusterSetLister: clusterSetInformer.Lister(),
|
|
clusterSetBindingLister: clusterSetBindingInformer.Lister(),
|
|
placementLister: placementInformer.Lister(),
|
|
placementDecisionLister: placementDecisionInformer.Lister(),
|
|
enqueuePlacementFunc: enqueuePlacementFunc,
|
|
recorder: krecorder,
|
|
scheduler: scheduler,
|
|
}
|
|
|
|
// setup event handler for cluster informer.
|
|
// Once a cluster changes, clusterEventHandler enqueues all placements which are
|
|
// impacted potentially for further reconciliation. It might not function before the
|
|
// informers/listers of clusterset/clustersetbinding/placement are synced during
|
|
// controller booting. But that should not cause any problem because all existing
|
|
// placements will be enqueued by the controller anyway when booting.
|
|
clusterInformer.Informer().AddEventHandler(&clusterEventHandler{
|
|
clusterSetLister: clusterSetInformer.Lister(),
|
|
clusterSetBindingLister: clusterSetBindingInformer.Lister(),
|
|
placementLister: placementInformer.Lister(),
|
|
enqueuePlacementFunc: enqueuePlacementFunc,
|
|
})
|
|
|
|
// setup event handler for clusterset informer
|
|
// Once a clusterset changes, clusterSetEventHandler enqueues all placements which are
|
|
// impacted potentially for further reconciliation. It might not function before the
|
|
// informers/listers of clustersetbinding/placement are synced during controller
|
|
// booting. But that should not cause any problem because all existing placements will
|
|
// be enqueued by the controller anyway when booting.
|
|
clusterSetInformer.Informer().AddEventHandler(&clusterSetEventHandler{
|
|
clusterSetBindingLister: clusterSetBindingInformer.Lister(),
|
|
placementLister: placementInformer.Lister(),
|
|
enqueuePlacementFunc: enqueuePlacementFunc,
|
|
})
|
|
|
|
// setup event handler for clustersetbinding informer
|
|
// Once a clustersetbinding changes, clusterSetBindingEventHandler enqueues all placements
|
|
// which are impacted potentially for further reconciliation. It might not function before
|
|
// the informers/listers of clusterset/placement are synced during controller booting. But
|
|
// that should not cause any problem because all existing placements will be enqueued by
|
|
// the controller anyway when booting.
|
|
clusterSetBindingInformer.Informer().AddEventHandler(&clusterSetBindingEventHandler{
|
|
clusterSetLister: clusterSetInformer.Lister(),
|
|
placementLister: placementInformer.Lister(),
|
|
enqueuePlacementFunc: enqueuePlacementFunc,
|
|
})
|
|
|
|
return factory.New().
|
|
WithSyncContext(syncCtx).
|
|
WithInformersQueueKeyFunc(func(obj runtime.Object) string {
|
|
key, _ := cache.MetaNamespaceKeyFunc(obj)
|
|
return key
|
|
}, placementInformer.Informer()).
|
|
WithFilteredEventsInformersQueueKeyFunc(func(obj runtime.Object) string {
|
|
accessor, _ := meta.Accessor(obj)
|
|
labels := accessor.GetLabels()
|
|
placementName := labels[placementLabel]
|
|
return fmt.Sprintf("%s/%s", accessor.GetNamespace(), placementName)
|
|
}, func(obj interface{}) bool {
|
|
accessor, err := meta.Accessor(obj)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
labels := accessor.GetLabels()
|
|
if _, ok := labels[placementLabel]; ok {
|
|
return true
|
|
}
|
|
return false
|
|
}, placementDecisionInformer.Informer()).
|
|
WithBareInformers(clusterInformer.Informer(), clusterSetInformer.Informer(), clusterSetBindingInformer.Informer()).
|
|
WithSync(c.sync).
|
|
ToController(schedulingControllerName, recorder)
|
|
}
|
|
|
|
func (c *schedulingController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
|
|
queueKey := syncCtx.QueueKey()
|
|
namespace, name, err := cache.SplitMetaNamespaceKey(queueKey)
|
|
if err != nil {
|
|
// ignore placement whose key is not in format: namespace/name
|
|
utilruntime.HandleError(err)
|
|
return nil
|
|
}
|
|
|
|
klog.V(4).Infof("Reconciling placement %q", queueKey)
|
|
placement, err := c.placementLister.Placements(namespace).Get(name)
|
|
if errors.IsNotFound(err) {
|
|
// no work if placement is deleted
|
|
return nil
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// no work if placement is deleting
|
|
if !placement.DeletionTimestamp.IsZero() {
|
|
return nil
|
|
}
|
|
|
|
// get all valid clustersetbindings in the placement namespace
|
|
bindings, err := c.getValidManagedClusterSetBindings(placement.Namespace)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// get eligible clustersets for the placement
|
|
clusterSetNames := c.getEligibleClusterSets(placement, bindings)
|
|
|
|
// get available clusters for the placement
|
|
clusters, err := c.getAvailableClusters(clusterSetNames)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// schedule placement with scheduler
|
|
scheduleResult, err := c.scheduler.Schedule(ctx, placement, clusters)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = c.bind(ctx, placement, scheduleResult.Decisions(), scheduleResult.PrioritizerScores())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// update placement status if necessary to signal no bindings
|
|
return c.updateStatus(ctx, placement, clusterSetNames, len(bindings), len(clusters), scheduleResult)
|
|
}
|
|
|
|
// getManagedClusterSetBindings returns all bindings found in the placement namespace.
|
|
func (c *schedulingController) getValidManagedClusterSetBindings(placementNamespace string) ([]*clusterapiv1alpha1.ManagedClusterSetBinding, error) {
|
|
// get all clusterset bindings under the placement namespace
|
|
bindings, err := c.clusterSetBindingLister.ManagedClusterSetBindings(placementNamespace).List(labels.Everything())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(bindings) == 0 {
|
|
bindings = nil
|
|
}
|
|
|
|
validBindings := []*clusterapiv1alpha1.ManagedClusterSetBinding{}
|
|
for _, binding := range bindings {
|
|
// ignore clustersetbinding refers to a non-existent clusterset
|
|
_, err := c.clusterSetLister.Get(binding.Name)
|
|
if errors.IsNotFound(err) {
|
|
continue
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
validBindings = append(validBindings, binding)
|
|
}
|
|
|
|
return validBindings, nil
|
|
}
|
|
|
|
// getEligibleClusterSets returns the names of clusterset that eligible for the placement
|
|
func (c *schedulingController) getEligibleClusterSets(placement *clusterapiv1alpha1.Placement, bindings []*clusterapiv1alpha1.ManagedClusterSetBinding) []string {
|
|
// filter out invaid clustersetbindings
|
|
clusterSetNames := sets.NewString()
|
|
for _, binding := range bindings {
|
|
clusterSetNames.Insert(binding.Name)
|
|
}
|
|
|
|
// get intersection of clustesets bound to placement namespace and clustesets specified
|
|
// in placement spec
|
|
if len(placement.Spec.ClusterSets) != 0 {
|
|
clusterSetNames = clusterSetNames.Intersection(sets.NewString(placement.Spec.ClusterSets...))
|
|
}
|
|
|
|
return clusterSetNames.List()
|
|
}
|
|
|
|
// getAvailableClusters returns available clusters for the given placement. The clusters must
|
|
// 1) Be from clustersets bound to the placement namespace;
|
|
// 2) Belong to one of particular clustersets if .spec.clusterSets is specified;
|
|
func (c *schedulingController) getAvailableClusters(clusterSetNames []string) ([]*clusterapiv1.ManagedCluster, error) {
|
|
if len(clusterSetNames) == 0 {
|
|
return nil, nil
|
|
}
|
|
// list clusters from those clustersets
|
|
requirement, err := labels.NewRequirement(clusterSetLabel, selection.In, clusterSetNames)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
labelSelector := labels.NewSelector().Add(*requirement)
|
|
return c.clusterLister.List(labelSelector)
|
|
}
|
|
|
|
// updateStatus updates the status of the placement according to intermediate scheduling data.
|
|
func (c *schedulingController) updateStatus(
|
|
ctx context.Context,
|
|
placement *clusterapiv1alpha1.Placement,
|
|
eligibleClusterSetNames []string,
|
|
numOfBindings,
|
|
numOfAvailableClusters int,
|
|
scheduleResult ScheduleResult,
|
|
) error {
|
|
newPlacement := placement.DeepCopy()
|
|
newPlacement.Status.NumberOfSelectedClusters = int32(len(scheduleResult.Decisions()))
|
|
|
|
satisfiedCondition := newSatisfiedCondition(
|
|
placement.Spec.ClusterSets,
|
|
eligibleClusterSetNames,
|
|
numOfBindings,
|
|
numOfAvailableClusters,
|
|
len(scheduleResult.Decisions()),
|
|
scheduleResult.NumOfUnscheduled(),
|
|
)
|
|
|
|
meta.SetStatusCondition(&newPlacement.Status.Conditions, satisfiedCondition)
|
|
if reflect.DeepEqual(newPlacement.Status, placement.Status) {
|
|
return nil
|
|
}
|
|
_, err := c.clusterClient.ClusterV1alpha1().Placements(newPlacement.Namespace).UpdateStatus(ctx, newPlacement, metav1.UpdateOptions{})
|
|
return err
|
|
}
|
|
|
|
// newSatisfiedCondition returns a new condition with type PlacementConditionSatisfied
|
|
func newSatisfiedCondition(
|
|
clusterSetsInSpec []string,
|
|
eligibleClusterSets []string,
|
|
numOfBindings,
|
|
numOfAvailableClusters,
|
|
numOfFeasibleClusters,
|
|
numOfUnscheduledDecisions int,
|
|
) metav1.Condition {
|
|
condition := metav1.Condition{
|
|
Type: clusterapiv1alpha1.PlacementConditionSatisfied,
|
|
}
|
|
switch {
|
|
case numOfBindings == 0:
|
|
condition.Status = metav1.ConditionFalse
|
|
condition.Reason = "NoManagedClusterSetBindings"
|
|
condition.Message = "No valid ManagedClusterSetBindings found in placement namespace"
|
|
case len(eligibleClusterSets) == 0:
|
|
condition.Status = metav1.ConditionFalse
|
|
condition.Reason = "NoIntersection"
|
|
condition.Message = fmt.Sprintf("None of ManagedClusterSets [%s] is bound to placement namespace", strings.Join(clusterSetsInSpec, ","))
|
|
case numOfAvailableClusters == 0:
|
|
condition.Status = metav1.ConditionFalse
|
|
condition.Reason = "AllManagedClusterSetsEmpty"
|
|
condition.Message = fmt.Sprintf("All ManagedClusterSets [%s] have no member ManagedCluster", strings.Join(eligibleClusterSets, ","))
|
|
case numOfFeasibleClusters == 0:
|
|
condition.Status = metav1.ConditionFalse
|
|
condition.Reason = "NoManagedClusterMatched"
|
|
condition.Message = "No ManagedCluster matches any of the cluster predicate"
|
|
case numOfUnscheduledDecisions == 0:
|
|
condition.Status = metav1.ConditionTrue
|
|
condition.Reason = "AllDecisionsScheduled"
|
|
condition.Message = "All cluster decisions scheduled"
|
|
default:
|
|
condition.Status = metav1.ConditionFalse
|
|
condition.Reason = "NotAllDecisionsScheduled"
|
|
condition.Message = fmt.Sprintf("%d cluster decisions unscheduled", numOfUnscheduledDecisions)
|
|
}
|
|
return condition
|
|
}
|
|
|
|
// bind updates the cluster decisions in the status of the placementdecisions with the given
|
|
// cluster decision slice. New placementdecisions will be created if no one exists.
|
|
func (c *schedulingController) bind(
|
|
ctx context.Context,
|
|
placement *clusterapiv1alpha1.Placement,
|
|
clusterDecisions []clusterapiv1alpha1.ClusterDecision,
|
|
clusterScores PrioritizerScore,
|
|
) error {
|
|
// sort clusterdecisions by cluster name
|
|
sort.SliceStable(clusterDecisions, func(i, j int) bool {
|
|
return clusterDecisions[i].ClusterName < clusterDecisions[j].ClusterName
|
|
})
|
|
|
|
// split the cluster decisions into slices, the size of each slice cannot exceed
|
|
// maxNumOfClusterDecisions.
|
|
decisionSlices := [][]clusterapiv1alpha1.ClusterDecision{}
|
|
remainingDecisions := clusterDecisions
|
|
for index := 0; len(remainingDecisions) > 0; index++ {
|
|
var decisionSlice []clusterapiv1alpha1.ClusterDecision
|
|
switch {
|
|
case len(remainingDecisions) > maxNumOfClusterDecisions:
|
|
decisionSlice = remainingDecisions[0:maxNumOfClusterDecisions]
|
|
remainingDecisions = remainingDecisions[maxNumOfClusterDecisions:]
|
|
default:
|
|
decisionSlice = remainingDecisions
|
|
remainingDecisions = nil
|
|
}
|
|
decisionSlices = append(decisionSlices, decisionSlice)
|
|
}
|
|
|
|
// bind cluster decision slices to placementdecisions.
|
|
errs := []error{}
|
|
|
|
placementDecisionNames := sets.NewString()
|
|
for index, decisionSlice := range decisionSlices {
|
|
placementDecisionName := fmt.Sprintf("%s-decision-%d", placement.Name, index+1)
|
|
placementDecisionNames.Insert(placementDecisionName)
|
|
err := c.createOrUpdatePlacementDecision(
|
|
ctx, placement, placementDecisionName, decisionSlice, clusterScores)
|
|
if err != nil {
|
|
errs = append(errs, err)
|
|
}
|
|
}
|
|
if len(errs) != 0 {
|
|
return errorhelpers.NewMultiLineAggregate(errs)
|
|
}
|
|
|
|
// query all placementdecisions of the placement
|
|
requirement, err := labels.NewRequirement(placementLabel, selection.Equals, []string{placement.Name})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
labelSelector := labels.NewSelector().Add(*requirement)
|
|
placementDecisions, err := c.placementDecisionLister.PlacementDecisions(placement.Namespace).List(labelSelector)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// delete redundant placementdecisions
|
|
errs = []error{}
|
|
for _, placementDecision := range placementDecisions {
|
|
if placementDecisionNames.Has(placementDecision.Name) {
|
|
continue
|
|
}
|
|
err := c.clusterClient.ClusterV1alpha1().PlacementDecisions(
|
|
placementDecision.Namespace).Delete(ctx, placementDecision.Name, metav1.DeleteOptions{})
|
|
if errors.IsNotFound(err) {
|
|
continue
|
|
}
|
|
if err != nil {
|
|
errs = append(errs, err)
|
|
}
|
|
c.recorder.Eventf(
|
|
placement, placementDecision, corev1.EventTypeNormal,
|
|
"DecisionDelete", "DecisionDeleted",
|
|
"Decision %s is deleted with placement %s in namespace %s", placementDecision.Name, placement.Name, placement.Namespace)
|
|
}
|
|
return errorhelpers.NewMultiLineAggregate(errs)
|
|
}
|
|
|
|
// createOrUpdatePlacementDecision creates a new PlacementDecision if it does not exist and
|
|
// then updates the status with the given ClusterDecision slice if necessary
|
|
func (c *schedulingController) createOrUpdatePlacementDecision(
|
|
ctx context.Context,
|
|
placement *clusterapiv1alpha1.Placement,
|
|
placementDecisionName string,
|
|
clusterDecisions []clusterapiv1alpha1.ClusterDecision,
|
|
clusterScores PrioritizerScore,
|
|
) error {
|
|
if len(clusterDecisions) > maxNumOfClusterDecisions {
|
|
return fmt.Errorf("the number of clusterdecisions %q exceeds the max limitation %q", len(clusterDecisions), maxNumOfClusterDecisions)
|
|
}
|
|
|
|
placementDecision, err := c.placementDecisionLister.PlacementDecisions(placement.Namespace).Get(placementDecisionName)
|
|
switch {
|
|
case errors.IsNotFound(err):
|
|
// create the placementdecision if not exists
|
|
owner := metav1.NewControllerRef(placement, clusterapiv1alpha1.GroupVersion.WithKind("Placement"))
|
|
placementDecision = &clusterapiv1alpha1.PlacementDecision{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: placementDecisionName,
|
|
Namespace: placement.Namespace,
|
|
Labels: map[string]string{
|
|
placementLabel: placement.Name,
|
|
},
|
|
OwnerReferences: []metav1.OwnerReference{*owner},
|
|
},
|
|
}
|
|
var err error
|
|
placementDecision, err = c.clusterClient.ClusterV1alpha1().PlacementDecisions(
|
|
placement.Namespace).Create(ctx, placementDecision, metav1.CreateOptions{})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
c.recorder.Eventf(
|
|
placement, placementDecision, corev1.EventTypeNormal,
|
|
"DecisionCreate", "DecisionCreated",
|
|
"Decision %s is created with placement %s in namespace %s", placementDecision.Name, placement.Name, placement.Namespace)
|
|
case err != nil:
|
|
return err
|
|
}
|
|
|
|
// update the status of the placementdecision if decisions change
|
|
if apiequality.Semantic.DeepEqual(placementDecision.Status.Decisions, clusterDecisions) {
|
|
return nil
|
|
}
|
|
|
|
newPlacementDecision := placementDecision.DeepCopy()
|
|
newPlacementDecision.Status.Decisions = clusterDecisions
|
|
newPlacementDecision, err = c.clusterClient.ClusterV1alpha1().PlacementDecisions(newPlacementDecision.Namespace).
|
|
UpdateStatus(ctx, newPlacementDecision, metav1.UpdateOptions{})
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
c.recorder.Eventf(
|
|
placement, placementDecision, corev1.EventTypeNormal,
|
|
"DecisionUpdate", "DecisionUpdated",
|
|
"Decision %s is updated with placement %s in namespace %s", placementDecision.Name, placement.Name, placement.Namespace)
|
|
|
|
// update the event with prioritizer score.
|
|
scoreStr := ""
|
|
for k, v := range clusterScores {
|
|
scoreStr += fmt.Sprintf("%s:%d ", k, v)
|
|
}
|
|
c.recorder.Eventf(
|
|
placement, placementDecision, corev1.EventTypeNormal,
|
|
"ScoreUpdate", "ScoreUpdated",
|
|
scoreStr)
|
|
|
|
return nil
|
|
}
|