Merge pull request #1819 from ingvagabund/bootstrap-descheduler

Deduplicate descheduler initialization code so unit tests test more of the production code
This commit is contained in:
Kubernetes Prow Robot
2026-02-04 16:33:55 +05:30
committed by GitHub
2 changed files with 390 additions and 157 deletions

View File

@@ -547,7 +547,135 @@ const (
secretReconciliation
)
type runFncType func(context.Context) error
func bootstrapDescheduler(
ctx context.Context,
rs *options.DeschedulerServer,
deschedulerPolicy *api.DeschedulerPolicy,
evictionPolicyGroupVersion string,
metricProviderTokenReconciliation tokenReconciliation,
sharedInformerFactory, namespacedSharedInformerFactory informers.SharedInformerFactory,
eventRecorder events.EventRecorder,
) (*descheduler, runFncType, error) {
// Always create descheduler with real client/factory first to register all informers
descheduler, err := newDescheduler(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion, eventRecorder, rs.Client, sharedInformerFactory, nil)
if err != nil {
return nil, nil, fmt.Errorf("failed to create new descheduler: %v", err)
}
// Setup Prometheus provider
if err := setupPrometheusProvider(descheduler, namespacedSharedInformerFactory); err != nil {
return nil, nil, fmt.Errorf("failed to setup Prometheus provider: %v", err)
}
// If in dry run mode, replace the descheduler with one using fake client/factory
if rs.DryRun {
// Create sandbox with resources to mirror from real client
kubeClientSandbox, err := newDefaultKubeClientSandbox(rs.Client, sharedInformerFactory)
if err != nil {
return nil, nil, fmt.Errorf("failed to create kube client sandbox: %v", err)
}
klog.V(3).Infof("Building a cached client from the cluster for the dry run")
// TODO(ingvagabund): drop the previous queue
// TODO(ingvagabund): stop the previous pod evictor
// Replace descheduler with one using fake client/factory
descheduler, err = newDescheduler(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion, eventRecorder, kubeClientSandbox.fakeClient(), kubeClientSandbox.fakeSharedInformerFactory(), kubeClientSandbox)
if err != nil {
return nil, nil, fmt.Errorf("failed to create dry run descheduler: %v", err)
}
// Setup Prometheus provider (with the real shared informer factory as the secret is only read)
if err := setupPrometheusProvider(descheduler, namespacedSharedInformerFactory); err != nil {
return nil, nil, fmt.Errorf("failed to setup Prometheus provider for the dry run descheduler: %v", err)
}
}
// init is responsible for starting all informer factories, metrics providers
// and other parts that require to start before a first descheduling cycle is run
deschedulerInitFnc := func(ctx context.Context) error {
// In dry run mode, start and sync the fake shared informer factory so it can mirror
// events from the real factory. Reliable propagation depends on both factories being
// fully synced (see WaitForCacheSync calls below), not solely on startup order.
if rs.DryRun {
descheduler.kubeClientSandbox.fakeSharedInformerFactory().Start(ctx.Done())
descheduler.kubeClientSandbox.fakeSharedInformerFactory().WaitForCacheSync(ctx.Done())
}
sharedInformerFactory.Start(ctx.Done())
if metricProviderTokenReconciliation == secretReconciliation {
namespacedSharedInformerFactory.Start(ctx.Done())
}
sharedInformerFactory.WaitForCacheSync(ctx.Done())
if metricProviderTokenReconciliation == secretReconciliation {
namespacedSharedInformerFactory.WaitForCacheSync(ctx.Done())
}
descheduler.podEvictor.WaitForEventHandlersSync(ctx)
if descheduler.metricsCollector != nil {
go func() {
klog.V(2).Infof("Starting metrics collector")
descheduler.metricsCollector.Run(ctx)
klog.V(2).Infof("Stopped metrics collector")
}()
klog.V(2).Infof("Waiting for metrics collector to sync")
if err := wait.PollWithContext(ctx, time.Second, time.Minute, func(context.Context) (done bool, err error) {
return descheduler.metricsCollector.HasSynced(), nil
}); err != nil {
return fmt.Errorf("unable to wait for metrics collector to sync: %v", err)
}
}
if metricProviderTokenReconciliation == secretReconciliation {
go descheduler.promClientCtrl.runAuthenticationSecretReconciler(ctx)
}
return nil
}
if err := deschedulerInitFnc(ctx); err != nil {
return nil, nil, err
}
runFnc := func(ctx context.Context) error {
if metricProviderTokenReconciliation == inClusterReconciliation {
// Read the sa token and assume it has the sufficient permissions to authenticate
if err := descheduler.promClientCtrl.reconcileInClusterSAToken(); err != nil {
return fmt.Errorf("unable to reconcile an in cluster SA token: %v", err)
}
}
err = descheduler.runDeschedulerLoop(ctx)
if err != nil {
return fmt.Errorf("failed to run descheduler loop: %v", err)
}
return nil
}
return descheduler, runFnc, nil
}
func prometheusProviderToTokenReconciliation(prometheusProvider *api.MetricsProvider) tokenReconciliation {
if prometheusProvider != nil && prometheusProvider.Prometheus != nil && prometheusProvider.Prometheus.URL != "" {
if prometheusProvider.Prometheus.AuthToken != nil {
// Will get reconciled
return secretReconciliation
} else {
// Use the sa token and assume it has the sufficient permissions to authenticate
return inClusterReconciliation
}
}
return noReconciliation
}
func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer, deschedulerPolicy *api.DeschedulerPolicy, evictionPolicyGroupVersion string) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
var span trace.Span
ctx, span = tracing.Tracer().Start(ctx, "RunDeschedulerStrategies")
defer span.End()
@@ -564,111 +692,27 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
defer eventBroadcaster.Shutdown()
var namespacedSharedInformerFactory informers.SharedInformerFactory
metricProviderTokenReconciliation := noReconciliation
prometheusProvider := metricsProviderListToMap(deschedulerPolicy.MetricsProviders)[api.PrometheusMetrics]
if prometheusProvider != nil && prometheusProvider.Prometheus != nil && prometheusProvider.Prometheus.URL != "" {
if prometheusProvider.Prometheus.AuthToken != nil {
// Will get reconciled
namespacedSharedInformerFactory = informers.NewSharedInformerFactoryWithOptions(rs.Client, 0, informers.WithTransform(trimManagedFields), informers.WithNamespace(prometheusProvider.Prometheus.AuthToken.SecretReference.Namespace))
metricProviderTokenReconciliation = secretReconciliation
} else {
// Use the sa token and assume it has the sufficient permissions to authenticate
metricProviderTokenReconciliation = inClusterReconciliation
}
metricProviderTokenReconciliation := prometheusProviderToTokenReconciliation(prometheusProvider)
if metricProviderTokenReconciliation == secretReconciliation {
namespacedSharedInformerFactory = informers.NewSharedInformerFactoryWithOptions(rs.Client, 0, informers.WithTransform(trimManagedFields), informers.WithNamespace(prometheusProvider.Prometheus.AuthToken.SecretReference.Namespace))
}
// Always create descheduler with real client/factory first to register all informers
descheduler, err := newDescheduler(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion, eventRecorder, rs.Client, sharedInformerFactory, nil)
_, runLoop, err := bootstrapDescheduler(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion, metricProviderTokenReconciliation, sharedInformerFactory, namespacedSharedInformerFactory, eventRecorder)
if err != nil {
span.AddEvent("Failed to create new descheduler", trace.WithAttributes(attribute.String("err", err.Error())))
span.AddEvent("Failed to bootstrap a descheduler", trace.WithAttributes(attribute.String("err", err.Error())))
return err
}
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Setup Prometheus provider (only for real client case, not for dry run)
if err := setupPrometheusProvider(descheduler, namespacedSharedInformerFactory); err != nil {
span.AddEvent("Failed to setup Prometheus provider", trace.WithAttributes(attribute.String("err", err.Error())))
return err
}
// If in dry run mode, replace the descheduler with one using fake client/factory
if rs.DryRun {
// Create sandbox with resources to mirror from real client
kubeClientSandbox, err := newDefaultKubeClientSandbox(rs.Client, sharedInformerFactory)
if err != nil {
span.AddEvent("Failed to create kube client sandbox", trace.WithAttributes(attribute.String("err", err.Error())))
return fmt.Errorf("failed to create kube client sandbox: %v", err)
}
klog.V(3).Infof("Building a cached client from the cluster for the dry run")
// TODO(ingvagabund): drop the previous queue
// TODO(ingvagabund): stop the previous pod evictor
// Replace descheduler with one using fake client/factory
descheduler, err = newDescheduler(ctx, rs, deschedulerPolicy, evictionPolicyGroupVersion, eventRecorder, kubeClientSandbox.fakeClient(), kubeClientSandbox.fakeSharedInformerFactory(), kubeClientSandbox)
if err != nil {
span.AddEvent("Failed to create dry run descheduler", trace.WithAttributes(attribute.String("err", err.Error())))
return err
}
}
// In dry run mode, start and sync the fake shared informer factory so it can mirror
// events from the real factory. Reliable propagation depends on both factories being
// fully synced (see WaitForCacheSync calls below), not solely on startup order.
if rs.DryRun {
descheduler.kubeClientSandbox.fakeSharedInformerFactory().Start(ctx.Done())
descheduler.kubeClientSandbox.fakeSharedInformerFactory().WaitForCacheSync(ctx.Done())
}
sharedInformerFactory.Start(ctx.Done())
if metricProviderTokenReconciliation == secretReconciliation {
namespacedSharedInformerFactory.Start(ctx.Done())
}
sharedInformerFactory.WaitForCacheSync(ctx.Done())
if metricProviderTokenReconciliation == secretReconciliation {
namespacedSharedInformerFactory.WaitForCacheSync(ctx.Done())
}
descheduler.podEvictor.WaitForEventHandlersSync(ctx)
if descheduler.metricsCollector != nil {
go func() {
klog.V(2).Infof("Starting metrics collector")
descheduler.metricsCollector.Run(ctx)
klog.V(2).Infof("Stopped metrics collector")
}()
klog.V(2).Infof("Waiting for metrics collector to sync")
if err := wait.PollWithContext(ctx, time.Second, time.Minute, func(context.Context) (done bool, err error) {
return descheduler.metricsCollector.HasSynced(), nil
}); err != nil {
return fmt.Errorf("unable to wait for metrics collector to sync: %v", err)
}
}
if metricProviderTokenReconciliation == secretReconciliation {
go descheduler.promClientCtrl.runAuthenticationSecretReconciler(ctx)
}
wait.NonSlidingUntil(func() {
if metricProviderTokenReconciliation == inClusterReconciliation {
// Read the sa token and assume it has the sufficient permissions to authenticate
if err := descheduler.promClientCtrl.reconcileInClusterSAToken(); err != nil {
klog.ErrorS(err, "unable to reconcile an in cluster SA token")
return
}
}
// A next context is created here intentionally to avoid nesting the spans via context.
sCtx, sSpan := tracing.Tracer().Start(ctx, "NonSlidingUntil")
defer sSpan.End()
err = descheduler.runDeschedulerLoop(sCtx)
if err != nil {
sSpan.AddEvent("Failed to run descheduler loop", trace.WithAttributes(attribute.String("err", err.Error())))
if err := runLoop(sCtx); err != nil {
sSpan.AddEvent("Descheduling loop failed", trace.WithAttributes(attribute.String("err", err.Error())))
klog.Error(err)
cancel()
return
}
// If there was no interval specified, send a signal to the stopChannel to end the wait.Until loop after 1 iteration

View File

@@ -202,7 +202,7 @@ func lowNodeUtilizationPolicy(thresholds, targetThresholds api.ResourceThreshold
}
}
func initDescheduler(t *testing.T, ctx context.Context, featureGates featuregate.FeatureGate, internalDeschedulerPolicy *api.DeschedulerPolicy, metricsClient metricsclient.Interface, dryRun bool, objects ...runtime.Object) (*options.DeschedulerServer, *descheduler, *fakeclientset.Clientset) {
func initDescheduler(t *testing.T, ctx context.Context, featureGates featuregate.FeatureGate, internalDeschedulerPolicy *api.DeschedulerPolicy, metricsClient metricsclient.Interface, dryRun bool, objects ...runtime.Object) (*options.DeschedulerServer, *descheduler, runFncType, *fakeclientset.Clientset) {
client := fakeclientset.NewSimpleClientset(objects...)
eventClient := fakeclientset.NewSimpleClientset(objects...)
@@ -219,44 +219,21 @@ func initDescheduler(t *testing.T, ctx context.Context, featureGates featuregate
sharedInformerFactory := informers.NewSharedInformerFactoryWithOptions(rs.Client, 0, informers.WithTransform(trimManagedFields))
eventBroadcaster, eventRecorder := utils.GetRecorderAndBroadcaster(ctx, client)
var namespacedSharedInformerFactory informers.SharedInformerFactory
prometheusProvider := metricsProviderListToMap(internalDeschedulerPolicy.MetricsProviders)[api.PrometheusMetrics]
metricProviderTokenReconciliation := prometheusProviderToTokenReconciliation(prometheusProvider)
if metricProviderTokenReconciliation == secretReconciliation {
namespacedSharedInformerFactory = informers.NewSharedInformerFactoryWithOptions(rs.Client, 0, informers.WithTransform(trimManagedFields), informers.WithNamespace(prometheusProvider.Prometheus.AuthToken.SecretReference.Namespace))
}
// Always create descheduler with real client/factory first to register all informers
descheduler, err := newDescheduler(ctx, rs, internalDeschedulerPolicy, "v1", eventRecorder, rs.Client, sharedInformerFactory, nil)
descheduler, runFnc, err := bootstrapDescheduler(ctx, rs, internalDeschedulerPolicy, "v1", metricProviderTokenReconciliation, sharedInformerFactory, namespacedSharedInformerFactory, eventRecorder)
if err != nil {
eventBroadcaster.Shutdown()
t.Fatalf("Unable to create descheduler instance: %v", err)
t.Fatalf("Failed to bootstrap a descheduler: %v", err)
}
// Setup Prometheus provider (only for real client case, not for dry run)
if err := setupPrometheusProvider(descheduler, nil); err != nil {
eventBroadcaster.Shutdown()
t.Fatalf("Failed to setup Prometheus provider: %v", err)
}
// If in dry run mode, replace the descheduler with one using fake client/factory
if dryRun {
// Create sandbox with resources to mirror from real client
kubeClientSandbox, err := newDefaultKubeClientSandbox(rs.Client, sharedInformerFactory)
if err != nil {
eventBroadcaster.Shutdown()
t.Fatalf("Failed to create kube client sandbox: %v", err)
}
// Replace descheduler with one using fake client/factory
descheduler, err = newDescheduler(ctx, rs, internalDeschedulerPolicy, "v1", eventRecorder, kubeClientSandbox.fakeClient(), kubeClientSandbox.fakeSharedInformerFactory(), kubeClientSandbox)
if err != nil {
eventBroadcaster.Shutdown()
t.Fatalf("Unable to create dry run descheduler instance: %v", err)
}
}
// Start the real shared informer factory after creating the descheduler
if dryRun {
descheduler.kubeClientSandbox.fakeSharedInformerFactory().Start(ctx.Done())
descheduler.kubeClientSandbox.fakeSharedInformerFactory().WaitForCacheSync(ctx.Done())
}
sharedInformerFactory.Start(ctx.Done())
sharedInformerFactory.WaitForCacheSync(ctx.Done())
if dryRun {
if err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 5*time.Second, true, func(ctx context.Context) (bool, error) {
for _, obj := range objects {
@@ -281,7 +258,7 @@ func initDescheduler(t *testing.T, ctx context.Context, featureGates featuregate
}
}
return rs, descheduler, client
return rs, descheduler, runFnc, client
}
func TestTaintsUpdated(t *testing.T) {
@@ -602,7 +579,7 @@ func TestPodEvictorReset(t *testing.T) {
internalDeschedulerPolicy := removePodsViolatingNodeTaintsPolicy()
ctxCancel, cancel := context.WithCancel(ctx)
_, descheduler, client := initDescheduler(t, ctxCancel, initFeatureGates(), internalDeschedulerPolicy, nil, tc.dryRun, node1, node2, p1, p2)
_, descheduler, _, client := initDescheduler(t, ctxCancel, initFeatureGates(), internalDeschedulerPolicy, nil, tc.dryRun, node1, node2, p1, p2)
defer cancel()
var evictedPods []string
@@ -654,8 +631,8 @@ func checkTotals(t *testing.T, ctx context.Context, descheduler *descheduler, to
t.Logf("Total evictions: %v, total eviction requests: %v, total evictions and eviction requests: %v", totalEvicted, totalEvictionRequests, totalEvicted+totalEvictionRequests)
}
func runDeschedulingCycleAndCheckTotals(t *testing.T, ctx context.Context, nodes []*v1.Node, descheduler *descheduler, totalEvictionRequests, totalEvicted uint) {
err := descheduler.runDeschedulerLoop(ctx)
func runDeschedulingCycleAndCheckTotals(t *testing.T, ctx context.Context, nodes []*v1.Node, descheduler *descheduler, runFnc runFncType, totalEvictionRequests, totalEvicted uint) {
err := runFnc(ctx)
if err != nil {
t.Fatalf("Unable to run a descheduling loop: %v", err)
}
@@ -695,19 +672,19 @@ func TestEvictionRequestsCache(t *testing.T) {
featureGates.Add(map[featuregate.Feature]featuregate.FeatureSpec{
features.EvictionsInBackground: {Default: true, PreRelease: featuregate.Alpha},
})
_, descheduler, client := initDescheduler(t, ctxCancel, featureGates, internalDeschedulerPolicy, nil, false, node1, node2, p1, p2, p3, p4)
_, descheduler, runFnc, client := initDescheduler(t, ctxCancel, featureGates, internalDeschedulerPolicy, nil, false, node1, node2, p1, p2, p3, p4)
defer cancel()
var evictedPods []string
client.PrependReactor("create", "pods", podEvictionReactionTestingFnc(&evictedPods, func(name string) bool { return name == "p1" || name == "p2" }, nil))
klog.Infof("2 evictions in background expected, 2 normal evictions")
runDeschedulingCycleAndCheckTotals(t, ctx, nodes, descheduler, 2, 2)
runDeschedulingCycleAndCheckTotals(t, ctx, nodes, descheduler, runFnc, 2, 2)
klog.Infof("Repeat the same as previously to confirm no more evictions in background are requested")
// No evicted pod is actually deleted on purpose so the test can run the descheduling cycle repeatedly
// without recreating the pods.
runDeschedulingCycleAndCheckTotals(t, ctx, nodes, descheduler, 2, 2)
runDeschedulingCycleAndCheckTotals(t, ctx, nodes, descheduler, runFnc, 2, 2)
klog.Infof("Scenario: Eviction in background got initiated")
p2.Annotations[evictions.EvictionInProgressAnnotationKey] = ""
@@ -717,7 +694,7 @@ func TestEvictionRequestsCache(t *testing.T) {
time.Sleep(100 * time.Millisecond)
klog.Infof("Repeat the same as previously to confirm no more evictions in background are requested")
runDeschedulingCycleAndCheckTotals(t, ctx, nodes, descheduler, 2, 2)
runDeschedulingCycleAndCheckTotals(t, ctx, nodes, descheduler, runFnc, 2, 2)
klog.Infof("Scenario: Another eviction in background got initiated")
p1.Annotations[evictions.EvictionInProgressAnnotationKey] = ""
@@ -727,7 +704,7 @@ func TestEvictionRequestsCache(t *testing.T) {
time.Sleep(100 * time.Millisecond)
klog.Infof("Repeat the same as previously to confirm no more evictions in background are requested")
runDeschedulingCycleAndCheckTotals(t, ctx, nodes, descheduler, 2, 2)
runDeschedulingCycleAndCheckTotals(t, ctx, nodes, descheduler, runFnc, 2, 2)
klog.Infof("Scenario: Eviction in background completed")
if err := client.CoreV1().Pods(p1.Namespace).Delete(context.TODO(), p1.Name, metav1.DeleteOptions{}); err != nil {
@@ -736,7 +713,7 @@ func TestEvictionRequestsCache(t *testing.T) {
time.Sleep(100 * time.Millisecond)
klog.Infof("Check the number of evictions in background decreased")
runDeschedulingCycleAndCheckTotals(t, ctx, nodes, descheduler, 1, 2)
runDeschedulingCycleAndCheckTotals(t, ctx, nodes, descheduler, runFnc, 1, 2)
klog.Infof("Scenario: A new pod without eviction in background added")
if _, err := client.CoreV1().Pods(p5.Namespace).Create(context.TODO(), p5, metav1.CreateOptions{}); err != nil {
@@ -745,7 +722,7 @@ func TestEvictionRequestsCache(t *testing.T) {
time.Sleep(100 * time.Millisecond)
klog.Infof("Check the number of evictions increased after running a descheduling cycle")
runDeschedulingCycleAndCheckTotals(t, ctx, nodes, descheduler, 1, 3)
runDeschedulingCycleAndCheckTotals(t, ctx, nodes, descheduler, runFnc, 1, 3)
klog.Infof("Scenario: Eviction in background canceled => eviction in progress annotation removed")
delete(p2.Annotations, evictions.EvictionInProgressAnnotationKey)
@@ -758,7 +735,7 @@ func TestEvictionRequestsCache(t *testing.T) {
checkTotals(t, ctx, descheduler, 0, 3)
klog.Infof("Scenario: Re-run the descheduling cycle to re-request eviction in background")
runDeschedulingCycleAndCheckTotals(t, ctx, nodes, descheduler, 1, 3)
runDeschedulingCycleAndCheckTotals(t, ctx, nodes, descheduler, runFnc, 1, 3)
klog.Infof("Scenario: Eviction in background completed with a pod in completed state")
p2.Status.Phase = v1.PodSucceeded
@@ -768,7 +745,7 @@ func TestEvictionRequestsCache(t *testing.T) {
time.Sleep(100 * time.Millisecond)
klog.Infof("Check the number of evictions in background decreased")
runDeschedulingCycleAndCheckTotals(t, ctx, nodes, descheduler, 0, 3)
runDeschedulingCycleAndCheckTotals(t, ctx, nodes, descheduler, runFnc, 0, 3)
}
func TestDeschedulingLimits(t *testing.T) {
@@ -831,7 +808,7 @@ func TestDeschedulingLimits(t *testing.T) {
featureGates.Add(map[featuregate.Feature]featuregate.FeatureSpec{
features.EvictionsInBackground: {Default: true, PreRelease: featuregate.Alpha},
})
_, descheduler, client := initDescheduler(t, ctxCancel, featureGates, tc.policy, nil, false, node1, node2)
_, descheduler, runFnc, client := initDescheduler(t, ctxCancel, featureGates, tc.policy, nil, false, node1, node2)
defer cancel()
var evictedPods []string
@@ -863,7 +840,7 @@ func TestDeschedulingLimits(t *testing.T) {
time.Sleep(100 * time.Millisecond)
klog.Infof("2 evictions in background expected, 2 normal evictions")
err := descheduler.runDeschedulerLoop(ctx)
err := runFnc(ctx)
if err != nil {
t.Fatalf("Unable to run a descheduling loop: %v", err)
}
@@ -1029,7 +1006,7 @@ func TestNodeLabelSelectorBasedEviction(t *testing.T) {
}
ctxCancel, cancel := context.WithCancel(ctx)
_, deschedulerInstance, client := initDescheduler(t, ctxCancel, initFeatureGates(), policy, nil, tc.dryRun, objects...)
_, deschedulerInstance, _, client := initDescheduler(t, ctxCancel, initFeatureGates(), policy, nil, tc.dryRun, objects...)
defer cancel()
// Verify all pods are created initially
@@ -1143,7 +1120,7 @@ func TestLoadAwareDescheduling(t *testing.T) {
policy.MetricsProviders = []api.MetricsProvider{{Source: api.KubernetesMetrics}}
ctxCancel, cancel := context.WithCancel(ctx)
_, descheduler, _ := initDescheduler(
_, descheduler, runFnc, _ := initDescheduler(
t,
ctxCancel,
initFeatureGates(),
@@ -1157,7 +1134,7 @@ func TestLoadAwareDescheduling(t *testing.T) {
// after newDescheduler in RunDeschedulerStrategies.
descheduler.metricsCollector.Collect(ctx)
err := descheduler.runDeschedulerLoop(ctx)
err := runFnc(ctx)
if err != nil {
t.Fatalf("Unable to run a descheduling loop: %v", err)
}
@@ -1443,7 +1420,7 @@ func verifyAllPrometheusClientsEqual(t *testing.T, expected, fromReactor, fromPl
}
// TestPluginPrometheusClientAccess tests that the Prometheus client is accessible through the plugin handle
func TestPluginPrometheusClientAccess(t *testing.T) {
func TestPluginPrometheusClientAccess_Secret(t *testing.T) {
testCases := []struct {
name string
dryRun bool
@@ -1496,6 +1473,12 @@ func TestPluginPrometheusClientAccess(t *testing.T) {
)
deschedulerPolicy := &api.DeschedulerPolicy{
MetricsProviders: []api.MetricsProvider{
{
Source: api.PrometheusMetrics,
Prometheus: newPrometheusConfig(),
},
},
Profiles: []api.DeschedulerProfile{
{
Name: "test-profile",
@@ -1517,28 +1500,51 @@ func TestPluginPrometheusClientAccess(t *testing.T) {
node1 := test.BuildTestNode("node1", 1000, 2000, 9, nil)
node2 := test.BuildTestNode("node2", 1000, 2000, 9, nil)
_, descheduler, _ := initDescheduler(t, ctx, initFeatureGates(), deschedulerPolicy, nil, tc.dryRun, node1, node2)
_, descheduler, runFnc, fakeClient := initDescheduler(t, ctx, initFeatureGates(), deschedulerPolicy, nil, tc.dryRun, node1, node2)
// Test cycles with different Prometheus client values
cycles := []struct {
name string
client promapi.Client
name string
operation func() error
skipWaiting bool
client promapi.Client
token string
}{
{
name: "initial client",
name: "no secret initially",
operation: func() error { return nil },
skipWaiting: true,
client: nil,
token: "",
},
{
name: "add secret",
operation: func() error {
secret := newPrometheusAuthSecret(withToken("token-1"))
_, err := fakeClient.CoreV1().Secrets(secret.Namespace).Create(ctx, secret, metav1.CreateOptions{})
return err
},
client: &mockPrometheusClient{name: "new-init-client"},
token: "token-1",
},
{
name: "nil client",
client: nil,
},
{
name: "new client",
name: "update secret",
operation: func() error {
secret := newPrometheusAuthSecret(withToken("token-2"))
_, err := fakeClient.CoreV1().Secrets(secret.Namespace).Update(ctx, secret, metav1.UpdateOptions{})
return err
},
client: &mockPrometheusClient{name: "new-client"},
token: "token-2",
},
{
name: "another client",
client: &mockPrometheusClient{name: "another-client"},
name: "delete secret",
operation: func() error {
secret := newPrometheusAuthSecret(withToken("token-3"))
return fakeClient.CoreV1().Secrets(secret.Namespace).Delete(ctx, secret.Name, metav1.DeleteOptions{})
},
client: nil,
token: "",
},
}
@@ -1547,14 +1553,197 @@ func TestPluginPrometheusClientAccess(t *testing.T) {
// Set the descheduler's Prometheus client
t.Logf("Setting descheduler.promClientCtrl.promClient from %v to %v", descheduler.promClientCtrl.promClient, cycle.client)
descheduler.promClientCtrl.promClient = cycle.client
descheduler.promClientCtrl.createPrometheusClient = func(url, token string) (promapi.Client, *http.Transport, error) {
if token != cycle.token {
t.Fatalf("Expected token to be %q, got %q", cycle.token, token)
}
if url != prometheusURL {
t.Fatalf("Expected url to be %q, got %q", prometheusURL, url)
}
return cycle.client, &http.Transport{}, nil
}
if err := cycle.operation(); err != nil {
t.Fatalf("operation failed: %v", err)
}
if !cycle.skipWaiting {
err := wait.PollUntilContextTimeout(ctx, 50*time.Millisecond, 200*time.Millisecond, true, func(ctx context.Context) (bool, error) {
currentPromClient := descheduler.promClientCtrl.prometheusClient()
if currentPromClient != cycle.client {
t.Logf("Waiting for prometheus client to be set to %v, got %v instead, waiting", cycle.client, currentPromClient)
return false, nil
}
return true, nil
})
if err != nil {
t.Fatalf("Timed out waiting for expected conditions: %v", err)
}
}
newInvoked = false
reactorInvoked = false
prometheusClientFromPluginNewHandle = nil
prometheusClientFromReactor = nil
descheduler.runProfiles(ctx)
if err := runFnc(ctx); err != nil {
t.Fatalf("Unexpected error during running a descheduling cycle: %v", err)
}
t.Logf("After cycle %d: prometheusClientFromReactor=%v, descheduler.promClientCtrl.promClient=%v", i+1, prometheusClientFromReactor, descheduler.promClientCtrl.promClient)
if !newInvoked {
t.Fatalf("Expected plugin new to be invoked during cycle %d", i+1)
}
if !reactorInvoked {
t.Fatalf("Expected deschedule reactor to be invoked during cycle %d", i+1)
}
verifyAllPrometheusClientsEqual(t, cycle.client, prometheusClientFromReactor, prometheusClientFromPluginNewHandle, descheduler.promClientCtrl.promClient)
}
})
}
}
func TestPluginPrometheusClientAccess_InCluster(t *testing.T) {
testCases := []struct {
name string
dryRun bool
}{
{
name: "dry run disabled",
dryRun: false,
},
{
name: "dry run enabled",
dryRun: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
initPluginRegistry()
newInvoked := false
reactorInvoked := false
var prometheusClientFromPluginNewHandle promapi.Client
var prometheusClientFromReactor promapi.Client
fakePlugin := &fakeplugin.FakePlugin{
PluginName: "TestPluginWithPrometheusClient",
}
fakePlugin.AddReactor(string(frameworktypes.DescheduleExtensionPoint), func(action fakeplugin.Action) (handled, filter bool, err error) {
if dAction, ok := action.(fakeplugin.DescheduleAction); ok {
reactorInvoked = true
prometheusClientFromReactor = dAction.Handle().PrometheusClient()
return true, false, nil
}
return false, false, nil
})
pluginregistry.Register(
fakePlugin.PluginName,
fakeplugin.NewPluginFncFromFakeWithReactor(fakePlugin, func(action fakeplugin.ActionImpl) {
newInvoked = true
prometheusClientFromPluginNewHandle = action.Handle().PrometheusClient()
}),
&fakeplugin.FakePlugin{},
&fakeplugin.FakePluginArgs{},
fakeplugin.ValidateFakePluginArgs,
fakeplugin.SetDefaults_FakePluginArgs,
pluginregistry.PluginRegistry,
)
deschedulerPolicy := &api.DeschedulerPolicy{
MetricsProviders: []api.MetricsProvider{
{
Source: api.PrometheusMetrics,
Prometheus: &api.Prometheus{
URL: prometheusURL,
},
},
},
Profiles: []api.DeschedulerProfile{
{
Name: "test-profile",
PluginConfigs: []api.PluginConfig{
{
Name: fakePlugin.PluginName,
Args: &fakeplugin.FakePluginArgs{},
},
},
Plugins: api.Plugins{
Deschedule: api.PluginSet{
Enabled: []string{fakePlugin.PluginName},
},
},
},
},
}
node1 := test.BuildTestNode("node1", 1000, 2000, 9, nil)
node2 := test.BuildTestNode("node2", 1000, 2000, 9, nil)
_, descheduler, runFnc, _ := initDescheduler(t, ctx, initFeatureGates(), deschedulerPolicy, nil, tc.dryRun, node1, node2)
// Test cycles with different Prometheus client values
cycles := []struct {
name string
client promapi.Client
token string
}{
{
name: "initial client",
client: &mockPrometheusClient{name: "new-init-client"},
token: "init-token",
},
{
name: "nil client",
client: nil,
token: "",
},
{
name: "new client",
client: &mockPrometheusClient{name: "new-client"},
token: "new-token",
},
{
name: "another client",
client: &mockPrometheusClient{name: "another-client"},
token: "another-token",
},
}
for i, cycle := range cycles {
t.Logf("Cycle %d: %s", i+1, cycle.name)
// Set the descheduler's Prometheus client
t.Logf("Setting descheduler.promClientCtrl.promClient from %v to %v", descheduler.promClientCtrl.promClient, cycle.client)
descheduler.promClientCtrl.inClusterConfig = func() (*rest.Config, error) {
return &rest.Config{BearerToken: cycle.token}, nil
}
descheduler.promClientCtrl.createPrometheusClient = func(url, token string) (promapi.Client, *http.Transport, error) {
if token != cycle.token {
t.Errorf("Expected token to be %q, got %q", cycle.token, token)
}
if url != prometheusURL {
t.Errorf("Expected url to be %q, got %q", prometheusURL, url)
}
return cycle.client, &http.Transport{}, nil
}
newInvoked = false
reactorInvoked = false
prometheusClientFromPluginNewHandle = nil
prometheusClientFromReactor = nil
if err := runFnc(ctx); err != nil {
t.Fatalf("Unexpected error during running a descheduling cycle: %v", err)
}
t.Logf("After cycle %d: prometheusClientFromReactor=%v, descheduler.promClientCtrl.promClient=%v", i+1, prometheusClientFromReactor, descheduler.promClientCtrl.promClient)