mirror of
https://github.com/kubernetes-sigs/descheduler.git
synced 2026-05-06 01:08:24 +00:00
refactor(pkg/descheduler): move prometheus client specific code under a dedicated promClientController
The underlying implementation is the same. Only moving the code under a separate controller that can be unit test independently of the descheduler type implementation.
This commit is contained in:
@@ -79,23 +79,39 @@ type profileRunner struct {
|
||||
}
|
||||
|
||||
type descheduler struct {
|
||||
rs *options.DeschedulerServer
|
||||
client clientset.Interface
|
||||
kubeClientSandbox *kubeClientSandbox
|
||||
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc
|
||||
sharedInformerFactory informers.SharedInformerFactory
|
||||
namespacedSecretsLister corev1listers.SecretNamespaceLister
|
||||
deschedulerPolicy *api.DeschedulerPolicy
|
||||
eventRecorder events.EventRecorder
|
||||
podEvictor *evictions.PodEvictor
|
||||
metricsCollector *metricscollector.MetricsCollector
|
||||
prometheusClient promapi.Client
|
||||
rs *options.DeschedulerServer
|
||||
client clientset.Interface
|
||||
kubeClientSandbox *kubeClientSandbox
|
||||
getPodsAssignedToNode podutil.GetPodsAssignedToNodeFunc
|
||||
sharedInformerFactory informers.SharedInformerFactory
|
||||
deschedulerPolicy *api.DeschedulerPolicy
|
||||
eventRecorder events.EventRecorder
|
||||
podEvictor *evictions.PodEvictor
|
||||
metricsCollector *metricscollector.MetricsCollector
|
||||
promClientCtrl *promClientController
|
||||
}
|
||||
|
||||
type promClientController struct {
|
||||
promClient promapi.Client
|
||||
previousPrometheusClientTransport *http.Transport
|
||||
queue workqueue.RateLimitingInterface
|
||||
currentPrometheusAuthToken string
|
||||
namespacedSecretsLister corev1listers.SecretNamespaceLister
|
||||
metricsProviders map[api.MetricsSource]*api.MetricsProvider
|
||||
}
|
||||
|
||||
func newPromClientController(prometheusClient promapi.Client, metricsProviders map[api.MetricsSource]*api.MetricsProvider) *promClientController {
|
||||
return &promClientController{
|
||||
promClient: prometheusClient,
|
||||
queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{Name: "descheduler"}),
|
||||
metricsProviders: metricsProviders,
|
||||
}
|
||||
}
|
||||
|
||||
func (d *promClientController) prometheusClient() promapi.Client {
|
||||
return d.promClient
|
||||
}
|
||||
|
||||
func nodeSelectorFromPolicy(deschedulerPolicy *api.DeschedulerPolicy) (labels.Selector, error) {
|
||||
nodeSelector := labels.Everything()
|
||||
if deschedulerPolicy.NodeSelector != nil {
|
||||
@@ -122,7 +138,10 @@ func metricsProviderListToMap(providersList []api.MetricsProvider) map[api.Metri
|
||||
|
||||
// setupPrometheusProvider sets up the prometheus provider on the descheduler if configured
|
||||
func setupPrometheusProvider(d *descheduler, namespacedSharedInformerFactory informers.SharedInformerFactory) error {
|
||||
prometheusProvider := d.metricsProviders[api.PrometheusMetrics]
|
||||
if d.promClientCtrl == nil {
|
||||
return nil
|
||||
}
|
||||
prometheusProvider := d.promClientCtrl.metricsProviders[api.PrometheusMetrics]
|
||||
if prometheusProvider != nil && prometheusProvider.Prometheus != nil && prometheusProvider.Prometheus.AuthToken != nil {
|
||||
authTokenSecret := prometheusProvider.Prometheus.AuthToken.SecretReference
|
||||
if authTokenSecret == nil || authTokenSecret.Namespace == "" {
|
||||
@@ -131,8 +150,8 @@ func setupPrometheusProvider(d *descheduler, namespacedSharedInformerFactory inf
|
||||
if namespacedSharedInformerFactory == nil {
|
||||
return fmt.Errorf("namespacedSharedInformerFactory not configured")
|
||||
}
|
||||
namespacedSharedInformerFactory.Core().V1().Secrets().Informer().AddEventHandler(d.eventHandler())
|
||||
d.namespacedSecretsLister = namespacedSharedInformerFactory.Core().V1().Secrets().Lister().Secrets(authTokenSecret.Namespace)
|
||||
namespacedSharedInformerFactory.Core().V1().Secrets().Informer().AddEventHandler(d.promClientCtrl.eventHandler())
|
||||
d.promClientCtrl.namespacedSecretsLister = namespacedSharedInformerFactory.Core().V1().Secrets().Lister().Secrets(authTokenSecret.Namespace)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -177,9 +196,7 @@ func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedu
|
||||
deschedulerPolicy: deschedulerPolicy,
|
||||
eventRecorder: eventRecorder,
|
||||
podEvictor: podEvictor,
|
||||
prometheusClient: rs.PrometheusClient,
|
||||
queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{Name: "descheduler"}),
|
||||
metricsProviders: metricsProviderListToMap(deschedulerPolicy.MetricsProviders),
|
||||
promClientCtrl: newPromClientController(rs.PrometheusClient, metricsProviderListToMap(deschedulerPolicy.MetricsProviders)),
|
||||
}
|
||||
|
||||
nodeSelector, err := nodeSelectorFromPolicy(deschedulerPolicy)
|
||||
@@ -198,7 +215,7 @@ func newDescheduler(ctx context.Context, rs *options.DeschedulerServer, deschedu
|
||||
return desch, nil
|
||||
}
|
||||
|
||||
func (d *descheduler) reconcileInClusterSAToken() error {
|
||||
func (d *promClientController) reconcileInClusterSAToken() error {
|
||||
// Read the sa token and assume it has the sufficient permissions to authenticate
|
||||
cfg, err := rest.InClusterConfig()
|
||||
if err == nil {
|
||||
@@ -208,7 +225,7 @@ func (d *descheduler) reconcileInClusterSAToken() error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to create a prometheus client: %v", err)
|
||||
}
|
||||
d.prometheusClient = prometheusClient
|
||||
d.promClient = prometheusClient
|
||||
if d.previousPrometheusClientTransport != nil {
|
||||
d.previousPrometheusClientTransport.CloseIdleConnections()
|
||||
}
|
||||
@@ -223,7 +240,7 @@ func (d *descheduler) reconcileInClusterSAToken() error {
|
||||
return fmt.Errorf("unexpected error when reading in cluster config: %v", err)
|
||||
}
|
||||
|
||||
func (d *descheduler) runAuthenticationSecretReconciler(ctx context.Context) {
|
||||
func (d *promClientController) runAuthenticationSecretReconciler(ctx context.Context) {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer d.queue.ShutDown()
|
||||
|
||||
@@ -235,12 +252,12 @@ func (d *descheduler) runAuthenticationSecretReconciler(ctx context.Context) {
|
||||
<-ctx.Done()
|
||||
}
|
||||
|
||||
func (d *descheduler) runAuthenticationSecretReconcilerWorker(ctx context.Context) {
|
||||
func (d *promClientController) runAuthenticationSecretReconcilerWorker(ctx context.Context) {
|
||||
for d.processNextWorkItem(ctx) {
|
||||
}
|
||||
}
|
||||
|
||||
func (d *descheduler) processNextWorkItem(ctx context.Context) bool {
|
||||
func (d *promClientController) processNextWorkItem(ctx context.Context) bool {
|
||||
dsKey, quit := d.queue.Get()
|
||||
if quit {
|
||||
return false
|
||||
@@ -259,7 +276,7 @@ func (d *descheduler) processNextWorkItem(ctx context.Context) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (d *descheduler) sync() error {
|
||||
func (d *promClientController) sync() error {
|
||||
prometheusConfig := d.metricsProviders[api.PrometheusMetrics].Prometheus
|
||||
if prometheusConfig == nil || prometheusConfig.AuthToken == nil || prometheusConfig.AuthToken.SecretReference == nil {
|
||||
return fmt.Errorf("prometheus metrics source configuration is missing authentication token secret")
|
||||
@@ -275,7 +292,7 @@ func (d *descheduler) sync() error {
|
||||
d.previousPrometheusClientTransport.CloseIdleConnections()
|
||||
}
|
||||
d.previousPrometheusClientTransport = nil
|
||||
d.prometheusClient = nil
|
||||
d.promClient = nil
|
||||
}
|
||||
return fmt.Errorf("unable to get %v/%v secret", ns, name)
|
||||
}
|
||||
@@ -292,7 +309,7 @@ func (d *descheduler) sync() error {
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to create a prometheus client: %v", err)
|
||||
}
|
||||
d.prometheusClient = prometheusClient
|
||||
d.promClient = prometheusClient
|
||||
if d.previousPrometheusClientTransport != nil {
|
||||
d.previousPrometheusClientTransport.CloseIdleConnections()
|
||||
}
|
||||
@@ -301,7 +318,7 @@ func (d *descheduler) sync() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *descheduler) eventHandler() cache.ResourceEventHandler {
|
||||
func (d *promClientController) eventHandler() cache.ResourceEventHandler {
|
||||
return cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) { d.queue.Add(workQueueKey) },
|
||||
UpdateFunc: func(old, new interface{}) { d.queue.Add(workQueueKey) },
|
||||
@@ -370,6 +387,10 @@ func (d *descheduler) runProfiles(ctx context.Context) {
|
||||
|
||||
var profileRunners []profileRunner
|
||||
for idx, profile := range d.deschedulerPolicy.Profiles {
|
||||
var promClient promapi.Client
|
||||
if d.promClientCtrl != nil {
|
||||
promClient = d.promClientCtrl.prometheusClient()
|
||||
}
|
||||
currProfile, err := frameworkprofile.NewProfile(
|
||||
ctx,
|
||||
profile,
|
||||
@@ -379,7 +400,7 @@ func (d *descheduler) runProfiles(ctx context.Context) {
|
||||
frameworkprofile.WithPodEvictor(d.podEvictor),
|
||||
frameworkprofile.WithGetPodsAssignedToNodeFnc(d.getPodsAssignedToNode),
|
||||
frameworkprofile.WithMetricsCollector(d.metricsCollector),
|
||||
frameworkprofile.WithPrometheusClient(d.prometheusClient),
|
||||
frameworkprofile.WithPrometheusClient(promClient),
|
||||
// Generate a unique instance ID using just the index to avoid long IDs
|
||||
// when profile names are very long
|
||||
frameworkprofile.WithProfileInstanceID(fmt.Sprintf("%d", idx)),
|
||||
@@ -618,13 +639,13 @@ func RunDeschedulerStrategies(ctx context.Context, rs *options.DeschedulerServer
|
||||
}
|
||||
|
||||
if metricProviderTokenReconciliation == secretReconciliation {
|
||||
go descheduler.runAuthenticationSecretReconciler(ctx)
|
||||
go descheduler.promClientCtrl.runAuthenticationSecretReconciler(ctx)
|
||||
}
|
||||
|
||||
wait.NonSlidingUntil(func() {
|
||||
if metricProviderTokenReconciliation == inClusterReconciliation {
|
||||
// Read the sa token and assume it has the sufficient permissions to authenticate
|
||||
if err := descheduler.reconcileInClusterSAToken(); err != nil {
|
||||
if err := descheduler.promClientCtrl.reconcileInClusterSAToken(); err != nil {
|
||||
klog.ErrorS(err, "unable to reconcile an in cluster SA token")
|
||||
return
|
||||
}
|
||||
|
||||
@@ -1544,8 +1544,8 @@ func TestPluginPrometheusClientAccess(t *testing.T) {
|
||||
t.Logf("Cycle %d: %s", i+1, cycle.name)
|
||||
|
||||
// Set the descheduler's Prometheus client
|
||||
t.Logf("Setting descheduler.prometheusClient from %v to %v", descheduler.prometheusClient, cycle.client)
|
||||
descheduler.prometheusClient = cycle.client
|
||||
t.Logf("Setting descheduler.promClientCtrl.promClient from %v to %v", descheduler.promClientCtrl.promClient, cycle.client)
|
||||
descheduler.promClientCtrl.promClient = cycle.client
|
||||
|
||||
newInvoked = false
|
||||
reactorInvoked = false
|
||||
@@ -1554,7 +1554,7 @@ func TestPluginPrometheusClientAccess(t *testing.T) {
|
||||
|
||||
descheduler.runProfiles(ctx)
|
||||
|
||||
t.Logf("After cycle %d: prometheusClientFromReactor=%v, descheduler.prometheusClient=%v", i+1, prometheusClientFromReactor, descheduler.prometheusClient)
|
||||
t.Logf("After cycle %d: prometheusClientFromReactor=%v, descheduler.promClientCtrl.promClient=%v", i+1, prometheusClientFromReactor, descheduler.promClientCtrl.promClient)
|
||||
|
||||
if !newInvoked {
|
||||
t.Fatalf("Expected plugin new to be invoked during cycle %d", i+1)
|
||||
@@ -1564,7 +1564,7 @@ func TestPluginPrometheusClientAccess(t *testing.T) {
|
||||
t.Fatalf("Expected deschedule reactor to be invoked during cycle %d", i+1)
|
||||
}
|
||||
|
||||
verifyAllPrometheusClientsEqual(t, cycle.client, prometheusClientFromReactor, prometheusClientFromPluginNewHandle, descheduler.prometheusClient)
|
||||
verifyAllPrometheusClientsEqual(t, cycle.client, prometheusClientFromReactor, prometheusClientFromPluginNewHandle, descheduler.promClientCtrl.promClient)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user