mirror of
https://github.com/projectcapsule/capsule.git
synced 2026-05-19 07:46:37 +00:00
Fixing the ResourceQuota update (#15)
* Fixing the ResourceQuota update * Using goroutines to parallelize and speed up ResourceQuotas processing
This commit is contained in:
@@ -18,6 +18,7 @@ import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/go-logr/logr"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
@@ -169,6 +170,51 @@ func (r *ReconcileTenant) pruningResources(ns string, keys []string, obj runtime
|
||||
return nil
|
||||
}
|
||||
|
||||
// Serial ResourceQuota processing is expensive: using Go routines we can speed it up.
|
||||
// In case of multiple errors these are logged properly, returning a generic error since we have to repush back the
|
||||
// reconciliation loop.
|
||||
func (r *ReconcileTenant) resourceQuotasUpdate(resourceName corev1.ResourceName, qt resource.Quantity, list ...corev1.ResourceQuota) (err error) {
|
||||
ch := make(chan error, len(list))
|
||||
|
||||
wg := &sync.WaitGroup{}
|
||||
wg.Add(len(list))
|
||||
|
||||
f := func(rq corev1.ResourceQuota, wg *sync.WaitGroup, ch chan error) {
|
||||
defer wg.Done()
|
||||
ch <- retry.RetryOnConflict(retry.DefaultBackoff, func() error {
|
||||
// Retrieving from the cache the actual ResourceQuota
|
||||
found := &corev1.ResourceQuota{}
|
||||
_ = r.client.Get(context.TODO(), types.NamespacedName{Namespace: rq.Namespace, Name: rq.Name}, found)
|
||||
// Ensuring annotation map is there to avoid uninitialized map error and
|
||||
// assigning the overall usage
|
||||
if found.Annotations == nil {
|
||||
found.Annotations = make(map[string]string)
|
||||
}
|
||||
found.Labels = rq.Labels
|
||||
found.Annotations[capsulev1alpha1.UsedQuotaFor(resourceName)] = qt.String()
|
||||
// Updating the Resource according to the qt.Cmp result
|
||||
found.Spec.Hard = rq.Spec.Hard
|
||||
return r.client.Update(context.TODO(), found, &client.UpdateOptions{})
|
||||
})
|
||||
}
|
||||
|
||||
for _, rq := range list {
|
||||
go f(rq, wg, ch)
|
||||
}
|
||||
wg.Wait()
|
||||
close(ch)
|
||||
|
||||
for e := range ch {
|
||||
if e != nil {
|
||||
// We had an error and we mark the whole transaction as failed
|
||||
// to process it another time acording to the Tenant controller back-off factor.
|
||||
err = fmt.Errorf("update of outer ResourceQuota items has failed")
|
||||
r.logger.Error(err, "Cannot update outer ResourceQuotas", "resourceName", resourceName.String())
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// We're relying on the ResourceQuota resource to represent the resource quota for the single Tenant rather than the
|
||||
// single Namespace, so abusing of this API although its Namespaced scope.
|
||||
// Since a Namespace could take-up all the available resource quota, the Namespace ResourceQuota will be a 1:1 mapping
|
||||
@@ -250,14 +296,24 @@ func (r *ReconcileTenant) syncResourceQuotas(tenant *capsulev1alpha1.Tenant) err
|
||||
r.logger.Info("Computed " + rn.String() + " quota for the whole Tenant is " + qt.String())
|
||||
|
||||
switch qt.Cmp(q.Hard[rn]) {
|
||||
case 0:
|
||||
// The Tenant is matching exactly the Quota:
|
||||
// falling through next case since we have to block further
|
||||
// resource allocations.
|
||||
fallthrough
|
||||
case 1:
|
||||
// The Tenant is OverQuota:
|
||||
// updating all the related ResourceQuota with the current
|
||||
// used Quota to block further creations.
|
||||
for i := range rql.Items {
|
||||
rql.Items[i].Spec.Hard[rn] = rql.Items[i].Status.Used[rn]
|
||||
if _, ok := rql.Items[i].Status.Used[rn]; ok {
|
||||
rql.Items[i].Spec.Hard[rn] = rql.Items[i].Status.Used[rn]
|
||||
} else {
|
||||
um := make(map[corev1.ResourceName]resource.Quantity)
|
||||
um[rn] = resource.Quantity{}
|
||||
rql.Items[i].Spec.Hard = um
|
||||
}
|
||||
}
|
||||
println("")
|
||||
default:
|
||||
// The Tenant is respecting the Hard quota:
|
||||
// restoring the default one for all the elements,
|
||||
@@ -267,21 +323,9 @@ func (r *ReconcileTenant) syncResourceQuotas(tenant *capsulev1alpha1.Tenant) err
|
||||
}
|
||||
target.Spec = q
|
||||
}
|
||||
|
||||
// Updating all outer join ResourceQuota adding the Used for the current Resource
|
||||
// TODO(prometherion): this is too expensive, should be performed via a recursion
|
||||
for _, oj := range rql.Items {
|
||||
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
|
||||
_ = r.client.Get(context.TODO(), types.NamespacedName{Namespace: oj.Namespace, Name: oj.Name}, &oj)
|
||||
if oj.Annotations == nil {
|
||||
oj.Annotations = make(map[string]string)
|
||||
}
|
||||
oj.Annotations[capsulev1alpha1.UsedQuotaFor(rn)] = qt.String()
|
||||
return r.client.Update(context.TODO(), &oj, &client.UpdateOptions{})
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if err := r.resourceQuotasUpdate(rn, qt, rql.Items...); err != nil {
|
||||
r.logger.Error(err, "cannot proceed with outer ResourceQuota")
|
||||
return err
|
||||
}
|
||||
}
|
||||
return controllerutil.SetControllerReference(tenant, target, r.scheme)
|
||||
|
||||
Reference in New Issue
Block a user