Files
open-cluster-management/pkg/placement/controllers/scheduling/enqueue.go
Jian Zhu 7332a585c0 🌱 add a verify rule for golang files import order (#177)
* 🌱 add a verify rule for golang files import order

This PR uses the [gci tool](https://github.com/daixiang0/gci) to make all go files' import section with a specific order, it will organize import with group with order:
1. standard library modules
2. 3rd party modules
3. modules in OCM org, like the `open-cluster-management.io/api`
4. current project `open-cluster-management.io/ocm` modules

developers can use the `make fmt-imports` to format the import automatically and the `make verify-fmt-imports` to check for any violation.

Signed-off-by: zhujian <jiazhu@redhat.com>

* 🌱 format the go files import

Signed-off-by: zhujian <jiazhu@redhat.com>

---------

Signed-off-by: zhujian <jiazhu@redhat.com>
2023-06-12 10:23:04 -04:00

261 lines
8.1 KiB
Go

package scheduling
import (
"fmt"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
clusterinformerv1 "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1"
clusterinformerv1beta1 "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1beta1"
clusterinformerv1beta2 "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1beta2"
clusterlisterv1 "open-cluster-management.io/api/client/cluster/listers/cluster/v1"
clusterlisterv1beta2 "open-cluster-management.io/api/client/cluster/listers/cluster/v1beta2"
clusterapiv1 "open-cluster-management.io/api/cluster/v1"
clusterapiv1beta1 "open-cluster-management.io/api/cluster/v1beta1"
clusterapiv1beta2 "open-cluster-management.io/api/cluster/v1beta2"
)
const (
// anyClusterSet is an index value for indexPlacementByClusterSet for placement
// not setting any clusterset
anyClusterSet = "*"
placementsByClusterSetBinding = "placementsByClusterSet"
clustersetBindingsByClusterSet = "clustersetBindingsByClusterSet"
placementsByScore = "placementsByScore"
)
type enqueuer struct {
queue workqueue.RateLimitingInterface
enqueuePlacementFunc func(obj interface{}, queue workqueue.RateLimitingInterface)
clusterLister clusterlisterv1.ManagedClusterLister
clusterSetLister clusterlisterv1beta2.ManagedClusterSetLister
placementIndexer cache.Indexer
clusterSetBindingIndexer cache.Indexer
}
func newEnqueuer(
queue workqueue.RateLimitingInterface,
clusterInformer clusterinformerv1.ManagedClusterInformer,
clusterSetInformer clusterinformerv1beta2.ManagedClusterSetInformer,
placementInformer clusterinformerv1beta1.PlacementInformer,
clusterSetBindingInformer clusterinformerv1beta2.ManagedClusterSetBindingInformer) *enqueuer {
err := placementInformer.Informer().AddIndexers(cache.Indexers{
placementsByScore: indexPlacementsByScore,
placementsByClusterSetBinding: indexPlacementByClusterSetBinding,
})
if err != nil {
runtime.HandleError(err)
}
err = clusterSetBindingInformer.Informer().AddIndexers(cache.Indexers{
clustersetBindingsByClusterSet: indexClusterSetBindingByClusterSet,
})
if err != nil {
runtime.HandleError(err)
}
return &enqueuer{
queue: queue,
enqueuePlacementFunc: enqueuePlacement,
clusterLister: clusterInformer.Lister(),
clusterSetLister: clusterSetInformer.Lister(),
placementIndexer: placementInformer.Informer().GetIndexer(),
clusterSetBindingIndexer: clusterSetBindingInformer.Informer().GetIndexer(),
}
}
func enqueuePlacement(obj interface{}, queue workqueue.RateLimitingInterface) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(err)
return
}
queue.Add(key)
}
func (e *enqueuer) enqueueClusterSetBinding(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(err)
return
}
namespace, _, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
runtime.HandleError(err)
return
}
// enqueue all placement that ref to the binding
objs, err := e.placementIndexer.ByIndex(placementsByClusterSetBinding, key)
if err != nil {
runtime.HandleError(err)
return
}
anyObjs, err := e.placementIndexer.ByIndex(placementsByClusterSetBinding, fmt.Sprintf("%s/%s", namespace, anyClusterSet))
if err != nil {
runtime.HandleError(err)
return
}
objs = append(objs, anyObjs...)
for _, o := range objs {
placement := o.(*clusterapiv1beta1.Placement)
klog.V(4).Infof("enqueue placement %s/%s, because of binding %s", placement.Namespace, placement.Name, key)
e.enqueuePlacementFunc(placement, e.queue)
}
}
func (e *enqueuer) enqueueClusterSet(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(err)
return
}
objs, err := e.clusterSetBindingIndexer.ByIndex(clustersetBindingsByClusterSet, key)
if err != nil {
runtime.HandleError(err)
return
}
for _, o := range objs {
clusterSetBinding := o.(*clusterapiv1beta2.ManagedClusterSetBinding)
klog.V(4).Infof("enqueue clustersetbinding %s/%s, because of clusterset %s", clusterSetBinding.Namespace, clusterSetBinding.Name, key)
e.enqueueClusterSetBinding(clusterSetBinding)
}
}
func (e *enqueuer) enqueueCluster(obj interface{}) {
cluster, ok := obj.(*clusterapiv1.ManagedCluster)
if !ok {
runtime.HandleError(fmt.Errorf("obj %T is not a ManagedCluster", obj))
return
}
clusterSets, err := clusterapiv1beta2.GetClusterSetsOfCluster(cluster, e.clusterSetLister)
if err != nil {
klog.V(4).Infof("Unable to get clusterSets of cluster %q: %w", cluster.GetName(), err)
return
}
for _, clusterSet := range clusterSets {
klog.V(4).Infof("enqueue clusterSet %s, because of cluster %s", clusterSet.Name, cluster.Name)
e.enqueueClusterSet(clusterSet)
}
}
func (e *enqueuer) enqueuePlacementScore(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
runtime.HandleError(err)
return
}
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
runtime.HandleError(err)
return
}
objs, err := e.placementIndexer.ByIndex(placementsByScore, name)
if err != nil {
runtime.HandleError(err)
return
}
// filter the namespace of placement based on cluster. Find all related clustersetbinding
// to the cluster at first. Only enqueue placement when its namespace is in the valid namespaces
// of clustersetbindings.
filteredBindingNamespaces := sets.NewString()
cluster, err := e.clusterLister.Get(namespace)
if err != nil {
klog.V(4).Infof("Unable to get cluster %s: %w", namespace, err)
}
clusterSets, err := clusterapiv1beta2.GetClusterSetsOfCluster(cluster, e.clusterSetLister)
if err != nil {
klog.V(4).Infof("Unable to get clusterSets of cluster %q: %w", cluster.GetName(), err)
return
}
for _, clusterset := range clusterSets {
bindingObjs, err := e.clusterSetBindingIndexer.ByIndex(clustersetBindingsByClusterSet, clusterset.Name)
if err != nil {
klog.V(4).Infof("Unable to get clusterSetBindings of clusterset %q: %w", clusterset.Name, err)
continue
}
for _, bindingObj := range bindingObjs {
binding := bindingObj.(*clusterapiv1beta2.ManagedClusterSetBinding)
filteredBindingNamespaces.Insert(binding.Namespace)
}
}
for _, o := range objs {
placement := o.(*clusterapiv1beta1.Placement)
if filteredBindingNamespaces.Has(placement.Namespace) {
klog.V(4).Infof("enqueue placement %s/%s, because of score %s", placement.Namespace, placement.Name, key)
e.enqueuePlacementFunc(placement, e.queue)
}
}
}
func indexPlacementByClusterSetBinding(obj interface{}) ([]string, error) {
placement, ok := obj.(*clusterapiv1beta1.Placement)
if !ok {
return []string{}, fmt.Errorf("obj %T is not a Placement", obj)
}
clustersets := placement.Spec.ClusterSets
if len(clustersets) == 0 {
return []string{fmt.Sprintf("%s/%s", placement.Namespace, anyClusterSet)}, nil
}
var bindings []string
for _, clusterset := range clustersets {
bindings = append(bindings, fmt.Sprintf("%s/%s", placement.Namespace, clusterset))
}
return bindings, nil
}
func indexPlacementsByScore(obj interface{}) ([]string, error) {
placement, ok := obj.(*clusterapiv1beta1.Placement)
if !ok {
return []string{}, fmt.Errorf("obj %T is not a Placement", obj)
}
var keys []string
for _, config := range placement.Spec.PrioritizerPolicy.Configurations {
if config.ScoreCoordinate == nil {
continue
}
if config.ScoreCoordinate.Type != clusterapiv1beta1.ScoreCoordinateTypeAddOn {
continue
}
if config.ScoreCoordinate.AddOn == nil {
continue
}
keys = append(keys, config.ScoreCoordinate.AddOn.ResourceName)
}
return keys, nil
}
func indexClusterSetBindingByClusterSet(obj interface{}) ([]string, error) {
binding, ok := obj.(*clusterapiv1beta2.ManagedClusterSetBinding)
if !ok {
return []string{}, fmt.Errorf("obj %T is not a ManagedClusterSetBinding", obj)
}
return []string{binding.Spec.ClusterSet}, nil
}