mirror of
https://github.com/open-cluster-management-io/ocm.git
synced 2026-02-14 18:09:57 +00:00
adding contextual logging in placement component (#254)
Signed-off-by: ntishchauhan0022 <nitishchauhan0022@gmail.com>
This commit is contained in:
@@ -68,6 +68,7 @@ func RunControllerManagerWithInformers(
|
||||
}
|
||||
|
||||
schedulingController := scheduling.NewSchedulingController(
|
||||
ctx,
|
||||
clusterClient,
|
||||
clusterInformers.Cluster().V1().ManagedClusters(),
|
||||
clusterInformers.Cluster().V1beta2().ManagedClusterSets(),
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
|
||||
clusterfake "open-cluster-management.io/api/client/cluster/clientset/versioned/fake"
|
||||
clusterapiv1beta2 "open-cluster-management.io/api/cluster/v1beta2"
|
||||
@@ -96,11 +97,13 @@ func TestOnClusterChange(t *testing.T) {
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
clusterClient := clusterfake.NewSimpleClientset(c.initObjs...)
|
||||
clusterInformerFactory := newClusterInformerFactory(t, clusterClient, c.initObjs...)
|
||||
|
||||
syncCtx := testingcommon.NewFakeSyncContext(t, "fake")
|
||||
q := newEnqueuer(
|
||||
ctx,
|
||||
syncCtx.Queue(),
|
||||
clusterInformerFactory.Cluster().V1().ManagedClusters(),
|
||||
clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets(),
|
||||
@@ -258,11 +261,13 @@ func TestOnClusterUpdate(t *testing.T) {
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
clusterClient := clusterfake.NewSimpleClientset(c.initObjs...)
|
||||
clusterInformerFactory := newClusterInformerFactory(t, clusterClient, c.initObjs...)
|
||||
|
||||
syncCtx := testingcommon.NewFakeSyncContext(t, "fake")
|
||||
q := newEnqueuer(
|
||||
ctx,
|
||||
syncCtx.Queue(),
|
||||
clusterInformerFactory.Cluster().V1().ManagedClusters(),
|
||||
clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets(),
|
||||
@@ -360,11 +365,13 @@ func TestOnClusterDelete(t *testing.T) {
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
clusterClient := clusterfake.NewSimpleClientset(c.initObjs...)
|
||||
clusterInformerFactory := newClusterInformerFactory(t, clusterClient, c.initObjs...)
|
||||
|
||||
syncCtx := testingcommon.NewFakeSyncContext(t, "fake")
|
||||
q := newEnqueuer(
|
||||
ctx,
|
||||
syncCtx.Queue(),
|
||||
clusterInformerFactory.Cluster().V1().ManagedClusters(),
|
||||
clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets(),
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package scheduling
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
@@ -29,6 +30,7 @@ const (
|
||||
)
|
||||
|
||||
type enqueuer struct {
|
||||
logger klog.Logger
|
||||
queue workqueue.RateLimitingInterface
|
||||
enqueuePlacementFunc func(obj interface{}, queue workqueue.RateLimitingInterface)
|
||||
|
||||
@@ -39,6 +41,7 @@ type enqueuer struct {
|
||||
}
|
||||
|
||||
func newEnqueuer(
|
||||
ctx context.Context,
|
||||
queue workqueue.RateLimitingInterface,
|
||||
clusterInformer clusterinformerv1.ManagedClusterInformer,
|
||||
clusterSetInformer clusterinformerv1beta2.ManagedClusterSetInformer,
|
||||
@@ -60,6 +63,7 @@ func newEnqueuer(
|
||||
}
|
||||
|
||||
return &enqueuer{
|
||||
logger: klog.FromContext(ctx),
|
||||
queue: queue,
|
||||
enqueuePlacementFunc: enqueuePlacement,
|
||||
clusterLister: clusterInformer.Lister(),
|
||||
@@ -108,7 +112,7 @@ func (e *enqueuer) enqueueClusterSetBinding(obj interface{}) {
|
||||
|
||||
for _, o := range objs {
|
||||
placement := o.(*clusterapiv1beta1.Placement)
|
||||
klog.V(4).Infof("enqueue placement %s/%s, because of binding %s", placement.Namespace, placement.Name, key)
|
||||
e.logger.V(4).Info("Enqueue placement because of binding", "placementNamespace", placement.Namespace, "placementName", placement.Name, "bindingKey", key)
|
||||
e.enqueuePlacementFunc(placement, e.queue)
|
||||
}
|
||||
}
|
||||
@@ -128,7 +132,7 @@ func (e *enqueuer) enqueueClusterSet(obj interface{}) {
|
||||
|
||||
for _, o := range objs {
|
||||
clusterSetBinding := o.(*clusterapiv1beta2.ManagedClusterSetBinding)
|
||||
klog.V(4).Infof("enqueue clustersetbinding %s/%s, because of clusterset %s", clusterSetBinding.Namespace, clusterSetBinding.Name, key)
|
||||
e.logger.V(4).Info("Enqueue clustersetbinding because of clusterset", "clusterSetBinding", klog.KObj(clusterSetBinding), "clustersetKey", key)
|
||||
e.enqueueClusterSetBinding(clusterSetBinding)
|
||||
}
|
||||
}
|
||||
@@ -142,12 +146,12 @@ func (e *enqueuer) enqueueCluster(obj interface{}) {
|
||||
|
||||
clusterSets, err := clusterapiv1beta2.GetClusterSetsOfCluster(cluster, e.clusterSetLister)
|
||||
if err != nil {
|
||||
klog.V(4).Infof("Unable to get clusterSets of cluster %q: %w", cluster.GetName(), err)
|
||||
e.logger.V(4).Error(err, "Unable to get clusterSets of cluster", "clusterName", cluster.GetName())
|
||||
return
|
||||
}
|
||||
|
||||
for _, clusterSet := range clusterSets {
|
||||
klog.V(4).Infof("enqueue clusterSet %s, because of cluster %s", clusterSet.Name, cluster.Name)
|
||||
e.logger.V(4).Info("Enqueue clusterSet because of cluster", "clusterSetName", clusterSet.Name, "clusterName", cluster.Name)
|
||||
e.enqueueClusterSet(clusterSet)
|
||||
}
|
||||
}
|
||||
@@ -177,19 +181,19 @@ func (e *enqueuer) enqueuePlacementScore(obj interface{}) {
|
||||
filteredBindingNamespaces := sets.NewString()
|
||||
cluster, err := e.clusterLister.Get(namespace)
|
||||
if err != nil {
|
||||
klog.V(4).Infof("Unable to get cluster %s: %w", namespace, err)
|
||||
e.logger.V(4).Error(err, "Unable to get cluster", "clusterNamespace", namespace)
|
||||
}
|
||||
|
||||
clusterSets, err := clusterapiv1beta2.GetClusterSetsOfCluster(cluster, e.clusterSetLister)
|
||||
if err != nil {
|
||||
klog.V(4).Infof("Unable to get clusterSets of cluster %q: %w", cluster.GetName(), err)
|
||||
e.logger.V(4).Error(err, "Unable to get clusterSets of cluster", "clusterName", cluster.GetName())
|
||||
return
|
||||
}
|
||||
|
||||
for _, clusterset := range clusterSets {
|
||||
bindingObjs, err := e.clusterSetBindingIndexer.ByIndex(clustersetBindingsByClusterSet, clusterset.Name)
|
||||
if err != nil {
|
||||
klog.V(4).Infof("Unable to get clusterSetBindings of clusterset %q: %w", clusterset.Name, err)
|
||||
e.logger.V(4).Error(err, "Unable to get clusterSetBindings of clusterset", "clustersetName", clusterset.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -202,7 +206,7 @@ func (e *enqueuer) enqueuePlacementScore(obj interface{}) {
|
||||
for _, o := range objs {
|
||||
placement := o.(*clusterapiv1beta1.Placement)
|
||||
if filteredBindingNamespaces.Has(placement.Namespace) {
|
||||
klog.V(4).Infof("enqueue placement %s/%s, because of score %s", placement.Namespace, placement.Name, key)
|
||||
e.logger.V(4).Info("Enqueue placement because of score", "placementNamespace", placement.Namespace, "placementName", placement.Name, "scoreKey", key)
|
||||
e.enqueuePlacementFunc(placement, e.queue)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
"k8s.io/klog/v2/ktesting"
|
||||
|
||||
clusterclient "open-cluster-management.io/api/client/cluster/clientset/versioned"
|
||||
clusterfake "open-cluster-management.io/api/client/cluster/clientset/versioned/fake"
|
||||
@@ -184,11 +185,13 @@ func TestEnqueuePlacementsByClusterSet(t *testing.T) {
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
clusterClient := clusterfake.NewSimpleClientset(c.initObjs...)
|
||||
clusterInformerFactory := newClusterInformerFactory(t, clusterClient, c.initObjs...)
|
||||
|
||||
syncCtx := testingcommon.NewFakeSyncContext(t, "fake")
|
||||
q := newEnqueuer(
|
||||
ctx,
|
||||
syncCtx.Queue(),
|
||||
clusterInformerFactory.Cluster().V1().ManagedClusters(),
|
||||
clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets(),
|
||||
@@ -291,11 +294,13 @@ func TestEnqueuePlacementsByClusterSetBinding(t *testing.T) {
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
clusterClient := clusterfake.NewSimpleClientset(c.initObjs...)
|
||||
clusterInformerFactory := newClusterInformerFactory(t, clusterClient, c.initObjs...)
|
||||
|
||||
syncCtx := testingcommon.NewFakeSyncContext(t, "fake")
|
||||
q := newEnqueuer(
|
||||
ctx,
|
||||
syncCtx.Queue(),
|
||||
clusterInformerFactory.Cluster().V1().ManagedClusters(),
|
||||
clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets(),
|
||||
@@ -379,11 +384,13 @@ func TestEnqueuePlacementsByScore(t *testing.T) {
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
_, ctx := ktesting.NewTestContext(t)
|
||||
clusterClient := clusterfake.NewSimpleClientset(c.initObjs...)
|
||||
clusterInformerFactory := newClusterInformerFactory(t, clusterClient, c.initObjs...)
|
||||
|
||||
syncCtx := testingcommon.NewFakeSyncContext(t, "fake")
|
||||
q := newEnqueuer(
|
||||
ctx,
|
||||
syncCtx.Queue(),
|
||||
clusterInformerFactory.Cluster().V1().ManagedClusters(),
|
||||
clusterInformerFactory.Cluster().V1beta2().ManagedClusterSets(),
|
||||
|
||||
@@ -171,6 +171,7 @@ func (s *pluginScheduler) Schedule(
|
||||
placement *clusterapiv1beta1.Placement,
|
||||
clusters []*clusterapiv1.ManagedCluster,
|
||||
) (ScheduleResult, *framework.Status) {
|
||||
logger := klog.FromContext(ctx)
|
||||
filtered := clusters
|
||||
finalStatus := framework.NewStatus("", framework.Success, "")
|
||||
|
||||
@@ -190,7 +191,7 @@ func (s *pluginScheduler) Schedule(
|
||||
case status.IsError():
|
||||
return results, status
|
||||
case status.Code() == framework.Warning:
|
||||
klog.Warningf("%v", status.Message())
|
||||
logger.Info("Warning status message", "message", status.Message())
|
||||
finalStatus = status
|
||||
}
|
||||
|
||||
@@ -207,7 +208,7 @@ func (s *pluginScheduler) Schedule(
|
||||
case status.IsError():
|
||||
return results, status
|
||||
case status.Code() == framework.Warning:
|
||||
klog.Warningf("%v", status.Message())
|
||||
logger.Info("Warning status message", "message", status.Message())
|
||||
finalStatus = status
|
||||
}
|
||||
|
||||
@@ -217,7 +218,7 @@ func (s *pluginScheduler) Schedule(
|
||||
case status.IsError():
|
||||
return results, status
|
||||
case status.Code() == framework.Warning:
|
||||
klog.Warningf("%v", status.Message())
|
||||
logger.Info("Warning status message", "message", status.Message())
|
||||
finalStatus = status
|
||||
}
|
||||
|
||||
@@ -235,7 +236,7 @@ func (s *pluginScheduler) Schedule(
|
||||
case status.IsError():
|
||||
return results, status
|
||||
case status.Code() == framework.Warning:
|
||||
klog.Warningf("%v", status.Message())
|
||||
logger.Info("Warning status message", "message", status.Message())
|
||||
finalStatus = status
|
||||
}
|
||||
|
||||
|
||||
@@ -72,6 +72,7 @@ type schedulingController struct {
|
||||
|
||||
// NewSchedulingController return an instance of schedulingController
|
||||
func NewSchedulingController(
|
||||
ctx context.Context,
|
||||
clusterClient clusterclient.Interface,
|
||||
clusterInformer clusterinformerv1.ManagedClusterInformer,
|
||||
clusterSetInformer clusterinformerv1beta2.ManagedClusterSetInformer,
|
||||
@@ -84,7 +85,7 @@ func NewSchedulingController(
|
||||
) factory.Controller {
|
||||
syncCtx := factory.NewSyncContext(schedulingControllerName, recorder)
|
||||
|
||||
enQueuer := newEnqueuer(syncCtx.Queue(), clusterInformer, clusterSetInformer, placementInformer, clusterSetBindingInformer)
|
||||
enQueuer := newEnqueuer(ctx, syncCtx.Queue(), clusterInformer, clusterSetInformer, placementInformer, clusterSetBindingInformer)
|
||||
|
||||
// build controller
|
||||
c := &schedulingController{
|
||||
@@ -175,8 +176,9 @@ func NewSchedulingController(
|
||||
}
|
||||
|
||||
func (c *schedulingController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
|
||||
logger := klog.FromContext(ctx)
|
||||
queueKey := syncCtx.QueueKey()
|
||||
klog.V(4).Infof("Reconciling placement %q", queueKey)
|
||||
logger.V(4).Info("Reconciling placement", "queueKey", queueKey)
|
||||
|
||||
placement, err := c.getPlacement(queueKey)
|
||||
if errors.IsNotFound(err) {
|
||||
@@ -207,6 +209,7 @@ func (c *schedulingController) getPlacement(queueKey string) (*clusterapiv1beta1
|
||||
}
|
||||
|
||||
func (c *schedulingController) syncPlacement(ctx context.Context, syncCtx factory.SyncContext, placement *clusterapiv1beta1.Placement) error {
|
||||
logger := klog.FromContext(ctx)
|
||||
// no work if placement is deleting
|
||||
if !placement.DeletionTimestamp.IsZero() {
|
||||
return nil
|
||||
@@ -254,7 +257,7 @@ func (c *schedulingController) syncPlacement(ctx context.Context, syncCtx factor
|
||||
if syncCtx != nil && scheduleResult.RequeueAfter() != nil {
|
||||
key, _ := cache.MetaNamespaceKeyFunc(placement)
|
||||
t := scheduleResult.RequeueAfter()
|
||||
klog.V(4).Infof("Requeue placement %s after %t", key, t)
|
||||
logger.V(4).Info("Requeue placement after time", "placementKey", key, "time", t)
|
||||
syncCtx.Queue().AddAfter(key, *t)
|
||||
}
|
||||
|
||||
|
||||
@@ -82,7 +82,7 @@ func (c *AddOn) Score(ctx context.Context, placement *clusterapiv1beta1.Placemen
|
||||
// get AddOnPlacementScores CR with resourceName
|
||||
addOnScores, err := c.handle.ScoreLister().AddOnPlacementScores(namespace).Get(c.resourceName)
|
||||
if err != nil {
|
||||
klog.Warningf("Getting AddOnPlacementScores failed: %s", err)
|
||||
klog.FromContext(ctx).Info("Failed to get AddOnPlacementScores", "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
@@ -91,7 +91,7 @@ func (pl *TaintToleration) Filter(ctx context.Context, placement *clusterapiv1be
|
||||
func (pl *TaintToleration) RequeueAfter(ctx context.Context, placement *clusterapiv1beta1.Placement) (plugins.PluginRequeueResult, *framework.Status) {
|
||||
status := framework.NewStatus(pl.Name(), framework.Success, "")
|
||||
// get exist decisions clusters
|
||||
decisionClusterNames, decisionClusters := getDecisionClusters(pl.handle, placement)
|
||||
decisionClusterNames, decisionClusters := getDecisionClusters(klog.FromContext(ctx), pl.handle, placement)
|
||||
if decisionClusterNames == nil || decisionClusters == nil {
|
||||
return plugins.PluginRequeueResult{}, status
|
||||
}
|
||||
@@ -226,7 +226,7 @@ func getDecisionClusterNames(handle plugins.Handle, placement *clusterapiv1beta1
|
||||
return existingDecisions
|
||||
}
|
||||
|
||||
func getDecisionClusters(handle plugins.Handle, placement *clusterapiv1beta1.Placement) (sets.String, []*clusterapiv1.ManagedCluster) {
|
||||
func getDecisionClusters(logger klog.Logger, handle plugins.Handle, placement *clusterapiv1beta1.Placement) (sets.String, []*clusterapiv1.ManagedCluster) {
|
||||
// get existing decision cluster name
|
||||
decisionClusterNames := getDecisionClusterNames(handle, placement)
|
||||
|
||||
@@ -234,7 +234,7 @@ func getDecisionClusters(handle plugins.Handle, placement *clusterapiv1beta1.Pla
|
||||
decisionClusters := []*clusterapiv1.ManagedCluster{}
|
||||
for c := range decisionClusterNames {
|
||||
if managedCluser, err := handle.ClusterLister().Get(c); err != nil {
|
||||
klog.Warningf("Failed to get ManagedCluster: %s", err)
|
||||
logger.Info("Failed to get ManagedCluster", "error", err)
|
||||
} else {
|
||||
decisionClusters = append(decisionClusters, managedCluser)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user