From 4004e354fa3f5dc447a81664496969f97cc9577b Mon Sep 17 00:00:00 2001 From: Jian Qiu Date: Fri, 9 Jul 2021 16:10:51 +0800 Subject: [PATCH] Use plugin to support steady/balance Signed-off-by: Jian Qiu --- pkg/controllers/manager.go | 15 +- pkg/controllers/scheduling/schedule.go | 219 +++++++++++++++--- pkg/controllers/scheduling/schedule_test.go | 134 ++++++++--- .../scheduling/scheduling_controller.go | 33 ++- .../scheduling/scheduling_controller_test.go | 24 +- pkg/helpers/testing/helpers.go | 29 +++ pkg/plugins/balance/balance.go | 75 ++++++ pkg/plugins/balance/balance_test.go | 91 ++++++++ pkg/plugins/interface.go | 61 +++++ .../predicate}/predicate.go | 32 ++- .../predicate}/predicate_test.go | 69 +++--- pkg/plugins/steady/steady.go | 74 ++++++ pkg/plugins/steady/steady_test.go | 78 +++++++ 13 files changed, 818 insertions(+), 116 deletions(-) create mode 100644 pkg/plugins/balance/balance.go create mode 100644 pkg/plugins/balance/balance_test.go create mode 100644 pkg/plugins/interface.go rename pkg/{controllers/scheduling => plugins/predicate}/predicate.go (77%) rename pkg/{controllers/scheduling => plugins/predicate}/predicate_test.go (72%) create mode 100644 pkg/plugins/steady/steady.go create mode 100644 pkg/plugins/steady/steady_test.go diff --git a/pkg/controllers/manager.go b/pkg/controllers/manager.go index 0c62e86b9..c3dbe9cea 100644 --- a/pkg/controllers/manager.go +++ b/pkg/controllers/manager.go @@ -5,8 +5,10 @@ import ( "time" "github.com/openshift/library-go/pkg/controller/controllercmd" - + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/events" clusterclient "open-cluster-management.io/api/client/cluster/clientset/versioned" + clusterscheme "open-cluster-management.io/api/client/cluster/clientset/versioned/scheme" clusterinformers "open-cluster-management.io/api/client/cluster/informers/externalversions" scheduling "open-cluster-management.io/placement/pkg/controllers/scheduling" ) @@ -17,8 +19,18 @@ func RunControllerManager(ctx context.Context, controllerContext *controllercmd. if err != nil { return err } + + kubeClient, err := kubernetes.NewForConfig(controllerContext.KubeConfig) + if err != nil { + return err + } + clusterInformers := clusterinformers.NewSharedInformerFactory(clusterClient, 10*time.Minute) + broadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: kubeClient.EventsV1()}) + + broadcaster.StartRecordingToSink(ctx.Done()) + schedulingController := scheduling.NewSchedulingController( clusterClient, clusterInformers.Cluster().V1().ManagedClusters(), @@ -27,6 +39,7 @@ func RunControllerManager(ctx context.Context, controllerContext *controllercmd. clusterInformers.Cluster().V1alpha1().Placements(), clusterInformers.Cluster().V1alpha1().PlacementDecisions(), controllerContext.EventRecorder, + broadcaster.NewRecorder(clusterscheme.Scheme, "placementController"), ) go clusterInformers.Start(ctx.Done()) diff --git a/pkg/controllers/scheduling/schedule.go b/pkg/controllers/scheduling/schedule.go index 33cae512f..ed4fb0a13 100644 --- a/pkg/controllers/scheduling/schedule.go +++ b/pkg/controllers/scheduling/schedule.go @@ -3,33 +3,35 @@ package scheduling import ( "context" "fmt" - "reflect" "sort" errorhelpers "github.com/openshift/library-go/pkg/operator/v1helpers" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/util/sets" - clusterclient "open-cluster-management.io/api/client/cluster/clientset/versioned" - clusterlisterv1alpha1 "open-cluster-management.io/api/client/cluster/listers/cluster/v1alpha1" clusterapiv1 "open-cluster-management.io/api/cluster/v1" clusterapiv1alpha1 "open-cluster-management.io/api/cluster/v1alpha1" + "open-cluster-management.io/placement/pkg/plugins" + "open-cluster-management.io/placement/pkg/plugins/balance" + "open-cluster-management.io/placement/pkg/plugins/predicate" + "open-cluster-management.io/placement/pkg/plugins/steady" ) const ( maxNumOfClusterDecisions = 100 ) -type scheduleFunc func( - ctx context.Context, - placement *clusterapiv1alpha1.Placement, - clusters []*clusterapiv1.ManagedCluster, - clusterClient clusterclient.Interface, - placementDecisionLister clusterlisterv1alpha1.PlacementDecisionLister, -) (*scheduleResult, error) +type Scheduler interface { + schedule( + ctx context.Context, + placement *clusterapiv1alpha1.Placement, + clusters []*clusterapiv1.ManagedCluster, + ) (*scheduleResult, error) +} type scheduleResult struct { feasibleClusters int @@ -37,35 +39,116 @@ type scheduleResult struct { unscheduledDecisions int } -func schedule( +type pluginScore struct { + sumScore int64 + scores map[string]int64 +} + +func newPluginScore() *pluginScore { + return &pluginScore{ + sumScore: 0, + scores: map[string]int64{}, + } +} + +func (p *pluginScore) add(pluginName string, score int64) { + p.sumScore = p.sumScore + score + p.scores[pluginName] = score +} + +func (p *pluginScore) sum() int64 { + return p.sumScore +} + +func (p *pluginScore) string() string { + output := "" + + for name, score := range p.scores { + output = fmt.Sprintf("%splugin: %s, score: %d; ", output, name, score) + } + + return output +} + +type pluginScheduler struct { + filters []plugins.Filter + prioritizers []plugins.Prioritizer + clientWrapper plugins.Handle +} + +func newPluginScheduler(handle plugins.Handle) *pluginScheduler { + return &pluginScheduler{ + filters: []plugins.Filter{ + predicate.New(handle), + }, + prioritizers: []plugins.Prioritizer{ + steady.New(handle), + balance.New(handle), + }, + clientWrapper: handle, + } +} + +func (s *pluginScheduler) schedule( ctx context.Context, placement *clusterapiv1alpha1.Placement, clusters []*clusterapiv1.ManagedCluster, - clusterClient clusterclient.Interface, - placementDecisionLister clusterlisterv1alpha1.PlacementDecisionLister, ) (*scheduleResult, error) { - // filter clusters with cluster predicates - feasibleClusters, err := matchWithClusterPredicates(placement.Spec.Predicates, clusters) - if err != nil { - return nil, err + var err error + filtered := clusters + + // filter clusters + for _, f := range s.filters { + filtered, err = f.Filter(ctx, placement, filtered) + + if err != nil { + return nil, err + } } + // score clusters + // Score the cluster + scoreSum := map[string]*pluginScore{} + for _, cluster := range filtered { + scoreSum[cluster.Name] = newPluginScore() + } + for _, p := range s.prioritizers { + score, err := p.Score(ctx, placement, filtered) + if err != nil { + return nil, err + } + + // TODO we currently weigh each prioritizer as equal. We should consider + // importance factor for each priotizer when caculating the final score. + // Since currently balance plugin has a score range of +/- 100 while the score range of + // balacne is 0/100, the balance plugin will trigger the reschedule for rebalancing when + // a cluster's decision count is larger than average. + for name, val := range score { + scoreSum[name].add(p.Name(), val) + } + } + + // Sort cluster by score + sort.SliceStable(filtered, func(i, j int) bool { + return scoreSum[clusters[i].Name].sum() > scoreSum[clusters[j].Name].sum() + }) + // select clusters and generate cluster decisions // TODO: sort the feasible clusters and make sure the selection stable - decisions := selectClusters(placement, feasibleClusters) + decisions := selectClusters(placement, filtered) scheduled, unscheduled := len(decisions), 0 if placement.Spec.NumberOfClusters != nil { unscheduled = int(*placement.Spec.NumberOfClusters) - scheduled } // bind the cluster decisions into placementdecisions - err = bind(ctx, placement, decisions, clusterClient, placementDecisionLister) + err = s.bind(ctx, placement, decisions, scoreSum) if err != nil { return nil, err } return &scheduleResult{ - feasibleClusters: len(feasibleClusters), + feasibleClusters: len(filtered), scheduledDecisions: scheduled, unscheduledDecisions: unscheduled, }, nil @@ -96,12 +179,11 @@ func selectClusters(placement *clusterapiv1alpha1.Placement, clusters []*cluster // bind updates the cluster decisions in the status of the placementdecisions with the given // cluster decision slice. New placementdecisions will be created if no one exists. -func bind( +func (s *pluginScheduler) bind( ctx context.Context, placement *clusterapiv1alpha1.Placement, clusterDecisions []clusterapiv1alpha1.ClusterDecision, - clusterClient clusterclient.Interface, - placementDecisionLister clusterlisterv1alpha1.PlacementDecisionLister, + score map[string]*pluginScore, ) error { // sort clusterdecisions by cluster name sort.SliceStable(clusterDecisions, func(i, j int) bool { @@ -127,11 +209,13 @@ func bind( // bind cluster decision slices to placementdecisions. errs := []error{} + placementDecisionNames := sets.NewString() for index, decisionSlice := range decisionSlices { placementDecisionName := fmt.Sprintf("%s-decision-%d", placement.Name, index+1) placementDecisionNames.Insert(placementDecisionName) - err := createOrUpdatePlacementDecision(ctx, placement, placementDecisionName, decisionSlice, clusterClient, placementDecisionLister) + err := s.createOrUpdatePlacementDecision( + ctx, placement, placementDecisionName, decisionSlice, score) if err != nil { errs = append(errs, err) } @@ -146,7 +230,7 @@ func bind( return err } labelSelector := labels.NewSelector().Add(*requirement) - placementDecisions, err := placementDecisionLister.PlacementDecisions(placement.Namespace).List(labelSelector) + placementDecisions, err := s.clientWrapper.DecisionLister().PlacementDecisions(placement.Namespace).List(labelSelector) if err != nil { return err } @@ -157,32 +241,36 @@ func bind( if placementDecisionNames.Has(placementDecision.Name) { continue } - err := clusterClient.ClusterV1alpha1().PlacementDecisions(placementDecision.Namespace).Delete(ctx, placementDecision.Name, metav1.DeleteOptions{}) + err := s.clientWrapper.ClusterClient().ClusterV1alpha1().PlacementDecisions( + placementDecision.Namespace).Delete(ctx, placementDecision.Name, metav1.DeleteOptions{}) if errors.IsNotFound(err) { continue } if err != nil { errs = append(errs, err) } + s.clientWrapper.EventRecorder().Eventf( + placement, placementDecision, corev1.EventTypeNormal, + "DecisionDelete", "DecisionDeleted", + "Decision %s is deleted with placement %s in namespace %s", placementDecision.Name, placement.Name, placement.Namespace) } return errorhelpers.NewMultiLineAggregate(errs) } // createOrUpdatePlacementDecision creates a new PlacementDecision if it does not exist and // then updates the status with the given ClusterDecision slice if necessary -func createOrUpdatePlacementDecision( +func (s *pluginScheduler) createOrUpdatePlacementDecision( ctx context.Context, placement *clusterapiv1alpha1.Placement, placementDecisionName string, clusterDecisions []clusterapiv1alpha1.ClusterDecision, - clusterClient clusterclient.Interface, - placementDecisionLister clusterlisterv1alpha1.PlacementDecisionLister, + scores map[string]*pluginScore, ) error { if len(clusterDecisions) > maxNumOfClusterDecisions { return fmt.Errorf("the number of clusterdecisions %q exceeds the max limitation %q", len(clusterDecisions), maxNumOfClusterDecisions) } - placementDecision, err := placementDecisionLister.PlacementDecisions(placement.Namespace).Get(placementDecisionName) + placementDecision, err := s.clientWrapper.DecisionLister().PlacementDecisions(placement.Namespace).Get(placementDecisionName) switch { case errors.IsNotFound(err): // create the placementdecision if not exists @@ -198,22 +286,85 @@ func createOrUpdatePlacementDecision( }, } var err error - placementDecision, err = clusterClient.ClusterV1alpha1().PlacementDecisions(placement.Namespace).Create(ctx, placementDecision, metav1.CreateOptions{}) + placementDecision, err = s.clientWrapper.ClusterClient().ClusterV1alpha1().PlacementDecisions( + placement.Namespace).Create(ctx, placementDecision, metav1.CreateOptions{}) if err != nil { return err } + s.clientWrapper.EventRecorder().Eventf( + placement, placementDecision, corev1.EventTypeNormal, + "DecisionCreate", "DecisionCreated", + "Decision %s is created with placement %s in namespace %s", placementDecision.Name, placement.Name, placement.Namespace) case err != nil: return err } // update the status of the placementdecision if decisions change - if reflect.DeepEqual(placementDecision.Status.Decisions, clusterDecisions) { + added, deleted, updated := s.compareDecision(placementDecision.Status.Decisions, clusterDecisions) + if !updated { return nil } newPlacementDecision := placementDecision.DeepCopy() newPlacementDecision.Status.Decisions = clusterDecisions - _, err = clusterClient.ClusterV1alpha1().PlacementDecisions(newPlacementDecision.Namespace). + newPlacementDecision, err = s.clientWrapper.ClusterClient().ClusterV1alpha1().PlacementDecisions(newPlacementDecision.Namespace). UpdateStatus(ctx, newPlacementDecision, metav1.UpdateOptions{}) - return err + + if err != nil { + return err + } + + for clusterName := range added { + score, ok := scores[clusterName] + if !ok { + continue + } + s.clientWrapper.EventRecorder().Eventf( + placement, newPlacementDecision, corev1.EventTypeNormal, + "DecisionUpdate", "DecisionUpdated", + "cluster %s is added into placementDecision %s in namespace %s with score %s ", + clusterName, placementDecision.Name, placement.Namespace, score.string()) + } + + for clusterName := range deleted { + score, ok := scores[clusterName] + if !ok { + continue + } + s.clientWrapper.EventRecorder().Eventf( + placement, newPlacementDecision, corev1.EventTypeNormal, + "DecisionUpdate", "DecisionUpdated", + "cluster %s is removed from placementDecision %s in namespace %s with score %s ", + clusterName, placementDecision.Name, placement.Namespace, score.string()) + } + + return nil +} + +// compareDecision compare the existing decision with desired decision. It outputs a result on why +// a decision is chosen, and whether the decision results should be updated. +func (s *pluginScheduler) compareDecision( + existingDecisions, desiredDecisions []clusterapiv1alpha1.ClusterDecision) (sets.String, sets.String, bool) { + + existing := sets.NewString() + + desired := sets.NewString() + + for _, d := range existingDecisions { + existing.Insert(d.ClusterName) + } + + for _, d := range desiredDecisions { + desired.Insert(d.ClusterName) + } + + if existing.Equal(desired) { + return nil, nil, false + } + + added := desired.Difference(existing) + + deleted := existing.Difference(desired) + + return added, deleted, true } diff --git a/pkg/controllers/scheduling/schedule_test.go b/pkg/controllers/scheduling/schedule_test.go index 7ce0fae30..a0bb0314b 100644 --- a/pkg/controllers/scheduling/schedule_test.go +++ b/pkg/controllers/scheduling/schedule_test.go @@ -28,6 +28,7 @@ func TestSchedule(t *testing.T) { placement *clusterapiv1alpha1.Placement initObjs []runtime.Object clusters []*clusterapiv1.ManagedCluster + decisions []runtime.Object scheduleResult scheduleResult validateActions func(t *testing.T, actions []clienttesting.Action) }{ @@ -38,6 +39,7 @@ func TestSchedule(t *testing.T) { testinghelpers.NewClusterSet(clusterSetName), testinghelpers.NewClusterSetBinding(placementNamespace, clusterSetName), }, + decisions: []runtime.Object{}, clusters: []*clusterapiv1.ManagedCluster{ testinghelpers.NewManagedCluster("cluster1").WithLabel(clusterSetLabel, clusterSetName).Build(), }, @@ -63,6 +65,7 @@ func TestSchedule(t *testing.T) { testinghelpers.NewClusterSet(clusterSetName), testinghelpers.NewClusterSetBinding(placementNamespace, clusterSetName), }, + decisions: []runtime.Object{}, clusters: []*clusterapiv1.ManagedCluster{ testinghelpers.NewManagedCluster("cluster1").WithLabel(clusterSetLabel, clusterSetName).Build(), }, @@ -92,12 +95,18 @@ func TestSchedule(t *testing.T) { WithLabel(placementLabel, placementName). WithDecisions("cluster1", "cluster2").Build(), }, + decisions: []runtime.Object{ + testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName(placementName, 1)). + WithLabel(placementLabel, placementName). + WithDecisions("cluster1", "cluster2").Build(), + }, clusters: []*clusterapiv1.ManagedCluster{ testinghelpers.NewManagedCluster("cluster1").WithLabel(clusterSetLabel, clusterSetName).Build(), testinghelpers.NewManagedCluster("cluster2").WithLabel(clusterSetLabel, clusterSetName).Build(), + testinghelpers.NewManagedCluster("cluster3").WithLabel(clusterSetLabel, clusterSetName).Build(), }, scheduleResult: scheduleResult{ - feasibleClusters: 2, + feasibleClusters: 3, scheduledDecisions: 2, }, validateActions: testinghelpers.AssertNoActions, @@ -116,6 +125,11 @@ func TestSchedule(t *testing.T) { testinghelpers.NewManagedCluster("cluster1").WithLabel(clusterSetLabel, clusterSetName).Build(), testinghelpers.NewManagedCluster("cluster2").WithLabel(clusterSetLabel, clusterSetName).Build(), }, + decisions: []runtime.Object{ + testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName(placementName, 1)). + WithLabel(placementLabel, placementName). + WithDecisions("cluster1").Build(), + }, scheduleResult: scheduleResult{ feasibleClusters: 2, scheduledDecisions: 2, @@ -142,6 +156,11 @@ func TestSchedule(t *testing.T) { WithLabel(placementLabel, placementName). WithDecisions("cluster1").Build(), }, + decisions: []runtime.Object{ + testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName(placementName, 1)). + WithLabel(placementLabel, placementName). + WithDecisions("cluster1").Build(), + }, clusters: []*clusterapiv1.ManagedCluster{ testinghelpers.NewManagedCluster("cluster1").WithLabel(clusterSetLabel, clusterSetName).Build(), }, @@ -152,19 +171,84 @@ func TestSchedule(t *testing.T) { }, validateActions: testinghelpers.AssertNoActions, }, + { + name: "schedule to cluster with least decisions", + placement: testinghelpers.NewPlacement(placementNamespace, placementName).WithNOC(1).Build(), + initObjs: []runtime.Object{ + testinghelpers.NewClusterSet(clusterSetName), + testinghelpers.NewClusterSetBinding(placementNamespace, clusterSetName), + testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName("others", 1)). + WithDecisions("cluster1", "cluster2").Build(), + }, + decisions: []runtime.Object{ + testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName("others", 1)). + WithDecisions("cluster1", "cluster2").Build(), + }, + clusters: []*clusterapiv1.ManagedCluster{ + testinghelpers.NewManagedCluster("cluster1").WithLabel(clusterSetLabel, clusterSetName).Build(), + testinghelpers.NewManagedCluster("cluster2").WithLabel(clusterSetLabel, clusterSetName).Build(), + testinghelpers.NewManagedCluster("cluster3").WithLabel(clusterSetLabel, clusterSetName).Build(), + }, + scheduleResult: scheduleResult{ + feasibleClusters: 3, + scheduledDecisions: 1, + }, + validateActions: func(t *testing.T, actions []clienttesting.Action) { + // check if PlacementDecision has been updated + testinghelpers.AssertActions(t, actions, "create", "update") + actual := actions[1].(clienttesting.UpdateActionImpl).Object + placementDecision, ok := actual.(*clusterapiv1alpha1.PlacementDecision) + if !ok { + t.Errorf("expected PlacementDecision was updated") + } + assertClustersSelected(t, placementDecision.Status.Decisions, "cluster3") + }, + }, + { + name: "do not schedule to other cluster even with least decisions", + placement: testinghelpers.NewPlacement(placementNamespace, placementName).WithNOC(1).Build(), + initObjs: []runtime.Object{ + testinghelpers.NewClusterSet(clusterSetName), + testinghelpers.NewClusterSetBinding(placementNamespace, clusterSetName), + testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName("others", 1)). + WithDecisions("cluster3", "cluster2").Build(), + testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName("others", 2)). + WithDecisions("cluster2", "cluster1").Build(), + testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName(placementName, 1)). + WithLabel(placementLabel, placementName). + WithDecisions("cluster3").Build(), + }, + decisions: []runtime.Object{ + testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName("others", 1)). + WithDecisions("cluster3", "cluster2").Build(), + testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName("others", 2)). + WithDecisions("cluster2", "cluster1").Build(), + testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName(placementName, 1)). + WithLabel(placementLabel, placementName). + WithDecisions("cluster3").Build(), + }, + clusters: []*clusterapiv1.ManagedCluster{ + testinghelpers.NewManagedCluster("cluster1").WithLabel(clusterSetLabel, clusterSetName).Build(), + testinghelpers.NewManagedCluster("cluster2").WithLabel(clusterSetLabel, clusterSetName).Build(), + testinghelpers.NewManagedCluster("cluster3").WithLabel(clusterSetLabel, clusterSetName).Build(), + }, + scheduleResult: scheduleResult{ + feasibleClusters: 3, + scheduledDecisions: 1, + }, + validateActions: testinghelpers.AssertNoActions, + }, } for _, c := range cases { t.Run(c.name, func(t *testing.T) { c.initObjs = append(c.initObjs, c.placement) clusterClient := clusterfake.NewSimpleClientset(c.initObjs...) - clusterInformerFactory := testinghelpers.NewClusterInformerFactory(clusterClient, c.initObjs...) - result, err := schedule( + s := newPluginScheduler(testinghelpers.NewFakePluginHandle(t, clusterClient, c.initObjs...)) + result, err := s.schedule( context.TODO(), c.placement, c.clusters, - clusterClient, - clusterInformerFactory.Cluster().V1alpha1().PlacementDecisions().Lister(), ) if err != nil { t.Errorf("unexpected err: %v", err) @@ -203,7 +287,7 @@ func TestBind(t *testing.T) { if !ok { t.Errorf("expected PlacementDecision was updated") } - assertClustersSelected(t, placementDecision.Status.Decisions, newSelectedClusters(10, false)...) + assertClustersSelected(t, placementDecision.Status.Decisions, newSelectedClusters(10)...) }, }, { @@ -211,7 +295,7 @@ func TestBind(t *testing.T) { clusterDecisions: newClusterDecisions(101), validateActions: func(t *testing.T, actions []clienttesting.Action) { testinghelpers.AssertActions(t, actions, "create", "update", "create", "update") - selectedClusters := newSelectedClusters(101, true) + selectedClusters := newSelectedClusters(101) actual := actions[1].(clienttesting.UpdateActionImpl).Object placementDecision, ok := actual.(*clusterapiv1alpha1.PlacementDecision) if !ok { @@ -233,10 +317,10 @@ func TestBind(t *testing.T) { initObjs: []runtime.Object{ testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName(placementName, 1)). WithLabel(placementLabel, placementName). - WithDecisions(newSelectedClusters(128, true)[:100]...).Build(), + WithDecisions(newSelectedClusters(128)[:100]...).Build(), testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName(placementName, 2)). WithLabel(placementLabel, placementName). - WithDecisions(newSelectedClusters(128, true)[100:]...).Build(), + WithDecisions(newSelectedClusters(128)[100:]...).Build(), }, validateActions: testinghelpers.AssertNoActions, }, @@ -246,11 +330,11 @@ func TestBind(t *testing.T) { initObjs: []runtime.Object{ testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName(placementName, 1)). WithLabel(placementLabel, placementName). - WithDecisions(newSelectedClusters(128, true)[:100]...).Build(), + WithDecisions(newSelectedClusters(128)[:100]...).Build(), }, validateActions: func(t *testing.T, actions []clienttesting.Action) { testinghelpers.AssertActions(t, actions, "create", "update") - selectedClusters := newSelectedClusters(128, true) + selectedClusters := newSelectedClusters(128) actual := actions[1].(clienttesting.UpdateActionImpl).Object placementDecision, ok := actual.(*clusterapiv1alpha1.PlacementDecision) if !ok { @@ -265,10 +349,10 @@ func TestBind(t *testing.T) { initObjs: []runtime.Object{ testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName(placementName, 1)). WithLabel(placementLabel, placementName). - WithDecisions(newSelectedClusters(128, true)[:100]...).Build(), + WithDecisions(newSelectedClusters(128)[:100]...).Build(), testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName(placementName, 2)). WithLabel(placementLabel, placementName). - WithDecisions(newSelectedClusters(128, true)[100:]...).Build(), + WithDecisions(newSelectedClusters(128)[100:]...).Build(), }, validateActions: func(t *testing.T, actions []clienttesting.Action) { testinghelpers.AssertActions(t, actions, "update", "delete") @@ -277,7 +361,7 @@ func TestBind(t *testing.T) { if !ok { t.Errorf("expected PlacementDecision was updated") } - assertClustersSelected(t, placementDecision.Status.Decisions, newSelectedClusters(10, false)...) + assertClustersSelected(t, placementDecision.Status.Decisions, newSelectedClusters(10)...) }, }, { @@ -286,10 +370,10 @@ func TestBind(t *testing.T) { initObjs: []runtime.Object{ testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName(placementName, 1)). WithLabel(placementLabel, placementName). - WithDecisions(newSelectedClusters(128, true)[:100]...).Build(), + WithDecisions(newSelectedClusters(128)[:100]...).Build(), testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName(placementName, 2)). WithLabel(placementLabel, placementName). - WithDecisions(newSelectedClusters(128, true)[100:]...).Build(), + WithDecisions(newSelectedClusters(128)[100:]...).Build(), }, validateActions: func(t *testing.T, actions []clienttesting.Action) { testinghelpers.AssertActions(t, actions, "delete", "delete") @@ -313,13 +397,13 @@ func TestBind(t *testing.T) { }, ) - clusterInformerFactory := testinghelpers.NewClusterInformerFactory(clusterClient, c.initObjs...) - err := bind( + s := newPluginScheduler(testinghelpers.NewFakePluginHandle(t, clusterClient, c.initObjs...)) + + err := s.bind( context.TODO(), testinghelpers.NewPlacement(placementNamespace, placementName).Build(), c.clusterDecisions, - clusterClient, - clusterInformerFactory.Cluster().V1alpha1().PlacementDecisions().Lister(), + map[string]*pluginScore{}, ) if err != nil { t.Errorf("unexpected err: %v", err) @@ -338,7 +422,7 @@ func assertClustersSelected(t *testing.T, decisons []clusterapiv1alpha1.ClusterD } if names.Len() != 0 { - t.Errorf("expected clusters selected: %s", strings.Join(names.UnsortedList(), ",")) + t.Errorf("expected clusters selected: %s, but got %v", strings.Join(names.UnsortedList(), ","), decisons) } } @@ -351,17 +435,15 @@ func newClusterDecisions(num int) (decisions []clusterapiv1alpha1.ClusterDecisio return decisions } -func newSelectedClusters(num int, sortWithName bool) (clusters []string) { +func newSelectedClusters(num int) (clusters []string) { for i := 0; i < num; i++ { clusters = append(clusters, fmt.Sprintf("cluster%d", i+1)) } - if !sortWithName { - return clusters - } - // sort cluster by name + sort.SliceStable(clusters, func(i, j int) bool { return clusters[i] < clusters[j] }) + return clusters } diff --git a/pkg/controllers/scheduling/scheduling_controller.go b/pkg/controllers/scheduling/scheduling_controller.go index 1880ac51c..b8b6ab1a8 100644 --- a/pkg/controllers/scheduling/scheduling_controller.go +++ b/pkg/controllers/scheduling/scheduling_controller.go @@ -17,6 +17,7 @@ import ( utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" cache "k8s.io/client-go/tools/cache" + kevents "k8s.io/client-go/tools/events" "k8s.io/klog/v2" clusterclient "open-cluster-management.io/api/client/cluster/clientset/versioned" @@ -45,7 +46,13 @@ type schedulingController struct { placementLister clusterlisterv1alpha1.PlacementLister placementDecisionLister clusterlisterv1alpha1.PlacementDecisionLister enqueuePlacementFunc enqueuePlacementFunc - scheduleFunc scheduleFunc + scheduler Scheduler +} + +type schedulerHandler struct { + recorder kevents.EventRecorder + placementDecisionLister clusterlisterv1alpha1.PlacementDecisionLister + clusterClient clusterclient.Interface } // NewDecisionSchedulingController return an instance of schedulingController @@ -56,13 +63,19 @@ func NewSchedulingController( clusterSetBindingInformer clusterinformerv1alpha1.ManagedClusterSetBindingInformer, placementInformer clusterinformerv1alpha1.PlacementInformer, placementDecisionInformer clusterinformerv1alpha1.PlacementDecisionInformer, - recorder events.Recorder, + recorder events.Recorder, krecorder kevents.EventRecorder, ) factory.Controller { syncCtx := factory.NewSyncContext(schedulingControllerName, recorder) enqueuePlacementFunc := func(namespace, name string) { syncCtx.Queue().Add(fmt.Sprintf("%s/%s", namespace, name)) } + sHandler := &schedulerHandler{ + recorder: krecorder, + placementDecisionLister: placementDecisionInformer.Lister(), + clusterClient: clusterClient, + } + // build controller c := schedulingController{ clusterClient: clusterClient, @@ -71,7 +84,7 @@ func NewSchedulingController( clusterSetBindingLister: clusterSetBindingInformer.Lister(), placementLister: placementInformer.Lister(), placementDecisionLister: placementDecisionInformer.Lister(), - scheduleFunc: schedule, + scheduler: newPluginScheduler(sHandler), enqueuePlacementFunc: enqueuePlacementFunc, } @@ -179,7 +192,7 @@ func (c *schedulingController) sync(ctx context.Context, syncCtx factory.SyncCon } // schedule placement with scheduler - scheduleResult, err := c.scheduleFunc(ctx, placement, clusters, c.clusterClient, c.placementDecisionLister) + scheduleResult, err := c.scheduler.schedule(ctx, placement, clusters) if err != nil { return err } @@ -317,3 +330,15 @@ func newSatisfiedCondition( } return condition } + +func (s *schedulerHandler) EventRecorder() kevents.EventRecorder { + return s.recorder +} + +func (s *schedulerHandler) DecisionLister() clusterlisterv1alpha1.PlacementDecisionLister { + return s.placementDecisionLister +} + +func (s *schedulerHandler) ClusterClient() clusterclient.Interface { + return s.clusterClient +} diff --git a/pkg/controllers/scheduling/scheduling_controller_test.go b/pkg/controllers/scheduling/scheduling_controller_test.go index 3489336cf..d3971843d 100644 --- a/pkg/controllers/scheduling/scheduling_controller_test.go +++ b/pkg/controllers/scheduling/scheduling_controller_test.go @@ -10,14 +10,23 @@ import ( "k8s.io/apimachinery/pkg/util/sets" clienttesting "k8s.io/client-go/testing" - clusterclient "open-cluster-management.io/api/client/cluster/clientset/versioned" clusterfake "open-cluster-management.io/api/client/cluster/clientset/versioned/fake" - clusterlisterv1alpha1 "open-cluster-management.io/api/client/cluster/listers/cluster/v1alpha1" clusterapiv1 "open-cluster-management.io/api/cluster/v1" clusterapiv1alpha1 "open-cluster-management.io/api/cluster/v1alpha1" testinghelpers "open-cluster-management.io/placement/pkg/helpers/testing" ) +type testScheduler struct { + result *scheduleResult +} + +func (s *testScheduler) schedule(ctx context.Context, + placement *clusterapiv1alpha1.Placement, + clusters []*clusterapiv1.ManagedCluster, +) (*scheduleResult, error) { + return s.result, nil +} + func TestSchedulingController_sync(t *testing.T) { placementNamespace := "ns1" placementName := "placement1" @@ -144,6 +153,7 @@ func TestSchedulingController_sync(t *testing.T) { c.initObjs = append(c.initObjs, c.placement) clusterClient := clusterfake.NewSimpleClientset(c.initObjs...) clusterInformerFactory := testinghelpers.NewClusterInformerFactory(clusterClient, c.initObjs...) + s := &testScheduler{result: c.scheduleResult} ctrl := schedulingController{ clusterClient: clusterClient, @@ -152,15 +162,7 @@ func TestSchedulingController_sync(t *testing.T) { clusterSetBindingLister: clusterInformerFactory.Cluster().V1alpha1().ManagedClusterSetBindings().Lister(), placementLister: clusterInformerFactory.Cluster().V1alpha1().Placements().Lister(), placementDecisionLister: clusterInformerFactory.Cluster().V1alpha1().PlacementDecisions().Lister(), - scheduleFunc: func( - ctx context.Context, - placement *clusterapiv1alpha1.Placement, - clusters []*clusterapiv1.ManagedCluster, - clusterClient clusterclient.Interface, - placementDecisionLister clusterlisterv1alpha1.PlacementDecisionLister, - ) (*scheduleResult, error) { - return c.scheduleResult, nil - }, + scheduler: s, } sysCtx := testinghelpers.NewFakeSyncContext(t, c.placement.Namespace+"/"+c.placement.Name) diff --git a/pkg/helpers/testing/helpers.go b/pkg/helpers/testing/helpers.go index c63dceb3e..19bd0e723 100644 --- a/pkg/helpers/testing/helpers.go +++ b/pkg/helpers/testing/helpers.go @@ -6,8 +6,13 @@ import ( "github.com/openshift/library-go/pkg/operator/events" "github.com/openshift/library-go/pkg/operator/events/eventstesting" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" clienttesting "k8s.io/client-go/testing" + kevents "k8s.io/client-go/tools/events" "k8s.io/client-go/util/workqueue" + clusterclient "open-cluster-management.io/api/client/cluster/clientset/versioned" + clusterfake "open-cluster-management.io/api/client/cluster/clientset/versioned/fake" + clusterlisterv1alpha1 "open-cluster-management.io/api/client/cluster/listers/cluster/v1alpha1" ) type FakeSyncContext struct { @@ -28,6 +33,30 @@ func NewFakeSyncContext(t *testing.T, queueKey string) *FakeSyncContext { } } +type FakePluginHandle struct { + recorder kevents.EventRecorder + lister clusterlisterv1alpha1.PlacementDecisionLister + client clusterclient.Interface +} + +func (f *FakePluginHandle) EventRecorder() kevents.EventRecorder { return f.recorder } +func (f *FakePluginHandle) DecisionLister() clusterlisterv1alpha1.PlacementDecisionLister { + return f.lister +} +func (f *FakePluginHandle) ClusterClient() clusterclient.Interface { + return f.client +} + +func NewFakePluginHandle( + t *testing.T, client *clusterfake.Clientset, objects ...runtime.Object) *FakePluginHandle { + informers := NewClusterInformerFactory(client, objects...) + return &FakePluginHandle{ + recorder: kevents.NewFakeRecorder(100), + client: client, + lister: informers.Cluster().V1alpha1().PlacementDecisions().Lister(), + } +} + // AssertActions asserts the actual actions have the expected action verb func AssertActions(t *testing.T, actualActions []clienttesting.Action, expectedVerbs ...string) { if len(actualActions) != len(expectedVerbs) { diff --git a/pkg/plugins/balance/balance.go b/pkg/plugins/balance/balance.go new file mode 100644 index 000000000..bd9f9cd99 --- /dev/null +++ b/pkg/plugins/balance/balance.go @@ -0,0 +1,75 @@ +package balance + +import ( + "context" + + "k8s.io/apimachinery/pkg/labels" + clusterapiv1 "open-cluster-management.io/api/cluster/v1" + clusterapiv1alpha1 "open-cluster-management.io/api/cluster/v1alpha1" + "open-cluster-management.io/placement/pkg/plugins" +) + +const ( + placementLabel = "cluster.open-cluster-management.io/placement" + description = ` + Balance prioritizer balance the number of decisions among the clusters. The cluster + with the highest number of decison is given the lowest score, while the empty cluster is given + the highest score. + ` +) + +var _ plugins.Prioritizer = &Balance{} + +type Balance struct { + handle plugins.Handle +} + +func New(handle plugins.Handle) *Balance { + return &Balance{handle: handle} +} + +func (b *Balance) Name() string { + return "balance" +} + +func (b *Balance) Description() string { + return description +} + +func (b *Balance) Score(ctx context.Context, placement *clusterapiv1alpha1.Placement, clusters []*clusterapiv1.ManagedCluster) (map[string]int64, error) { + scores := map[string]int64{} + for _, cluster := range clusters { + scores[cluster.Name] = plugins.MaxClusterScore + } + + decisions, err := b.handle.DecisionLister().List(labels.Everything()) + if err != nil { + return nil, err + } + + var maxCount int64 + decisionCount := map[string]int64{} + for _, decision := range decisions { + // Do not count the decision that is being scheduled. + if decision.Labels[placementLabel] == placement.Name && decision.Namespace == placement.Namespace { + continue + } + for _, d := range decision.Status.Decisions { + decisionCount[d.ClusterName] = decisionCount[d.ClusterName] + 1 + if decisionCount[d.ClusterName] > maxCount { + maxCount = decisionCount[d.ClusterName] + } + } + } + + for clusterName := range scores { + if count, ok := decisionCount[clusterName]; ok { + usage := float64(count) / float64(maxCount) + + // Negate the usage and substracted by 0.5, then we double it and muliply by maxCount, + // which normalize the score to value between 100 and -100 + scores[clusterName] = 2 * int64(float64(plugins.MaxClusterScore)*(0.5-usage)) + } + } + return scores, nil +} diff --git a/pkg/plugins/balance/balance_test.go b/pkg/plugins/balance/balance_test.go new file mode 100644 index 000000000..8be6365af --- /dev/null +++ b/pkg/plugins/balance/balance_test.go @@ -0,0 +1,91 @@ +package balance + +import ( + "context" + "testing" + + apiequality "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/runtime" + clusterapiv1 "open-cluster-management.io/api/cluster/v1" + clusterapiv1alpha1 "open-cluster-management.io/api/cluster/v1alpha1" + testinghelpers "open-cluster-management.io/placement/pkg/helpers/testing" +) + +func TestScoreClusterWithSteady(t *testing.T) { + cases := []struct { + name string + placement *clusterapiv1alpha1.Placement + clusters []*clusterapiv1.ManagedCluster + existingDecisions []runtime.Object + expectedScores map[string]int64 + }{ + { + name: "no decisions", + placement: testinghelpers.NewPlacement("test", "test").Build(), + clusters: []*clusterapiv1.ManagedCluster{ + testinghelpers.NewManagedCluster("cluster1").Build(), + testinghelpers.NewManagedCluster("cluster2").Build(), + testinghelpers.NewManagedCluster("cluster3").Build(), + }, + existingDecisions: []runtime.Object{}, + expectedScores: map[string]int64{"cluster1": 100, "cluster2": 100, "cluster3": 100}, + }, + { + name: "one decision belongs to current placement", + placement: testinghelpers.NewPlacement("test", "test").Build(), + clusters: []*clusterapiv1.ManagedCluster{ + testinghelpers.NewManagedCluster("cluster1").Build(), + testinghelpers.NewManagedCluster("cluster2").Build(), + testinghelpers.NewManagedCluster("cluster3").Build(), + }, + existingDecisions: []runtime.Object{ + testinghelpers.NewPlacementDecision("test", "test1").WithLabel(placementLabel, "test").WithDecisions("cluster1").Build(), + }, + expectedScores: map[string]int64{"cluster1": 100, "cluster2": 100, "cluster3": 100}, + }, + { + name: "one decision not belong to current placement", + placement: testinghelpers.NewPlacement("test", "test").Build(), + clusters: []*clusterapiv1.ManagedCluster{ + testinghelpers.NewManagedCluster("cluster1").Build(), + testinghelpers.NewManagedCluster("cluster2").Build(), + testinghelpers.NewManagedCluster("cluster3").Build(), + }, + existingDecisions: []runtime.Object{ + testinghelpers.NewPlacementDecision("test", "test1").WithLabel(placementLabel, "test1").WithDecisions("cluster1").Build(), + }, + expectedScores: map[string]int64{"cluster1": -100, "cluster2": 100, "cluster3": 100}, + }, + { + name: "multiple decisions", + placement: testinghelpers.NewPlacement("test", "test").Build(), + clusters: []*clusterapiv1.ManagedCluster{ + testinghelpers.NewManagedCluster("cluster1").Build(), + testinghelpers.NewManagedCluster("cluster2").Build(), + testinghelpers.NewManagedCluster("cluster3").Build(), + }, + existingDecisions: []runtime.Object{ + testinghelpers.NewPlacementDecision("test", "test1").WithLabel(placementLabel, "test1").WithDecisions("cluster1", "cluster2").Build(), + testinghelpers.NewPlacementDecision("test", "test2").WithLabel(placementLabel, "test2").WithDecisions("cluster1", "cluster3").Build(), + }, + expectedScores: map[string]int64{"cluster1": -100, "cluster2": 0, "cluster3": 0}, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + steady := &Balance{ + handle: testinghelpers.NewFakePluginHandle(t, nil, c.existingDecisions...), + } + + scores, err := steady.Score(context.TODO(), c.placement, c.clusters) + if err != nil { + t.Errorf("Expect no error, but got %v", err) + } + + if !apiequality.Semantic.DeepEqual(scores, c.expectedScores) { + t.Errorf("Expect score %v, but got %v", c.expectedScores, scores) + } + }) + } +} diff --git a/pkg/plugins/interface.go b/pkg/plugins/interface.go new file mode 100644 index 000000000..2ca563935 --- /dev/null +++ b/pkg/plugins/interface.go @@ -0,0 +1,61 @@ +package plugins + +import ( + "context" + "math" + + "k8s.io/client-go/tools/events" + clusterclient "open-cluster-management.io/api/client/cluster/clientset/versioned" + clusterlisterv1alpha1 "open-cluster-management.io/api/client/cluster/listers/cluster/v1alpha1" + clusterapiv1 "open-cluster-management.io/api/cluster/v1" + clusterapiv1alpha1 "open-cluster-management.io/api/cluster/v1alpha1" +) + +const ( + // MaxClusterScore is the maximum score a Prioritizer plugin is expected to return. + MaxClusterScore int64 = 100 + + // MinClusterScore is the minimum score a Prioritizer plugin is expected to return. + MinClusterScore int64 = -100 + + // MaxTotalScore is the maximum total score. + MaxTotalScore int64 = math.MaxInt64 +) + +// Plugin is the parent type for all the scheduling plugins. +type Plugin interface { + Name() string + // Set is to set the placement for the current scheduling. + Description() string +} + +// Fitler defines a filter plugin that filter unsatisfied cluster. +type Filter interface { + Plugin + + // Filter returns a list of clusters satisfying the certain condition. + Filter(ctx context.Context, placement *clusterapiv1alpha1.Placement, clusters []*clusterapiv1.ManagedCluster) ([]*clusterapiv1.ManagedCluster, error) +} + +// Prioritizer defines a prioritizer plugin that score each cluster. The score is normalized +// as a floating betwween 0 and 1. +type Prioritizer interface { + Plugin + + // Score gives the score to a list of the clusters, it returns a map with the key as + // the cluster name. + Score(ctx context.Context, placement *clusterapiv1alpha1.Placement, clusters []*clusterapiv1.ManagedCluster) (map[string]int64, error) +} + +// Handle provides data and some tools that plugins can use. It is +// passed to the plugin factories at the time of plugin initialization. +type Handle interface { + // ListDecisionsInPlacment lists all decisions + DecisionLister() clusterlisterv1alpha1.PlacementDecisionLister + + // ClusterClient returns the cluster client + ClusterClient() clusterclient.Interface + + // EventRecorder returns an event recorder. + EventRecorder() events.EventRecorder +} diff --git a/pkg/controllers/scheduling/predicate.go b/pkg/plugins/predicate/predicate.go similarity index 77% rename from pkg/controllers/scheduling/predicate.go rename to pkg/plugins/predicate/predicate.go index 8a9b77faf..da6e5c436 100644 --- a/pkg/controllers/scheduling/predicate.go +++ b/pkg/plugins/predicate/predicate.go @@ -1,21 +1,43 @@ -package scheduling +package predicate import ( + "context" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" clusterapiv1 "open-cluster-management.io/api/cluster/v1" clusterapiv1alpha1 "open-cluster-management.io/api/cluster/v1alpha1" + "open-cluster-management.io/placement/pkg/plugins" ) +var _ plugins.Filter = &Predicate{} + +const description = "Predicate filter filters the clusters based on predicate defined in placement" + +type Predicate struct{} + type predicateSelector struct { labelSelector labels.Selector claimSelector labels.Selector } -// matchWithClusterPredicates returns a slice of clusters. Each of them must match at least one of the predicates. -func matchWithClusterPredicates(predicates []clusterapiv1alpha1.ClusterPredicate, clusters []*clusterapiv1.ManagedCluster) ([]*clusterapiv1.ManagedCluster, error) { - if len(predicates) == 0 { +func New(handle plugins.Handle) *Predicate { + return &Predicate{} +} + +func (p *Predicate) Name() string { + return "predicate" +} + +func (p *Predicate) Description() string { + return description +} + +func (p *Predicate) Filter( + ctx context.Context, placement *clusterapiv1alpha1.Placement, clusters []*clusterapiv1.ManagedCluster) ([]*clusterapiv1.ManagedCluster, error) { + + if len(placement.Spec.Predicates) == 0 { return clusters, nil } if len(clusters) == 0 { @@ -24,7 +46,7 @@ func matchWithClusterPredicates(predicates []clusterapiv1alpha1.ClusterPredicate // prebuild label/claim selectors for each predicate predicateSelectors := []predicateSelector{} - for _, predicate := range predicates { + for _, predicate := range placement.Spec.Predicates { // build label selector labelSelector, err := convertLabelSelector(predicate.RequiredClusterSelector.LabelSelector) if err != nil { diff --git a/pkg/controllers/scheduling/predicate_test.go b/pkg/plugins/predicate/predicate_test.go similarity index 72% rename from pkg/controllers/scheduling/predicate_test.go rename to pkg/plugins/predicate/predicate_test.go index daafeeacf..c54fac84e 100644 --- a/pkg/controllers/scheduling/predicate_test.go +++ b/pkg/plugins/predicate/predicate_test.go @@ -1,6 +1,7 @@ -package scheduling +package predicate import ( + "context" "strings" "testing" @@ -15,19 +16,17 @@ import ( func TestMatchWithClusterPredicates(t *testing.T) { cases := []struct { name string - predicates []clusterapiv1alpha1.ClusterPredicate + placement *clusterapiv1alpha1.Placement clusters []*clusterapiv1.ManagedCluster expectedClusterNames []string }{ { name: "match with label", - predicates: []clusterapiv1alpha1.ClusterPredicate{ - testinghelpers.NewClusterPredicate(&metav1.LabelSelector{ - MatchLabels: map[string]string{ - "cloud": "Amazon", - }, - }, nil), - }, + placement: testinghelpers.NewPlacement("test", "test").AddPredicate(&metav1.LabelSelector{ + MatchLabels: map[string]string{ + "cloud": "Amazon", + }, + }, nil).Build(), clusters: []*clusterapiv1.ManagedCluster{ testinghelpers.NewManagedCluster("cluster1").WithLabel("cloud", "Amazon").Build(), testinghelpers.NewManagedCluster("cluster2").WithLabel("cloud", "Google").Build(), @@ -36,18 +35,17 @@ func TestMatchWithClusterPredicates(t *testing.T) { }, { name: "match with claim", - predicates: []clusterapiv1alpha1.ClusterPredicate{ - testinghelpers.NewClusterPredicate(nil, - &clusterapiv1alpha1.ClusterClaimSelector{ - MatchExpressions: []metav1.LabelSelectorRequirement{ - { - Key: "cloud", - Operator: metav1.LabelSelectorOpIn, - Values: []string{"Amazon"}, - }, + placement: testinghelpers.NewPlacement("test", "test").AddPredicate( + nil, + &clusterapiv1alpha1.ClusterClaimSelector{ + MatchExpressions: []metav1.LabelSelectorRequirement{ + { + Key: "cloud", + Operator: metav1.LabelSelectorOpIn, + Values: []string{"Amazon"}, }, - }), - }, + }, + }).Build(), clusters: []*clusterapiv1.ManagedCluster{ testinghelpers.NewManagedCluster("cluster1").WithClaim("cloud", "Amazon").Build(), testinghelpers.NewManagedCluster("cluster2").WithClaim("cloud", "Google").Build(), @@ -56,12 +54,13 @@ func TestMatchWithClusterPredicates(t *testing.T) { }, { name: "match with both label and claim", - predicates: []clusterapiv1alpha1.ClusterPredicate{ - testinghelpers.NewClusterPredicate(&metav1.LabelSelector{ + placement: testinghelpers.NewPlacement("test", "test").AddPredicate( + &metav1.LabelSelector{ MatchLabels: map[string]string{ "cloud": "Amazon", }, - }, &clusterapiv1alpha1.ClusterClaimSelector{ + }, + &clusterapiv1alpha1.ClusterClaimSelector{ MatchExpressions: []metav1.LabelSelectorRequirement{ { Key: "region", @@ -69,8 +68,8 @@ func TestMatchWithClusterPredicates(t *testing.T) { Values: []string{"us-east-1"}, }, }, - }), - }, + }, + ).Build(), clusters: []*clusterapiv1.ManagedCluster{ testinghelpers.NewManagedCluster("cluster1"). WithLabel("cloud", "Amazon"). @@ -83,13 +82,12 @@ func TestMatchWithClusterPredicates(t *testing.T) { }, { name: "match with multiple predicates", - predicates: []clusterapiv1alpha1.ClusterPredicate{ - testinghelpers.NewClusterPredicate(&metav1.LabelSelector{ - MatchLabels: map[string]string{ - "cloud": "Amazon", - }, - }, nil), - testinghelpers.NewClusterPredicate(nil, &clusterapiv1alpha1.ClusterClaimSelector{ + placement: testinghelpers.NewPlacement("test", "test").AddPredicate(&metav1.LabelSelector{ + MatchLabels: map[string]string{ + "cloud": "Amazon", + }, + }, nil).AddPredicate( + nil, &clusterapiv1alpha1.ClusterClaimSelector{ MatchExpressions: []metav1.LabelSelectorRequirement{ { Key: "region", @@ -97,8 +95,8 @@ func TestMatchWithClusterPredicates(t *testing.T) { Values: []string{"us-east-1"}, }, }, - }), - }, + }, + ).Build(), clusters: []*clusterapiv1.ManagedCluster{ testinghelpers.NewManagedCluster("cluster1").WithLabel("cloud", "Amazon").Build(), testinghelpers.NewManagedCluster("cluster2").WithClaim("region", "us-east-1").Build(), @@ -110,7 +108,8 @@ func TestMatchWithClusterPredicates(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { - clusters, err := matchWithClusterPredicates(c.predicates, c.clusters) + p := &Predicate{} + clusters, err := p.Filter(context.TODO(), c.placement, c.clusters) if err != nil { t.Errorf("unexpected err: %v", err) } diff --git a/pkg/plugins/steady/steady.go b/pkg/plugins/steady/steady.go new file mode 100644 index 000000000..1ca8f0e25 --- /dev/null +++ b/pkg/plugins/steady/steady.go @@ -0,0 +1,74 @@ +package steady + +import ( + "context" + + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" + "k8s.io/apimachinery/pkg/util/sets" + clusterapiv1 "open-cluster-management.io/api/cluster/v1" + clusterapiv1alpha1 "open-cluster-management.io/api/cluster/v1alpha1" + "open-cluster-management.io/placement/pkg/plugins" +) + +const ( + placementLabel = "cluster.open-cluster-management.io/placement" + description = ` + Steady prioritizer ensure the existing decision is stabilized. The clusters that existing decisions + choose are given the highest score while the clusters with no existing decisions are given the lowest + score. + ` +) + +var _ plugins.Prioritizer = &Steady{} + +type Steady struct { + handle plugins.Handle +} + +func New(handle plugins.Handle) *Steady { + return &Steady{handle: handle} +} + +func (s *Steady) Name() string { + return "steady" +} + +func (s *Steady) Description() string { + return description +} + +func (s *Steady) Score( + ctx context.Context, placement *clusterapiv1alpha1.Placement, clusters []*clusterapiv1.ManagedCluster) (map[string]int64, error) { + // query placementdecisions with label selector + scores := map[string]int64{} + requirement, err := labels.NewRequirement(placementLabel, selection.Equals, []string{placement.Name}) + + if err != nil { + return nil, err + } + + labelSelector := labels.NewSelector().Add(*requirement) + decisions, err := s.handle.DecisionLister().PlacementDecisions(placement.Namespace).List(labelSelector) + + if err != nil { + return nil, err + } + + existingDecisions := sets.String{} + for _, decision := range decisions { + for _, d := range decision.Status.Decisions { + existingDecisions.Insert(d.ClusterName) + } + } + + for _, cluster := range clusters { + if existingDecisions.Has(cluster.Name) { + scores[cluster.Name] = plugins.MaxClusterScore + } else { + scores[cluster.Name] = 0 + } + } + + return scores, nil +} diff --git a/pkg/plugins/steady/steady_test.go b/pkg/plugins/steady/steady_test.go new file mode 100644 index 000000000..4874debb8 --- /dev/null +++ b/pkg/plugins/steady/steady_test.go @@ -0,0 +1,78 @@ +package steady + +import ( + "context" + "testing" + + apiequality "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/runtime" + clusterapiv1 "open-cluster-management.io/api/cluster/v1" + clusterapiv1alpha1 "open-cluster-management.io/api/cluster/v1alpha1" + testinghelpers "open-cluster-management.io/placement/pkg/helpers/testing" +) + +func TestScoreClusterWithSteady(t *testing.T) { + cases := []struct { + name string + placement *clusterapiv1alpha1.Placement + clusters []*clusterapiv1.ManagedCluster + existingDecisions []runtime.Object + expectedScores map[string]int64 + }{ + { + name: "no decisions", + placement: testinghelpers.NewPlacement("test", "test").Build(), + clusters: []*clusterapiv1.ManagedCluster{ + testinghelpers.NewManagedCluster("cluster1").Build(), + testinghelpers.NewManagedCluster("cluster2").Build(), + testinghelpers.NewManagedCluster("cluster3").Build(), + }, + existingDecisions: []runtime.Object{}, + expectedScores: map[string]int64{"cluster1": 0, "cluster2": 0, "cluster3": 0}, + }, + { + name: "one decisions", + placement: testinghelpers.NewPlacement("test", "test").Build(), + clusters: []*clusterapiv1.ManagedCluster{ + testinghelpers.NewManagedCluster("cluster1").Build(), + testinghelpers.NewManagedCluster("cluster2").Build(), + testinghelpers.NewManagedCluster("cluster3").Build(), + }, + existingDecisions: []runtime.Object{ + testinghelpers.NewPlacementDecision("test", "test1").WithLabel(placementLabel, "test").WithDecisions("cluster1").Build(), + }, + expectedScores: map[string]int64{"cluster1": 100, "cluster2": 0, "cluster3": 0}, + }, + { + name: "one decisions", + placement: testinghelpers.NewPlacement("test", "test").Build(), + clusters: []*clusterapiv1.ManagedCluster{ + testinghelpers.NewManagedCluster("cluster1").Build(), + testinghelpers.NewManagedCluster("cluster2").Build(), + testinghelpers.NewManagedCluster("cluster3").Build(), + }, + existingDecisions: []runtime.Object{ + testinghelpers.NewPlacementDecision("test", "test1").WithLabel(placementLabel, "test").WithDecisions("cluster1").Build(), + testinghelpers.NewPlacementDecision("test", "test2").WithLabel(placementLabel, "test").WithDecisions("cluster3").Build(), + }, + expectedScores: map[string]int64{"cluster1": 100, "cluster2": 0, "cluster3": 100}, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + steady := &Steady{ + handle: testinghelpers.NewFakePluginHandle(t, nil, c.existingDecisions...), + } + + scores, err := steady.Score(context.TODO(), c.placement, c.clusters) + if err != nil { + t.Errorf("Expect no error, but got %v", err) + } + + if !apiequality.Semantic.DeepEqual(scores, c.expectedScores) { + t.Errorf("Expect score %v, but got %v", c.expectedScores, scores) + } + }) + } +}