From 12b1ddfb52b9c4230bbe16b3fdfe49c09d1dcadd Mon Sep 17 00:00:00 2001 From: Jan Chaloupka Date: Mon, 26 Jan 2026 13:23:23 +0100 Subject: [PATCH] refactor(pkg/descheduler): move prometheus client specific code under a dedicated promClientController The underlying implementation is the same. Only moving the code under a separate controller that can be unit test independently of the descheduler type implementation. --- pkg/descheduler/descheduler.go | 79 ++++++++++++++++++----------- pkg/descheduler/descheduler_test.go | 8 +-- 2 files changed, 54 insertions(+), 33 deletions(-) diff --git a/pkg/descheduler/descheduler.go b/pkg/descheduler/descheduler.go index bc825e64b..67cf2234e 100644 --- a/pkg/descheduler/descheduler.go +++ b/pkg/descheduler/descheduler.go @@ -79,23 +79,39 @@ type profileRunner struct { } type descheduler struct { - rs *options.DeschedulerServer - client clientset.Interface - kubeClientSandbox *kubeClientSandbox - getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc - sharedInformerFactory informers.SharedInformerFactory - namespacedSecretsLister corev1listers.SecretNamespaceLister - deschedulerPolicy *api.DeschedulerPolicy - eventRecorder events.EventRecorder - podEvictor *evictions.PodEvictor - metricsCollector *metricscollector.MetricsCollector - prometheusClient promapi.Client + rs *options.DeschedulerServer + client clientset.Interface + kubeClientSandbox *kubeClientSandbox + getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc + sharedInformerFactory informers.SharedInformerFactory + deschedulerPolicy *api.DeschedulerPolicy + eventRecorder events.EventRecorder + podEvictor *evictions.PodEvictor + metricsCollector *metricscollector.MetricsCollector + promClientCtrl *promClientController +} + +type promClientController struct { + promClient promapi.Client previousPrometheusClientTransport *http.Transport queue workqueue.RateLimitingInterface currentPrometheusAuthToken string + namespacedSecretsLister corev1listers.SecretNamespaceLister metricsProviders map[api.MetricsSource]*api.MetricsProvider } +func newPromClientController(prometheusClient promapi.Client, metricsProviders map[api.MetricsSource]*api.MetricsProvider) *promClientController { + return &promClientController{ + promClient: prometheusClient, + queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{Name: "descheduler"}), + metricsProviders: metricsProviders, + } +} + +func (d *promClientController) prometheusClient() promapi.Client { + return d.promClient +} + func nodeSelectorFromPolicy(deschedulerPolicy *api.DeschedulerPolicy) (labels.Selector, error) { nodeSelector := labels.Everything() if deschedulerPolicy.NodeSelector != nil { @@ -122,7 +138,10 @@ func metricsProviderListToMap(providersList []api.MetricsProvider) map[api.Metri // setupPrometheusProvider sets up the prometheus provider on the descheduler if configured func setupPrometheusProvider(d *descheduler, namespacedSharedInformerFactory informers.SharedInformerFactory) error { - prometheusProvider := d.metricsProviders[api.PrometheusMetrics] + if d.promClientCtrl == nil { + return nil + } + prometheusProvider := d.promClientCtrl.metricsProviders[api.PrometheusMetrics] if prometheusProvider != nil && prometheusProvider.Prometheus != nil && prometheusProvider.Prometheus.AuthToken != nil { authTokenSecret := prometheusProvider.Prometheus.AuthToken.SecretReference if authTokenSecret == nil || authTokenSecret.Namespace == "" { @@ -131,8 +150,8 @@ func setupPrometheusProvider(d *descheduler, namespacedSharedInformerFactory inf if namespacedSharedInformerFactory == nil { return fmt.Errorf("namespacedSharedInformerFactory not configured") } - namespacedSharedInformerFactory.Core().V1().Secrets().Informer().AddEventHandler(d.eventHandler()) - d.namespacedSecretsLister = namespacedSharedInformerFactory.Core().V1().Secrets().Lister().Secrets(authTokenSecret.Namespace) + namespacedSharedInformerFactory.Core().V1().Secrets().Informer().AddEventHandler(d.promClientCtrl.eventHandler()) + d.promClientCtrl.namespacedSecretsLister = namespacedSharedInformerFactory.Core().V1().Secrets().Lister().Secrets(authTokenSecret.Namespace) } return nil } @@ -177,9 +196,7 @@ func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedu deschedulerPolicy: deschedulerPolicy, eventRecorder: eventRecorder, podEvictor: podEvictor, - prometheusClient: rs.PrometheusClient, - queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{Name: "descheduler"}), - metricsProviders: metricsProviderListToMap(deschedulerPolicy.MetricsProviders), + promClientCtrl: newPromClientController(rs.PrometheusClient, metricsProviderListToMap(deschedulerPolicy.MetricsProviders)), } nodeSelector, err := nodeSelectorFromPolicy(deschedulerPolicy) @@ -198,7 +215,7 @@ func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedu return desch, nil } -func (d *descheduler) reconcileInClusterSAToken() error { +func (d *promClientController) reconcileInClusterSAToken() error { // Read the sa token and assume it has the sufficient permissions to authenticate cfg, err := rest.InClusterConfig() if err == nil { @@ -208,7 +225,7 @@ func (d *descheduler) reconcileInClusterSAToken() error { if err != nil { return fmt.Errorf("unable to create a prometheus client: %v", err) } - d.prometheusClient = prometheusClient + d.promClient = prometheusClient if d.previousPrometheusClientTransport != nil { d.previousPrometheusClientTransport.CloseIdleConnections() } @@ -223,7 +240,7 @@ func (d *descheduler) reconcileInClusterSAToken() error { return fmt.Errorf("unexpected error when reading in cluster config: %v", err) } -func (d *descheduler) runAuthenticationSecretReconciler(ctx context.Context) { +func (d *promClientController) runAuthenticationSecretReconciler(ctx context.Context) { defer utilruntime.HandleCrash() defer d.queue.ShutDown() @@ -235,12 +252,12 @@ func (d *descheduler) runAuthenticationSecretReconciler(ctx context.Context) { <-ctx.Done() } -func (d *descheduler) runAuthenticationSecretReconcilerWorker(ctx context.Context) { +func (d *promClientController) runAuthenticationSecretReconcilerWorker(ctx context.Context) { for d.processNextWorkItem(ctx) { } } -func (d *descheduler) processNextWorkItem(ctx context.Context) bool { +func (d *promClientController) processNextWorkItem(ctx context.Context) bool { dsKey, quit := d.queue.Get() if quit { return false @@ -259,7 +276,7 @@ func (d *descheduler) processNextWorkItem(ctx context.Context) bool { return true } -func (d *descheduler) sync() error { +func (d *promClientController) sync() error { prometheusConfig := d.metricsProviders[api.PrometheusMetrics].Prometheus if prometheusConfig == nil || prometheusConfig.AuthToken == nil || prometheusConfig.AuthToken.SecretReference == nil { return fmt.Errorf("prometheus metrics source configuration is missing authentication token secret") @@ -275,7 +292,7 @@ func (d *descheduler) sync() error { d.previousPrometheusClientTransport.CloseIdleConnections() } d.previousPrometheusClientTransport = nil - d.prometheusClient = nil + d.promClient = nil } return fmt.Errorf("unable to get %v/%v secret", ns, name) } @@ -292,7 +309,7 @@ func (d *descheduler) sync() error { if err != nil { return fmt.Errorf("unable to create a prometheus client: %v", err) } - d.prometheusClient = prometheusClient + d.promClient = prometheusClient if d.previousPrometheusClientTransport != nil { d.previousPrometheusClientTransport.CloseIdleConnections() } @@ -301,7 +318,7 @@ func (d *descheduler) sync() error { return nil } -func (d *descheduler) eventHandler() cache.ResourceEventHandler { +func (d *promClientController) eventHandler() cache.ResourceEventHandler { return cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { d.queue.Add(workQueueKey) }, UpdateFunc: func(old, new interface{}) { d.queue.Add(workQueueKey) }, @@ -370,6 +387,10 @@ func (d *descheduler) runProfiles(ctx context.Context) { var profileRunners []profileRunner for idx, profile := range d.deschedulerPolicy.Profiles { + var promClient promapi.Client + if d.promClientCtrl != nil { + promClient = d.promClientCtrl.prometheusClient() + } currProfile, err := frameworkprofile.NewProfile( ctx, profile, @@ -379,7 +400,7 @@ func (d *descheduler) runProfiles(ctx context.Context) { frameworkprofile.WithPodEvictor(d.podEvictor), frameworkprofile.WithGetPodsAssignedToNodeFnc(d.getPodsAssignedToNode), frameworkprofile.WithMetricsCollector(d.metricsCollector), - frameworkprofile.WithPrometheusClient(d.prometheusClient), + frameworkprofile.WithPrometheusClient(promClient), // Generate a unique instance ID using just the index to avoid long IDs // when profile names are very long frameworkprofile.WithProfileInstanceID(fmt.Sprintf("%d", idx)), @@ -618,13 +639,13 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer } if metricProviderTokenReconciliation == secretReconciliation { - go descheduler.runAuthenticationSecretReconciler(ctx) + go descheduler.promClientCtrl.runAuthenticationSecretReconciler(ctx) } wait.NonSlidingUntil(func() { if metricProviderTokenReconciliation == inClusterReconciliation { // Read the sa token and assume it has the sufficient permissions to authenticate - if err := descheduler.reconcileInClusterSAToken(); err != nil { + if err := descheduler.promClientCtrl.reconcileInClusterSAToken(); err != nil { klog.ErrorS(err, "unable to reconcile an in cluster SA token") return } diff --git a/pkg/descheduler/descheduler_test.go b/pkg/descheduler/descheduler_test.go index b6f02f893..4e525f31d 100644 --- a/pkg/descheduler/descheduler_test.go +++ b/pkg/descheduler/descheduler_test.go @@ -1544,8 +1544,8 @@ func TestPluginPrometheusClientAccess(t *testing.T) { t.Logf("Cycle %d: %s", i+1, cycle.name) // Set the descheduler's Prometheus client - t.Logf("Setting descheduler.prometheusClient from %v to %v", descheduler.prometheusClient, cycle.client) - descheduler.prometheusClient = cycle.client + t.Logf("Setting descheduler.promClientCtrl.promClient from %v to %v", descheduler.promClientCtrl.promClient, cycle.client) + descheduler.promClientCtrl.promClient = cycle.client newInvoked = false reactorInvoked = false @@ -1554,7 +1554,7 @@ func TestPluginPrometheusClientAccess(t *testing.T) { descheduler.runProfiles(ctx) - t.Logf("After cycle %d: prometheusClientFromReactor=%v, descheduler.prometheusClient=%v", i+1, prometheusClientFromReactor, descheduler.prometheusClient) + t.Logf("After cycle %d: prometheusClientFromReactor=%v, descheduler.promClientCtrl.promClient=%v", i+1, prometheusClientFromReactor, descheduler.promClientCtrl.promClient) if !newInvoked { t.Fatalf("Expected plugin new to be invoked during cycle %d", i+1) @@ -1564,7 +1564,7 @@ func TestPluginPrometheusClientAccess(t *testing.T) { t.Fatalf("Expected deschedule reactor to be invoked during cycle %d", i+1) } - verifyAllPrometheusClientsEqual(t, cycle.client, prometheusClientFromReactor, prometheusClientFromPluginNewHandle, descheduler.prometheusClient) + verifyAllPrometheusClientsEqual(t, cycle.client, prometheusClientFromReactor, prometheusClientFromPluginNewHandle, descheduler.promClientCtrl.promClient) } }) }