Files
capsule/controllers/resourcepools/claim_controller.go
Oliver Bähler c8377d51f1 feat: improve resourcepool monitoring (#1488)
* feat(resourcepools): add improved metrics

Signed-off-by: Oliver Bähler <oliverbaehler@hotmail.com>

* feat(helm): add resourcepool dashboard

Signed-off-by: Oliver Bähler <oliverbaehler@hotmail.com>

---------

Signed-off-by: Oliver Bähler <oliverbaehler@hotmail.com>
2025-06-03 14:10:42 +02:00

295 lines
7.1 KiB
Go

// Copyright 2020-2023 Project Capsule Authors.
// SPDX-License-Identifier: Apache-2.0
package resourcepools
import (
"context"
"fmt"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
capsulev1beta2 "github.com/projectcapsule/capsule/api/v1beta2"
"github.com/projectcapsule/capsule/pkg/api"
"github.com/projectcapsule/capsule/pkg/meta"
"github.com/projectcapsule/capsule/pkg/metrics"
)
type resourceClaimController struct {
client.Client
metrics *metrics.ClaimRecorder
log logr.Logger
recorder record.EventRecorder
}
func (r *resourceClaimController) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&capsulev1beta2.ResourcePoolClaim{}).
Watches(
&capsulev1beta2.ResourcePool{},
handler.EnqueueRequestsFromMapFunc(r.claimsWithoutPoolFromNamespaces),
builder.WithPredicates(predicate.ResourceVersionChangedPredicate{}),
).
Complete(r)
}
func (r resourceClaimController) Reconcile(ctx context.Context, request ctrl.Request) (result ctrl.Result, err error) {
log := r.log.WithValues("Request.Name", request.Name)
instance := &capsulev1beta2.ResourcePoolClaim{}
if err = r.Get(ctx, request.NamespacedName, instance); err != nil {
if apierrors.IsNotFound(err) {
log.V(5).Info("Request object not found, could have been deleted after reconcile request")
r.metrics.DeleteClaimMetric(request.Name, request.Namespace)
return reconcile.Result{}, nil
}
log.Error(err, "Error reading the object")
return
}
// Ensuring the Quota Status
err = r.reconcile(ctx, log, instance)
// Emit a Metric in any case
r.metrics.RecordClaimCondition(instance)
if err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, err
}
// Trigger claims from a namespace, which are not yet allocated.
// when a resourcepool updates it's status.
func (r *resourceClaimController) claimsWithoutPoolFromNamespaces(ctx context.Context, obj client.Object) []reconcile.Request {
pool, ok := obj.(*capsulev1beta2.ResourcePool)
if !ok {
return nil
}
var requests []reconcile.Request
for _, ns := range pool.Status.Namespaces {
claimList := &capsulev1beta2.ResourcePoolClaimList{}
if err := r.List(ctx, claimList, client.InNamespace(ns)); err != nil {
r.log.Error(err, "Failed to list claims in namespace", "namespace", ns)
continue
}
for _, claim := range claimList.Items {
if claim.Status.Pool.UID == "" {
requests = append(requests, reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: claim.Namespace,
Name: claim.Name,
},
})
}
}
}
return requests
}
// This Controller is responsible for assigning Claims to ResourcePools.
// Everything else will be handeled by the ResourcePool Controller.
func (r resourceClaimController) reconcile(
ctx context.Context,
log logr.Logger,
claim *capsulev1beta2.ResourcePoolClaim,
) (err error) {
pool, err := r.evaluateResourcePool(ctx, claim)
if err != nil {
claim.Status.Pool = api.StatusNameUID{}
cond := meta.NewAssignedCondition(claim)
cond.Status = metav1.ConditionFalse
cond.Reason = meta.FailedReason
cond.Message = err.Error()
return updateStatusAndEmitEvent(
ctx,
r.Client,
r.recorder,
claim,
cond,
)
}
return r.allocateResourcePool(ctx, log, claim, pool)
}
// Verify a Pool can be allocated.
func (r resourceClaimController) evaluateResourcePool(
ctx context.Context,
claim *capsulev1beta2.ResourcePoolClaim,
) (pool *capsulev1beta2.ResourcePool, err error) {
poolName := claim.Spec.Pool
if poolName == "" {
err = fmt.Errorf("no pool reference was defined")
return pool, err
}
pool = &capsulev1beta2.ResourcePool{}
if err := r.Get(ctx, client.ObjectKey{
Name: poolName,
}, pool); err != nil {
return nil, err
}
if !pool.DeletionTimestamp.IsZero() {
return nil, fmt.Errorf(
"resourcepool not available",
)
}
allowed := false
for _, ns := range pool.Status.Namespaces {
if ns == claim.GetNamespace() {
allowed = true
continue
}
}
if !allowed {
return nil, fmt.Errorf(
"resourcepool not available",
)
}
// Validates if Resources can be allocated in the first place
for resourceName := range claim.Spec.ResourceClaims {
_, exists := pool.Status.Allocation.Hard[resourceName]
if !exists {
return nil, fmt.Errorf(
"resource %s is not available in pool %s",
resourceName,
pool.Name,
)
}
}
return pool, err
}
func (r resourceClaimController) allocateResourcePool(
ctx context.Context,
log logr.Logger,
cl *capsulev1beta2.ResourcePoolClaim,
pool *capsulev1beta2.ResourcePool,
) (err error) {
allocate := api.StatusNameUID{
Name: api.Name(pool.GetName()),
UID: pool.GetUID(),
}
if !meta.HasLooseOwnerReference(cl, pool) {
log.V(5).Info("adding ownerreference for", "pool", pool.Name)
patch := client.MergeFrom(cl.DeepCopy())
if err := meta.SetLooseOwnerReference(cl, pool, r.Scheme()); err != nil {
return err
}
if err := r.Patch(ctx, cl, patch); err != nil {
return err
}
}
if cl.Status.Pool.Name == allocate.Name &&
cl.Status.Pool.UID == allocate.UID {
return nil
}
cond := meta.NewAssignedCondition(cl)
cond.Status = metav1.ConditionTrue
cond.Reason = meta.SucceededReason
// Set claim pool in status and condition
cl.Status = capsulev1beta2.ResourcePoolClaimStatus{
Pool: allocate,
Condition: cond,
}
// Update status in a separate call
if err := r.Client.Status().Update(ctx, cl); err != nil {
return err
}
return nil
}
// Update the Status of a claim and emit an event if Status changed.
func updateStatusAndEmitEvent(
ctx context.Context,
c client.Client,
recorder record.EventRecorder,
claim *capsulev1beta2.ResourcePoolClaim,
condition metav1.Condition,
) (err error) {
if claim.Status.Condition.Type == condition.Type &&
claim.Status.Condition.Status == condition.Status &&
claim.Status.Condition.Reason == condition.Reason &&
claim.Status.Condition.Message == condition.Message {
return nil
}
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
current := &capsulev1beta2.ResourcePoolClaim{}
if err := c.Get(ctx, client.ObjectKeyFromObject(claim), current); err != nil {
return fmt.Errorf("failed to refetch instance before update: %w", err)
}
current.Status.Condition = condition
return c.Status().Update(ctx, current)
})
claim.Status.Condition = condition
if err != nil {
return err
}
eventType := corev1.EventTypeNormal
if claim.Status.Condition.Status == metav1.ConditionFalse {
eventType = corev1.EventTypeWarning
}
recorder.AnnotatedEventf(
claim,
map[string]string{
"Status": string(claim.Status.Condition.Status),
"Type": claim.Status.Condition.Type,
},
eventType,
claim.Status.Condition.Reason,
claim.Status.Condition.Message,
)
return
}