mirror of
https://github.com/kubernetes-sigs/descheduler.git
synced 2026-05-09 02:36:47 +00:00
feat(pkg/descheduler): create profiles outside the descheduling cycle
This commit is contained in:
@@ -114,6 +114,7 @@ type descheduler struct {
|
||||
metricsCollector *metricscollector.MetricsCollector
|
||||
inClusterPromClientCtrl *inClusterPromClientController
|
||||
secretBasedPromClientCtrl *secretBasedPromClientController
|
||||
profileRunners []profileRunner
|
||||
}
|
||||
|
||||
type (
|
||||
@@ -275,6 +276,35 @@ func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedu
|
||||
desch.metricsCollector = metricscollector.NewMetricsCollector(sharedInformerFactory.Core().V1().Nodes().Lister(), rs.MetricsClient, nodeSelector)
|
||||
}
|
||||
|
||||
var profileRunners []profileRunner
|
||||
for idx, profile := range deschedulerPolicy.Profiles {
|
||||
var promClientGetter func() promapi.Client
|
||||
if desch.inClusterPromClientCtrl != nil {
|
||||
promClientGetter = desch.inClusterPromClientCtrl.prometheusClient
|
||||
} else if desch.secretBasedPromClientCtrl != nil {
|
||||
promClientGetter = desch.secretBasedPromClientCtrl.prometheusClient
|
||||
}
|
||||
currProfile, err := frameworkprofile.NewProfile(
|
||||
ctx,
|
||||
profile,
|
||||
pluginregistry.PluginRegistry,
|
||||
frameworkprofile.WithClientSet(desch.client),
|
||||
frameworkprofile.WithSharedInformerFactory(desch.sharedInformerFactory),
|
||||
frameworkprofile.WithPodEvictor(desch.podEvictor),
|
||||
frameworkprofile.WithGetPodsAssignedToNodeFnc(desch.getPodsAssignedToNode),
|
||||
frameworkprofile.WithMetricsCollector(desch.metricsCollector),
|
||||
frameworkprofile.WithPrometheusClient(promClientGetter),
|
||||
// Generate a unique instance ID using just the index to avoid long IDs
|
||||
// when profile names are very long
|
||||
frameworkprofile.WithProfileInstanceID(fmt.Sprintf("%d", idx)),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to create %q profile: %v", profile.Name, err)
|
||||
}
|
||||
profileRunners = append(profileRunners, profileRunner{profile.Name, currProfile.RunDeschedulePlugins, currProfile.RunBalancePlugins})
|
||||
}
|
||||
|
||||
desch.profileRunners = profileRunners
|
||||
return desch, nil
|
||||
}
|
||||
|
||||
@@ -466,40 +496,7 @@ func (d *descheduler) runProfiles(ctx context.Context) {
|
||||
return // gracefully skip this cycle instead of aborting
|
||||
}
|
||||
|
||||
var profileRunners []profileRunner
|
||||
for idx, profile := range d.deschedulerPolicy.Profiles {
|
||||
var promClient promapi.Client
|
||||
if d.inClusterPromClientCtrl != nil && d.secretBasedPromClientCtrl != nil {
|
||||
klog.Error(fmt.Errorf("At most one of inClusterPromClientCtrl and secretBasedPromClientCtrl can be set, got both"))
|
||||
continue
|
||||
}
|
||||
if d.inClusterPromClientCtrl != nil {
|
||||
promClient = d.inClusterPromClientCtrl.prometheusClient()
|
||||
} else if d.secretBasedPromClientCtrl != nil {
|
||||
promClient = d.secretBasedPromClientCtrl.prometheusClient()
|
||||
}
|
||||
currProfile, err := frameworkprofile.NewProfile(
|
||||
ctx,
|
||||
profile,
|
||||
pluginregistry.PluginRegistry,
|
||||
frameworkprofile.WithClientSet(d.client),
|
||||
frameworkprofile.WithSharedInformerFactory(d.sharedInformerFactory),
|
||||
frameworkprofile.WithPodEvictor(d.podEvictor),
|
||||
frameworkprofile.WithGetPodsAssignedToNodeFnc(d.getPodsAssignedToNode),
|
||||
frameworkprofile.WithMetricsCollector(d.metricsCollector),
|
||||
frameworkprofile.WithPrometheusClient(promClient),
|
||||
// Generate a unique instance ID using just the index to avoid long IDs
|
||||
// when profile names are very long
|
||||
frameworkprofile.WithProfileInstanceID(fmt.Sprintf("%d", idx)),
|
||||
)
|
||||
if err != nil {
|
||||
klog.ErrorS(err, "unable to create a profile", "profile", profile.Name)
|
||||
continue
|
||||
}
|
||||
profileRunners = append(profileRunners, profileRunner{profile.Name, currProfile.RunDeschedulePlugins, currProfile.RunBalancePlugins})
|
||||
}
|
||||
|
||||
for _, profileR := range profileRunners {
|
||||
for _, profileR := range d.profileRunners {
|
||||
// First deschedule
|
||||
status := profileR.descheduleEPs(ctx, nodes)
|
||||
if status != nil && status.Err != nil {
|
||||
@@ -509,7 +506,7 @@ func (d *descheduler) runProfiles(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
for _, profileR := range profileRunners {
|
||||
for _, profileR := range d.profileRunners {
|
||||
// Balance Later
|
||||
status := profileR.balanceEPs(ctx, nodes)
|
||||
if status != nil && status.Err != nil {
|
||||
|
||||
@@ -1429,14 +1429,11 @@ func TestEvictedPodRestorationInDryRun(t *testing.T) {
|
||||
}
|
||||
|
||||
// verifyAllPrometheusClientsEqual checks that all Prometheus client variables are equal to the expected value
|
||||
func verifyAllPrometheusClientsEqual(t *testing.T, expected, fromReactor, fromPluginHandle, fromDescheduler promapi.Client) {
|
||||
func verifyAllPrometheusClientsEqual(t *testing.T, expected, fromReactor, fromDescheduler promapi.Client) {
|
||||
t.Helper()
|
||||
if fromReactor != expected {
|
||||
t.Fatalf("Prometheus client from reactor: expected %v, got %v", expected, fromReactor)
|
||||
}
|
||||
if fromPluginHandle != expected {
|
||||
t.Fatalf("Prometheus client from plugin handle: expected %v, got %v", expected, fromPluginHandle)
|
||||
}
|
||||
if fromDescheduler != expected {
|
||||
t.Fatalf("Prometheus client from descheduler: expected %v, got %v", expected, fromDescheduler)
|
||||
}
|
||||
@@ -1445,6 +1442,8 @@ func verifyAllPrometheusClientsEqual(t *testing.T, expected, fromReactor, fromPl
|
||||
|
||||
// TestPluginPrometheusClientAccess tests that the Prometheus client is accessible through the plugin handle
|
||||
func TestPluginPrometheusClientAccess_Secret(t *testing.T) {
|
||||
// klog.InitFlags(nil)
|
||||
// flag.Set("v", "4")
|
||||
testCases := []struct {
|
||||
name string
|
||||
dryRun bool
|
||||
@@ -1526,6 +1525,14 @@ func TestPluginPrometheusClientAccess_Secret(t *testing.T) {
|
||||
|
||||
_, descheduler, runFnc, fakeClient := initDescheduler(t, ctx, initFeatureGates(), deschedulerPolicy, nil, tc.dryRun, node1, node2)
|
||||
|
||||
if !newInvoked {
|
||||
t.Fatalf("Expected plugin New to be invoked")
|
||||
}
|
||||
newInvoked = false
|
||||
if prometheusClientFromPluginNewHandle != nil {
|
||||
t.Fatalf("Expected nil prometheus client from plugin New's handle, got %v", prometheusClientFromPluginNewHandle)
|
||||
}
|
||||
|
||||
// Test cycles with different Prometheus client values
|
||||
cycles := []struct {
|
||||
name string
|
||||
@@ -1607,9 +1614,7 @@ func TestPluginPrometheusClientAccess_Secret(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
newInvoked = false
|
||||
reactorInvoked = false
|
||||
prometheusClientFromPluginNewHandle = nil
|
||||
prometheusClientFromReactor = nil
|
||||
|
||||
if err := runFnc(ctx); err != nil {
|
||||
@@ -1618,15 +1623,15 @@ func TestPluginPrometheusClientAccess_Secret(t *testing.T) {
|
||||
|
||||
t.Logf("After cycle %d: prometheusClientFromReactor=%v, descheduler.secretBasedPromClientCtrl.promClient=%v", i+1, prometheusClientFromReactor, descheduler.secretBasedPromClientCtrl.prometheusClient())
|
||||
|
||||
if !newInvoked {
|
||||
t.Fatalf("Expected plugin new to be invoked during cycle %d", i+1)
|
||||
if newInvoked {
|
||||
t.Fatalf("Expected plugin New not to be invoked during cycle %d", i+1)
|
||||
}
|
||||
|
||||
if !reactorInvoked {
|
||||
t.Fatalf("Expected deschedule reactor to be invoked during cycle %d", i+1)
|
||||
}
|
||||
|
||||
verifyAllPrometheusClientsEqual(t, cycle.client, prometheusClientFromReactor, prometheusClientFromPluginNewHandle, descheduler.secretBasedPromClientCtrl.prometheusClient())
|
||||
verifyAllPrometheusClientsEqual(t, cycle.client, prometheusClientFromReactor, descheduler.secretBasedPromClientCtrl.prometheusClient())
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -1716,6 +1721,14 @@ func TestPluginPrometheusClientAccess_InCluster(t *testing.T) {
|
||||
|
||||
_, descheduler, runFnc, _ := initDescheduler(t, ctx, initFeatureGates(), deschedulerPolicy, nil, tc.dryRun, node1, node2)
|
||||
|
||||
if !newInvoked {
|
||||
t.Fatalf("Expected plugin New to be invoked")
|
||||
}
|
||||
newInvoked = false
|
||||
if prometheusClientFromPluginNewHandle != nil {
|
||||
t.Fatalf("Expected nil prometheus client from plugin New's handle, got %v", prometheusClientFromPluginNewHandle)
|
||||
}
|
||||
|
||||
// Test cycles with different Prometheus client values
|
||||
cycles := []struct {
|
||||
name string
|
||||
@@ -1764,9 +1777,7 @@ func TestPluginPrometheusClientAccess_InCluster(t *testing.T) {
|
||||
}
|
||||
descheduler.inClusterPromClientCtrl.mu.Unlock()
|
||||
|
||||
newInvoked = false
|
||||
reactorInvoked = false
|
||||
prometheusClientFromPluginNewHandle = nil
|
||||
prometheusClientFromReactor = nil
|
||||
|
||||
if err := runFnc(ctx); err != nil {
|
||||
@@ -1775,15 +1786,15 @@ func TestPluginPrometheusClientAccess_InCluster(t *testing.T) {
|
||||
|
||||
t.Logf("After cycle %d: prometheusClientFromReactor=%v, descheduler.inClusterPromClientCtrl.promClient=%v", i+1, prometheusClientFromReactor, descheduler.inClusterPromClientCtrl.prometheusClient())
|
||||
|
||||
if !newInvoked {
|
||||
t.Fatalf("Expected plugin new to be invoked during cycle %d", i+1)
|
||||
if newInvoked {
|
||||
t.Fatalf("Expected plugin New not to be invoked during cycle %d", i+1)
|
||||
}
|
||||
|
||||
if !reactorInvoked {
|
||||
t.Fatalf("Expected deschedule reactor to be invoked during cycle %d", i+1)
|
||||
}
|
||||
|
||||
verifyAllPrometheusClientsEqual(t, cycle.client, prometheusClientFromReactor, prometheusClientFromPluginNewHandle, descheduler.inClusterPromClientCtrl.prometheusClient())
|
||||
verifyAllPrometheusClientsEqual(t, cycle.client, prometheusClientFromReactor, descheduler.inClusterPromClientCtrl.prometheusClient())
|
||||
}
|
||||
})
|
||||
}
|
||||
@@ -2651,3 +2662,153 @@ func TestReconcileInClusterSAToken(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestPluginInformerRegistration tests that plugin-specific informers are registered during newDescheduler
|
||||
func TestPluginInformerRegistration(t *testing.T) {
|
||||
testCases := []struct {
|
||||
name string
|
||||
dryRun bool
|
||||
}{
|
||||
{
|
||||
name: "dry run disabled",
|
||||
dryRun: false,
|
||||
},
|
||||
{
|
||||
name: "dry run enabled",
|
||||
dryRun: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
ctx := context.Background()
|
||||
|
||||
initPluginRegistry()
|
||||
|
||||
// Define the custom informers that should be registered by the plugin
|
||||
customInformers := []schema.GroupVersionResource{
|
||||
{Group: "apps", Version: "v1", Resource: "daemonsets"},
|
||||
{Group: "apps", Version: "v1", Resource: "replicasets"},
|
||||
{Group: "apps", Version: "v1", Resource: "statefulsets"},
|
||||
}
|
||||
|
||||
callbackInvoked := false
|
||||
fakePlugin := &fakeplugin.FakePlugin{
|
||||
PluginName: "TestPluginWithInformers",
|
||||
}
|
||||
|
||||
reactorInvoked := false
|
||||
fakePlugin.AddReactor(string(frameworktypes.DescheduleExtensionPoint), func(action fakeplugin.Action) (handled, filter bool, err error) {
|
||||
if _, ok := action.(fakeplugin.DescheduleAction); ok {
|
||||
reactorInvoked = true
|
||||
return true, false, nil
|
||||
}
|
||||
return false, false, nil
|
||||
})
|
||||
|
||||
// Register our mock plugin using NewPluginFncFromFakeWithReactor
|
||||
pluginregistry.Register(
|
||||
fakePlugin.PluginName,
|
||||
fakeplugin.NewPluginFncFromFakeWithReactor(fakePlugin, func(action fakeplugin.ActionImpl) {
|
||||
callbackInvoked = true
|
||||
for _, gvr := range customInformers {
|
||||
_, err := action.Handle().SharedInformerFactory().ForResource(gvr)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to register informer for %s: %v", gvr.Resource, err)
|
||||
}
|
||||
t.Logf("Informer for %v registered inside of plugin's New", gvr)
|
||||
}
|
||||
}),
|
||||
&fakeplugin.FakePlugin{},
|
||||
&fakeplugin.FakePluginArgs{},
|
||||
fakeplugin.ValidateFakePluginArgs,
|
||||
fakeplugin.SetDefaults_FakePluginArgs,
|
||||
pluginregistry.PluginRegistry,
|
||||
)
|
||||
|
||||
deschedulerPolicy := &api.DeschedulerPolicy{
|
||||
Profiles: []api.DeschedulerProfile{
|
||||
{
|
||||
Name: "test-profile",
|
||||
PluginConfigs: []api.PluginConfig{
|
||||
{
|
||||
Name: fakePlugin.PluginName,
|
||||
Args: &fakeplugin.FakePluginArgs{},
|
||||
},
|
||||
},
|
||||
Plugins: api.Plugins{
|
||||
Deschedule: api.PluginSet{
|
||||
Enabled: []string{fakePlugin.PluginName},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
node1 := test.BuildTestNode("node1", 1000, 2000, 9, nil)
|
||||
node2 := test.BuildTestNode("node2", 1000, 2000, 9, nil)
|
||||
|
||||
_, descheduler, runFnc, _ := initDescheduler(t, ctx, initFeatureGates(), deschedulerPolicy, nil, tc.dryRun, node1, node2)
|
||||
|
||||
if !callbackInvoked {
|
||||
t.Fatal("Expected plugin initialization callback to be invoked")
|
||||
}
|
||||
|
||||
// Verify that custom informers were registered in the SharedInformerFactory
|
||||
for _, gvr := range customInformers {
|
||||
informer, err := descheduler.sharedInformerFactory.ForResource(gvr)
|
||||
if err != nil {
|
||||
t.Errorf("Expected %s informer to be registered in SharedInformerFactory, got error: %v", gvr.Resource, err)
|
||||
continue
|
||||
}
|
||||
|
||||
if informer.Informer() == nil {
|
||||
t.Errorf("Expected %s informer to be registered in SharedInformerFactory", gvr.Resource)
|
||||
continue
|
||||
}
|
||||
|
||||
var informer2 informers.GenericInformer
|
||||
informer2, err = descheduler.sharedInformerFactory.ForResource(gvr)
|
||||
if err != nil {
|
||||
t.Errorf("Expected %s informer to be cached in factory, got error: %v", gvr.Resource, err)
|
||||
continue
|
||||
}
|
||||
|
||||
if informer.Informer() != informer2.Informer() {
|
||||
t.Errorf("Expected %s informer to be cached in factory", gvr.Resource)
|
||||
}
|
||||
t.Logf("Found %v informer after initializing the descheduler", gvr)
|
||||
}
|
||||
|
||||
// Verify profileRunners were created
|
||||
if len(descheduler.profileRunners) == 0 {
|
||||
t.Fatal("Expected profileRunners to be created, got empty slice")
|
||||
}
|
||||
|
||||
if len(descheduler.profileRunners) != 1 {
|
||||
t.Fatalf("Expected 1 profileRunner, got %d", len(descheduler.profileRunners))
|
||||
}
|
||||
|
||||
// Verify profile name
|
||||
if descheduler.profileRunners[0].name != "test-profile" {
|
||||
t.Errorf("Expected profile name to be 'test-profile', got '%s'", descheduler.profileRunners[0].name)
|
||||
}
|
||||
|
||||
callbackInvoked = false
|
||||
t.Logf("Running a descheduling cycle, no informer registration is expected")
|
||||
if err := runFnc(ctx); err != nil {
|
||||
t.Fatalf("Unable to run a descheduling loop: %v", err)
|
||||
}
|
||||
|
||||
if !reactorInvoked {
|
||||
t.Fatalf("Expected reactorInvoked to be set")
|
||||
}
|
||||
t.Logf("Deschedule reactor invoked")
|
||||
|
||||
if callbackInvoked {
|
||||
t.Fatal("Unexpected plugin initialization callback")
|
||||
}
|
||||
t.Logf("Plugin initialization callback not invoked as expected")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -69,7 +69,7 @@ func (ei *evictorImpl) Evict(ctx context.Context, pod *v1.Pod, opts evictions.Ev
|
||||
// handleImpl implements the framework handle which gets passed to plugins
|
||||
type handleImpl struct {
|
||||
clientSet clientset.Interface
|
||||
prometheusClient promapi.Client
|
||||
prometheusClientGetter func() promapi.Client
|
||||
metricsCollector *metricscollector.MetricsCollector
|
||||
getPodsAssignedToNodeFunc podutil.GetPodsAssignedToNodeFunc
|
||||
sharedInformerFactory informers.SharedInformerFactory
|
||||
@@ -92,7 +92,11 @@ func (hi *handleImpl) ClientSet() clientset.Interface {
|
||||
}
|
||||
|
||||
func (hi *handleImpl) PrometheusClient() promapi.Client {
|
||||
return hi.prometheusClient
|
||||
if hi.prometheusClientGetter == nil {
|
||||
klog.V(3).Info("prometheusClientGetter is nil, returning nil Prometheus client")
|
||||
return nil
|
||||
}
|
||||
return hi.prometheusClientGetter()
|
||||
}
|
||||
|
||||
func (hi *handleImpl) MetricsCollector() *metricscollector.MetricsCollector {
|
||||
@@ -156,7 +160,7 @@ type Option func(*handleImplOpts)
|
||||
|
||||
type handleImplOpts struct {
|
||||
clientSet clientset.Interface
|
||||
prometheusClient promapi.Client
|
||||
prometheusClientGetter func() promapi.Client
|
||||
sharedInformerFactory informers.SharedInformerFactory
|
||||
getPodsAssignedToNodeFunc podutil.GetPodsAssignedToNodeFunc
|
||||
podEvictor *evictions.PodEvictor
|
||||
@@ -171,10 +175,10 @@ func WithClientSet(clientSet clientset.Interface) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// WithPrometheusClient sets Prometheus client for the scheduling frameworkImpl.
|
||||
func WithPrometheusClient(prometheusClient promapi.Client) Option {
|
||||
// WithPrometheusClient sets Prometheus client getter for the scheduling frameworkImpl.
|
||||
func WithPrometheusClient(prometheusClientGetter func() promapi.Client) Option {
|
||||
return func(o *handleImplOpts) {
|
||||
o.prometheusClient = prometheusClient
|
||||
o.prometheusClientGetter = prometheusClientGetter
|
||||
}
|
||||
}
|
||||
|
||||
@@ -309,8 +313,8 @@ func NewProfile(ctx context.Context, config api.DeschedulerProfile, reg pluginre
|
||||
profileName: config.Name,
|
||||
podEvictor: hOpts.podEvictor,
|
||||
},
|
||||
metricsCollector: hOpts.metricsCollector,
|
||||
prometheusClient: hOpts.prometheusClient,
|
||||
metricsCollector: hOpts.metricsCollector,
|
||||
prometheusClientGetter: hOpts.prometheusClientGetter,
|
||||
}
|
||||
|
||||
// Collect all unique plugin names across all extension points
|
||||
|
||||
Reference in New Issue
Block a user