diff --git a/pkg/controllers/scheduling/schedule.go b/pkg/controllers/scheduling/schedule.go index e48cf4ad2..242738dd7 100644 --- a/pkg/controllers/scheduling/schedule.go +++ b/pkg/controllers/scheduling/schedule.go @@ -6,9 +6,12 @@ import ( "reflect" "sort" + errorhelpers "github.com/openshift/library-go/pkg/operator/v1helpers" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" + "k8s.io/apimachinery/pkg/util/sets" clusterclient "github.com/open-cluster-management/api/client/cluster/clientset/versioned" clusterlisterv1alpha1 "github.com/open-cluster-management/api/client/cluster/listers/cluster/v1alpha1" @@ -16,6 +19,10 @@ import ( clusterapiv1alpha1 "github.com/open-cluster-management/api/cluster/v1alpha1" ) +const ( + maxNumOfClusterDecisions = 100 +) + type scheduleFunc func( ctx context.Context, placement *clusterapiv1alpha1.Placement, @@ -86,7 +93,7 @@ func selectClusters(placement *clusterapiv1alpha1.Placement, clusters []*cluster } // bind updates the cluster decisions in the status of the placementdecisions with the given -// cluster decision slice. New placementdecision will be created if no one exists. +// cluster decision slice. New placementdecisions will be created if no one exists. func bind( ctx context.Context, placement *clusterapiv1alpha1.Placement, @@ -94,7 +101,44 @@ func bind( clusterClient clusterclient.Interface, placementDecisionLister clusterlisterv1alpha1.PlacementDecisionLister, ) error { - // query placementdecisions with label selector + // sort clusterdecisions by cluster name + sort.SliceStable(clusterDecisions, func(i, j int) bool { + return clusterDecisions[i].ClusterName < clusterDecisions[j].ClusterName + }) + + // split the cluster decisions into slices, the size of each slice cannot exceed + // maxNumOfClusterDecisions. + decisionSlices := [][]clusterapiv1alpha1.ClusterDecision{} + remainingDecisions := clusterDecisions + for index := 0; len(remainingDecisions) > 0; index++ { + var decisionSlice []clusterapiv1alpha1.ClusterDecision + switch { + case len(remainingDecisions) > maxNumOfClusterDecisions: + decisionSlice = remainingDecisions[0:maxNumOfClusterDecisions] + remainingDecisions = remainingDecisions[maxNumOfClusterDecisions:] + default: + decisionSlice = remainingDecisions + remainingDecisions = nil + } + decisionSlices = append(decisionSlices, decisionSlice) + } + + // bind cluster decision slices to placementdecisions. + errs := []error{} + placementDecisionNames := sets.NewString() + for index, decisionSlice := range decisionSlices { + placementDecisionName := fmt.Sprintf("%s-decision-%d", placement.Name, index+1) + placementDecisionNames.Insert(placementDecisionName) + err := createOrUpdatePlacementDecision(ctx, placement, placementDecisionName, decisionSlice, clusterClient, placementDecisionLister) + if err != nil { + errs = append(errs, err) + } + } + if len(errs) != 0 { + return errorhelpers.NewMultiLineAggregate(errs) + } + + // query all placementdecisions of the placement requirement, err := labels.NewRequirement(placementLabel, selection.Equals, []string{placement.Name}) if err != nil { return err @@ -105,45 +149,69 @@ func bind( return err } - // TODO: support multiple placementdecisions for a placement - var placementDecision *clusterapiv1alpha1.PlacementDecision + // delete redundant placementdecisions + errs = []error{} + for _, placementDecision := range placementDecisions { + if placementDecisionNames.Has(placementDecision.Name) { + continue + } + err := clusterClient.ClusterV1alpha1().PlacementDecisions(placementDecision.Namespace).Delete(ctx, placementDecision.Name, metav1.DeleteOptions{}) + if errors.IsNotFound(err) { + continue + } + if err != nil { + errs = append(errs, err) + } + } + return errorhelpers.NewMultiLineAggregate(errs) +} + +// createOrUpdatePlacementDecision creates a new PlacementDecision if it does not exist and +// then updates the status with the given ClusterDecision slice if necessary +func createOrUpdatePlacementDecision( + ctx context.Context, + placement *clusterapiv1alpha1.Placement, + placementDecisionName string, + clusterDecisions []clusterapiv1alpha1.ClusterDecision, + clusterClient clusterclient.Interface, + placementDecisionLister clusterlisterv1alpha1.PlacementDecisionLister, +) error { + if len(clusterDecisions) > maxNumOfClusterDecisions { + return fmt.Errorf("the number of clusterdecisions %q exceeds the max limitation %q", len(clusterDecisions), maxNumOfClusterDecisions) + } + + placementDecision, err := placementDecisionLister.PlacementDecisions(placement.Namespace).Get(placementDecisionName) switch { - case len(placementDecisions) > 0: - placementDecision = placementDecisions[0] - default: - // create a placementdecision if not exists + case errors.IsNotFound(err): + // create the placementdecision if not exists owner := metav1.NewControllerRef(placement, clusterapiv1alpha1.GroupVersion.WithKind("Placement")) placementDecision = &clusterapiv1alpha1.PlacementDecision{ ObjectMeta: metav1.ObjectMeta{ - GenerateName: fmt.Sprintf("%s-", placement.Name), - Namespace: placement.Namespace, + Name: placementDecisionName, + Namespace: placement.Namespace, Labels: map[string]string{ placementLabel: placement.Name, }, OwnerReferences: []metav1.OwnerReference{*owner}, }, } + var err error placementDecision, err = clusterClient.ClusterV1alpha1().PlacementDecisions(placement.Namespace).Create(ctx, placementDecision, metav1.CreateOptions{}) if err != nil { return err } + case err != nil: + return err } - // sort by cluster name - sort.SliceStable(clusterDecisions, func(i, j int) bool { - return clusterDecisions[i].ClusterName < clusterDecisions[j].ClusterName - }) - - // update the status of the placementdecision if necessary + // update the status of the placementdecision if decisions change if reflect.DeepEqual(placementDecision.Status.Decisions, clusterDecisions) { return nil } + newPlacementDecision := placementDecision.DeepCopy() newPlacementDecision.Status.Decisions = clusterDecisions _, err = clusterClient.ClusterV1alpha1().PlacementDecisions(newPlacementDecision.Namespace). UpdateStatus(ctx, newPlacementDecision, metav1.UpdateOptions{}) - if err != nil { - return err - } - return nil + return err } diff --git a/pkg/controllers/scheduling/schedule_test.go b/pkg/controllers/scheduling/schedule_test.go index fa5b149d5..3cc73849d 100644 --- a/pkg/controllers/scheduling/schedule_test.go +++ b/pkg/controllers/scheduling/schedule_test.go @@ -2,10 +2,13 @@ package scheduling import ( "context" + "fmt" + "sort" "strings" "testing" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/sets" clienttesting "k8s.io/client-go/testing" @@ -19,7 +22,6 @@ func TestSchedule(t *testing.T) { clusterSetName := "clusterSets" placementNamespace := "ns1" placementName := "placement1" - placementDecisionName := "placement1-decision1" cases := []struct { name string @@ -84,7 +86,7 @@ func TestSchedule(t *testing.T) { initObjs: []runtime.Object{ testinghelpers.NewClusterSet(clusterSetName), testinghelpers.NewClusterSetBinding(placementNamespace, clusterSetName), - testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName). + testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName(placementName, 1)). WithLabel(placementLabel, placementName). WithDecisions("cluster1", "cluster2").Build(), }, @@ -103,8 +105,9 @@ func TestSchedule(t *testing.T) { initObjs: []runtime.Object{ testinghelpers.NewClusterSet(clusterSetName), testinghelpers.NewClusterSetBinding(placementNamespace, clusterSetName), - testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName). - WithLabel(placementLabel, placementName).WithDecisions("cluster1").Build(), + testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName(placementName, 1)). + WithLabel(placementLabel, placementName). + WithDecisions("cluster1").Build(), }, clusters: []*clusterapiv1.ManagedCluster{ testinghelpers.NewManagedCluster("cluster1").WithLabel(clusterSetLabel, clusterSetName).Build(), @@ -131,8 +134,9 @@ func TestSchedule(t *testing.T) { initObjs: []runtime.Object{ testinghelpers.NewClusterSet(clusterSetName), testinghelpers.NewClusterSetBinding(placementNamespace, clusterSetName), - testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName). - WithLabel(placementLabel, placementName).WithDecisions("cluster1").Build(), + testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName(placementName, 1)). + WithLabel(placementLabel, placementName). + WithDecisions("cluster1").Build(), }, clusters: []*clusterapiv1.ManagedCluster{ testinghelpers.NewManagedCluster("cluster1").WithLabel(clusterSetLabel, clusterSetName).Build(), @@ -171,6 +175,152 @@ func TestSchedule(t *testing.T) { } } +func TestBind(t *testing.T) { + placementNamespace := "ns1" + placementName := "placement1" + + cases := []struct { + name string + initObjs []runtime.Object + clusterDecisions []clusterapiv1alpha1.ClusterDecision + validateActions func(t *testing.T, actions []clienttesting.Action) + }{ + { + name: "create single placementdecision", + clusterDecisions: newClusterDecisions(10), + validateActions: func(t *testing.T, actions []clienttesting.Action) { + testinghelpers.AssertActions(t, actions, "create", "update") + actual := actions[1].(clienttesting.UpdateActionImpl).Object + placementDecision, ok := actual.(*clusterapiv1alpha1.PlacementDecision) + if !ok { + t.Errorf("expected PlacementDecision was updated") + } + assertClustersSelected(t, placementDecision.Status.Decisions, newSelectedClusters(10, false)...) + }, + }, + { + name: "create multiple placementdecisions", + clusterDecisions: newClusterDecisions(101), + validateActions: func(t *testing.T, actions []clienttesting.Action) { + testinghelpers.AssertActions(t, actions, "create", "update", "create", "update") + selectedClusters := newSelectedClusters(101, true) + actual := actions[1].(clienttesting.UpdateActionImpl).Object + placementDecision, ok := actual.(*clusterapiv1alpha1.PlacementDecision) + if !ok { + t.Errorf("expected PlacementDecision was updated") + } + assertClustersSelected(t, placementDecision.Status.Decisions, selectedClusters[0:100]...) + + actual = actions[3].(clienttesting.UpdateActionImpl).Object + placementDecision, ok = actual.(*clusterapiv1alpha1.PlacementDecision) + if !ok { + t.Errorf("expected PlacementDecision was updated") + } + assertClustersSelected(t, placementDecision.Status.Decisions, selectedClusters[100:]...) + }, + }, + { + name: "no change", + clusterDecisions: newClusterDecisions(128), + initObjs: []runtime.Object{ + testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName(placementName, 1)). + WithLabel(placementLabel, placementName). + WithDecisions(newSelectedClusters(128, true)[:100]...).Build(), + testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName(placementName, 2)). + WithLabel(placementLabel, placementName). + WithDecisions(newSelectedClusters(128, true)[100:]...).Build(), + }, + validateActions: testinghelpers.AssertNoActions, + }, + { + name: "update one of placementdecisions", + clusterDecisions: newClusterDecisions(128), + initObjs: []runtime.Object{ + testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName(placementName, 1)). + WithLabel(placementLabel, placementName). + WithDecisions(newSelectedClusters(128, true)[:100]...).Build(), + }, + validateActions: func(t *testing.T, actions []clienttesting.Action) { + testinghelpers.AssertActions(t, actions, "create", "update") + selectedClusters := newSelectedClusters(128, true) + actual := actions[1].(clienttesting.UpdateActionImpl).Object + placementDecision, ok := actual.(*clusterapiv1alpha1.PlacementDecision) + if !ok { + t.Errorf("expected PlacementDecision was updated") + } + assertClustersSelected(t, placementDecision.Status.Decisions, selectedClusters[100:]...) + }, + }, + { + name: "delete redundant placementdecisions", + clusterDecisions: newClusterDecisions(10), + initObjs: []runtime.Object{ + testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName(placementName, 1)). + WithLabel(placementLabel, placementName). + WithDecisions(newSelectedClusters(128, true)[:100]...).Build(), + testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName(placementName, 2)). + WithLabel(placementLabel, placementName). + WithDecisions(newSelectedClusters(128, true)[100:]...).Build(), + }, + validateActions: func(t *testing.T, actions []clienttesting.Action) { + testinghelpers.AssertActions(t, actions, "update", "delete") + actual := actions[0].(clienttesting.UpdateActionImpl).Object + placementDecision, ok := actual.(*clusterapiv1alpha1.PlacementDecision) + if !ok { + t.Errorf("expected PlacementDecision was updated") + } + assertClustersSelected(t, placementDecision.Status.Decisions, newSelectedClusters(10, false)...) + }, + }, + { + name: "delete all placementdecisions", + clusterDecisions: newClusterDecisions(0), + initObjs: []runtime.Object{ + testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName(placementName, 1)). + WithLabel(placementLabel, placementName). + WithDecisions(newSelectedClusters(128, true)[:100]...).Build(), + testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName(placementName, 2)). + WithLabel(placementLabel, placementName). + WithDecisions(newSelectedClusters(128, true)[100:]...).Build(), + }, + validateActions: func(t *testing.T, actions []clienttesting.Action) { + testinghelpers.AssertActions(t, actions, "delete", "delete") + }, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + clusterClient := clusterfake.NewSimpleClientset(c.initObjs...) + + // GenerateName is not working for fake clent, set the name with random suffix + clusterClient.PrependReactor( + "create", + "placementdecisions", + func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) { + createAction := action.(clienttesting.CreateActionImpl) + pd := createAction.Object.(*clusterapiv1alpha1.PlacementDecision) + pd.Name = fmt.Sprintf("%s%s", pd.GenerateName, rand.String(5)) + return false, pd, nil + }, + ) + + clusterInformerFactory := testinghelpers.NewClusterInformerFactory(clusterClient, c.initObjs...) + err := bind( + context.TODO(), + testinghelpers.NewPlacement(placementNamespace, placementName).Build(), + c.clusterDecisions, + clusterClient, + clusterInformerFactory.Cluster().V1alpha1().PlacementDecisions().Lister(), + ) + if err != nil { + t.Errorf("unexpected err: %v", err) + } + c.validateActions(t, clusterClient.Actions()) + }) + } +} + func assertClustersSelected(t *testing.T, decisons []clusterapiv1alpha1.ClusterDecision, clusterNames ...string) { names := sets.NewString(clusterNames...) for _, decision := range decisons { @@ -183,3 +333,30 @@ func assertClustersSelected(t *testing.T, decisons []clusterapiv1alpha1.ClusterD t.Errorf("expected clusters selected: %s", strings.Join(names.UnsortedList(), ",")) } } + +func newClusterDecisions(num int) (decisions []clusterapiv1alpha1.ClusterDecision) { + for i := 0; i < num; i++ { + decisions = append(decisions, clusterapiv1alpha1.ClusterDecision{ + ClusterName: fmt.Sprintf("cluster%d", i+1), + }) + } + return decisions +} + +func newSelectedClusters(num int, sortWithName bool) (clusters []string) { + for i := 0; i < num; i++ { + clusters = append(clusters, fmt.Sprintf("cluster%d", i+1)) + } + if !sortWithName { + return clusters + } + // sort cluster by name + sort.SliceStable(clusters, func(i, j int) bool { + return clusters[i] < clusters[j] + }) + return clusters +} + +func placementDecisionName(placementName string, index int) string { + return fmt.Sprintf("%s-decision-%d", placementName, index) +} diff --git a/test/integration/placement_test.go b/test/integration/placement_test.go index 55757f04c..fe02718a7 100644 --- a/test/integration/placement_test.go +++ b/test/integration/placement_test.go @@ -19,8 +19,9 @@ import ( ) const ( - clusterSetLabel = "cluster.open-cluster-management.io/clusterset" - placementLabel = "cluster.open-cluster-management.io/placement" + clusterSetLabel = "cluster.open-cluster-management.io/clusterset" + placementLabel = "cluster.open-cluster-management.io/placement" + maxNumOfClusterDecisions = 100 ) var _ = ginkgo.Describe("Placement", func() { @@ -98,6 +99,10 @@ var _ = ginkgo.Describe("Placement", func() { assertNumberOfDecisions := func(placementName string, desiredNOD int) { ginkgo.By("Check the number of decisions in placementdecisions") + desiredNOPD := desiredNOD / maxNumOfClusterDecisions + if desiredNOD%maxNumOfClusterDecisions != 0 { + desiredNOPD++ + } gomega.Eventually(func() bool { pdl, err := clusterClient.ClusterV1alpha1().PlacementDecisions(namespace).List(context.Background(), metav1.ListOptions{ LabelSelector: placementLabel + "=" + placementName, @@ -105,7 +110,7 @@ var _ = ginkgo.Describe("Placement", func() { if err != nil { return false } - if len(pdl.Items) == 0 { + if len(pdl.Items) != desiredNOPD { return false } actualNOD := 0 @@ -341,6 +346,16 @@ var _ = ginkgo.Describe("Placement", func() { assertNumberOfDecisions(placementName, nod) assertPlacementStatus(placementName, nod, false) }) + + ginkgo.It("Should create multiple placementdecisions once scheduled", func() { + assertBindingClusterSet(clusterSet1Name) + assertCreatingClusters(clusterSet1Name, 101) + assertCreatingPlacement(placementName, nil, 101) + + nod := 101 + assertNumberOfDecisions(placementName, nod) + assertPlacementStatus(placementName, nod, true) + }) }) })