watch placementscore during scheduling (#89)

* watch placementscore during scheduling

refactor eventhandler using index

Signed-off-by: Jian Qiu <jqiu@redhat.com>

* Filter placement when enqueue score

Signed-off-by: Jian Qiu <jqiu@redhat.com>

Signed-off-by: Jian Qiu <jqiu@redhat.com>
This commit is contained in:
Jian Qiu
2022-11-29 09:55:53 +08:00
committed by GitHub
parent fafcfbfdbe
commit 5d33dd956c
11 changed files with 729 additions and 796 deletions

View File

@@ -62,17 +62,7 @@ func RunControllerManager(ctx context.Context, controllerContext *controllercmd.
clusterInformers.Cluster().V1beta2().ManagedClusterSetBindings(),
clusterInformers.Cluster().V1beta1().Placements(),
clusterInformers.Cluster().V1beta1().PlacementDecisions(),
scheduler,
controllerContext.EventRecorder, recorder,
)
schedulingControllerResync := scheduling.NewSchedulingControllerResync(
clusterClient,
clusterInformers.Cluster().V1().ManagedClusters(),
clusterInformers.Cluster().V1beta2().ManagedClusterSets(),
clusterInformers.Cluster().V1beta2().ManagedClusterSetBindings(),
clusterInformers.Cluster().V1beta1().Placements(),
clusterInformers.Cluster().V1beta1().PlacementDecisions(),
clusterInformers.Cluster().V1alpha1().AddOnPlacementScores(),
scheduler,
controllerContext.EventRecorder, recorder,
)
@@ -80,7 +70,6 @@ func RunControllerManager(ctx context.Context, controllerContext *controllercmd.
go clusterInformers.Start(ctx.Done())
go schedulingController.Run(ctx, 1)
go schedulingControllerResync.Run(ctx, 1)
<-ctx.Done()
return nil

View File

@@ -4,26 +4,17 @@ import (
"fmt"
"reflect"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
cache "k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
clusterlisterv1beta1 "open-cluster-management.io/api/client/cluster/listers/cluster/v1beta1"
clusterlisterv1beta2 "open-cluster-management.io/api/client/cluster/listers/cluster/v1beta2"
clusterapiv1 "open-cluster-management.io/api/cluster/v1"
clusterapiv1beta2 "open-cluster-management.io/api/cluster/v1beta2"
)
type clusterEventHandler struct {
clusterSetLister clusterlisterv1beta2.ManagedClusterSetLister
clusterSetBindingLister clusterlisterv1beta2.ManagedClusterSetBindingLister
placementLister clusterlisterv1beta1.PlacementLister
enqueuePlacementFunc enqueuePlacementFunc
enqueuer *enqueuer
}
func (h *clusterEventHandler) OnAdd(obj interface{}) {
h.onChange(obj)
h.enqueuer.enqueueCluster(obj)
}
func (h *clusterEventHandler) OnUpdate(oldObj, newObj interface{}) {
@@ -31,7 +22,7 @@ func (h *clusterEventHandler) OnUpdate(oldObj, newObj interface{}) {
if !ok {
return
}
h.onChange(newObj)
h.enqueuer.enqueueCluster(newObj)
if oldObj == nil {
return
@@ -43,65 +34,17 @@ func (h *clusterEventHandler) OnUpdate(oldObj, newObj interface{}) {
// if the cluster labels changes, process the original clusterset
if !reflect.DeepEqual(newCluster.Labels, oldCluster.Labels) {
h.onChange(oldCluster)
h.enqueuer.enqueueCluster(oldCluster)
}
}
func (h *clusterEventHandler) OnDelete(obj interface{}) {
switch t := obj.(type) {
case *clusterapiv1.ManagedCluster:
h.onChange(obj)
h.enqueuer.enqueueCluster(obj)
case cache.DeletedFinalStateUnknown:
h.onChange(t.Obj)
h.enqueuer.enqueueCluster(t.Obj)
default:
utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type"))
}
}
func (h *clusterEventHandler) onChange(obj interface{}) {
cluster, ok := obj.(metav1.Object)
if !ok {
utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type"))
return
}
clusterSetNames, err := h.getClusterSetNames(cluster)
if err != nil {
klog.V(4).Infof("Unable to get clusterset of cluster %q: %v", cluster.GetName(), err)
return
}
// skip cluster belongs to no clusterset
if len(clusterSetNames) == 0 {
return
}
for _, clusterSetName := range clusterSetNames {
// enqueue placements which might be impacted
err = enqueuePlacementsByClusterSet(
clusterSetName,
h.clusterSetBindingLister,
h.placementLister,
h.enqueuePlacementFunc,
)
if err != nil {
klog.Errorf("Unable to enqueue placements with access to clusterset %q: %v", clusterSetName, err)
}
}
}
// getClusterSetName returns the name of the clusterset the cluster belongs to. It also checks the existence
// of the clusterset.
func (h *clusterEventHandler) getClusterSetNames(cluster metav1.Object) ([]string, error) {
clusterSetNames := []string{}
clusterSets, err := clusterapiv1beta2.GetClusterSetsOfCluster(cluster.(*clusterapiv1.ManagedCluster), h.clusterSetLister)
if err != nil {
return clusterSetNames, err
}
for _, cs := range clusterSets {
clusterSetNames = append(clusterSetNames, cs.Name)
}
return clusterSetNames, nil
}

View File

@@ -1,7 +1,7 @@
package scheduling
import (
"fmt"
"k8s.io/client-go/util/workqueue"
"strings"
"testing"
@@ -93,19 +93,24 @@ func TestOnClusterChange(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
clusterClient := clusterfake.NewSimpleClientset(c.initObjs...)
clusterInformerFactory := testinghelpers.NewClusterInformerFactory(clusterClient, c.initObjs...)
clusterInformerFactory := newClusterInformerFactory(clusterClient, c.initObjs...)
syncCtx := testinghelpers.NewFakeSyncContext(t, "fake")
q := newEnqueuer(
syncCtx.Queue(),
clusterInformerFactory.Cluster().V1().ManagedClusters(),
clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets(),
clusterInformerFactory.Cluster().V1beta1().Placements(),
clusterInformerFactory.Cluster().V1beta2().ManagedClusterSetBindings(),
)
queuedKeys := sets.NewString()
handler := &clusterEventHandler{
clusterSetLister: clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets().Lister(),
clusterSetBindingLister: clusterInformerFactory.Cluster().V1beta2().ManagedClusterSetBindings().Lister(),
placementLister: clusterInformerFactory.Cluster().V1beta1().Placements().Lister(),
enqueuePlacementFunc: func(namespace, name string) {
queuedKeys.Insert(fmt.Sprintf("%s/%s", namespace, name))
},
fakeEnqueuePlacement := func(obj interface{}, queue workqueue.RateLimitingInterface) {
key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
queuedKeys.Insert(key)
}
q.enqueuePlacementFunc = fakeEnqueuePlacement
handler.onChange(c.obj)
q.enqueueCluster(c.obj)
expectedQueuedKeys := sets.NewString(c.queuedKeys...)
if !queuedKeys.Equal(expectedQueuedKeys) {
t.Errorf("expected queued placements %q, but got %s", strings.Join(expectedQueuedKeys.List(), ","), strings.Join(queuedKeys.List(), ","))
@@ -247,16 +252,24 @@ func TestOnClusterUpdate(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
clusterClient := clusterfake.NewSimpleClientset(c.initObjs...)
clusterInformerFactory := testinghelpers.NewClusterInformerFactory(clusterClient, c.initObjs...)
clusterInformerFactory := newClusterInformerFactory(clusterClient, c.initObjs...)
syncCtx := testinghelpers.NewFakeSyncContext(t, "fake")
q := newEnqueuer(
syncCtx.Queue(),
clusterInformerFactory.Cluster().V1().ManagedClusters(),
clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets(),
clusterInformerFactory.Cluster().V1beta1().Placements(),
clusterInformerFactory.Cluster().V1beta2().ManagedClusterSetBindings(),
)
queuedKeys := sets.NewString()
fakeEnqueuePlacement := func(obj interface{}, queue workqueue.RateLimitingInterface) {
key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
queuedKeys.Insert(key)
}
q.enqueuePlacementFunc = fakeEnqueuePlacement
handler := &clusterEventHandler{
clusterSetLister: clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets().Lister(),
clusterSetBindingLister: clusterInformerFactory.Cluster().V1beta2().ManagedClusterSetBindings().Lister(),
placementLister: clusterInformerFactory.Cluster().V1beta1().Placements().Lister(),
enqueuePlacementFunc: func(namespace, name string) {
queuedKeys.Insert(fmt.Sprintf("%s/%s", namespace, name))
},
enqueuer: q,
}
handler.OnUpdate(c.oldObj, c.newObj)
@@ -339,16 +352,24 @@ func TestOnClusterDelete(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
clusterClient := clusterfake.NewSimpleClientset(c.initObjs...)
clusterInformerFactory := testinghelpers.NewClusterInformerFactory(clusterClient, c.initObjs...)
clusterInformerFactory := newClusterInformerFactory(clusterClient, c.initObjs...)
syncCtx := testinghelpers.NewFakeSyncContext(t, "fake")
q := newEnqueuer(
syncCtx.Queue(),
clusterInformerFactory.Cluster().V1().ManagedClusters(),
clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets(),
clusterInformerFactory.Cluster().V1beta1().Placements(),
clusterInformerFactory.Cluster().V1beta2().ManagedClusterSetBindings(),
)
queuedKeys := sets.NewString()
fakeEnqueuePlacement := func(obj interface{}, queue workqueue.RateLimitingInterface) {
key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
queuedKeys.Insert(key)
}
q.enqueuePlacementFunc = fakeEnqueuePlacement
handler := &clusterEventHandler{
clusterSetLister: clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets().Lister(),
clusterSetBindingLister: clusterInformerFactory.Cluster().V1beta2().ManagedClusterSetBindings().Lister(),
placementLister: clusterInformerFactory.Cluster().V1beta1().Placements().Lister(),
enqueuePlacementFunc: func(namespace, name string) {
queuedKeys.Insert(fmt.Sprintf("%s/%s", namespace, name))
},
enqueuer: q,
}
handler.OnDelete(c.obj)

View File

@@ -1,97 +0,0 @@
package scheduling
import (
"fmt"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
cache "k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
errorhelpers "github.com/openshift/library-go/pkg/operator/v1helpers"
clusterlisterv1beta1 "open-cluster-management.io/api/client/cluster/listers/cluster/v1beta1"
clusterlisterv1beta2 "open-cluster-management.io/api/client/cluster/listers/cluster/v1beta2"
clusterapiv1beta2 "open-cluster-management.io/api/cluster/v1beta2"
)
type clusterSetEventHandler struct {
clusterSetBindingLister clusterlisterv1beta2.ManagedClusterSetBindingLister
placementLister clusterlisterv1beta1.PlacementLister
enqueuePlacementFunc enqueuePlacementFunc
}
func (h *clusterSetEventHandler) OnAdd(obj interface{}) {
h.onChange(obj)
}
func (h *clusterSetEventHandler) OnUpdate(oldObj, newObj interface{}) {
// ignore Update event
}
func (h *clusterSetEventHandler) OnDelete(obj interface{}) {
var clusterSetName string
switch t := obj.(type) {
case *clusterapiv1beta2.ManagedClusterSet:
clusterSetName = t.Name
case cache.DeletedFinalStateUnknown:
clusterSet, ok := t.Obj.(metav1.Object)
if !ok {
utilruntime.HandleError(fmt.Errorf("error decoding object tombstone, invalid type"))
return
}
clusterSetName = clusterSet.GetName()
default:
utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type"))
return
}
err := enqueuePlacementsByClusterSet(clusterSetName, h.clusterSetBindingLister,
h.placementLister, h.enqueuePlacementFunc)
if err != nil {
klog.Errorf("Unable to enqueue placements by clusterset %q: %v", clusterSetName, err)
}
}
func (h *clusterSetEventHandler) onChange(obj interface{}) {
accessor, err := meta.Accessor(obj)
if err != nil {
utilruntime.HandleError(fmt.Errorf("error accessing metadata: %w", err))
return
}
clusterSetName := accessor.GetName()
err = enqueuePlacementsByClusterSet(clusterSetName, h.clusterSetBindingLister,
h.placementLister, h.enqueuePlacementFunc)
if err != nil {
klog.Errorf("Unable to enqueue placements by clusterset %q: %v", clusterSetName, err)
}
}
// enqueuePlacementsByClusterSet enqueues placements that might be impacted by the given clusterset into
// controller queue for further reconciliation
func enqueuePlacementsByClusterSet(
clusterSetName string,
clusterSetBindingLister clusterlisterv1beta2.ManagedClusterSetBindingLister,
placementLister clusterlisterv1beta1.PlacementLister,
enqueuePlacementFunc enqueuePlacementFunc,
) error {
bindings, err := clusterSetBindingLister.List(labels.Everything())
if err != nil {
return err
}
errs := []error{}
for _, binding := range bindings {
if binding.Name != clusterSetName {
continue
}
if err := enqueuePlacementsByClusterSetBinding(binding.Namespace, binding.Name, placementLister, enqueuePlacementFunc); err != nil {
errs = append(errs, err)
}
}
return errorhelpers.NewMultiLineAggregate(errs)
}

View File

@@ -1,219 +0,0 @@
package scheduling
import (
"fmt"
"strings"
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
cache "k8s.io/client-go/tools/cache"
clusterfake "open-cluster-management.io/api/client/cluster/clientset/versioned/fake"
clusterapiv1beta2 "open-cluster-management.io/api/cluster/v1beta2"
testinghelpers "open-cluster-management.io/placement/pkg/helpers/testing"
)
func TestEnqueuePlacementsByClusterSet(t *testing.T) {
cases := []struct {
name string
clusterSetName string
initObjs []runtime.Object
queuedKeys []string
}{
{
name: "enqueue placements in a namespace",
clusterSetName: "clusterset1",
initObjs: []runtime.Object{
testinghelpers.NewClusterSet("clusterset1").Build(),
testinghelpers.NewClusterSet("clusterset2").Build(),
testinghelpers.NewClusterSetBinding("ns1", "clusterset1"),
testinghelpers.NewClusterSetBinding("ns1", "clusterset2"),
testinghelpers.NewPlacement("ns1", "placement1").Build(),
testinghelpers.NewPlacement("ns1", "placement2").WithClusterSets("clusterset2").Build(),
},
queuedKeys: []string{
"ns1/placement1",
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
clusterClient := clusterfake.NewSimpleClientset(c.initObjs...)
clusterInformerFactory := testinghelpers.NewClusterInformerFactory(clusterClient, c.initObjs...)
queuedKeys := sets.NewString()
err := enqueuePlacementsByClusterSet(
c.clusterSetName,
clusterInformerFactory.Cluster().V1beta2().ManagedClusterSetBindings().Lister(),
clusterInformerFactory.Cluster().V1beta1().Placements().Lister(),
func(namespace, name string) {
queuedKeys.Insert(fmt.Sprintf("%s/%s", namespace, name))
},
)
if err != nil {
t.Errorf("unexpected err: %v", err)
}
expectedQueuedKeys := sets.NewString(c.queuedKeys...)
if !queuedKeys.Equal(expectedQueuedKeys) {
t.Errorf("expected queued placements %q, but got %s", strings.Join(expectedQueuedKeys.List(), ","), strings.Join(queuedKeys.List(), ","))
}
})
}
}
func TestOnClusterSetAdd(t *testing.T) {
cases := []struct {
name string
obj interface{}
initObjs []runtime.Object
queuedKeys []string
}{
{
name: "invalid object type",
obj: "invalid object type",
},
{
name: "clusterset selector type is LegacyClusterSetLabel",
obj: testinghelpers.NewClusterSet("clusterset1").Build(),
initObjs: []runtime.Object{
testinghelpers.NewClusterSetBinding("ns1", "clusterset1"),
testinghelpers.NewPlacement("ns1", "placement1").Build(),
},
queuedKeys: []string{
"ns1/placement1",
},
},
{
name: "clusterset selector type is LabelSelector",
obj: testinghelpers.NewClusterSet("clusterset1").WithClusterSelector(clusterapiv1beta2.ManagedClusterSelector{
SelectorType: clusterapiv1beta2.LabelSelector,
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"cloud": "Amazon",
},
},
}).Build(),
initObjs: []runtime.Object{
testinghelpers.NewClusterSetBinding("ns1", "clusterset1"),
testinghelpers.NewPlacement("ns1", "placement1").Build(),
},
queuedKeys: []string{
"ns1/placement1",
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
clusterClient := clusterfake.NewSimpleClientset(c.initObjs...)
clusterInformerFactory := testinghelpers.NewClusterInformerFactory(clusterClient, c.initObjs...)
queuedKeys := sets.NewString()
handler := &clusterSetEventHandler{
clusterSetBindingLister: clusterInformerFactory.Cluster().V1beta2().ManagedClusterSetBindings().Lister(),
placementLister: clusterInformerFactory.Cluster().V1beta1().Placements().Lister(),
enqueuePlacementFunc: func(namespace, name string) {
queuedKeys.Insert(fmt.Sprintf("%s/%s", namespace, name))
},
}
handler.OnAdd(c.obj)
expectedQueuedKeys := sets.NewString(c.queuedKeys...)
if !queuedKeys.Equal(expectedQueuedKeys) {
t.Errorf("expected queued placements %q, but got %s", strings.Join(expectedQueuedKeys.List(), ","), strings.Join(queuedKeys.List(), ","))
}
})
}
}
func TestOnClusterSetDelete(t *testing.T) {
cases := []struct {
name string
obj interface{}
initObjs []runtime.Object
queuedKeys []string
}{
{
name: "invalid object type",
obj: "invalid object type",
},
{
name: "clusterset selector type is LegacyClusterSetLabel",
obj: testinghelpers.NewClusterSet("clusterset1").Build(),
initObjs: []runtime.Object{
testinghelpers.NewClusterSetBinding("ns1", "clusterset1"),
testinghelpers.NewPlacement("ns1", "placement1").Build(),
},
queuedKeys: []string{
"ns1/placement1",
},
},
{
name: "clusterset selector type is LabelSelector",
obj: testinghelpers.NewClusterSet("clusterset1").WithClusterSelector(clusterapiv1beta2.ManagedClusterSelector{
SelectorType: clusterapiv1beta2.LabelSelector,
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"cloud": "Amazon",
},
},
}).Build(),
initObjs: []runtime.Object{
testinghelpers.NewClusterSetBinding("ns1", "clusterset1"),
testinghelpers.NewPlacement("ns1", "placement1").Build(),
},
queuedKeys: []string{
"ns1/placement1",
},
},
{
name: "tombstone",
obj: cache.DeletedFinalStateUnknown{
Obj: testinghelpers.NewClusterSet("clusterset1").Build(),
},
initObjs: []runtime.Object{
testinghelpers.NewClusterSetBinding("ns1", "clusterset1"),
testinghelpers.NewPlacement("ns1", "placement1").Build(),
},
queuedKeys: []string{
"ns1/placement1",
},
},
{
name: "tombstone with invalid object type",
obj: cache.DeletedFinalStateUnknown{
Obj: "invalid object type",
},
initObjs: []runtime.Object{
testinghelpers.NewClusterSetBinding("ns1", "clusterset1"),
testinghelpers.NewPlacement("ns1", "placement1").Build(),
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
clusterClient := clusterfake.NewSimpleClientset(c.initObjs...)
clusterInformerFactory := testinghelpers.NewClusterInformerFactory(clusterClient, c.initObjs...)
queuedKeys := sets.NewString()
handler := &clusterSetEventHandler{
clusterSetBindingLister: clusterInformerFactory.Cluster().V1beta2().ManagedClusterSetBindings().Lister(),
placementLister: clusterInformerFactory.Cluster().V1beta1().Placements().Lister(),
enqueuePlacementFunc: func(namespace, name string) {
queuedKeys.Insert(fmt.Sprintf("%s/%s", namespace, name))
},
}
handler.OnDelete(c.obj)
expectedQueuedKeys := sets.NewString(c.queuedKeys...)
if !queuedKeys.Equal(expectedQueuedKeys) {
t.Errorf("expected queued placements %q, but got %s", strings.Join(expectedQueuedKeys.List(), ","), strings.Join(queuedKeys.List(), ","))
}
})
}
}

View File

@@ -1,92 +0,0 @@
package scheduling
import (
"fmt"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
cache "k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
clusterlisterv1beta1 "open-cluster-management.io/api/client/cluster/listers/cluster/v1beta1"
clusterlisterv1beta2 "open-cluster-management.io/api/client/cluster/listers/cluster/v1beta2"
clusterapiv1beta2 "open-cluster-management.io/api/cluster/v1beta2"
)
type clusterSetBindingEventHandler struct {
clusterSetLister clusterlisterv1beta2.ManagedClusterSetLister
placementLister clusterlisterv1beta1.PlacementLister
enqueuePlacementFunc enqueuePlacementFunc
}
func (h *clusterSetBindingEventHandler) OnAdd(obj interface{}) {
h.onChange(obj)
}
func (h *clusterSetBindingEventHandler) OnUpdate(oldObj, newObj interface{}) {
h.onChange(newObj)
}
func (h *clusterSetBindingEventHandler) OnDelete(obj interface{}) {
switch t := obj.(type) {
case *clusterapiv1beta2.ManagedClusterSetBinding:
h.onChange(obj)
case cache.DeletedFinalStateUnknown:
h.onChange(t.Obj)
default:
utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type"))
}
}
func (h *clusterSetBindingEventHandler) onChange(obj interface{}) {
clusterSetBinding, ok := obj.(metav1.Object)
if !ok {
utilruntime.HandleError(fmt.Errorf("error decoding object, invalid type"))
return
}
namespace := clusterSetBinding.GetNamespace()
clusterSetBindingName := clusterSetBinding.GetName()
_, err := h.clusterSetLister.Get(clusterSetBindingName)
// skip if the clusterset does not exist
if errors.IsNotFound(err) {
return
}
if err != nil {
klog.Errorf("Unable to get clusterset %q: %v", clusterSetBindingName, err)
return
}
err = enqueuePlacementsByClusterSetBinding(namespace, clusterSetBindingName, h.placementLister, h.enqueuePlacementFunc)
if err != nil {
klog.Errorf("Unable to enqueue placements by clustersetbinding %s/%s: %v", namespace, clusterSetBindingName, err)
}
}
// enqueuePlacementsByClusterSetBinding enqueues placements that might be impacted by a particular clustersetbinding
// into controller queue for further reconciliation
func enqueuePlacementsByClusterSetBinding(
namespace, clusterSetBindingName string,
placementLister clusterlisterv1beta1.PlacementLister,
enqueuePlacementFunc enqueuePlacementFunc,
) error {
placements, err := placementLister.Placements(namespace).List(labels.Everything())
if err != nil {
return err
}
for _, placement := range placements {
// ignore placement whose .spec.clusterSets is specified but does no include this
// particular clusterset.
clusterSets := sets.NewString(placement.Spec.ClusterSets...)
if clusterSets.Len() != 0 && !clusterSets.Has(clusterSetBindingName) {
continue
}
enqueuePlacementFunc(placement.Namespace, placement.Name)
}
return nil
}

View File

@@ -1,193 +0,0 @@
package scheduling
import (
"fmt"
"strings"
"testing"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
cache "k8s.io/client-go/tools/cache"
clusterfake "open-cluster-management.io/api/client/cluster/clientset/versioned/fake"
testinghelpers "open-cluster-management.io/placement/pkg/helpers/testing"
)
func TestOnClusterSetBindingChange(t *testing.T) {
cases := []struct {
name string
obj interface{}
initObjs []runtime.Object
queuedKeys []string
}{
{
name: "invalid resource type",
obj: "invalid resource type",
initObjs: []runtime.Object{
testinghelpers.NewPlacement("ns1", "placement1").Build(),
testinghelpers.NewPlacement("ns2", "placement2").Build(),
},
},
{
name: "clusterset does not exist",
obj: testinghelpers.NewClusterSetBinding("ns1", "clusterset1"),
initObjs: []runtime.Object{
testinghelpers.NewPlacement("ns1", "placement1").Build(),
testinghelpers.NewPlacement("ns2", "placement2").Build(),
},
},
{
name: "on clustersetbinding change",
obj: testinghelpers.NewClusterSetBinding("ns1", "clusterset1"),
initObjs: []runtime.Object{
testinghelpers.NewClusterSet("clusterset1").Build(),
testinghelpers.NewPlacement("ns1", "placement1").Build(),
testinghelpers.NewPlacement("ns2", "placement2").Build(),
},
queuedKeys: []string{
"ns1/placement1",
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
clusterClient := clusterfake.NewSimpleClientset(c.initObjs...)
clusterInformerFactory := testinghelpers.NewClusterInformerFactory(clusterClient, c.initObjs...)
queuedKeys := sets.NewString()
handler := &clusterSetBindingEventHandler{
clusterSetLister: clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets().Lister(),
placementLister: clusterInformerFactory.Cluster().V1beta1().Placements().Lister(),
enqueuePlacementFunc: func(namespace, name string) {
queuedKeys.Insert(fmt.Sprintf("%s/%s", namespace, name))
},
}
handler.onChange(c.obj)
expectedQueuedKeys := sets.NewString(c.queuedKeys...)
if !queuedKeys.Equal(expectedQueuedKeys) {
t.Errorf("expected queued placements %q, but got %s", strings.Join(expectedQueuedKeys.List(), ","), strings.Join(queuedKeys.List(), ","))
}
})
}
}
func TestEnqueuePlacementsByClusterSetBinding(t *testing.T) {
cases := []struct {
name string
namespace string
clusterSetBindingName string
initObjs []runtime.Object
queuedKeys []string
}{
{
name: "enqueue placements by clusterSetBinding",
namespace: "ns1",
clusterSetBindingName: "clusterset1",
initObjs: []runtime.Object{
testinghelpers.NewPlacement("ns1", "placement1").Build(),
testinghelpers.NewPlacement("ns1", "placement2").WithClusterSets("clusterset2").Build(),
testinghelpers.NewPlacement("ns2", "placement3").WithClusterSets("clusterset1").Build(),
},
queuedKeys: []string{
"ns1/placement1",
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
clusterClient := clusterfake.NewSimpleClientset(c.initObjs...)
clusterInformerFactory := testinghelpers.NewClusterInformerFactory(clusterClient, c.initObjs...)
queuedKeys := sets.NewString()
err := enqueuePlacementsByClusterSetBinding(
c.namespace,
c.clusterSetBindingName,
clusterInformerFactory.Cluster().V1beta1().Placements().Lister(),
func(namespace, name string) {
queuedKeys.Insert(fmt.Sprintf("%s/%s", namespace, name))
},
)
if err != nil {
t.Errorf("unexpected err: %v", err)
}
expectedQueuedKeys := sets.NewString(c.queuedKeys...)
if !queuedKeys.Equal(expectedQueuedKeys) {
t.Errorf("expected queued placements %q, but got %s", strings.Join(expectedQueuedKeys.List(), ","), strings.Join(queuedKeys.List(), ","))
}
})
}
}
func TestOnClusterSetBindingDelete(t *testing.T) {
cases := []struct {
name string
obj interface{}
initObjs []runtime.Object
queuedKeys []string
}{
{
name: "invalid object type",
obj: "invalid object type",
},
{
name: "clusterset",
obj: testinghelpers.NewClusterSetBinding("ns1", "clusterset1"),
initObjs: []runtime.Object{
testinghelpers.NewClusterSet("clusterset1").Build(),
testinghelpers.NewPlacement("ns1", "placement1").Build(),
},
queuedKeys: []string{
"ns1/placement1",
},
},
{
name: "tombstone",
obj: cache.DeletedFinalStateUnknown{
Obj: testinghelpers.NewClusterSetBinding("ns1", "clusterset1"),
},
initObjs: []runtime.Object{
testinghelpers.NewClusterSet("clusterset1").Build(),
testinghelpers.NewPlacement("ns1", "placement1").Build(),
},
queuedKeys: []string{
"ns1/placement1",
},
},
{
name: "tombstone with invalid object type",
obj: cache.DeletedFinalStateUnknown{
Obj: "invalid object type",
},
initObjs: []runtime.Object{
testinghelpers.NewClusterSet("clusterset1").Build(),
testinghelpers.NewPlacement("ns1", "placement1").Build(),
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
clusterClient := clusterfake.NewSimpleClientset(c.initObjs...)
clusterInformerFactory := testinghelpers.NewClusterInformerFactory(clusterClient, c.initObjs...)
queuedKeys := sets.NewString()
handler := &clusterSetBindingEventHandler{
clusterSetLister: clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets().Lister(),
placementLister: clusterInformerFactory.Cluster().V1beta1().Placements().Lister(),
enqueuePlacementFunc: func(namespace, name string) {
queuedKeys.Insert(fmt.Sprintf("%s/%s", namespace, name))
},
}
handler.OnDelete(c.obj)
expectedQueuedKeys := sets.NewString(c.queuedKeys...)
if !queuedKeys.Equal(expectedQueuedKeys) {
t.Errorf("expected queued placements %q, but got %s", strings.Join(expectedQueuedKeys.List(), ","), strings.Join(queuedKeys.List(), ","))
}
})
}
}

View File

@@ -0,0 +1,252 @@
package scheduling
import (
"fmt"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
clusterinformerv1 "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1"
clusterinformerv1beta1 "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1beta1"
clusterinformerv1beta2 "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1beta2"
clusterlisterv1 "open-cluster-management.io/api/client/cluster/listers/cluster/v1"
clusterlisterv1beta2 "open-cluster-management.io/api/client/cluster/listers/cluster/v1beta2"
clusterapiv1 "open-cluster-management.io/api/cluster/v1"
clusterapiv1beta1 "open-cluster-management.io/api/cluster/v1beta1"
clusterapiv1beta2 "open-cluster-management.io/api/cluster/v1beta2"
)
const (
// anyClusterSet is an index value for indexPlacementByClusterSet for placement
// not setting any clusterset
anyClusterSet = "*"
placementsByClusterSetBinding = "placementsByClusterSet"
clustersetBindingsByClusterSet = "clustersetBindingsByClusterSet"
placementsByScore = "placementsByScore"
)
type enqueuer struct {
queue workqueue.RateLimitingInterface
enqueuePlacementFunc func(obj interface{}, queue workqueue.RateLimitingInterface)
clusterLister clusterlisterv1.ManagedClusterLister
clusterSetLister clusterlisterv1beta2.ManagedClusterSetLister
placementIndexer cache.Indexer
clusterSetBindingIndexer cache.Indexer
}
func newEnqueuer(
queue workqueue.RateLimitingInterface,
clusterInformer clusterinformerv1.ManagedClusterInformer,
clusterSetInformer clusterinformerv1beta2.ManagedClusterSetInformer,
placementInformer clusterinformerv1beta1.PlacementInformer,
clusterSetBindingInformer clusterinformerv1beta2.ManagedClusterSetBindingInformer) *enqueuer {
placementInformer.Informer().AddIndexers(cache.Indexers{
placementsByScore: indexPlacementsByScore,
placementsByClusterSetBinding: indexPlacementByClusterSetBinding,
})
clusterSetBindingInformer.Informer().AddIndexers(cache.Indexers{
clustersetBindingsByClusterSet: indexClusterSetBindingByClusterSet,
})
return &enqueuer{
queue: queue,
enqueuePlacementFunc: enqueuePlacement,
clusterLister: clusterInformer.Lister(),
clusterSetLister: clusterSetInformer.Lister(),
placementIndexer: placementInformer.Informer().GetIndexer(),
clusterSetBindingIndexer: clusterSetBindingInformer.Informer().GetIndexer(),
}
}
func enqueuePlacement(obj interface{}, queue workqueue.RateLimitingInterface) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(err)
return
}
queue.Add(key)
}
func (e *enqueuer) enqueueClusterSetBinding(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(err)
return
}
namespace, _, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
runtime.HandleError(err)
return
}
// enqueue all placement that ref to the binding
objs, err := e.placementIndexer.ByIndex(placementsByClusterSetBinding, key)
if err != nil {
runtime.HandleError(err)
return
}
anyObjs, err := e.placementIndexer.ByIndex(placementsByClusterSetBinding, fmt.Sprintf("%s/%s", namespace, anyClusterSet))
if err != nil {
runtime.HandleError(err)
return
}
objs = append(objs, anyObjs...)
for _, o := range objs {
placement := o.(*clusterapiv1beta1.Placement)
klog.V(4).Infof("enqueue placement %s/%s, because of binding %s", placement.Namespace, placement.Name, key)
e.enqueuePlacementFunc(placement, e.queue)
}
}
func (e *enqueuer) enqueueClusterSet(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(err)
return
}
objs, err := e.clusterSetBindingIndexer.ByIndex(clustersetBindingsByClusterSet, key)
if err != nil {
runtime.HandleError(err)
return
}
for _, o := range objs {
clusterSetBinding := o.(*clusterapiv1beta2.ManagedClusterSetBinding)
klog.V(4).Infof("enqueue clustersetbinding %s/%s, because of clusterset %s", clusterSetBinding.Namespace, clusterSetBinding.Name, key)
e.enqueueClusterSetBinding(clusterSetBinding)
}
}
func (e *enqueuer) enqueueCluster(obj interface{}) {
cluster, ok := obj.(*clusterapiv1.ManagedCluster)
if !ok {
runtime.HandleError(fmt.Errorf("obj %T is not a ManagedCluster", obj))
return
}
clusterSets, err := clusterapiv1beta2.GetClusterSetsOfCluster(cluster, e.clusterSetLister)
if err != nil {
klog.V(4).Infof("Unable to get clusterSets of cluster %q: %w", cluster.GetName(), err)
return
}
for _, clusterSet := range clusterSets {
klog.V(4).Infof("enqueue clusterSet %s, because of cluster %s", clusterSet.Name, cluster.Name)
e.enqueueClusterSet(clusterSet)
}
}
func (e *enqueuer) enqueuePlacementScore(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(err)
return
}
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
runtime.HandleError(err)
return
}
objs, err := e.placementIndexer.ByIndex(placementsByScore, name)
if err != nil {
runtime.HandleError(err)
return
}
// filter the namespace of placement based on cluster. Find all related clustersetbinding
// to the cluster at first. Only enqueue placement when its namespace is in the valid namespaces
// of clustersetbindings.
filteredBindingNamespaces := sets.NewString()
cluster, err := e.clusterLister.Get(namespace)
if err != nil {
klog.V(4).Infof("Unable to get cluster %s: %w", namespace, err)
}
clusterSets, err := clusterapiv1beta2.GetClusterSetsOfCluster(cluster, e.clusterSetLister)
if err != nil {
klog.V(4).Infof("Unable to get clusterSets of cluster %q: %w", cluster.GetName(), err)
return
}
for _, clusterset := range clusterSets {
bindingObjs, err := e.clusterSetBindingIndexer.ByIndex(clustersetBindingsByClusterSet, clusterset.Name)
if err != nil {
klog.V(4).Infof("Unable to get clusterSetBindings of clusterset %q: %w", clusterset.Name, err)
continue
}
for _, bindingObj := range bindingObjs {
binding := bindingObj.(*clusterapiv1beta2.ManagedClusterSetBinding)
filteredBindingNamespaces.Insert(binding.Namespace)
}
}
for _, o := range objs {
placement := o.(*clusterapiv1beta1.Placement)
if filteredBindingNamespaces.Has(placement.Namespace) {
klog.V(4).Infof("enqueue placement %s/%s, because of score %s", placement.Namespace, placement.Name, key)
e.enqueuePlacementFunc(placement, e.queue)
}
}
}
func indexPlacementByClusterSetBinding(obj interface{}) ([]string, error) {
placement, ok := obj.(*clusterapiv1beta1.Placement)
if !ok {
return []string{}, fmt.Errorf("obj %T is not a Placement", obj)
}
clustersets := placement.Spec.ClusterSets
if len(clustersets) == 0 {
return []string{fmt.Sprintf("%s/%s", placement.Namespace, anyClusterSet)}, nil
}
var bindings []string
for _, clusterset := range clustersets {
bindings = append(bindings, fmt.Sprintf("%s/%s", placement.Namespace, clusterset))
}
return bindings, nil
}
func indexPlacementsByScore(obj interface{}) ([]string, error) {
placement, ok := obj.(*clusterapiv1beta1.Placement)
if !ok {
return []string{}, fmt.Errorf("obj %T is not a Placement", obj)
}
var keys []string
for _, config := range placement.Spec.PrioritizerPolicy.Configurations {
if config.ScoreCoordinate == nil {
continue
}
if config.ScoreCoordinate.Type != clusterapiv1beta1.ScoreCoordinateTypeAddOn {
continue
}
if config.ScoreCoordinate.AddOn == nil {
continue
}
keys = append(keys, config.ScoreCoordinate.AddOn.ResourceName)
}
return keys, nil
}
func indexClusterSetBindingByClusterSet(obj interface{}) ([]string, error) {
binding, ok := obj.(*clusterapiv1beta2.ManagedClusterSetBinding)
if !ok {
return []string{}, fmt.Errorf("obj %T is not a ManagedClusterSetBinding", obj)
}
return []string{binding.Spec.ClusterSet}, nil
}

View File

@@ -0,0 +1,393 @@
package scheduling
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
clusterclient "open-cluster-management.io/api/client/cluster/clientset/versioned"
clusterfake "open-cluster-management.io/api/client/cluster/clientset/versioned/fake"
clusterinformers "open-cluster-management.io/api/client/cluster/informers/externalversions"
clusterapiv1 "open-cluster-management.io/api/cluster/v1"
clusterapiv1alpha1 "open-cluster-management.io/api/cluster/v1alpha1"
clusterapiv1beta1 "open-cluster-management.io/api/cluster/v1beta1"
clusterapiv1beta2 "open-cluster-management.io/api/cluster/v1beta2"
testinghelpers "open-cluster-management.io/placement/pkg/helpers/testing"
"strings"
"testing"
"time"
)
func newClusterInformerFactory(clusterClient clusterclient.Interface, objects ...runtime.Object) clusterinformers.SharedInformerFactory {
clusterInformerFactory := clusterinformers.NewSharedInformerFactory(clusterClient, time.Minute*10)
clusterInformerFactory.Cluster().V1beta1().Placements().Informer().AddIndexers(cache.Indexers{
placementsByScore: indexPlacementsByScore,
placementsByClusterSetBinding: indexPlacementByClusterSetBinding,
})
clusterInformerFactory.Cluster().V1beta2().ManagedClusterSetBindings().Informer().AddIndexers(cache.Indexers{
clustersetBindingsByClusterSet: indexClusterSetBindingByClusterSet,
})
clusterStore := clusterInformerFactory.Cluster().V1().ManagedClusters().Informer().GetStore()
clusterSetStore := clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets().Informer().GetStore()
clusterSetBindingStore := clusterInformerFactory.Cluster().V1beta2().ManagedClusterSetBindings().Informer().GetStore()
placementStore := clusterInformerFactory.Cluster().V1beta1().Placements().Informer().GetStore()
placementDecisionStore := clusterInformerFactory.Cluster().V1beta1().PlacementDecisions().Informer().GetStore()
addOnPlacementStore := clusterInformerFactory.Cluster().V1alpha1().AddOnPlacementScores().Informer().GetStore()
for _, obj := range objects {
switch obj.(type) {
case *clusterapiv1.ManagedCluster:
clusterStore.Add(obj)
case *clusterapiv1beta2.ManagedClusterSet:
clusterSetStore.Add(obj)
case *clusterapiv1beta2.ManagedClusterSetBinding:
clusterSetBindingStore.Add(obj)
case *clusterapiv1beta1.Placement:
placementStore.Add(obj)
case *clusterapiv1beta1.PlacementDecision:
placementDecisionStore.Add(obj)
case *clusterapiv1alpha1.AddOnPlacementScore:
addOnPlacementStore.Add(obj)
}
}
return clusterInformerFactory
}
func TestEnqueuePlacementsByClusterSet(t *testing.T) {
cases := []struct {
name string
clusterSet interface{}
initObjs []runtime.Object
queuedKeys []string
}{
{
name: "enqueue placements in a namespace",
clusterSet: testinghelpers.NewClusterSet("clusterset1").Build(),
initObjs: []runtime.Object{
testinghelpers.NewClusterSet("clusterset1").Build(),
testinghelpers.NewClusterSet("clusterset2").Build(),
testinghelpers.NewClusterSetBinding("ns1", "clusterset1"),
testinghelpers.NewClusterSetBinding("ns1", "clusterset2"),
testinghelpers.NewPlacement("ns1", "placement1").Build(),
testinghelpers.NewPlacement("ns1", "placement2").WithClusterSets("clusterset2").Build(),
},
queuedKeys: []string{
"ns1/placement1",
},
},
{
name: "invalid object type",
clusterSet: "invalid object type",
},
{
name: "clusterset selector type is LegacyClusterSetLabel",
clusterSet: testinghelpers.NewClusterSet("clusterset1").Build(),
initObjs: []runtime.Object{
testinghelpers.NewClusterSetBinding("ns1", "clusterset1"),
testinghelpers.NewPlacement("ns1", "placement1").Build(),
},
queuedKeys: []string{
"ns1/placement1",
},
},
{
name: "clusterset selector type is LabelSelector",
clusterSet: testinghelpers.NewClusterSet("clusterset1").WithClusterSelector(clusterapiv1beta2.ManagedClusterSelector{
SelectorType: clusterapiv1beta2.LabelSelector,
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"cloud": "Amazon",
},
},
}).Build(),
initObjs: []runtime.Object{
testinghelpers.NewClusterSetBinding("ns1", "clusterset1"),
testinghelpers.NewPlacement("ns1", "placement1").Build(),
},
queuedKeys: []string{
"ns1/placement1",
},
},
{
name: "clusterset selector type is LegacyClusterSetLabel",
clusterSet: testinghelpers.NewClusterSet("clusterset1").Build(),
initObjs: []runtime.Object{
testinghelpers.NewClusterSetBinding("ns1", "clusterset1"),
testinghelpers.NewPlacement("ns1", "placement1").Build(),
},
queuedKeys: []string{
"ns1/placement1",
},
},
{
name: "clusterset selector type is LabelSelector",
clusterSet: testinghelpers.NewClusterSet("clusterset1").WithClusterSelector(clusterapiv1beta2.ManagedClusterSelector{
SelectorType: clusterapiv1beta2.LabelSelector,
LabelSelector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"cloud": "Amazon",
},
},
}).Build(),
initObjs: []runtime.Object{
testinghelpers.NewClusterSetBinding("ns1", "clusterset1"),
testinghelpers.NewPlacement("ns1", "placement1").Build(),
},
queuedKeys: []string{
"ns1/placement1",
},
},
{
name: "tombstone",
clusterSet: cache.DeletedFinalStateUnknown{
Key: "clusterset1",
Obj: testinghelpers.NewClusterSet("clusterset1").Build(),
},
initObjs: []runtime.Object{
testinghelpers.NewClusterSetBinding("ns1", "clusterset1"),
testinghelpers.NewPlacement("ns1", "placement1").Build(),
},
queuedKeys: []string{
"ns1/placement1",
},
},
{
name: "tombstone with invalid object type",
clusterSet: cache.DeletedFinalStateUnknown{
Obj: "invalid object type",
},
initObjs: []runtime.Object{
testinghelpers.NewClusterSetBinding("ns1", "clusterset1"),
testinghelpers.NewPlacement("ns1", "placement1").Build(),
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
clusterClient := clusterfake.NewSimpleClientset(c.initObjs...)
clusterInformerFactory := newClusterInformerFactory(clusterClient, c.initObjs...)
syncCtx := testinghelpers.NewFakeSyncContext(t, "fake")
q := newEnqueuer(
syncCtx.Queue(),
clusterInformerFactory.Cluster().V1().ManagedClusters(),
clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets(),
clusterInformerFactory.Cluster().V1beta1().Placements(),
clusterInformerFactory.Cluster().V1beta2().ManagedClusterSetBindings(),
)
queuedKeys := sets.NewString()
fakeEnqueuePlacement := func(obj interface{}, queue workqueue.RateLimitingInterface) {
key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
queuedKeys.Insert(key)
}
q.enqueuePlacementFunc = fakeEnqueuePlacement
q.enqueueClusterSet(c.clusterSet)
expectedQueuedKeys := sets.NewString(c.queuedKeys...)
if !queuedKeys.Equal(expectedQueuedKeys) {
t.Errorf("expected queued placements %q, but got %s", strings.Join(expectedQueuedKeys.List(), ","), strings.Join(queuedKeys.List(), ","))
}
})
}
}
func TestEnqueuePlacementsByClusterSetBinding(t *testing.T) {
cases := []struct {
name string
namespace string
clusterSetBinding interface{}
initObjs []runtime.Object
queuedKeys []string
}{
{
name: "enqueue placements by clusterSetBinding",
namespace: "ns1",
clusterSetBinding: testinghelpers.NewClusterSetBinding("ns1", "clusterset1"),
initObjs: []runtime.Object{
testinghelpers.NewPlacement("ns1", "placement1").Build(),
testinghelpers.NewPlacement("ns1", "placement2").WithClusterSets("clusterset2").Build(),
testinghelpers.NewPlacement("ns2", "placement3").WithClusterSets("clusterset1").Build(),
},
queuedKeys: []string{
"ns1/placement1",
},
},
{
name: "invalid resource type",
clusterSetBinding: "invalid resource type",
initObjs: []runtime.Object{
testinghelpers.NewPlacement("ns1", "placement1").Build(),
testinghelpers.NewPlacement("ns2", "placement2").Build(),
},
},
{
name: "on clustersetbinding change",
clusterSetBinding: testinghelpers.NewClusterSetBinding("ns1", "clusterset1"),
initObjs: []runtime.Object{
testinghelpers.NewClusterSet("clusterset1").Build(),
testinghelpers.NewPlacement("ns1", "placement1").Build(),
testinghelpers.NewPlacement("ns2", "placement2").Build(),
},
queuedKeys: []string{
"ns1/placement1",
},
},
{
name: "clusterset",
clusterSetBinding: testinghelpers.NewClusterSetBinding("ns1", "clusterset1"),
initObjs: []runtime.Object{
testinghelpers.NewClusterSet("clusterset1").Build(),
testinghelpers.NewPlacement("ns1", "placement1").Build(),
},
queuedKeys: []string{
"ns1/placement1",
},
},
{
name: "tombstone",
clusterSetBinding: cache.DeletedFinalStateUnknown{
Key: "ns1/clusterset1",
Obj: testinghelpers.NewClusterSetBinding("ns1", "clusterset1"),
},
initObjs: []runtime.Object{
testinghelpers.NewClusterSet("clusterset1").Build(),
testinghelpers.NewPlacement("ns1", "placement1").Build(),
},
queuedKeys: []string{
"ns1/placement1",
},
},
{
name: "tombstone with invalid object type",
clusterSetBinding: cache.DeletedFinalStateUnknown{
Obj: "invalid object type",
},
initObjs: []runtime.Object{
testinghelpers.NewClusterSet("clusterset1").Build(),
testinghelpers.NewPlacement("ns1", "placement1").Build(),
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
clusterClient := clusterfake.NewSimpleClientset(c.initObjs...)
clusterInformerFactory := newClusterInformerFactory(clusterClient, c.initObjs...)
syncCtx := testinghelpers.NewFakeSyncContext(t, "fake")
q := newEnqueuer(
syncCtx.Queue(),
clusterInformerFactory.Cluster().V1().ManagedClusters(),
clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets(),
clusterInformerFactory.Cluster().V1beta1().Placements(),
clusterInformerFactory.Cluster().V1beta2().ManagedClusterSetBindings(),
)
queuedKeys := sets.NewString()
fakeEnqueuePlacement := func(obj interface{}, queue workqueue.RateLimitingInterface) {
key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
queuedKeys.Insert(key)
}
q.enqueuePlacementFunc = fakeEnqueuePlacement
q.enqueueClusterSetBinding(c.clusterSetBinding)
expectedQueuedKeys := sets.NewString(c.queuedKeys...)
if !queuedKeys.Equal(expectedQueuedKeys) {
t.Errorf("expected queued placements %q, but got %s", strings.Join(expectedQueuedKeys.List(), ","), strings.Join(queuedKeys.List(), ","))
}
})
}
}
func TestEnqueuePlacementsByScore(t *testing.T) {
cases := []struct {
name string
namespace string
score interface{}
initObjs []runtime.Object
queuedKeys []string
}{
{
name: "ensueue score",
score: testinghelpers.NewAddOnPlacementScore("cluster1", "score1").Build(),
initObjs: []runtime.Object{
testinghelpers.NewPlacement("ns1", "placement1").WithScoreCoordinateAddOn("score1", "cpu", 1).Build(),
testinghelpers.NewPlacement("ns2", "placement2").WithScoreCoordinateAddOn("score2", "cpu", 1).Build(),
testinghelpers.NewPlacement("ns3", "placement3").WithScoreCoordinateAddOn("score1", "cpu", 1).Build(),
testinghelpers.NewManagedCluster("cluster1").WithLabel(clusterSetLabel, "clusterset1").Build(),
testinghelpers.NewClusterSet("clusterset1").Build(),
testinghelpers.NewClusterSetBinding("ns1", "clusterset1"),
testinghelpers.NewClusterSetBinding("ns3", "clusterset1"),
},
queuedKeys: []string{
"ns1/placement1",
"ns3/placement3",
},
},
{
name: "only enqueue score with filtered placement",
score: testinghelpers.NewAddOnPlacementScore("cluster1", "score1").Build(),
initObjs: []runtime.Object{
testinghelpers.NewPlacement("ns1", "placement1").WithScoreCoordinateAddOn("score1", "cpu", 1).Build(),
testinghelpers.NewPlacement("ns2", "placement2").WithScoreCoordinateAddOn("score2", "cpu", 1).Build(),
testinghelpers.NewPlacement("ns3", "placement3").WithScoreCoordinateAddOn("score1", "cpu", 1).Build(),
testinghelpers.NewManagedCluster("cluster1").WithLabel(clusterSetLabel, "clusterset1").Build(),
testinghelpers.NewClusterSet("clusterset1").Build(),
testinghelpers.NewClusterSetBinding("ns1", "clusterset2"),
testinghelpers.NewClusterSetBinding("ns3", "clusterset1"),
},
queuedKeys: []string{
"ns3/placement3",
},
},
{
name: "tombstone",
score: cache.DeletedFinalStateUnknown{
Key: "cluster1/score1",
Obj: testinghelpers.NewAddOnPlacementScore("cluster1", "score1").Build(),
},
initObjs: []runtime.Object{
testinghelpers.NewPlacement("ns1", "placement1").WithScoreCoordinateAddOn("score1", "cpu", 1).Build(),
testinghelpers.NewManagedCluster("cluster1").WithLabel(clusterSetLabel, "clusterset1").Build(),
testinghelpers.NewClusterSet("clusterset1").Build(),
testinghelpers.NewClusterSetBinding("ns1", "clusterset1"),
},
queuedKeys: []string{
"ns1/placement1",
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
clusterClient := clusterfake.NewSimpleClientset(c.initObjs...)
clusterInformerFactory := newClusterInformerFactory(clusterClient, c.initObjs...)
syncCtx := testinghelpers.NewFakeSyncContext(t, "fake")
q := newEnqueuer(
syncCtx.Queue(),
clusterInformerFactory.Cluster().V1().ManagedClusters(),
clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets(),
clusterInformerFactory.Cluster().V1beta1().Placements(),
clusterInformerFactory.Cluster().V1beta2().ManagedClusterSetBindings(),
)
queuedKeys := sets.NewString()
fakeEnqueuePlacement := func(obj interface{}, queue workqueue.RateLimitingInterface) {
key, _ := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
queuedKeys.Insert(key)
}
q.enqueuePlacementFunc = fakeEnqueuePlacement
q.enqueuePlacementScore(c.score)
expectedQueuedKeys := sets.NewString(c.queuedKeys...)
if !queuedKeys.Equal(expectedQueuedKeys) {
t.Errorf("expected queued placements %q, but got %s", strings.Join(expectedQueuedKeys.List(), ","), strings.Join(queuedKeys.List(), ","))
}
})
}
}

View File

@@ -26,6 +26,7 @@ import (
"k8s.io/klog/v2"
clusterclient "open-cluster-management.io/api/client/cluster/clientset/versioned"
clusterinformerv1 "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1"
clusterinformerv1alpha1 "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1alpha1"
clusterinformerv1beta1 "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1beta1"
clusterinformerv1beta2 "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1beta2"
@@ -60,7 +61,6 @@ type schedulingController struct {
clusterSetBindingLister clusterlisterv1beta2.ManagedClusterSetBindingLister
placementLister clusterlisterv1beta1.PlacementLister
placementDecisionLister clusterlisterv1beta1.PlacementDecisionLister
enqueuePlacementFunc enqueuePlacementFunc
scheduler Scheduler
recorder kevents.EventRecorder
}
@@ -73,13 +73,13 @@ func NewSchedulingController(
clusterSetBindingInformer clusterinformerv1beta2.ManagedClusterSetBindingInformer,
placementInformer clusterinformerv1beta1.PlacementInformer,
placementDecisionInformer clusterinformerv1beta1.PlacementDecisionInformer,
placementScoreInformer clusterinformerv1alpha1.AddOnPlacementScoreInformer,
scheduler Scheduler,
recorder events.Recorder, krecorder kevents.EventRecorder,
) factory.Controller {
syncCtx := factory.NewSyncContext(schedulingControllerName, recorder)
enqueuePlacementFunc := func(namespace, name string) {
syncCtx.Queue().Add(fmt.Sprintf("%s/%s", namespace, name))
}
enQueuer := newEnqueuer(syncCtx.Queue(), clusterInformer, clusterSetInformer, placementInformer, clusterSetBindingInformer)
// build controller
c := &schedulingController{
@@ -89,7 +89,6 @@ func NewSchedulingController(
clusterSetBindingLister: clusterSetBindingInformer.Lister(),
placementLister: placementInformer.Lister(),
placementDecisionLister: placementDecisionInformer.Lister(),
enqueuePlacementFunc: enqueuePlacementFunc,
recorder: krecorder,
scheduler: scheduler,
}
@@ -101,10 +100,7 @@ func NewSchedulingController(
// controller booting. But that should not cause any problem because all existing
// placements will be enqueued by the controller anyway when booting.
clusterInformer.Informer().AddEventHandler(&clusterEventHandler{
clusterSetLister: clusterSetInformer.Lister(),
clusterSetBindingLister: clusterSetBindingInformer.Lister(),
placementLister: placementInformer.Lister(),
enqueuePlacementFunc: enqueuePlacementFunc,
enqueuer: enQueuer,
})
// setup event handler for clusterset informer
@@ -113,10 +109,12 @@ func NewSchedulingController(
// informers/listers of clustersetbinding/placement are synced during controller
// booting. But that should not cause any problem because all existing placements will
// be enqueued by the controller anyway when booting.
clusterSetInformer.Informer().AddEventHandler(&clusterSetEventHandler{
clusterSetBindingLister: clusterSetBindingInformer.Lister(),
placementLister: placementInformer.Lister(),
enqueuePlacementFunc: enqueuePlacementFunc,
clusterSetInformer.Informer().AddEventHandler(&cache.ResourceEventHandlerFuncs{
AddFunc: enQueuer.enqueueClusterSet,
UpdateFunc: func(oldObj, newObj interface{}) {
enQueuer.enqueueClusterSet(newObj)
},
DeleteFunc: enQueuer.enqueueClusterSet,
})
// setup event handler for clustersetbinding informer
@@ -125,10 +123,21 @@ func NewSchedulingController(
// the informers/listers of clusterset/placement are synced during controller booting. But
// that should not cause any problem because all existing placements will be enqueued by
// the controller anyway when booting.
clusterSetBindingInformer.Informer().AddEventHandler(&clusterSetBindingEventHandler{
clusterSetLister: clusterSetInformer.Lister(),
placementLister: placementInformer.Lister(),
enqueuePlacementFunc: enqueuePlacementFunc,
clusterSetBindingInformer.Informer().AddEventHandler(&cache.ResourceEventHandlerFuncs{
AddFunc: enQueuer.enqueueClusterSetBinding,
UpdateFunc: func(oldObj, newObj interface{}) {
enQueuer.enqueueClusterSetBinding(newObj)
},
DeleteFunc: enQueuer.enqueueClusterSetBinding,
})
// setup event handler for placementscore informer
placementScoreInformer.Informer().AddEventHandler(&cache.ResourceEventHandlerFuncs{
AddFunc: enQueuer.enqueuePlacementScore,
UpdateFunc: func(oldObj, newObj interface{}) {
enQueuer.enqueuePlacementScore(newObj)
},
DeleteFunc: enQueuer.enqueuePlacementScore,
})
return factory.New().
@@ -153,84 +162,11 @@ func NewSchedulingController(
}
return false
}, placementDecisionInformer.Informer()).
WithBareInformers(clusterInformer.Informer(), clusterSetInformer.Informer(), clusterSetBindingInformer.Informer()).
WithBareInformers(clusterInformer.Informer(), clusterSetInformer.Informer(), clusterSetBindingInformer.Informer(), placementScoreInformer.Informer()).
WithSync(c.sync).
ToController(schedulingControllerName, recorder)
}
func NewSchedulingControllerResync(
clusterClient clusterclient.Interface,
clusterInformer clusterinformerv1.ManagedClusterInformer,
clusterSetInformer clusterinformerv1beta2.ManagedClusterSetInformer,
clusterSetBindingInformer clusterinformerv1beta2.ManagedClusterSetBindingInformer,
placementInformer clusterinformerv1beta1.PlacementInformer,
placementDecisionInformer clusterinformerv1beta1.PlacementDecisionInformer,
scheduler Scheduler,
recorder events.Recorder, krecorder kevents.EventRecorder,
) factory.Controller {
syncCtx := factory.NewSyncContext(schedulingControllerResyncName, recorder)
enqueuePlacementFunc := func(namespace, name string) {
syncCtx.Queue().Add(fmt.Sprintf("%s/%s", namespace, name))
}
// build controller
c := &schedulingController{
clusterClient: clusterClient,
clusterLister: clusterInformer.Lister(),
clusterSetLister: clusterSetInformer.Lister(),
clusterSetBindingLister: clusterSetBindingInformer.Lister(),
placementLister: placementInformer.Lister(),
placementDecisionLister: placementDecisionInformer.Lister(),
enqueuePlacementFunc: enqueuePlacementFunc,
recorder: krecorder,
scheduler: scheduler,
}
return factory.New().
WithSyncContext(syncCtx).
WithSync(c.resync).
ResyncEvery(ResyncInterval).
ToController(schedulingControllerResyncName, recorder)
}
// Resync the placement which depends on AddOnPlacementScore periodically
func (c *schedulingController) resync(ctx context.Context, syncCtx factory.SyncContext) error {
queueKey := syncCtx.QueueKey()
klog.V(4).Infof("Resync placement %q", queueKey)
if queueKey == "key" {
placements, err := c.placementLister.List(labels.Everything())
if err != nil {
return err
}
for _, placement := range placements {
for _, config := range placement.Spec.PrioritizerPolicy.Configurations {
if config.ScoreCoordinate != nil && config.ScoreCoordinate.Type == clusterapiv1beta1.ScoreCoordinateTypeAddOn {
key, _ := cache.MetaNamespaceKeyFunc(placement)
klog.V(4).Infof("Requeue placement %s", key)
syncCtx.Queue().Add(key)
break
}
}
}
return nil
} else {
placement, err := c.getPlacement(queueKey)
if errors.IsNotFound(err) {
// no work if placement is deleted
return nil
}
if err != nil {
return err
}
// Do not pass syncCtx to syncPlacement, since don't want to requeue the placement when resyncing the placement.
return c.syncPlacement(ctx, nil, placement)
}
}
func (c *schedulingController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
queueKey := syncCtx.QueueKey()
klog.V(4).Infof("Reconciling placement %q", queueKey)

View File

@@ -201,7 +201,7 @@ func TestSchedulingController_sync(t *testing.T) {
t.Run(c.name, func(t *testing.T) {
c.initObjs = append(c.initObjs, c.placement)
clusterClient := clusterfake.NewSimpleClientset(c.initObjs...)
clusterInformerFactory := testinghelpers.NewClusterInformerFactory(clusterClient, c.initObjs...)
clusterInformerFactory := newClusterInformerFactory(clusterClient, c.initObjs...)
s := &testScheduler{result: c.scheduleResult}
ctrl := schedulingController{
@@ -258,7 +258,7 @@ func TestGetValidManagedClusterSetBindings(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
clusterClient := clusterfake.NewSimpleClientset(c.initObjs...)
clusterInformerFactory := testinghelpers.NewClusterInformerFactory(clusterClient, c.initObjs...)
clusterInformerFactory := newClusterInformerFactory(clusterClient, c.initObjs...)
ctrl := &schedulingController{
clusterSetLister: clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets().Lister(),
@@ -333,7 +333,7 @@ func TestGetValidManagedClusterSets(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
clusterClient := clusterfake.NewSimpleClientset(c.initObjs...)
clusterInformerFactory := testinghelpers.NewClusterInformerFactory(clusterClient, c.initObjs...)
clusterInformerFactory := newClusterInformerFactory(clusterClient, c.initObjs...)
ctrl := &schedulingController{
clusterSetLister: clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets().Lister(),
@@ -461,7 +461,7 @@ func TestGetAvailableClusters(t *testing.T) {
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
clusterClient := clusterfake.NewSimpleClientset(c.initObjs...)
clusterInformerFactory := testinghelpers.NewClusterInformerFactory(clusterClient, c.initObjs...)
clusterInformerFactory := newClusterInformerFactory(clusterClient, c.initObjs...)
ctrl := &schedulingController{
clusterLister: clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(),
@@ -754,7 +754,7 @@ func TestBind(t *testing.T) {
},
)
clusterInformerFactory := testinghelpers.NewClusterInformerFactory(clusterClient, c.initObjs...)
clusterInformerFactory := newClusterInformerFactory(clusterClient, c.initObjs...)
s := &testScheduler{}