No loop on ResourceQuota outer updates and error handling improvements (#168)

* Avoiding loop on updating outer resource quota

* Using retryOnConflict on Tenant status update

* Using errgroup instead of bare go routines

* Testing Namespace Capsule default label presence
This commit is contained in:
Dario Tranchitella
2020-12-20 12:25:41 +01:00
committed by GitHub
parent 6e24aad094
commit 03eb6e633e
5 changed files with 141 additions and 95 deletions

View File

@@ -20,10 +20,10 @@ import (
"bytes"
"context"
"errors"
"sync"
"time"
"github.com/go-logr/logr"
"golang.org/x/sync/errgroup"
v1 "k8s.io/api/admissionregistration/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
@@ -50,12 +50,8 @@ func (r *CaReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}
func (r CaReconciler) UpdateValidatingWebhookConfiguration(wg *sync.WaitGroup, ch chan error, caBundle []byte) {
defer wg.Done()
var err error
ch <- retry.RetryOnConflict(retry.DefaultBackoff, func() error {
func (r CaReconciler) UpdateValidatingWebhookConfiguration(caBundle []byte) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() (err error) {
vw := &v1.ValidatingWebhookConfiguration{}
err = r.Get(context.TODO(), types.NamespacedName{Name: "capsule-validating-webhook-configuration"}, vw)
if err != nil {
@@ -72,12 +68,8 @@ func (r CaReconciler) UpdateValidatingWebhookConfiguration(wg *sync.WaitGroup, c
})
}
func (r CaReconciler) UpdateMutatingWebhookConfiguration(wg *sync.WaitGroup, ch chan error, caBundle []byte) {
defer wg.Done()
var err error
ch <- retry.RetryOnConflict(retry.DefaultBackoff, func() error {
func (r CaReconciler) UpdateMutatingWebhookConfiguration(caBundle []byte) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() (err error) {
mw := &v1.MutatingWebhookConfiguration{}
err = r.Get(context.TODO(), types.NamespacedName{Name: "capsule-mutating-webhook-configuration"}, mw)
if err != nil {
@@ -139,20 +131,16 @@ func (r CaReconciler) Reconcile(ctx context.Context, request ctrl.Request) (ctrl
privateKeySecretKey: key.Bytes(),
}
wg := &sync.WaitGroup{}
wg.Add(2)
ch := make(chan error, 2)
group := errgroup.Group{}
group.Go(func() error {
return r.UpdateMutatingWebhookConfiguration(crt.Bytes())
})
group.Go(func() error {
return r.UpdateValidatingWebhookConfiguration(crt.Bytes())
})
go r.UpdateMutatingWebhookConfiguration(wg, ch, crt.Bytes())
go r.UpdateValidatingWebhookConfiguration(wg, ch, crt.Bytes())
wg.Wait()
close(ch)
for err = range ch {
if err != nil {
return reconcile.Result{}, err
}
if err := group.Wait(); err != nil {
return reconcile.Result{}, err
}
}

View File

@@ -22,10 +22,9 @@ import (
"hash/fnv"
"strconv"
"strings"
"sync"
"github.com/go-logr/logr"
"github.com/hashicorp/go-multierror"
"golang.org/x/sync/errgroup"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
rbacv1 "k8s.io/api/rbac/v1"
@@ -184,46 +183,43 @@ func (r *TenantReconciler) pruningResources(ns string, keys []string, obj client
// In case of multiple errors these are logged properly, returning a generic error since we have to repush back the
// reconciliation loop.
func (r *TenantReconciler) resourceQuotasUpdate(resourceName corev1.ResourceName, actual, limit resource.Quantity, list ...corev1.ResourceQuota) (err error) {
ch := make(chan error, len(list))
g := errgroup.Group{}
wg := &sync.WaitGroup{}
wg.Add(len(list))
f := func(rq corev1.ResourceQuota, wg *sync.WaitGroup, ch chan error) {
defer wg.Done()
ch <- retry.RetryOnConflict(retry.DefaultBackoff, func() error {
// Retrieving from the cache the actual ResourceQuota
for _, item := range list {
rq := item
g.Go(func() error {
found := &corev1.ResourceQuota{}
_ = r.Get(context.TODO(), types.NamespacedName{Namespace: rq.Namespace, Name: rq.Name}, found)
// Ensuring annotation map is there to avoid uninitialized map error and
// assigning the overall usage
if found.Annotations == nil {
found.Annotations = make(map[string]string)
if err := r.Get(context.TODO(), types.NamespacedName{Namespace: rq.Namespace, Name: rq.Name}, found); err != nil {
return err
}
found.Labels = rq.Labels
found.Annotations[capsulev1alpha1.UsedQuotaFor(resourceName)] = actual.String()
found.Annotations[capsulev1alpha1.HardQuotaFor(resourceName)] = limit.String()
// Updating the Resource according to the actual.Cmp result
found.Spec.Hard = rq.Spec.Hard
return r.Update(context.TODO(), found, &client.UpdateOptions{})
return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
_, err := controllerutil.CreateOrUpdate(context.TODO(), r.Client, found, func() error {
// Ensuring annotation map is there to avoid uninitialized map error and
// assigning the overall usage
if found.Annotations == nil {
found.Annotations = make(map[string]string)
}
found.Labels = rq.Labels
found.Annotations[capsulev1alpha1.UsedQuotaFor(resourceName)] = actual.String()
found.Annotations[capsulev1alpha1.HardQuotaFor(resourceName)] = limit.String()
// Updating the Resource according to the actual.Cmp result
found.Spec.Hard = rq.Spec.Hard
return nil
})
return err
})
})
}
for _, rq := range list {
go f(rq, wg, ch)
if err = g.Wait(); err != nil {
// We had an error and we mark the whole transaction as failed
// to process it another time according to the Tenant controller back-off factor.
r.Log.Error(err, "Cannot update outer ResourceQuotas", "resourceName", resourceName.String())
err = fmt.Errorf("update of outer ResourceQuota items has failed: %s", err.Error())
}
wg.Wait()
close(ch)
for e := range ch {
if e != nil {
// We had an error and we mark the whole transaction as failed
// to process it another time acording to the Tenant controller back-off factor.
r.Log.Error(e, "Cannot update outer ResourceQuotas", "resourceName", resourceName.String())
err = fmt.Errorf("update of outer ResourceQuota items has failed")
}
}
return
return err
}
// Additional Role Bindings can be used in many ways: applying Pod Security Policies or giving
@@ -465,20 +461,16 @@ func (r *TenantReconciler) syncLimitRanges(tenant *capsulev1alpha1.Tenant) error
return nil
}
func (r *TenantReconciler) syncNamespace(namespace string, tnt *capsulev1alpha1.Tenant, wg *sync.WaitGroup, channel chan error) {
defer wg.Done()
ns := &corev1.Namespace{}
if err := r.Client.Get(context.TODO(), types.NamespacedName{Name: namespace}, ns); err != nil {
channel <- err
}
channel <- retry.RetryOnConflict(retry.DefaultBackoff, func() error {
func (r *TenantReconciler) syncNamespace(namespace string, tnt *capsulev1alpha1.Tenant) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() (err error) {
ns := &corev1.Namespace{}
if err = r.Client.Get(context.TODO(), types.NamespacedName{Name: namespace}, ns); err != nil {
return
}
a := ns.GetAnnotations()
if a == nil {
a = make(map[string]string)
}
// resetting Capsule annotations
delete(a, capsulev1alpha1.AvailableIngressClassesAnnotation)
delete(a, capsulev1alpha1.AvailableIngressClassesRegexpAnnotation)
@@ -542,22 +534,18 @@ func (r *TenantReconciler) syncNamespace(namespace string, tnt *capsulev1alpha1.
// Ensuring all annotations are applied to each Namespace handled by the Tenant.
func (r *TenantReconciler) syncNamespaces(tenant *capsulev1alpha1.Tenant) (err error) {
ch := make(chan error, tenant.Status.Namespaces.Len())
group := errgroup.Group{}
wg := &sync.WaitGroup{}
wg.Add(tenant.Status.Namespaces.Len())
for _, ns := range tenant.Status.Namespaces {
go r.syncNamespace(ns, tenant, wg, ch)
for _, item := range tenant.Status.Namespaces {
namespace := item
group.Go(func() error {
return r.syncNamespace(namespace, tenant)
})
}
wg.Wait()
close(ch)
for e := range ch {
if e != nil {
err = multierror.Append(e, err)
}
if err = group.Wait(); err != nil {
r.Log.Error(err, "Cannot sync Namespaces")
err = fmt.Errorf("cannot sync Namespaces: %s", err.Error())
}
return
}
@@ -706,17 +694,19 @@ func (r *TenantReconciler) ensureNamespaceCount(tenant *capsulev1alpha1.Tenant)
})
}
func (r *TenantReconciler) collectNamespaces(tenant *capsulev1alpha1.Tenant) (err error) {
nl := &corev1.NamespaceList{}
err = r.Client.List(context.TODO(), nl, client.MatchingFieldsSelector{
Selector: fields.OneTermEqualSelector(".metadata.ownerReferences[*].capsule", tenant.GetName()),
})
if err != nil {
func (r *TenantReconciler) collectNamespaces(tenant *capsulev1alpha1.Tenant) error {
return retry.RetryOnConflict(retry.DefaultBackoff, func() (err error) {
nl := &corev1.NamespaceList{}
err = r.Client.List(context.TODO(), nl, client.MatchingFieldsSelector{
Selector: fields.OneTermEqualSelector(".metadata.ownerReferences[*].capsule", tenant.GetName()),
})
if err != nil {
return
}
_, err = controllerutil.CreateOrUpdate(context.TODO(), r.Client, tenant.DeepCopy(), func() error {
tenant.AssignNamespaces(nl.Items)
return r.Client.Status().Update(context.TODO(), tenant, &client.UpdateOptions{})
})
return
}
tenant.AssignNamespaces(nl.Items)
_, err = controllerutil.CreateOrUpdate(context.TODO(), r.Client, tenant.DeepCopy(), func() error {
return r.Client.Status().Update(context.TODO(), tenant, &client.UpdateOptions{})
})
return
}

View File

@@ -0,0 +1,67 @@
//+build e2e
/*
Copyright 2020 Clastix Labs.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package e2e
import (
"context"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"github.com/clastix/capsule/api/v1alpha1"
)
var _ = Describe("creating several Namespaces for a Tenant", func() {
tnt := &v1alpha1.Tenant{
ObjectMeta: metav1.ObjectMeta{
Name: "capsule-labels",
},
Spec: v1alpha1.TenantSpec{
Owner: v1alpha1.OwnerSpec{
Name: "charlie",
Kind: "User",
},
},
}
JustBeforeEach(func() {
EventuallyCreation(func() error {
return k8sClient.Create(context.TODO(), tnt.DeepCopy())
}).Should(Succeed())
})
JustAfterEach(func() {
Expect(k8sClient.Delete(context.TODO(), tnt)).Should(Succeed())
})
It("should contains the default Capsule label", func() {
namespaces := []*v1.Namespace{
NewNamespace("first-capsule-ns"),
NewNamespace("second-capsule-ns"),
NewNamespace("third-capsule-ns"),
}
for _, ns := range namespaces {
NamespaceCreation(ns, tnt, defaultTimeoutInterval).Should(Succeed())
TenantNamespaceList(tnt, defaultTimeoutInterval).Should(ContainElement(ns.GetName()))
Expect(k8sClient.Get(context.TODO(), types.NamespacedName{Name: ns.GetName()}, ns)).Should(Succeed())
Expect(ns.Labels).Should(HaveKeyWithValue("capsule.clastix.io/tenant", tnt.Name))
}
})
})

2
go.mod
View File

@@ -9,7 +9,7 @@ require (
github.com/onsi/gomega v1.10.2
github.com/stretchr/testify v1.5.1
go.uber.org/zap v1.15.0
gomodules.xyz/jsonpatch/v2 v2.1.0
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e
k8s.io/api v0.19.3
k8s.io/apimachinery v0.19.3
k8s.io/client-go v0.19.3

1
go.sum
View File

@@ -512,6 +512,7 @@ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190227155943-e225da77a7e6/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e h1:vcxGaoTs7kV8m5Np9uUNQin4BrLOthgV7252N8V+FwY=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=