From b214c1479389c7c2e15014e260c1f7d4fefb1fca Mon Sep 17 00:00:00 2001 From: Jan Chaloupka Date: Wed, 4 Feb 2026 19:10:29 +0100 Subject: [PATCH] feat(pkg/descheduler): create profiles outside the descheduling cycle --- pkg/descheduler/descheduler.go | 67 +++++----- pkg/descheduler/descheduler_test.go | 189 +++++++++++++++++++++++++--- pkg/framework/profile/profile.go | 20 +-- 3 files changed, 219 insertions(+), 57 deletions(-) diff --git a/pkg/descheduler/descheduler.go b/pkg/descheduler/descheduler.go index c00476f01..277365b44 100644 --- a/pkg/descheduler/descheduler.go +++ b/pkg/descheduler/descheduler.go @@ -114,6 +114,7 @@ type descheduler struct { metricsCollector *metricscollector.MetricsCollector inClusterPromClientCtrl *inClusterPromClientController secretBasedPromClientCtrl *secretBasedPromClientController + profileRunners []profileRunner } type ( @@ -275,6 +276,35 @@ func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedu desch.metricsCollector = metricscollector.NewMetricsCollector(sharedInformerFactory.Core().V1().Nodes().Lister(), rs.MetricsClient, nodeSelector) } + var profileRunners []profileRunner + for idx, profile := range deschedulerPolicy.Profiles { + var promClientGetter func() promapi.Client + if desch.inClusterPromClientCtrl != nil { + promClientGetter = desch.inClusterPromClientCtrl.prometheusClient + } else if desch.secretBasedPromClientCtrl != nil { + promClientGetter = desch.secretBasedPromClientCtrl.prometheusClient + } + currProfile, err := frameworkprofile.NewProfile( + ctx, + profile, + pluginregistry.PluginRegistry, + frameworkprofile.WithClientSet(desch.client), + frameworkprofile.WithSharedInformerFactory(desch.sharedInformerFactory), + frameworkprofile.WithPodEvictor(desch.podEvictor), + frameworkprofile.WithGetPodsAssignedToNodeFnc(desch.getPodsAssignedToNode), + frameworkprofile.WithMetricsCollector(desch.metricsCollector), + frameworkprofile.WithPrometheusClient(promClientGetter), + // Generate a unique instance ID using just the index to avoid long IDs + // when profile names are very long + frameworkprofile.WithProfileInstanceID(fmt.Sprintf("%d", idx)), + ) + if err != nil { + return nil, fmt.Errorf("unable to create %q profile: %v", profile.Name, err) + } + profileRunners = append(profileRunners, profileRunner{profile.Name, currProfile.RunDeschedulePlugins, currProfile.RunBalancePlugins}) + } + + desch.profileRunners = profileRunners return desch, nil } @@ -466,40 +496,7 @@ func (d *descheduler) runProfiles(ctx context.Context) { return // gracefully skip this cycle instead of aborting } - var profileRunners []profileRunner - for idx, profile := range d.deschedulerPolicy.Profiles { - var promClient promapi.Client - if d.inClusterPromClientCtrl != nil && d.secretBasedPromClientCtrl != nil { - klog.Error(fmt.Errorf("At most one of inClusterPromClientCtrl and secretBasedPromClientCtrl can be set, got both")) - continue - } - if d.inClusterPromClientCtrl != nil { - promClient = d.inClusterPromClientCtrl.prometheusClient() - } else if d.secretBasedPromClientCtrl != nil { - promClient = d.secretBasedPromClientCtrl.prometheusClient() - } - currProfile, err := frameworkprofile.NewProfile( - ctx, - profile, - pluginregistry.PluginRegistry, - frameworkprofile.WithClientSet(d.client), - frameworkprofile.WithSharedInformerFactory(d.sharedInformerFactory), - frameworkprofile.WithPodEvictor(d.podEvictor), - frameworkprofile.WithGetPodsAssignedToNodeFnc(d.getPodsAssignedToNode), - frameworkprofile.WithMetricsCollector(d.metricsCollector), - frameworkprofile.WithPrometheusClient(promClient), - // Generate a unique instance ID using just the index to avoid long IDs - // when profile names are very long - frameworkprofile.WithProfileInstanceID(fmt.Sprintf("%d", idx)), - ) - if err != nil { - klog.ErrorS(err, "unable to create a profile", "profile", profile.Name) - continue - } - profileRunners = append(profileRunners, profileRunner{profile.Name, currProfile.RunDeschedulePlugins, currProfile.RunBalancePlugins}) - } - - for _, profileR := range profileRunners { + for _, profileR := range d.profileRunners { // First deschedule status := profileR.descheduleEPs(ctx, nodes) if status != nil && status.Err != nil { @@ -509,7 +506,7 @@ func (d *descheduler) runProfiles(ctx context.Context) { } } - for _, profileR := range profileRunners { + for _, profileR := range d.profileRunners { // Balance Later status := profileR.balanceEPs(ctx, nodes) if status != nil && status.Err != nil { diff --git a/pkg/descheduler/descheduler_test.go b/pkg/descheduler/descheduler_test.go index d950c32fd..7a7dc5889 100644 --- a/pkg/descheduler/descheduler_test.go +++ b/pkg/descheduler/descheduler_test.go @@ -1429,14 +1429,11 @@ func TestEvictedPodRestorationInDryRun(t *testing.T) { } // verifyAllPrometheusClientsEqual checks that all Prometheus client variables are equal to the expected value -func verifyAllPrometheusClientsEqual(t *testing.T, expected, fromReactor, fromPluginHandle, fromDescheduler promapi.Client) { +func verifyAllPrometheusClientsEqual(t *testing.T, expected, fromReactor, fromDescheduler promapi.Client) { t.Helper() if fromReactor != expected { t.Fatalf("Prometheus client from reactor: expected %v, got %v", expected, fromReactor) } - if fromPluginHandle != expected { - t.Fatalf("Prometheus client from plugin handle: expected %v, got %v", expected, fromPluginHandle) - } if fromDescheduler != expected { t.Fatalf("Prometheus client from descheduler: expected %v, got %v", expected, fromDescheduler) } @@ -1445,6 +1442,8 @@ func verifyAllPrometheusClientsEqual(t *testing.T, expected, fromReactor, fromPl // TestPluginPrometheusClientAccess tests that the Prometheus client is accessible through the plugin handle func TestPluginPrometheusClientAccess_Secret(t *testing.T) { + // klog.InitFlags(nil) + // flag.Set("v", "4") testCases := []struct { name string dryRun bool @@ -1526,6 +1525,14 @@ func TestPluginPrometheusClientAccess_Secret(t *testing.T) { _, descheduler, runFnc, fakeClient := initDescheduler(t, ctx, initFeatureGates(), deschedulerPolicy, nil, tc.dryRun, node1, node2) + if !newInvoked { + t.Fatalf("Expected plugin New to be invoked") + } + newInvoked = false + if prometheusClientFromPluginNewHandle != nil { + t.Fatalf("Expected nil prometheus client from plugin New's handle, got %v", prometheusClientFromPluginNewHandle) + } + // Test cycles with different Prometheus client values cycles := []struct { name string @@ -1607,9 +1614,7 @@ func TestPluginPrometheusClientAccess_Secret(t *testing.T) { } } - newInvoked = false reactorInvoked = false - prometheusClientFromPluginNewHandle = nil prometheusClientFromReactor = nil if err := runFnc(ctx); err != nil { @@ -1618,15 +1623,15 @@ func TestPluginPrometheusClientAccess_Secret(t *testing.T) { t.Logf("After cycle %d: prometheusClientFromReactor=%v, descheduler.secretBasedPromClientCtrl.promClient=%v", i+1, prometheusClientFromReactor, descheduler.secretBasedPromClientCtrl.prometheusClient()) - if !newInvoked { - t.Fatalf("Expected plugin new to be invoked during cycle %d", i+1) + if newInvoked { + t.Fatalf("Expected plugin New not to be invoked during cycle %d", i+1) } if !reactorInvoked { t.Fatalf("Expected deschedule reactor to be invoked during cycle %d", i+1) } - verifyAllPrometheusClientsEqual(t, cycle.client, prometheusClientFromReactor, prometheusClientFromPluginNewHandle, descheduler.secretBasedPromClientCtrl.prometheusClient()) + verifyAllPrometheusClientsEqual(t, cycle.client, prometheusClientFromReactor, descheduler.secretBasedPromClientCtrl.prometheusClient()) } }) } @@ -1716,6 +1721,14 @@ func TestPluginPrometheusClientAccess_InCluster(t *testing.T) { _, descheduler, runFnc, _ := initDescheduler(t, ctx, initFeatureGates(), deschedulerPolicy, nil, tc.dryRun, node1, node2) + if !newInvoked { + t.Fatalf("Expected plugin New to be invoked") + } + newInvoked = false + if prometheusClientFromPluginNewHandle != nil { + t.Fatalf("Expected nil prometheus client from plugin New's handle, got %v", prometheusClientFromPluginNewHandle) + } + // Test cycles with different Prometheus client values cycles := []struct { name string @@ -1764,9 +1777,7 @@ func TestPluginPrometheusClientAccess_InCluster(t *testing.T) { } descheduler.inClusterPromClientCtrl.mu.Unlock() - newInvoked = false reactorInvoked = false - prometheusClientFromPluginNewHandle = nil prometheusClientFromReactor = nil if err := runFnc(ctx); err != nil { @@ -1775,15 +1786,15 @@ func TestPluginPrometheusClientAccess_InCluster(t *testing.T) { t.Logf("After cycle %d: prometheusClientFromReactor=%v, descheduler.inClusterPromClientCtrl.promClient=%v", i+1, prometheusClientFromReactor, descheduler.inClusterPromClientCtrl.prometheusClient()) - if !newInvoked { - t.Fatalf("Expected plugin new to be invoked during cycle %d", i+1) + if newInvoked { + t.Fatalf("Expected plugin New not to be invoked during cycle %d", i+1) } if !reactorInvoked { t.Fatalf("Expected deschedule reactor to be invoked during cycle %d", i+1) } - verifyAllPrometheusClientsEqual(t, cycle.client, prometheusClientFromReactor, prometheusClientFromPluginNewHandle, descheduler.inClusterPromClientCtrl.prometheusClient()) + verifyAllPrometheusClientsEqual(t, cycle.client, prometheusClientFromReactor, descheduler.inClusterPromClientCtrl.prometheusClient()) } }) } @@ -2651,3 +2662,153 @@ func TestReconcileInClusterSAToken(t *testing.T) { }) } } + +// TestPluginInformerRegistration tests that plugin-specific informers are registered during newDescheduler +func TestPluginInformerRegistration(t *testing.T) { + testCases := []struct { + name string + dryRun bool + }{ + { + name: "dry run disabled", + dryRun: false, + }, + { + name: "dry run enabled", + dryRun: true, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ctx := context.Background() + + initPluginRegistry() + + // Define the custom informers that should be registered by the plugin + customInformers := []schema.GroupVersionResource{ + {Group: "apps", Version: "v1", Resource: "daemonsets"}, + {Group: "apps", Version: "v1", Resource: "replicasets"}, + {Group: "apps", Version: "v1", Resource: "statefulsets"}, + } + + callbackInvoked := false + fakePlugin := &fakeplugin.FakePlugin{ + PluginName: "TestPluginWithInformers", + } + + reactorInvoked := false + fakePlugin.AddReactor(string(frameworktypes.DescheduleExtensionPoint), func(action fakeplugin.Action) (handled, filter bool, err error) { + if _, ok := action.(fakeplugin.DescheduleAction); ok { + reactorInvoked = true + return true, false, nil + } + return false, false, nil + }) + + // Register our mock plugin using NewPluginFncFromFakeWithReactor + pluginregistry.Register( + fakePlugin.PluginName, + fakeplugin.NewPluginFncFromFakeWithReactor(fakePlugin, func(action fakeplugin.ActionImpl) { + callbackInvoked = true + for _, gvr := range customInformers { + _, err := action.Handle().SharedInformerFactory().ForResource(gvr) + if err != nil { + t.Fatalf("Failed to register informer for %s: %v", gvr.Resource, err) + } + t.Logf("Informer for %v registered inside of plugin's New", gvr) + } + }), + &fakeplugin.FakePlugin{}, + &fakeplugin.FakePluginArgs{}, + fakeplugin.ValidateFakePluginArgs, + fakeplugin.SetDefaults_FakePluginArgs, + pluginregistry.PluginRegistry, + ) + + deschedulerPolicy := &api.DeschedulerPolicy{ + Profiles: []api.DeschedulerProfile{ + { + Name: "test-profile", + PluginConfigs: []api.PluginConfig{ + { + Name: fakePlugin.PluginName, + Args: &fakeplugin.FakePluginArgs{}, + }, + }, + Plugins: api.Plugins{ + Deschedule: api.PluginSet{ + Enabled: []string{fakePlugin.PluginName}, + }, + }, + }, + }, + } + + node1 := test.BuildTestNode("node1", 1000, 2000, 9, nil) + node2 := test.BuildTestNode("node2", 1000, 2000, 9, nil) + + _, descheduler, runFnc, _ := initDescheduler(t, ctx, initFeatureGates(), deschedulerPolicy, nil, tc.dryRun, node1, node2) + + if !callbackInvoked { + t.Fatal("Expected plugin initialization callback to be invoked") + } + + // Verify that custom informers were registered in the SharedInformerFactory + for _, gvr := range customInformers { + informer, err := descheduler.sharedInformerFactory.ForResource(gvr) + if err != nil { + t.Errorf("Expected %s informer to be registered in SharedInformerFactory, got error: %v", gvr.Resource, err) + continue + } + + if informer.Informer() == nil { + t.Errorf("Expected %s informer to be registered in SharedInformerFactory", gvr.Resource) + continue + } + + var informer2 informers.GenericInformer + informer2, err = descheduler.sharedInformerFactory.ForResource(gvr) + if err != nil { + t.Errorf("Expected %s informer to be cached in factory, got error: %v", gvr.Resource, err) + continue + } + + if informer.Informer() != informer2.Informer() { + t.Errorf("Expected %s informer to be cached in factory", gvr.Resource) + } + t.Logf("Found %v informer after initializing the descheduler", gvr) + } + + // Verify profileRunners were created + if len(descheduler.profileRunners) == 0 { + t.Fatal("Expected profileRunners to be created, got empty slice") + } + + if len(descheduler.profileRunners) != 1 { + t.Fatalf("Expected 1 profileRunner, got %d", len(descheduler.profileRunners)) + } + + // Verify profile name + if descheduler.profileRunners[0].name != "test-profile" { + t.Errorf("Expected profile name to be 'test-profile', got '%s'", descheduler.profileRunners[0].name) + } + + callbackInvoked = false + t.Logf("Running a descheduling cycle, no informer registration is expected") + if err := runFnc(ctx); err != nil { + t.Fatalf("Unable to run a descheduling loop: %v", err) + } + + if !reactorInvoked { + t.Fatalf("Expected reactorInvoked to be set") + } + t.Logf("Deschedule reactor invoked") + + if callbackInvoked { + t.Fatal("Unexpected plugin initialization callback") + } + t.Logf("Plugin initialization callback not invoked as expected") + }) + } +} diff --git a/pkg/framework/profile/profile.go b/pkg/framework/profile/profile.go index 3fff9057b..64996ae90 100644 --- a/pkg/framework/profile/profile.go +++ b/pkg/framework/profile/profile.go @@ -69,7 +69,7 @@ func (ei *evictorImpl) Evict(ctx context.Context, pod *v1.Pod, opts evictions.Ev // handleImpl implements the framework handle which gets passed to plugins type handleImpl struct { clientSet clientset.Interface - prometheusClient promapi.Client + prometheusClientGetter func() promapi.Client metricsCollector *metricscollector.MetricsCollector getPodsAssignedToNodeFunc podutil.GetPodsAssignedToNodeFunc sharedInformerFactory informers.SharedInformerFactory @@ -92,7 +92,11 @@ func (hi *handleImpl) ClientSet() clientset.Interface { } func (hi *handleImpl) PrometheusClient() promapi.Client { - return hi.prometheusClient + if hi.prometheusClientGetter == nil { + klog.V(3).Info("prometheusClientGetter is nil, returning nil Prometheus client") + return nil + } + return hi.prometheusClientGetter() } func (hi *handleImpl) MetricsCollector() *metricscollector.MetricsCollector { @@ -156,7 +160,7 @@ type Option func(*handleImplOpts) type handleImplOpts struct { clientSet clientset.Interface - prometheusClient promapi.Client + prometheusClientGetter func() promapi.Client sharedInformerFactory informers.SharedInformerFactory getPodsAssignedToNodeFunc podutil.GetPodsAssignedToNodeFunc podEvictor *evictions.PodEvictor @@ -171,10 +175,10 @@ func WithClientSet(clientSet clientset.Interface) Option { } } -// WithPrometheusClient sets Prometheus client for the scheduling frameworkImpl. -func WithPrometheusClient(prometheusClient promapi.Client) Option { +// WithPrometheusClient sets Prometheus client getter for the scheduling frameworkImpl. +func WithPrometheusClient(prometheusClientGetter func() promapi.Client) Option { return func(o *handleImplOpts) { - o.prometheusClient = prometheusClient + o.prometheusClientGetter = prometheusClientGetter } } @@ -309,8 +313,8 @@ func NewProfile(ctx context.Context, config api.DeschedulerProfile, reg pluginre profileName: config.Name, podEvictor: hOpts.podEvictor, }, - metricsCollector: hOpts.metricsCollector, - prometheusClient: hOpts.prometheusClient, + metricsCollector: hOpts.metricsCollector, + prometheusClientGetter: hOpts.prometheusClientGetter, } // Collect all unique plugin names across all extension points