Merge pull request #13 from elgnay/multi-pds

support multiple placementdecisions
This commit is contained in:
OpenShift Merge Robot
2021-06-09 04:54:11 +02:00
committed by GitHub
3 changed files with 289 additions and 29 deletions

View File

@@ -6,9 +6,12 @@ import (
"reflect"
"sort"
errorhelpers "github.com/openshift/library-go/pkg/operator/v1helpers"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/util/sets"
clusterclient "github.com/open-cluster-management/api/client/cluster/clientset/versioned"
clusterlisterv1alpha1 "github.com/open-cluster-management/api/client/cluster/listers/cluster/v1alpha1"
@@ -16,6 +19,10 @@ import (
clusterapiv1alpha1 "github.com/open-cluster-management/api/cluster/v1alpha1"
)
const (
maxNumOfClusterDecisions = 100
)
type scheduleFunc func(
ctx context.Context,
placement *clusterapiv1alpha1.Placement,
@@ -86,7 +93,7 @@ func selectClusters(placement *clusterapiv1alpha1.Placement, clusters []*cluster
}
// bind updates the cluster decisions in the status of the placementdecisions with the given
// cluster decision slice. New placementdecision will be created if no one exists.
// cluster decision slice. New placementdecisions will be created if no one exists.
func bind(
ctx context.Context,
placement *clusterapiv1alpha1.Placement,
@@ -94,7 +101,44 @@ func bind(
clusterClient clusterclient.Interface,
placementDecisionLister clusterlisterv1alpha1.PlacementDecisionLister,
) error {
// query placementdecisions with label selector
// sort clusterdecisions by cluster name
sort.SliceStable(clusterDecisions, func(i, j int) bool {
return clusterDecisions[i].ClusterName < clusterDecisions[j].ClusterName
})
// split the cluster decisions into slices, the size of each slice cannot exceed
// maxNumOfClusterDecisions.
decisionSlices := [][]clusterapiv1alpha1.ClusterDecision{}
remainingDecisions := clusterDecisions
for index := 0; len(remainingDecisions) > 0; index++ {
var decisionSlice []clusterapiv1alpha1.ClusterDecision
switch {
case len(remainingDecisions) > maxNumOfClusterDecisions:
decisionSlice = remainingDecisions[0:maxNumOfClusterDecisions]
remainingDecisions = remainingDecisions[maxNumOfClusterDecisions:]
default:
decisionSlice = remainingDecisions
remainingDecisions = nil
}
decisionSlices = append(decisionSlices, decisionSlice)
}
// bind cluster decision slices to placementdecisions.
errs := []error{}
placementDecisionNames := sets.NewString()
for index, decisionSlice := range decisionSlices {
placementDecisionName := fmt.Sprintf("%s-decision-%d", placement.Name, index+1)
placementDecisionNames.Insert(placementDecisionName)
err := createOrUpdatePlacementDecision(ctx, placement, placementDecisionName, decisionSlice, clusterClient, placementDecisionLister)
if err != nil {
errs = append(errs, err)
}
}
if len(errs) != 0 {
return errorhelpers.NewMultiLineAggregate(errs)
}
// query all placementdecisions of the placement
requirement, err := labels.NewRequirement(placementLabel, selection.Equals, []string{placement.Name})
if err != nil {
return err
@@ -105,45 +149,69 @@ func bind(
return err
}
// TODO: support multiple placementdecisions for a placement
var placementDecision *clusterapiv1alpha1.PlacementDecision
// delete redundant placementdecisions
errs = []error{}
for _, placementDecision := range placementDecisions {
if placementDecisionNames.Has(placementDecision.Name) {
continue
}
err := clusterClient.ClusterV1alpha1().PlacementDecisions(placementDecision.Namespace).Delete(ctx, placementDecision.Name, metav1.DeleteOptions{})
if errors.IsNotFound(err) {
continue
}
if err != nil {
errs = append(errs, err)
}
}
return errorhelpers.NewMultiLineAggregate(errs)
}
// createOrUpdatePlacementDecision creates a new PlacementDecision if it does not exist and
// then updates the status with the given ClusterDecision slice if necessary
func createOrUpdatePlacementDecision(
ctx context.Context,
placement *clusterapiv1alpha1.Placement,
placementDecisionName string,
clusterDecisions []clusterapiv1alpha1.ClusterDecision,
clusterClient clusterclient.Interface,
placementDecisionLister clusterlisterv1alpha1.PlacementDecisionLister,
) error {
if len(clusterDecisions) > maxNumOfClusterDecisions {
return fmt.Errorf("the number of clusterdecisions %q exceeds the max limitation %q", len(clusterDecisions), maxNumOfClusterDecisions)
}
placementDecision, err := placementDecisionLister.PlacementDecisions(placement.Namespace).Get(placementDecisionName)
switch {
case len(placementDecisions) > 0:
placementDecision = placementDecisions[0]
default:
// create a placementdecision if not exists
case errors.IsNotFound(err):
// create the placementdecision if not exists
owner := metav1.NewControllerRef(placement, clusterapiv1alpha1.GroupVersion.WithKind("Placement"))
placementDecision = &clusterapiv1alpha1.PlacementDecision{
ObjectMeta: metav1.ObjectMeta{
GenerateName: fmt.Sprintf("%s-", placement.Name),
Namespace: placement.Namespace,
Name: placementDecisionName,
Namespace: placement.Namespace,
Labels: map[string]string{
placementLabel: placement.Name,
},
OwnerReferences: []metav1.OwnerReference{*owner},
},
}
var err error
placementDecision, err = clusterClient.ClusterV1alpha1().PlacementDecisions(placement.Namespace).Create(ctx, placementDecision, metav1.CreateOptions{})
if err != nil {
return err
}
case err != nil:
return err
}
// sort by cluster name
sort.SliceStable(clusterDecisions, func(i, j int) bool {
return clusterDecisions[i].ClusterName < clusterDecisions[j].ClusterName
})
// update the status of the placementdecision if necessary
// update the status of the placementdecision if decisions change
if reflect.DeepEqual(placementDecision.Status.Decisions, clusterDecisions) {
return nil
}
newPlacementDecision := placementDecision.DeepCopy()
newPlacementDecision.Status.Decisions = clusterDecisions
_, err = clusterClient.ClusterV1alpha1().PlacementDecisions(newPlacementDecision.Namespace).
UpdateStatus(ctx, newPlacementDecision, metav1.UpdateOptions{})
if err != nil {
return err
}
return nil
return err
}

View File

@@ -2,10 +2,13 @@ package scheduling
import (
"context"
"fmt"
"sort"
"strings"
"testing"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/sets"
clienttesting "k8s.io/client-go/testing"
@@ -19,7 +22,6 @@ func TestSchedule(t *testing.T) {
clusterSetName := "clusterSets"
placementNamespace := "ns1"
placementName := "placement1"
placementDecisionName := "placement1-decision1"
cases := []struct {
name string
@@ -84,7 +86,7 @@ func TestSchedule(t *testing.T) {
initObjs: []runtime.Object{
testinghelpers.NewClusterSet(clusterSetName),
testinghelpers.NewClusterSetBinding(placementNamespace, clusterSetName),
testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName).
testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName(placementName, 1)).
WithLabel(placementLabel, placementName).
WithDecisions("cluster1", "cluster2").Build(),
},
@@ -103,8 +105,9 @@ func TestSchedule(t *testing.T) {
initObjs: []runtime.Object{
testinghelpers.NewClusterSet(clusterSetName),
testinghelpers.NewClusterSetBinding(placementNamespace, clusterSetName),
testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName).
WithLabel(placementLabel, placementName).WithDecisions("cluster1").Build(),
testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName(placementName, 1)).
WithLabel(placementLabel, placementName).
WithDecisions("cluster1").Build(),
},
clusters: []*clusterapiv1.ManagedCluster{
testinghelpers.NewManagedCluster("cluster1").WithLabel(clusterSetLabel, clusterSetName).Build(),
@@ -131,8 +134,9 @@ func TestSchedule(t *testing.T) {
initObjs: []runtime.Object{
testinghelpers.NewClusterSet(clusterSetName),
testinghelpers.NewClusterSetBinding(placementNamespace, clusterSetName),
testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName).
WithLabel(placementLabel, placementName).WithDecisions("cluster1").Build(),
testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName(placementName, 1)).
WithLabel(placementLabel, placementName).
WithDecisions("cluster1").Build(),
},
clusters: []*clusterapiv1.ManagedCluster{
testinghelpers.NewManagedCluster("cluster1").WithLabel(clusterSetLabel, clusterSetName).Build(),
@@ -171,6 +175,152 @@ func TestSchedule(t *testing.T) {
}
}
func TestBind(t *testing.T) {
placementNamespace := "ns1"
placementName := "placement1"
cases := []struct {
name string
initObjs []runtime.Object
clusterDecisions []clusterapiv1alpha1.ClusterDecision
validateActions func(t *testing.T, actions []clienttesting.Action)
}{
{
name: "create single placementdecision",
clusterDecisions: newClusterDecisions(10),
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testinghelpers.AssertActions(t, actions, "create", "update")
actual := actions[1].(clienttesting.UpdateActionImpl).Object
placementDecision, ok := actual.(*clusterapiv1alpha1.PlacementDecision)
if !ok {
t.Errorf("expected PlacementDecision was updated")
}
assertClustersSelected(t, placementDecision.Status.Decisions, newSelectedClusters(10, false)...)
},
},
{
name: "create multiple placementdecisions",
clusterDecisions: newClusterDecisions(101),
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testinghelpers.AssertActions(t, actions, "create", "update", "create", "update")
selectedClusters := newSelectedClusters(101, true)
actual := actions[1].(clienttesting.UpdateActionImpl).Object
placementDecision, ok := actual.(*clusterapiv1alpha1.PlacementDecision)
if !ok {
t.Errorf("expected PlacementDecision was updated")
}
assertClustersSelected(t, placementDecision.Status.Decisions, selectedClusters[0:100]...)
actual = actions[3].(clienttesting.UpdateActionImpl).Object
placementDecision, ok = actual.(*clusterapiv1alpha1.PlacementDecision)
if !ok {
t.Errorf("expected PlacementDecision was updated")
}
assertClustersSelected(t, placementDecision.Status.Decisions, selectedClusters[100:]...)
},
},
{
name: "no change",
clusterDecisions: newClusterDecisions(128),
initObjs: []runtime.Object{
testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName(placementName, 1)).
WithLabel(placementLabel, placementName).
WithDecisions(newSelectedClusters(128, true)[:100]...).Build(),
testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName(placementName, 2)).
WithLabel(placementLabel, placementName).
WithDecisions(newSelectedClusters(128, true)[100:]...).Build(),
},
validateActions: testinghelpers.AssertNoActions,
},
{
name: "update one of placementdecisions",
clusterDecisions: newClusterDecisions(128),
initObjs: []runtime.Object{
testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName(placementName, 1)).
WithLabel(placementLabel, placementName).
WithDecisions(newSelectedClusters(128, true)[:100]...).Build(),
},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testinghelpers.AssertActions(t, actions, "create", "update")
selectedClusters := newSelectedClusters(128, true)
actual := actions[1].(clienttesting.UpdateActionImpl).Object
placementDecision, ok := actual.(*clusterapiv1alpha1.PlacementDecision)
if !ok {
t.Errorf("expected PlacementDecision was updated")
}
assertClustersSelected(t, placementDecision.Status.Decisions, selectedClusters[100:]...)
},
},
{
name: "delete redundant placementdecisions",
clusterDecisions: newClusterDecisions(10),
initObjs: []runtime.Object{
testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName(placementName, 1)).
WithLabel(placementLabel, placementName).
WithDecisions(newSelectedClusters(128, true)[:100]...).Build(),
testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName(placementName, 2)).
WithLabel(placementLabel, placementName).
WithDecisions(newSelectedClusters(128, true)[100:]...).Build(),
},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testinghelpers.AssertActions(t, actions, "update", "delete")
actual := actions[0].(clienttesting.UpdateActionImpl).Object
placementDecision, ok := actual.(*clusterapiv1alpha1.PlacementDecision)
if !ok {
t.Errorf("expected PlacementDecision was updated")
}
assertClustersSelected(t, placementDecision.Status.Decisions, newSelectedClusters(10, false)...)
},
},
{
name: "delete all placementdecisions",
clusterDecisions: newClusterDecisions(0),
initObjs: []runtime.Object{
testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName(placementName, 1)).
WithLabel(placementLabel, placementName).
WithDecisions(newSelectedClusters(128, true)[:100]...).Build(),
testinghelpers.NewPlacementDecision(placementNamespace, placementDecisionName(placementName, 2)).
WithLabel(placementLabel, placementName).
WithDecisions(newSelectedClusters(128, true)[100:]...).Build(),
},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testinghelpers.AssertActions(t, actions, "delete", "delete")
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
clusterClient := clusterfake.NewSimpleClientset(c.initObjs...)
// GenerateName is not working for fake clent, set the name with random suffix
clusterClient.PrependReactor(
"create",
"placementdecisions",
func(action clienttesting.Action) (handled bool, ret runtime.Object, err error) {
createAction := action.(clienttesting.CreateActionImpl)
pd := createAction.Object.(*clusterapiv1alpha1.PlacementDecision)
pd.Name = fmt.Sprintf("%s%s", pd.GenerateName, rand.String(5))
return false, pd, nil
},
)
clusterInformerFactory := testinghelpers.NewClusterInformerFactory(clusterClient, c.initObjs...)
err := bind(
context.TODO(),
testinghelpers.NewPlacement(placementNamespace, placementName).Build(),
c.clusterDecisions,
clusterClient,
clusterInformerFactory.Cluster().V1alpha1().PlacementDecisions().Lister(),
)
if err != nil {
t.Errorf("unexpected err: %v", err)
}
c.validateActions(t, clusterClient.Actions())
})
}
}
func assertClustersSelected(t *testing.T, decisons []clusterapiv1alpha1.ClusterDecision, clusterNames ...string) {
names := sets.NewString(clusterNames...)
for _, decision := range decisons {
@@ -183,3 +333,30 @@ func assertClustersSelected(t *testing.T, decisons []clusterapiv1alpha1.ClusterD
t.Errorf("expected clusters selected: %s", strings.Join(names.UnsortedList(), ","))
}
}
func newClusterDecisions(num int) (decisions []clusterapiv1alpha1.ClusterDecision) {
for i := 0; i < num; i++ {
decisions = append(decisions, clusterapiv1alpha1.ClusterDecision{
ClusterName: fmt.Sprintf("cluster%d", i+1),
})
}
return decisions
}
func newSelectedClusters(num int, sortWithName bool) (clusters []string) {
for i := 0; i < num; i++ {
clusters = append(clusters, fmt.Sprintf("cluster%d", i+1))
}
if !sortWithName {
return clusters
}
// sort cluster by name
sort.SliceStable(clusters, func(i, j int) bool {
return clusters[i] < clusters[j]
})
return clusters
}
func placementDecisionName(placementName string, index int) string {
return fmt.Sprintf("%s-decision-%d", placementName, index)
}

View File

@@ -19,8 +19,9 @@ import (
)
const (
clusterSetLabel = "cluster.open-cluster-management.io/clusterset"
placementLabel = "cluster.open-cluster-management.io/placement"
clusterSetLabel = "cluster.open-cluster-management.io/clusterset"
placementLabel = "cluster.open-cluster-management.io/placement"
maxNumOfClusterDecisions = 100
)
var _ = ginkgo.Describe("Placement", func() {
@@ -98,6 +99,10 @@ var _ = ginkgo.Describe("Placement", func() {
assertNumberOfDecisions := func(placementName string, desiredNOD int) {
ginkgo.By("Check the number of decisions in placementdecisions")
desiredNOPD := desiredNOD / maxNumOfClusterDecisions
if desiredNOD%maxNumOfClusterDecisions != 0 {
desiredNOPD++
}
gomega.Eventually(func() bool {
pdl, err := clusterClient.ClusterV1alpha1().PlacementDecisions(namespace).List(context.Background(), metav1.ListOptions{
LabelSelector: placementLabel + "=" + placementName,
@@ -105,7 +110,7 @@ var _ = ginkgo.Describe("Placement", func() {
if err != nil {
return false
}
if len(pdl.Items) == 0 {
if len(pdl.Items) != desiredNOPD {
return false
}
actualNOD := 0
@@ -341,6 +346,16 @@ var _ = ginkgo.Describe("Placement", func() {
assertNumberOfDecisions(placementName, nod)
assertPlacementStatus(placementName, nod, false)
})
ginkgo.It("Should create multiple placementdecisions once scheduled", func() {
assertBindingClusterSet(clusterSet1Name)
assertCreatingClusters(clusterSet1Name, 101)
assertCreatingPlacement(placementName, nil, 101)
nod := 101
assertNumberOfDecisions(placementName, nod)
assertPlacementStatus(placementName, nod, true)
})
})
})