feat(pkg/descheduler): create profiles outside the descheduling cycle

This commit is contained in:
Jan Chaloupka
2026-02-04 19:10:29 +01:00
parent fc863ff58d
commit b214c14793
3 changed files with 219 additions and 57 deletions

View File

@@ -114,6 +114,7 @@ type descheduler struct {
metricsCollector *metricscollector.MetricsCollector
inClusterPromClientCtrl *inClusterPromClientController
secretBasedPromClientCtrl *secretBasedPromClientController
profileRunners []profileRunner
}
type (
@@ -275,6 +276,35 @@ func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedu
desch.metricsCollector = metricscollector.NewMetricsCollector(sharedInformerFactory.Core().V1().Nodes().Lister(), rs.MetricsClient, nodeSelector)
}
var profileRunners []profileRunner
for idx, profile := range deschedulerPolicy.Profiles {
var promClientGetter func() promapi.Client
if desch.inClusterPromClientCtrl != nil {
promClientGetter = desch.inClusterPromClientCtrl.prometheusClient
} else if desch.secretBasedPromClientCtrl != nil {
promClientGetter = desch.secretBasedPromClientCtrl.prometheusClient
}
currProfile, err := frameworkprofile.NewProfile(
ctx,
profile,
pluginregistry.PluginRegistry,
frameworkprofile.WithClientSet(desch.client),
frameworkprofile.WithSharedInformerFactory(desch.sharedInformerFactory),
frameworkprofile.WithPodEvictor(desch.podEvictor),
frameworkprofile.WithGetPodsAssignedToNodeFnc(desch.getPodsAssignedToNode),
frameworkprofile.WithMetricsCollector(desch.metricsCollector),
frameworkprofile.WithPrometheusClient(promClientGetter),
// Generate a unique instance ID using just the index to avoid long IDs
// when profile names are very long
frameworkprofile.WithProfileInstanceID(fmt.Sprintf("%d", idx)),
)
if err != nil {
return nil, fmt.Errorf("unable to create %q profile: %v", profile.Name, err)
}
profileRunners = append(profileRunners, profileRunner{profile.Name, currProfile.RunDeschedulePlugins, currProfile.RunBalancePlugins})
}
desch.profileRunners = profileRunners
return desch, nil
}
@@ -466,40 +496,7 @@ func (d *descheduler) runProfiles(ctx context.Context) {
return // gracefully skip this cycle instead of aborting
}
var profileRunners []profileRunner
for idx, profile := range d.deschedulerPolicy.Profiles {
var promClient promapi.Client
if d.inClusterPromClientCtrl != nil && d.secretBasedPromClientCtrl != nil {
klog.Error(fmt.Errorf("At most one of inClusterPromClientCtrl and secretBasedPromClientCtrl can be set, got both"))
continue
}
if d.inClusterPromClientCtrl != nil {
promClient = d.inClusterPromClientCtrl.prometheusClient()
} else if d.secretBasedPromClientCtrl != nil {
promClient = d.secretBasedPromClientCtrl.prometheusClient()
}
currProfile, err := frameworkprofile.NewProfile(
ctx,
profile,
pluginregistry.PluginRegistry,
frameworkprofile.WithClientSet(d.client),
frameworkprofile.WithSharedInformerFactory(d.sharedInformerFactory),
frameworkprofile.WithPodEvictor(d.podEvictor),
frameworkprofile.WithGetPodsAssignedToNodeFnc(d.getPodsAssignedToNode),
frameworkprofile.WithMetricsCollector(d.metricsCollector),
frameworkprofile.WithPrometheusClient(promClient),
// Generate a unique instance ID using just the index to avoid long IDs
// when profile names are very long
frameworkprofile.WithProfileInstanceID(fmt.Sprintf("%d", idx)),
)
if err != nil {
klog.ErrorS(err, "unable to create a profile", "profile", profile.Name)
continue
}
profileRunners = append(profileRunners, profileRunner{profile.Name, currProfile.RunDeschedulePlugins, currProfile.RunBalancePlugins})
}
for _, profileR := range profileRunners {
for _, profileR := range d.profileRunners {
// First deschedule
status := profileR.descheduleEPs(ctx, nodes)
if status != nil && status.Err != nil {
@@ -509,7 +506,7 @@ func (d *descheduler) runProfiles(ctx context.Context) {
}
}
for _, profileR := range profileRunners {
for _, profileR := range d.profileRunners {
// Balance Later
status := profileR.balanceEPs(ctx, nodes)
if status != nil && status.Err != nil {

View File

@@ -1429,14 +1429,11 @@ func TestEvictedPodRestorationInDryRun(t *testing.T) {
}
// verifyAllPrometheusClientsEqual checks that all Prometheus client variables are equal to the expected value
func verifyAllPrometheusClientsEqual(t *testing.T, expected, fromReactor, fromPluginHandle, fromDescheduler promapi.Client) {
func verifyAllPrometheusClientsEqual(t *testing.T, expected, fromReactor, fromDescheduler promapi.Client) {
t.Helper()
if fromReactor != expected {
t.Fatalf("Prometheus client from reactor: expected %v, got %v", expected, fromReactor)
}
if fromPluginHandle != expected {
t.Fatalf("Prometheus client from plugin handle: expected %v, got %v", expected, fromPluginHandle)
}
if fromDescheduler != expected {
t.Fatalf("Prometheus client from descheduler: expected %v, got %v", expected, fromDescheduler)
}
@@ -1445,6 +1442,8 @@ func verifyAllPrometheusClientsEqual(t *testing.T, expected, fromReactor, fromPl
// TestPluginPrometheusClientAccess tests that the Prometheus client is accessible through the plugin handle
func TestPluginPrometheusClientAccess_Secret(t *testing.T) {
// klog.InitFlags(nil)
// flag.Set("v", "4")
testCases := []struct {
name string
dryRun bool
@@ -1526,6 +1525,14 @@ func TestPluginPrometheusClientAccess_Secret(t *testing.T) {
_, descheduler, runFnc, fakeClient := initDescheduler(t, ctx, initFeatureGates(), deschedulerPolicy, nil, tc.dryRun, node1, node2)
if !newInvoked {
t.Fatalf("Expected plugin New to be invoked")
}
newInvoked = false
if prometheusClientFromPluginNewHandle != nil {
t.Fatalf("Expected nil prometheus client from plugin New's handle, got %v", prometheusClientFromPluginNewHandle)
}
// Test cycles with different Prometheus client values
cycles := []struct {
name string
@@ -1607,9 +1614,7 @@ func TestPluginPrometheusClientAccess_Secret(t *testing.T) {
}
}
newInvoked = false
reactorInvoked = false
prometheusClientFromPluginNewHandle = nil
prometheusClientFromReactor = nil
if err := runFnc(ctx); err != nil {
@@ -1618,15 +1623,15 @@ func TestPluginPrometheusClientAccess_Secret(t *testing.T) {
t.Logf("After cycle %d: prometheusClientFromReactor=%v, descheduler.secretBasedPromClientCtrl.promClient=%v", i+1, prometheusClientFromReactor, descheduler.secretBasedPromClientCtrl.prometheusClient())
if !newInvoked {
t.Fatalf("Expected plugin new to be invoked during cycle %d", i+1)
if newInvoked {
t.Fatalf("Expected plugin New not to be invoked during cycle %d", i+1)
}
if !reactorInvoked {
t.Fatalf("Expected deschedule reactor to be invoked during cycle %d", i+1)
}
verifyAllPrometheusClientsEqual(t, cycle.client, prometheusClientFromReactor, prometheusClientFromPluginNewHandle, descheduler.secretBasedPromClientCtrl.prometheusClient())
verifyAllPrometheusClientsEqual(t, cycle.client, prometheusClientFromReactor, descheduler.secretBasedPromClientCtrl.prometheusClient())
}
})
}
@@ -1716,6 +1721,14 @@ func TestPluginPrometheusClientAccess_InCluster(t *testing.T) {
_, descheduler, runFnc, _ := initDescheduler(t, ctx, initFeatureGates(), deschedulerPolicy, nil, tc.dryRun, node1, node2)
if !newInvoked {
t.Fatalf("Expected plugin New to be invoked")
}
newInvoked = false
if prometheusClientFromPluginNewHandle != nil {
t.Fatalf("Expected nil prometheus client from plugin New's handle, got %v", prometheusClientFromPluginNewHandle)
}
// Test cycles with different Prometheus client values
cycles := []struct {
name string
@@ -1764,9 +1777,7 @@ func TestPluginPrometheusClientAccess_InCluster(t *testing.T) {
}
descheduler.inClusterPromClientCtrl.mu.Unlock()
newInvoked = false
reactorInvoked = false
prometheusClientFromPluginNewHandle = nil
prometheusClientFromReactor = nil
if err := runFnc(ctx); err != nil {
@@ -1775,15 +1786,15 @@ func TestPluginPrometheusClientAccess_InCluster(t *testing.T) {
t.Logf("After cycle %d: prometheusClientFromReactor=%v, descheduler.inClusterPromClientCtrl.promClient=%v", i+1, prometheusClientFromReactor, descheduler.inClusterPromClientCtrl.prometheusClient())
if !newInvoked {
t.Fatalf("Expected plugin new to be invoked during cycle %d", i+1)
if newInvoked {
t.Fatalf("Expected plugin New not to be invoked during cycle %d", i+1)
}
if !reactorInvoked {
t.Fatalf("Expected deschedule reactor to be invoked during cycle %d", i+1)
}
verifyAllPrometheusClientsEqual(t, cycle.client, prometheusClientFromReactor, prometheusClientFromPluginNewHandle, descheduler.inClusterPromClientCtrl.prometheusClient())
verifyAllPrometheusClientsEqual(t, cycle.client, prometheusClientFromReactor, descheduler.inClusterPromClientCtrl.prometheusClient())
}
})
}
@@ -2651,3 +2662,153 @@ func TestReconcileInClusterSAToken(t *testing.T) {
})
}
}
// TestPluginInformerRegistration tests that plugin-specific informers are registered during newDescheduler
func TestPluginInformerRegistration(t *testing.T) {
testCases := []struct {
name string
dryRun bool
}{
{
name: "dry run disabled",
dryRun: false,
},
{
name: "dry run enabled",
dryRun: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
initPluginRegistry()
// Define the custom informers that should be registered by the plugin
customInformers := []schema.GroupVersionResource{
{Group: "apps", Version: "v1", Resource: "daemonsets"},
{Group: "apps", Version: "v1", Resource: "replicasets"},
{Group: "apps", Version: "v1", Resource: "statefulsets"},
}
callbackInvoked := false
fakePlugin := &fakeplugin.FakePlugin{
PluginName: "TestPluginWithInformers",
}
reactorInvoked := false
fakePlugin.AddReactor(string(frameworktypes.DescheduleExtensionPoint), func(action fakeplugin.Action) (handled, filter bool, err error) {
if _, ok := action.(fakeplugin.DescheduleAction); ok {
reactorInvoked = true
return true, false, nil
}
return false, false, nil
})
// Register our mock plugin using NewPluginFncFromFakeWithReactor
pluginregistry.Register(
fakePlugin.PluginName,
fakeplugin.NewPluginFncFromFakeWithReactor(fakePlugin, func(action fakeplugin.ActionImpl) {
callbackInvoked = true
for _, gvr := range customInformers {
_, err := action.Handle().SharedInformerFactory().ForResource(gvr)
if err != nil {
t.Fatalf("Failed to register informer for %s: %v", gvr.Resource, err)
}
t.Logf("Informer for %v registered inside of plugin's New", gvr)
}
}),
&fakeplugin.FakePlugin{},
&fakeplugin.FakePluginArgs{},
fakeplugin.ValidateFakePluginArgs,
fakeplugin.SetDefaults_FakePluginArgs,
pluginregistry.PluginRegistry,
)
deschedulerPolicy := &api.DeschedulerPolicy{
Profiles: []api.DeschedulerProfile{
{
Name: "test-profile",
PluginConfigs: []api.PluginConfig{
{
Name: fakePlugin.PluginName,
Args: &fakeplugin.FakePluginArgs{},
},
},
Plugins: api.Plugins{
Deschedule: api.PluginSet{
Enabled: []string{fakePlugin.PluginName},
},
},
},
},
}
node1 := test.BuildTestNode("node1", 1000, 2000, 9, nil)
node2 := test.BuildTestNode("node2", 1000, 2000, 9, nil)
_, descheduler, runFnc, _ := initDescheduler(t, ctx, initFeatureGates(), deschedulerPolicy, nil, tc.dryRun, node1, node2)
if !callbackInvoked {
t.Fatal("Expected plugin initialization callback to be invoked")
}
// Verify that custom informers were registered in the SharedInformerFactory
for _, gvr := range customInformers {
informer, err := descheduler.sharedInformerFactory.ForResource(gvr)
if err != nil {
t.Errorf("Expected %s informer to be registered in SharedInformerFactory, got error: %v", gvr.Resource, err)
continue
}
if informer.Informer() == nil {
t.Errorf("Expected %s informer to be registered in SharedInformerFactory", gvr.Resource)
continue
}
var informer2 informers.GenericInformer
informer2, err = descheduler.sharedInformerFactory.ForResource(gvr)
if err != nil {
t.Errorf("Expected %s informer to be cached in factory, got error: %v", gvr.Resource, err)
continue
}
if informer.Informer() != informer2.Informer() {
t.Errorf("Expected %s informer to be cached in factory", gvr.Resource)
}
t.Logf("Found %v informer after initializing the descheduler", gvr)
}
// Verify profileRunners were created
if len(descheduler.profileRunners) == 0 {
t.Fatal("Expected profileRunners to be created, got empty slice")
}
if len(descheduler.profileRunners) != 1 {
t.Fatalf("Expected 1 profileRunner, got %d", len(descheduler.profileRunners))
}
// Verify profile name
if descheduler.profileRunners[0].name != "test-profile" {
t.Errorf("Expected profile name to be 'test-profile', got '%s'", descheduler.profileRunners[0].name)
}
callbackInvoked = false
t.Logf("Running a descheduling cycle, no informer registration is expected")
if err := runFnc(ctx); err != nil {
t.Fatalf("Unable to run a descheduling loop: %v", err)
}
if !reactorInvoked {
t.Fatalf("Expected reactorInvoked to be set")
}
t.Logf("Deschedule reactor invoked")
if callbackInvoked {
t.Fatal("Unexpected plugin initialization callback")
}
t.Logf("Plugin initialization callback not invoked as expected")
})
}
}

View File

@@ -69,7 +69,7 @@ func (ei *evictorImpl) Evict(ctx context.Context, pod *v1.Pod, opts evictions.Ev
// handleImpl implements the framework handle which gets passed to plugins
type handleImpl struct {
clientSet clientset.Interface
prometheusClient promapi.Client
prometheusClientGetter func() promapi.Client
metricsCollector *metricscollector.MetricsCollector
getPodsAssignedToNodeFunc podutil.GetPodsAssignedToNodeFunc
sharedInformerFactory informers.SharedInformerFactory
@@ -92,7 +92,11 @@ func (hi *handleImpl) ClientSet() clientset.Interface {
}
func (hi *handleImpl) PrometheusClient() promapi.Client {
return hi.prometheusClient
if hi.prometheusClientGetter == nil {
klog.V(3).Info("prometheusClientGetter is nil, returning nil Prometheus client")
return nil
}
return hi.prometheusClientGetter()
}
func (hi *handleImpl) MetricsCollector() *metricscollector.MetricsCollector {
@@ -156,7 +160,7 @@ type Option func(*handleImplOpts)
type handleImplOpts struct {
clientSet clientset.Interface
prometheusClient promapi.Client
prometheusClientGetter func() promapi.Client
sharedInformerFactory informers.SharedInformerFactory
getPodsAssignedToNodeFunc podutil.GetPodsAssignedToNodeFunc
podEvictor *evictions.PodEvictor
@@ -171,10 +175,10 @@ func WithClientSet(clientSet clientset.Interface) Option {
}
}
// WithPrometheusClient sets Prometheus client for the scheduling frameworkImpl.
func WithPrometheusClient(prometheusClient promapi.Client) Option {
// WithPrometheusClient sets Prometheus client getter for the scheduling frameworkImpl.
func WithPrometheusClient(prometheusClientGetter func() promapi.Client) Option {
return func(o *handleImplOpts) {
o.prometheusClient = prometheusClient
o.prometheusClientGetter = prometheusClientGetter
}
}
@@ -309,8 +313,8 @@ func NewProfile(ctx context.Context, config api.DeschedulerProfile, reg pluginre
profileName: config.Name,
podEvictor: hOpts.podEvictor,
},
metricsCollector: hOpts.metricsCollector,
prometheusClient: hOpts.prometheusClient,
metricsCollector: hOpts.metricsCollector,
prometheusClientGetter: hOpts.prometheusClientGetter,
}
// Collect all unique plugin names across all extension points