feat: limiting amount of resources deployed in a tenant

This commit is contained in:
Dario Tranchitella
2021-12-16 16:46:23 +01:00
parent a179645f26
commit e53911942d
6 changed files with 359 additions and 8 deletions

View File

@@ -0,0 +1,47 @@
// Copyright 2020-2021 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package v1beta1
import (
"fmt"
"strconv"
)
const (
ResourceQuotaAnnotationPrefix = "quota.resources.capsule.clastix.io"
ResourceUsedAnnotationPrefix = "used.resources.capsule.clastix.io"
)
func UsedAnnotationForResource(kindGroup string) string {
return fmt.Sprintf("%s/%s", ResourceUsedAnnotationPrefix, kindGroup)
}
func LimitAnnotationForResource(kindGroup string) string {
return fmt.Sprintf("%s/%s", ResourceQuotaAnnotationPrefix, kindGroup)
}
func GetUsedResourceFromTenant(tenant Tenant, kindGroup string) (int64, error) {
usedStr, ok := tenant.GetAnnotations()[UsedAnnotationForResource(kindGroup)]
if !ok {
usedStr = "0"
}
used, _ := strconv.ParseInt(usedStr, 10, 10)
return used, nil
}
func GetLimitResourceFromTenant(tenant Tenant, kindGroup string) (int64, error) {
limitStr, ok := tenant.GetAnnotations()[LimitAnnotationForResource(kindGroup)]
if !ok {
return 0, fmt.Errorf("resource %s is not limited for the current tenant", kindGroup)
}
limit, err := strconv.ParseInt(limitStr, 10, 10)
if err != nil {
return 0, fmt.Errorf("resource %s limit cannot be parsed, %w", kindGroup, err)
}
return limit, nil
}

View File

@@ -9,6 +9,7 @@ import (
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
@@ -20,9 +21,10 @@ import (
type Manager struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
Recorder record.EventRecorder
Log logr.Logger
Scheme *runtime.Scheme
Recorder record.EventRecorder
RESTConfig *rest.Config
}
func (r *Manager) SetupWithManager(mgr ctrl.Manager) error {
@@ -55,6 +57,12 @@ func (r Manager) Reconcile(ctx context.Context, request ctrl.Request) (result ct
return
}
r.Log.Info("Ensuring limit resources count is updated")
if err = r.syncCustomResourceQuotaUsages(ctx, instance); err != nil {
r.Log.Error(err, "Cannot count limited resources")
return
}
// Ensuring all namespaces are collected
r.Log.Info("Ensuring all Namespaces are collected")
if err = r.collectNamespaces(instance); err != nil {

View File

@@ -0,0 +1,122 @@
package tenant
import (
"context"
"fmt"
"strings"
"golang.org/x/sync/errgroup"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/util/retry"
capsulev1beta1 "github.com/clastix/capsule/api/v1beta1"
)
func (r *Manager) syncCustomResourceQuotaUsages(ctx context.Context, tenant *capsulev1beta1.Tenant) error {
type resource struct {
kind string
group string
version string
}
var resourceList []resource
for k := range tenant.GetAnnotations() {
if !strings.HasPrefix(k, capsulev1beta1.ResourceQuotaAnnotationPrefix) {
continue
}
parts := strings.Split(k, "/")
if len(parts) != 2 {
r.Log.Info("non well-formed Resource Limit annotation", "key", k)
continue
}
parts = strings.Split(parts[1], "_")
if len(parts) != 2 {
r.Log.Info("non well-formed Resource Limit annotation, cannot retrieve version", "key", k)
continue
}
groupKindParts := strings.Split(parts[0], ".")
if len(groupKindParts) < 2 {
r.Log.Info("non well-formed Resource Limit annotation, cannot retrieve kind and group", "key", k)
continue
}
resourceList = append(resourceList, resource{
kind: groupKindParts[0],
group: strings.Join(groupKindParts[1:], "."),
version: parts[1],
})
}
errGroup := new(errgroup.Group)
usedMap := make(map[string]int)
defer func() {
for gvk, used := range usedMap {
err := retry.RetryOnConflict(retry.DefaultBackoff, func() (retryErr error) {
tnt := &capsulev1beta1.Tenant{}
if retryErr = r.Client.Get(ctx, types.NamespacedName{Name: tenant.GetName()}, tnt); retryErr != nil {
return
}
if tnt.GetAnnotations() == nil {
tnt.Annotations = make(map[string]string)
}
tnt.Annotations[capsulev1beta1.UsedAnnotationForResource(gvk)] = fmt.Sprintf("%d", used)
return r.Client.Update(ctx, tnt)
})
if err != nil {
r.Log.Error(err, "cannot update custom Resource Quota", "GVK", gvk)
}
}
}()
for _, item := range resourceList {
res := item
errGroup.Go(func() (scopeErr error) {
dynamicClient := dynamic.NewForConfigOrDie(r.RESTConfig)
for _, ns := range tenant.Status.Namespaces {
var list *unstructured.UnstructuredList
list, scopeErr = dynamicClient.Resource(schema.GroupVersionResource{Group: res.group, Version: res.version, Resource: res.kind}).List(ctx, metav1.ListOptions{
FieldSelector: fmt.Sprintf("metadata.namespace==%s", ns),
})
if scopeErr != nil {
return scopeErr
}
key := fmt.Sprintf("%s.%s_%s", res.kind, res.group, res.version)
if _, ok := usedMap[key]; !ok {
usedMap[key] = 0
}
usedMap[key] += len(list.Items)
}
return
})
}
if err := errGroup.Wait(); err != nil {
return err
}
return nil
}

11
main.go
View File

@@ -167,10 +167,11 @@ func main() {
if len(ca.Data) > 0 && len(tls.Data) > 0 {
if err = (&tenantcontroller.Manager{
Client: manager.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("Tenant"),
Scheme: manager.GetScheme(),
Recorder: manager.GetEventRecorderFor("tenant-controller"),
RESTConfig: manager.GetConfig(),
Client: manager.GetClient(),
Log: ctrl.Log.WithName("controllers").WithName("Tenant"),
Scheme: manager.GetScheme(),
Recorder: manager.GetEventRecorderFor("tenant-controller"),
}).SetupWithManager(manager); err != nil {
setupLog.Error(err, "unable to create controller", "controller", "Tenant")
os.Exit(1)
@@ -206,7 +207,7 @@ func main() {
route.NetworkPolicy(utils.InCapsuleGroups(cfg, networkpolicy.Handler())),
route.Tenant(tenant.NameHandler(), tenant.RoleBindingRegexHandler(), tenant.IngressClassRegexHandler(), tenant.StorageClassRegexHandler(), tenant.ContainerRegistryRegexHandler(), tenant.HostnameRegexHandler(), tenant.FreezedEmitter(), tenant.ServiceAccountNameHandler()),
route.OwnerReference(utils.InCapsuleGroups(cfg, ownerreference.Handler(cfg))),
route.Cordoning(tenant.CordoningHandler(cfg)),
route.Cordoning(tenant.CordoningHandler(cfg), tenant.ResourceCounterHandler()),
route.Node(utils.InCapsuleGroups(cfg, node.UserMetadataHandler(cfg, kubeVersion))),
)

View File

@@ -0,0 +1,151 @@
// Copyright 2020-2021 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package tenant
import (
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/webhook/admission"
capsulev1beta1 "github.com/clastix/capsule/api/v1beta1"
capsulewebhook "github.com/clastix/capsule/pkg/webhook"
"github.com/clastix/capsule/pkg/webhook/utils"
)
type resourceCounterHandler struct {
client client.Client
}
func (r *resourceCounterHandler) InjectClient(c client.Client) error {
r.client = c
return nil
}
func ResourceCounterHandler() capsulewebhook.Handler {
return &resourceCounterHandler{}
}
func (r *resourceCounterHandler) getTenantName(ctx context.Context, clt client.Client, req admission.Request) (string, error) {
tntList := &capsulev1beta1.TenantList{}
if err := clt.List(ctx, tntList, client.MatchingFieldsSelector{
Selector: fields.OneTermEqualSelector(".status.namespaces", req.Namespace),
}); err != nil {
return "", err
}
if len(tntList.Items) == 0 {
return "", nil
}
return tntList.Items[0].GetName(), nil
}
func (r *resourceCounterHandler) OnCreate(clt client.Client, decoder *admission.Decoder, recorder record.EventRecorder) capsulewebhook.Func {
return func(ctx context.Context, req admission.Request) *admission.Response {
var tntName string
var err error
if tntName, err = r.getTenantName(ctx, clt, req); err != nil {
return utils.ErroredResponse(err)
}
if len(tntName) == 0 {
return nil
}
kgv := fmt.Sprintf("%s.%s_%s", req.Resource.Resource, req.Resource.Group, req.Resource.Version)
tnt := &capsulev1beta1.Tenant{}
var limit int64
err = retry.RetryOnConflict(retry.DefaultRetry, func() (retryErr error) {
if retryErr = clt.Get(ctx, types.NamespacedName{Name: tntName}, tnt); err != nil {
return retryErr
}
if limit, retryErr = capsulev1beta1.GetLimitResourceFromTenant(*tnt, kgv); retryErr != nil {
return nil
}
used, _ := capsulev1beta1.GetUsedResourceFromTenant(*tnt, kgv)
if used >= limit {
return NewCustomResourceQuotaError(kgv, limit)
}
tnt.Annotations[capsulev1beta1.UsedAnnotationForResource(kgv)] = fmt.Sprintf("%d", used+1)
return clt.Update(ctx, tnt)
})
if err != nil {
if _, ok := err.(*customResourceQuotaError); ok {
recorder.Eventf(tnt, corev1.EventTypeWarning, "ResourceQuota", "Resource %s/%s in API group %s cannot be created, limit usage of %d has been reached", req.Namespace, req.Name, kgv, limit)
}
return utils.ErroredResponse(err)
}
return nil
}
}
func (r *resourceCounterHandler) OnDelete(clt client.Client, decoder *admission.Decoder, recorder record.EventRecorder) capsulewebhook.Func {
return func(ctx context.Context, req admission.Request) *admission.Response {
var tntName string
var err error
if tntName, err = r.getTenantName(ctx, clt, req); err != nil {
return utils.ErroredResponse(err)
}
if len(tntName) == 0 {
return nil
}
kgv := fmt.Sprintf("%s.%s_%s", req.Resource.Resource, req.Resource.Group, req.Resource.Version)
err = retry.RetryOnConflict(retry.DefaultRetry, func() (retryErr error) {
tnt := &capsulev1beta1.Tenant{}
if retryErr = clt.Get(ctx, types.NamespacedName{Name: tntName}, tnt); err != nil {
return
}
if tnt.Annotations == nil {
return
}
if _, ok := tnt.Annotations[capsulev1beta1.UsedAnnotationForResource(kgv)]; !ok {
return
}
used, _ := capsulev1beta1.GetUsedResourceFromTenant(*tnt, kgv)
tnt.Annotations[capsulev1beta1.UsedAnnotationForResource(kgv)] = fmt.Sprintf("%d", used-1)
return clt.Update(ctx, tnt)
})
if err != nil {
return utils.ErroredResponse(err)
}
return nil
}
}
func (r *resourceCounterHandler) OnUpdate(client client.Client, decoder *admission.Decoder, recorder record.EventRecorder) capsulewebhook.Func {
return func(ctx context.Context, req admission.Request) *admission.Response {
return nil
}
}

View File

@@ -0,0 +1,22 @@
// Copyright 2020-2021 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package tenant
import "fmt"
type customResourceQuotaError struct {
kindGroup string
limit int64
}
func NewCustomResourceQuotaError(kindGroup string, limit int64) error {
return &customResourceQuotaError{
kindGroup: kindGroup,
limit: limit,
}
}
func (r customResourceQuotaError) Error() string {
return fmt.Sprintf("resource %s has reached quota limit of %d items", r.kindGroup, r.limit)
}