feat: refactor resources controller

Co-authored-by: Maksim Fedotov <m_fedotov@wargaming.net>
This commit is contained in:
Max Fedotov
2022-10-27 22:29:37 +03:00
committed by Dario Tranchitella
parent 6403b60590
commit b1ec9fed50
12 changed files with 662 additions and 607 deletions

View File

@@ -7,13 +7,16 @@ import (
"context"
"github.com/hashicorp/go-multierror"
"k8s.io/apimachinery/pkg/api/errors"
"github.com/pkg/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/cluster-api/util/patch"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
@@ -60,21 +63,9 @@ func (r *Global) enqueueRequestFromTenant(object client.Object) (reqs []reconcil
}
func (r *Global) SetupWithManager(mgr ctrl.Manager) error {
unstructuredCachingClient, err := client.NewDelegatingClient(
client.NewDelegatingClientInput{
Client: mgr.GetClient(),
CacheReader: mgr.GetCache(),
CacheUnstructured: true,
},
)
if err != nil {
return err
}
r.client = mgr.GetClient()
r.processor = Processor{
client: r.client,
unstructuredClient: unstructuredCachingClient,
client: mgr.GetClient(),
}
return ctrl.NewControllerManagedBy(mgr).
@@ -83,14 +74,15 @@ func (r *Global) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}
//nolint:dupl
func (r *Global) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
log := ctrllog.FromContext(ctx)
log.Info("start processing")
tntResource := capsulev1beta2.GlobalTenantResource{}
if err := r.client.Get(ctx, request.NamespacedName, &tntResource); err != nil {
if errors.IsNotFound(err) {
// Retrieving the GlobalTenantResource
tntResource := &capsulev1beta2.GlobalTenantResource{}
if err := r.client.Get(ctx, request.NamespacedName, tntResource); err != nil {
if apierrors.IsNotFound(err) {
log.Info("Request object not found, could have been deleted after reconcile request")
return reconcile.Result{}, nil
@@ -98,15 +90,40 @@ func (r *Global) Reconcile(ctx context.Context, request reconcile.Request) (reco
return reconcile.Result{}, err
}
// Adding the default value for the status
patchHelper, err := patch.NewHelper(tntResource, r.client)
if err != nil {
return reconcile.Result{}, errors.Wrap(err, "failed to init patch helper")
}
defer func() {
if e := patchHelper.Patch(ctx, tntResource); e != nil {
if err == nil {
err = errors.Wrap(e, "failed to patch GlobalTenantResource")
}
}
}()
// Handle deleted GlobalTenantResource
if !tntResource.DeletionTimestamp.IsZero() {
return r.reconcileDelete(ctx, tntResource)
}
// Handle non-deleted GlobalTenantResource
return r.reconcileNormal(ctx, tntResource)
}
func (r *Global) reconcileNormal(ctx context.Context, tntResource *capsulev1beta2.GlobalTenantResource) (reconcile.Result, error) {
log := ctrllog.FromContext(ctx)
if *tntResource.Spec.PruningOnDelete {
controllerutil.AddFinalizer(tntResource, finalizer)
}
if tntResource.Status.ProcessedItems == nil {
tntResource.Status.ProcessedItems = make([]capsulev1beta2.ObjectReferenceStatus, 0, 0)
}
// Handling the finalizer section for the given GlobalTenantResource
enqueueBack, err := r.processor.HandleFinalizer(ctx, &tntResource, *tntResource.Spec.PruningOnDelete, tntResource.Status.ProcessedItems)
if err != nil || enqueueBack {
return reconcile.Result{}, err
}
// Retrieving the list of the Tenants up to the selector provided by the GlobalTenantResource resource.
tntSelector, err := metav1.LabelSelectorAsSelector(&tntResource.Spec.TenantSelector)
if err != nil {
@@ -158,9 +175,7 @@ func (r *Global) Reconcile(ctx context.Context, request reconcile.Request) (reco
return reconcile.Result{}, err
}
shouldUpdateStatus := !sets.NewString(tntResource.Status.SelectedTenants...).Equal(tntSet)
if r.processor.HandlePruning(ctx, tntResource.Status.ProcessedItems, processedItems) {
if r.processor.HandlePruning(ctx, tntResource.Status.ProcessedItems.AsSet(), processedItems) {
tntResource.Status.ProcessedItems = make([]capsulev1beta2.ObjectReferenceStatus, 0, len(processedItems))
for _, item := range processedItems.List() {
@@ -168,16 +183,22 @@ func (r *Global) Reconcile(ctx context.Context, request reconcile.Request) (reco
tntResource.Status.ProcessedItems = append(tntResource.Status.ProcessedItems, or)
}
}
shouldUpdateStatus = true
}
if shouldUpdateStatus {
tntResource.Status.SelectedTenants = tntSet.List()
tntResource.Status.SelectedTenants = tntSet.List()
if updateErr := r.client.Status().Update(ctx, &tntResource); updateErr != nil {
log.Error(updateErr, "unable to update TenantResource status")
}
log.Info("processing completed")
return reconcile.Result{Requeue: true, RequeueAfter: tntResource.Spec.ResyncPeriod.Duration}, nil
}
func (r *Global) reconcileDelete(ctx context.Context, tntResource *capsulev1beta2.GlobalTenantResource) (reconcile.Result, error) {
log := ctrllog.FromContext(ctx)
if *tntResource.Spec.PruningOnDelete {
r.processor.HandlePruning(ctx, tntResource.Status.ProcessedItems.AsSet(), nil)
controllerutil.RemoveFinalizer(tntResource, finalizer)
}
log.Info("processing completed")

View File

@@ -7,12 +7,14 @@ import (
"context"
"github.com/hashicorp/go-multierror"
apierr "k8s.io/apimachinery/pkg/api/errors"
"github.com/pkg/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/util/retry"
"sigs.k8s.io/cluster-api/util/patch"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
@@ -21,25 +23,13 @@ import (
type Namespaced struct {
client client.Client
finalizer Processor
processor Processor
}
func (r *Namespaced) SetupWithManager(mgr ctrl.Manager) error {
unstructuredCachingClient, err := client.NewDelegatingClient(
client.NewDelegatingClientInput{
Client: mgr.GetClient(),
CacheReader: mgr.GetCache(),
CacheUnstructured: true,
},
)
if err != nil {
return err
}
r.client = mgr.GetClient()
r.finalizer = Processor{
client: r.client,
unstructuredClient: unstructuredCachingClient,
r.processor = Processor{
client: mgr.GetClient(),
}
return ctrl.NewControllerManagedBy(mgr).
@@ -47,37 +37,62 @@ func (r *Namespaced) SetupWithManager(mgr ctrl.Manager) error {
Complete(r)
}
//nolint:dupl
func (r *Namespaced) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
log := ctrllog.FromContext(ctx)
log.Info("start processing")
// Retrieving the TenantResource
tntResource := capsulev1beta2.TenantResource{}
if err := r.client.Get(ctx, request.NamespacedName, &tntResource); err != nil {
if apierr.IsNotFound(err) {
tntResource := &capsulev1beta2.TenantResource{}
if err := r.client.Get(ctx, request.NamespacedName, tntResource); err != nil {
if apierrors.IsNotFound(err) {
log.Info("Request object not found, could have been deleted after reconcile request")
return reconcile.Result{}, nil
}
log.Error(err, "cannot retrieve capsulev1beta2.TenantResource")
return reconcile.Result{}, err
}
patchHelper, err := patch.NewHelper(tntResource, r.client)
if err != nil {
return reconcile.Result{}, errors.Wrap(err, "failed to init patch helper")
}
defer func() {
if e := patchHelper.Patch(ctx, tntResource); e != nil {
if err == nil {
err = errors.Wrap(e, "failed to patch TenantResource")
}
}
}()
// Handle deleted TenantResource
if !tntResource.DeletionTimestamp.IsZero() {
return r.reconcileDelete(ctx, tntResource)
}
// Handle non-deleted TenantResource
return r.reconcileNormal(ctx, tntResource)
}
func (r *Namespaced) reconcileNormal(ctx context.Context, tntResource *capsulev1beta2.TenantResource) (reconcile.Result, error) {
log := ctrllog.FromContext(ctx)
if *tntResource.Spec.PruningOnDelete {
controllerutil.AddFinalizer(tntResource, finalizer)
}
// Adding the default value for the status
if tntResource.Status.ProcessedItems == nil {
tntResource.Status.ProcessedItems = make([]capsulev1beta2.ObjectReferenceStatus, 0, 0)
}
// Handling the finalizer section for the given TenantResource
enqueueBack, err := r.finalizer.HandleFinalizer(ctx, &tntResource, *tntResource.Spec.PruningOnDelete, tntResource.Status.ProcessedItems)
if err != nil || enqueueBack {
return reconcile.Result{}, err
}
// Retrieving the parent of the Global Resource:
// Retrieving the parent of the Tenant Resource:
// can be owned, or being deployed in one of its Namespace.
tl := &capsulev1beta2.TenantList{}
if err = r.client.List(ctx, tl, client.MatchingFieldsSelector{Selector: fields.OneTermEqualSelector(".status.namespaces", tntResource.GetNamespace())}); err != nil {
log.Error(err, "unable to detect the Global for the given TenantResource")
if err := r.client.List(ctx, tl, client.MatchingFieldsSelector{Selector: fields.OneTermEqualSelector(".status.namespaces", tntResource.GetNamespace())}); err != nil {
log.Error(err, "unable to detect the Tenant for the given TenantResource")
return reconcile.Result{}, err
}
@@ -88,7 +103,7 @@ func (r *Namespaced) Reconcile(ctx context.Context, request reconcile.Request) (
return reconcile.Result{}, nil
}
err = new(multierror.Error)
err := new(multierror.Error)
// A TenantResource is made of several Resource sections, each one with specific options:
// the Status can be updated only in case of no errors across all of them to guarantee a valid and coherent status.
processedItems := sets.NewString()
@@ -101,7 +116,7 @@ func (r *Namespaced) Reconcile(ctx context.Context, request reconcile.Request) (
}
for index, resource := range tntResource.Spec.Resources {
items, sectionErr := r.finalizer.HandleSection(ctx, tl.Items[0], false, tenantLabel, index, resource)
items, sectionErr := r.processor.HandleSection(ctx, tl.Items[0], false, tenantLabel, index, resource)
if sectionErr != nil {
// Upon a process error storing the last error occurred and continuing to iterate,
// avoid to block the whole processing.
@@ -111,30 +126,19 @@ func (r *Namespaced) Reconcile(ctx context.Context, request reconcile.Request) (
}
}
if err.(*multierror.Error).ErrorOrNil() != nil { //nolint:errorlint,forcetypeassert
if err.ErrorOrNil() != nil {
log.Error(err, "unable to replicate the requested resources")
return reconcile.Result{}, err
}
if r.finalizer.HandlePruning(ctx, tntResource.Status.ProcessedItems, processedItems) {
statusErr := retry.RetryOnConflict(retry.DefaultRetry, func() (err error) {
if err = r.client.Get(ctx, request.NamespacedName, &tntResource); err != nil {
return err
if r.processor.HandlePruning(ctx, tntResource.Status.ProcessedItems.AsSet(), processedItems) {
tntResource.Status.ProcessedItems = make([]capsulev1beta2.ObjectReferenceStatus, 0, len(processedItems))
for _, item := range processedItems.List() {
if or := (capsulev1beta2.ObjectReferenceStatus{}); or.ParseFromString(item) == nil {
tntResource.Status.ProcessedItems = append(tntResource.Status.ProcessedItems, or)
}
tntResource.Status.ProcessedItems = make([]capsulev1beta2.ObjectReferenceStatus, 0, len(processedItems))
for _, item := range processedItems.List() {
if or := (capsulev1beta2.ObjectReferenceStatus{}); or.ParseFromString(item) == nil {
tntResource.Status.ProcessedItems = append(tntResource.Status.ProcessedItems, or)
}
}
return r.client.Status().Update(ctx, &tntResource)
})
if statusErr != nil {
log.Error(statusErr, "unable to update TenantResource status")
}
}
@@ -142,3 +146,17 @@ func (r *Namespaced) Reconcile(ctx context.Context, request reconcile.Request) (
return reconcile.Result{Requeue: true, RequeueAfter: tntResource.Spec.ResyncPeriod.Duration}, nil
}
func (r *Namespaced) reconcileDelete(ctx context.Context, tntResource *capsulev1beta2.TenantResource) (reconcile.Result, error) {
log := ctrllog.FromContext(ctx)
if *tntResource.Spec.PruningOnDelete {
r.processor.HandlePruning(ctx, tntResource.Status.ProcessedItems.AsSet(), nil)
}
controllerutil.RemoveFinalizer(tntResource, finalizer)
log.Info("processing completed")
return reconcile.Result{Requeue: true, RequeueAfter: tntResource.Spec.ResyncPeriod.Duration}, nil
}

View File

@@ -4,7 +4,24 @@
package resources
import (
"context"
"fmt"
"github.com/hashicorp/go-multierror"
corev1 "k8s.io/api/core/v1"
apierr "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
capsulev1beta2 "github.com/clastix/capsule/api/v1beta2"
)
const (
@@ -12,6 +29,248 @@ const (
)
type Processor struct {
client client.Client
unstructuredClient client.Client
client client.Client
}
func (r *Processor) HandlePruning(ctx context.Context, current, desired sets.String) (updateStatus bool) {
log := ctrllog.FromContext(ctx)
diff := current.Difference(desired)
// We don't want to trigger a reconciliation of the Status every time,
// rather, only in case of a difference between the processed and the actual status.
// This can happen upon the first reconciliation, or a removal, or a change, of a resource.
updateStatus = diff.Len() > 0 || current.Len() != desired.Len()
if diff.Len() > 0 {
log.Info("starting processing pruning", "length", diff.Len())
}
// The outer resources must be removed, iterating over these to clean-up
for item := range diff {
or := capsulev1beta2.ObjectReferenceStatus{}
if err := or.ParseFromString(item); err != nil {
log.Error(err, "unable to parse resource to prune", "resource", item)
continue
}
obj := unstructured.Unstructured{}
obj.SetNamespace(or.Namespace)
obj.SetName(or.Name)
obj.SetGroupVersionKind(schema.FromAPIVersionAndKind(or.APIVersion, or.Kind))
if err := r.client.Delete(ctx, &obj); err != nil {
if apierr.IsNotFound(err) {
// Object may have been already deleted, we can ignore this error
continue
}
log.Error(err, "unable to prune resource", "resource", item)
continue
}
log.Info("resource has been pruned", "resource", item)
}
return updateStatus
}
func (r *Processor) HandleSection(ctx context.Context, tnt capsulev1beta2.Tenant, allowCrossNamespaceSelection bool, tenantLabel string, resourceIndex int, spec capsulev1beta2.ResourceSpec) ([]string, error) {
log := ctrllog.FromContext(ctx)
var err error
// Creating Namespace selector
var selector labels.Selector
if spec.NamespaceSelector != nil {
selector, err = metav1.LabelSelectorAsSelector(spec.NamespaceSelector)
if err != nil {
log.Error(err, "cannot create Namespace selector for Namespace filtering and resource replication", "index", resourceIndex)
return nil, err
}
} else {
selector = labels.NewSelector()
}
// Resources can be replicated only on Namespaces belonging to the same Global:
// preventing a boundary cross by enforcing the selection.
tntRequirement, err := labels.NewRequirement(tenantLabel, selection.Equals, []string{tnt.GetName()})
if err != nil {
log.Error(err, "unable to create requirement for Namespace filtering and resource replication", "index", resourceIndex)
return nil, err
}
selector = selector.Add(*tntRequirement)
// Selecting the targeted Namespace according to the TenantResource specification.
namespaces := corev1.NamespaceList{}
if err = r.client.List(ctx, &namespaces, client.MatchingLabelsSelector{Selector: selector}); err != nil {
log.Error(err, "cannot retrieve Namespaces for resource", "index", resourceIndex)
return nil, err
}
// Generating additional metadata
objAnnotations, objLabels := map[string]string{}, map[string]string{}
if spec.AdditionalMetadata != nil {
objAnnotations = spec.AdditionalMetadata.Annotations
objLabels = spec.AdditionalMetadata.Labels
}
objAnnotations[tenantLabel] = tnt.GetName()
objLabels["capsule.clastix.io/resources"] = fmt.Sprintf("%d", resourceIndex)
objLabels[tenantLabel] = tnt.GetName()
// processed will contain the sets of resources replicated, both for the raw and the Namespaced ones:
// these are required to perform a final pruning once the replication has been occurred.
processed := sets.NewString()
tntNamespaces := sets.NewString(tnt.Status.Namespaces...)
syncErr := new(multierror.Error)
for nsIndex, item := range spec.NamespacedItems {
keysAndValues := []interface{}{"index", nsIndex, "namespace", item.Namespace}
// A TenantResource is created by a TenantOwner, and potentially, they could point to a resource in a non-owned
// Namespace: this must be blocked by checking it this is the case.
if !allowCrossNamespaceSelection && !tntNamespaces.Has(item.Namespace) {
log.Info("skipping processing of namespacedItem, referring a Namespace that is not part of the given Global", keysAndValues...)
continue
}
// Namespaced Items are relying on selecting resources, rather than specifying a specific name:
// creating it to get used by the client List action.
itemSelector, selectorErr := metav1.LabelSelectorAsSelector(&item.Selector)
if err != nil {
log.Error(selectorErr, "cannot create Selector for namespacedItem", keysAndValues...)
continue
}
objs := unstructured.UnstructuredList{}
objs.SetGroupVersionKind(schema.FromAPIVersionAndKind(item.APIVersion, fmt.Sprintf("%sList", item.Kind)))
if clientErr := r.client.List(ctx, &objs, client.InNamespace(item.Namespace), client.MatchingLabelsSelector{Selector: itemSelector}); clientErr != nil {
log.Error(clientErr, "cannot retrieve object for namespacedItem", keysAndValues...)
syncErr = multierror.Append(syncErr, clientErr)
continue
}
multiErr := new(multierror.Group)
// Iterating over all the retrieved objects from the resource spec to get replicated in all the selected Namespaces:
// in case of error during the create or update function, this will be appended to the list of errors.
for _, o := range objs.Items {
obj := o
multiErr.Go(func() error {
nsItems, nsErr := r.createOrUpdate(ctx, &obj, objLabels, objAnnotations, namespaces)
if nsErr != nil {
log.Error(err, "unable to sync namespacedItems", keysAndValues...)
return nsErr
}
processed.Insert(nsItems...)
return nil
})
}
if objsErr := multiErr.Wait(); objsErr != nil {
syncErr = multierror.Append(syncErr, objsErr)
}
}
codecFactory := serializer.NewCodecFactory(r.client.Scheme())
for rawIndex, item := range spec.RawItems {
obj, keysAndValues := unstructured.Unstructured{}, []interface{}{"index", rawIndex}
if _, _, decodeErr := codecFactory.UniversalDeserializer().Decode(item.Raw, nil, &obj); decodeErr != nil {
log.Error(decodeErr, "unable to deserialize rawItem", keysAndValues...)
syncErr = multierror.Append(syncErr, decodeErr)
continue
}
syncedRaw, rawErr := r.createOrUpdate(ctx, &obj, objLabels, objAnnotations, namespaces)
if rawErr != nil {
log.Info("unable to sync rawItem", keysAndValues...)
// In case of error processing an item in one of any selected Namespaces, storing it to report it lately
// to the upper call to ensure a partial sync that will be fixed by a subsequent reconciliation.
syncErr = multierror.Append(syncErr, rawErr)
} else {
processed.Insert(syncedRaw...)
}
}
return processed.List(), syncErr.ErrorOrNil()
}
// createOrUpdate replicates the provided unstructured object to all the provided Namespaces:
// this function mimics the CreateOrUpdate, by retrieving the object to understand if it must be created or updated,
// along adding the additional metadata, if required.
func (r *Processor) createOrUpdate(ctx context.Context, obj *unstructured.Unstructured, labels map[string]string, annotations map[string]string, namespaces corev1.NamespaceList) ([]string, error) {
log := ctrllog.FromContext(ctx)
errGroup := new(multierror.Group)
var items []string
for _, item := range namespaces.Items {
ns := item.GetName()
errGroup.Go(func() (err error) {
actual, desired := obj.DeepCopy(), obj.DeepCopy()
// Using a deferred function to properly log the results, and adding the item to the processed set.
defer func() {
keysAndValues := []interface{}{"resource", fmt.Sprintf("%s/%s", ns, desired.GetName())}
if err != nil {
log.Error(err, "unable to replicate resource", keysAndValues...)
return
}
log.Info("resource has been replicated", keysAndValues...)
replicatedItem := &capsulev1beta2.ObjectReferenceStatus{
Name: obj.GetName(),
}
replicatedItem.Kind = obj.GetKind()
replicatedItem.Namespace = ns
replicatedItem.APIVersion = obj.GetAPIVersion()
items = append(items, replicatedItem.String())
}()
actual.SetNamespace(ns)
_, err = controllerutil.CreateOrUpdate(ctx, r.client, actual, func() error {
UID := actual.GetUID()
actual.SetUnstructuredContent(desired.Object)
actual.SetNamespace(ns)
actual.SetLabels(labels)
actual.SetAnnotations(annotations)
actual.SetResourceVersion("")
actual.SetUID(UID)
return nil
})
return
})
}
// Wait returns *multierror.Error that implements stdlib error:
// the nil check must be performed down here rather than at the caller level to avoid wrong casting.
if err := errGroup.Wait(); err != nil {
return items, err
}
return items, nil
}

View File

@@ -1,54 +0,0 @@
// Copyright 2020-2021 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package resources
import (
"context"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/client"
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
capsulev1beta2 "github.com/clastix/capsule/api/v1beta2"
)
func (r *Processor) HandleFinalizer(ctx context.Context, obj client.Object, shouldPrune bool, items []capsulev1beta2.ObjectReferenceStatus) (enqueueBack bool, err error) {
log := ctrllog.FromContext(ctx)
// If the object has been marked for deletion,
// we have to clean up the created resources before removing the finalizer.
if obj.GetDeletionTimestamp() != nil {
log.Info("pruning prior finalizer removal")
if shouldPrune {
_ = r.HandlePruning(ctx, items, nil)
}
obj.SetFinalizers(nil)
if err = r.client.Update(ctx, obj); err != nil {
log.Error(err, "cannot remove finalizer")
return true, err
}
return true, nil
}
// When the pruning for the given resource is enabled, a finalizer is required when the TenantResource is marked
// for deletion: this allows to perform a clean-up of all the underlying resources.
if shouldPrune && !sets.NewString(obj.GetFinalizers()...).Has(finalizer) {
obj.SetFinalizers(append(obj.GetFinalizers(), finalizer))
if err = r.client.Update(ctx, obj); err != nil {
log.Error(err, "cannot add finalizer")
return true, err
}
log.Info("added finalizer, enqueuing back for processing")
return true, nil
}
return false, nil
}

View File

@@ -1,67 +0,0 @@
// Copyright 2020-2021 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package resources
import (
"context"
apierr "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
capsulev1beta2 "github.com/clastix/capsule/api/v1beta2"
)
func (r *Processor) HandlePruning(ctx context.Context, current []capsulev1beta2.ObjectReferenceStatus, desired sets.String) (updateStatus bool) {
log := ctrllog.FromContext(ctx)
// The status items are the actual replicated resources, these must be collected in order to perform the resulting
// diff that will be cleaned-up.
status := sets.NewString()
for _, item := range current {
status.Insert(item.String())
}
diff := status.Difference(desired)
// We don't want to trigger a reconciliation of the Status every time,
// rather, only in case of a difference between the processed and the actual status.
// This can happen upon the first reconciliation, or a removal, or a change, of a resource.
updateStatus = diff.Len() > 0 || status.Len() != desired.Len()
if diff.Len() > 0 {
log.Info("starting processing pruning", "length", diff.Len())
}
// The outer resources must be removed, iterating over these to clean-up
for item := range diff {
or := capsulev1beta2.ObjectReferenceStatus{}
if err := or.ParseFromString(item); err != nil {
log.Error(err, "unable to parse resource to prune", "resource", item)
continue
}
obj := unstructured.Unstructured{}
obj.SetNamespace(or.Namespace)
obj.SetName(or.Name)
obj.SetGroupVersionKind(schema.FromAPIVersionAndKind(or.APIVersion, or.Kind))
if err := r.unstructuredClient.Delete(ctx, &obj); err != nil {
if apierr.IsNotFound(err) {
// Object may have been already deleted, we can ignore this error
continue
}
log.Error(err, "unable to prune resource", "resource", item)
continue
}
log.Info("resource has been pruned", "resource", item)
}
return updateStatus
}

View File

@@ -1,223 +0,0 @@
// Copyright 2020-2021 Clastix Labs
// SPDX-License-Identifier: Apache-2.0
package resources
import (
"context"
"fmt"
"github.com/hashicorp/go-multierror"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
ctrllog "sigs.k8s.io/controller-runtime/pkg/log"
capsulev1beta2 "github.com/clastix/capsule/api/v1beta2"
)
func (r *Processor) HandleSection(ctx context.Context, tnt capsulev1beta2.Tenant, allowCrossNamespaceSelection bool, tenantLabel string, resourceIndex int, spec capsulev1beta2.ResourceSpec) ([]string, error) {
log := ctrllog.FromContext(ctx)
var err error
// Creating Namespace selector
var selector labels.Selector
if spec.NamespaceSelector != nil {
selector, err = metav1.LabelSelectorAsSelector(spec.NamespaceSelector)
if err != nil {
log.Error(err, "cannot create Namespace selector for Namespace filtering and resource replication", "index", resourceIndex)
return nil, err
}
} else {
selector = labels.NewSelector()
}
// Resources can be replicated only on Namespaces belonging to the same Global:
// preventing a boundary cross by enforcing the selection.
tntRequirement, err := labels.NewRequirement(tenantLabel, selection.Equals, []string{tnt.GetName()})
if err != nil {
log.Error(err, "unable to create requirement for Namespace filtering and resource replication", "index", resourceIndex)
return nil, err
}
selector = selector.Add(*tntRequirement)
// Selecting the targeted Namespace according to the TenantResource specification.
namespaces := corev1.NamespaceList{}
if err = r.client.List(ctx, &namespaces, client.MatchingLabelsSelector{Selector: selector}); err != nil {
log.Error(err, "cannot retrieve Namespaces for resource", "index", resourceIndex)
return nil, err
}
// Generating additional metadata
objAnnotations, objLabels := map[string]string{}, map[string]string{}
if spec.AdditionalMetadata != nil {
objAnnotations = spec.AdditionalMetadata.Annotations
objLabels = spec.AdditionalMetadata.Labels
}
objAnnotations[tenantLabel] = tnt.GetName()
objLabels["capsule.clastix.io/resources"] = fmt.Sprintf("%d", resourceIndex)
objLabels[tenantLabel] = tnt.GetName()
// processed will contain the sets of resources replicated, both for the raw and the Namespaced ones:
// these are required to perform a final pruning once the replication has been occurred.
processed := sets.NewString()
tntNamespaces := sets.NewString(tnt.Status.Namespaces...)
syncErr := new(multierror.Error)
for nsIndex, item := range spec.NamespacedItems {
keysAndValues := []interface{}{"index", nsIndex, "namespace", item.Namespace}
// A TenantResource is created by a TenantOwner, and potentially, they could point to a resource in a non-owned
// Namespace: this must be blocked by checking it this is the case.
if !allowCrossNamespaceSelection && !tntNamespaces.Has(item.Namespace) {
log.Info("skipping processing of namespacedItem, referring a Namespace that is not part of the given Global", keysAndValues...)
continue
}
// Namespaced Items are relying on selecting resources, rather than specifying a specific name:
// creating it to get used by the client List action.
itemSelector, selectorErr := metav1.LabelSelectorAsSelector(&item.Selector)
if err != nil {
log.Error(selectorErr, "cannot create Selector for namespacedItem", keysAndValues...)
continue
}
objs := unstructured.UnstructuredList{}
objs.SetGroupVersionKind(schema.FromAPIVersionAndKind(item.APIVersion, fmt.Sprintf("%sList", item.Kind)))
if clientErr := r.unstructuredClient.List(ctx, &objs, client.InNamespace(item.Namespace), client.MatchingLabelsSelector{Selector: itemSelector}); clientErr != nil {
log.Error(clientErr, "cannot retrieve object for namespacedItem", keysAndValues...)
syncErr = multierror.Append(syncErr, clientErr)
continue
}
multiErr := new(multierror.Group)
// Iterating over all the retrieved objects from the resource spec to get replicated in all the selected Namespaces:
// in case of error during the create or update function, this will be appended to the list of errors.
for _, o := range objs.Items {
obj := o
multiErr.Go(func() error {
nsItems, nsErr := r.createOrUpdate(ctx, &obj, objLabels, objAnnotations, namespaces)
if nsErr != nil {
log.Error(err, "unable to sync namespacedItems", keysAndValues...)
return nsErr
}
processed.Insert(nsItems...)
return nil
})
}
if objsErr := multiErr.Wait(); objsErr != nil {
syncErr = multierror.Append(syncErr, objsErr)
}
}
codecFactory := serializer.NewCodecFactory(r.client.Scheme())
for rawIndex, item := range spec.RawItems {
obj, keysAndValues := unstructured.Unstructured{}, []interface{}{"index", rawIndex}
if _, _, decodeErr := codecFactory.UniversalDeserializer().Decode(item.Raw, nil, &obj); decodeErr != nil {
log.Error(decodeErr, "unable to deserialize rawItem", keysAndValues...)
syncErr = multierror.Append(syncErr, decodeErr)
continue
}
syncedRaw, rawErr := r.createOrUpdate(ctx, &obj, objLabels, objAnnotations, namespaces)
if rawErr != nil {
log.Info("unable to sync rawItem", keysAndValues...)
// In case of error processing an item in one of any selected Namespaces, storing it to report it lately
// to the upper call to ensure a partial sync that will be fixed by a subsequent reconciliation.
syncErr = multierror.Append(syncErr, rawErr)
} else {
processed.Insert(syncedRaw...)
}
}
return processed.List(), syncErr.ErrorOrNil()
}
// createOrUpdate replicates the provided unstructured object to all the provided Namespaces:
// this function mimics the CreateOrUpdate, by retrieving the object to understand if it must be created or updated,
// along adding the additional metadata, if required.
func (r *Processor) createOrUpdate(ctx context.Context, obj *unstructured.Unstructured, labels map[string]string, annotations map[string]string, namespaces corev1.NamespaceList) ([]string, error) {
log := ctrllog.FromContext(ctx)
errGroup := new(multierror.Group)
var items []string
for _, item := range namespaces.Items {
ns := item.GetName()
errGroup.Go(func() (err error) {
actual, desired := obj.DeepCopy(), obj.DeepCopy()
// Using a deferred function to properly log the results, and adding the item to the processed set.
defer func() {
keysAndValues := []interface{}{"resource", fmt.Sprintf("%s/%s", ns, desired.GetName())}
if err != nil {
log.Error(err, "unable to replicate resource", keysAndValues...)
return
}
log.Info("resource has been replicated", keysAndValues...)
replicatedItem := &capsulev1beta2.ObjectReferenceStatus{
Name: obj.GetName(),
}
replicatedItem.Kind = obj.GetKind()
replicatedItem.Namespace = ns
replicatedItem.APIVersion = obj.GetAPIVersion()
items = append(items, replicatedItem.String())
}()
actual.SetNamespace(ns)
_, err = controllerutil.CreateOrUpdate(ctx, r.unstructuredClient, actual, func() error {
UID := actual.GetUID()
actual.SetUnstructuredContent(desired.Object)
actual.SetNamespace(ns)
actual.SetLabels(labels)
actual.SetAnnotations(annotations)
actual.SetResourceVersion("")
actual.SetUID(UID)
return nil
})
return
})
}
// Wait returns *multierror.Error that implements stdlib error:
// the nil check must be performed down here rather than at the caller level to avoid wrong casting.
if err := errGroup.Wait(); err != nil {
return items, err
}
return items, nil
}