mirror of
https://github.com/open-cluster-management-io/ocm.git
synced 2026-05-16 14:18:42 +00:00
separate finalizer controller
This commit is contained in:
148
pkg/spoke/controllers/finalizercontroller/finalize_controller.go
Normal file
148
pkg/spoke/controllers/finalizercontroller/finalize_controller.go
Normal file
@@ -0,0 +1,148 @@
|
||||
package finalizercontroller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
workv1client "github.com/open-cluster-management/api/client/work/clientset/versioned/typed/work/v1"
|
||||
workinformer "github.com/open-cluster-management/api/client/work/informers/externalversions/work/v1"
|
||||
worklister "github.com/open-cluster-management/api/client/work/listers/work/v1"
|
||||
workapiv1 "github.com/open-cluster-management/api/work/v1"
|
||||
"github.com/openshift/library-go/pkg/controller/factory"
|
||||
"github.com/openshift/library-go/pkg/operator/events"
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/klog"
|
||||
)
|
||||
|
||||
// FinalizeController handles cleanup of manifestwork resources before deletion is allowed.
|
||||
type FinalizeController struct {
|
||||
manifestWorkClient workv1client.ManifestWorkInterface
|
||||
manifestWorkLister worklister.ManifestWorkNamespaceLister
|
||||
spokeDynamicClient dynamic.Interface
|
||||
}
|
||||
|
||||
func NewFinalizeController(
|
||||
recorder events.Recorder,
|
||||
spokeDynamicClient dynamic.Interface,
|
||||
manifestWorkClient workv1client.ManifestWorkInterface,
|
||||
manifestWorkInformer workinformer.ManifestWorkInformer,
|
||||
manifestWorkLister worklister.ManifestWorkNamespaceLister,
|
||||
) factory.Controller {
|
||||
|
||||
controller := &FinalizeController{
|
||||
manifestWorkClient: manifestWorkClient,
|
||||
manifestWorkLister: manifestWorkLister,
|
||||
spokeDynamicClient: spokeDynamicClient,
|
||||
}
|
||||
|
||||
return factory.New().
|
||||
WithInformersQueueKeyFunc(func(obj runtime.Object) string {
|
||||
accessor, _ := meta.Accessor(obj)
|
||||
return accessor.GetName()
|
||||
}, manifestWorkInformer.Informer()).
|
||||
WithSync(controller.sync).ToController("ManifestWorkFinalizer", recorder)
|
||||
}
|
||||
|
||||
func (m *FinalizeController) sync(ctx context.Context, controllerContext factory.SyncContext) error {
|
||||
manifestWorkName := controllerContext.QueueKey()
|
||||
klog.V(4).Infof("Reconciling ManifestWork %q", manifestWorkName)
|
||||
|
||||
manifestWork, err := m.manifestWorkLister.Get(manifestWorkName)
|
||||
if errors.IsNotFound(err) {
|
||||
// work not found, could have been deleted, do nothing.
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return m.syncManifestWork(ctx, manifestWork)
|
||||
}
|
||||
|
||||
// syncManifestWork ensures that when a manifestwork has been deleted, everything it created is also deleted before removing
|
||||
// the finalizer
|
||||
func (m *FinalizeController) syncManifestWork(ctx context.Context, originalManifestWork *workapiv1.ManifestWork) error {
|
||||
manifestWork := originalManifestWork.DeepCopy()
|
||||
|
||||
// no work to do until we're deleted
|
||||
if manifestWork.DeletionTimestamp.IsZero() {
|
||||
return nil
|
||||
}
|
||||
|
||||
// don't do work if the finalizer is not present
|
||||
found := false
|
||||
for i := range manifestWork.Finalizers {
|
||||
if manifestWork.Finalizers[i] == manifestWorkFinalizer {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Work is deleting, we remove its related resources on spoke cluster
|
||||
if errs := m.cleanupResourceOfWork(manifestWork); len(errs) != 0 {
|
||||
return utilerrors.NewAggregate(errs)
|
||||
}
|
||||
|
||||
// We consider the case of deletion of created resources, with finalization still to come, as sufficient to remove the finalizer
|
||||
// We do this because resources cannot be un-deleted. This means that deletion is inevitable.
|
||||
// Also, since we don't track UIDs, we have no reliable way of know when "this" particular resource has been removed as
|
||||
// compared with a case where this controller deletes it and another controller (or manifestwork) creates it.
|
||||
|
||||
removeFinalizer(manifestWork, manifestWorkFinalizer)
|
||||
_, err := m.manifestWorkClient.Update(ctx, manifestWork, metav1.UpdateOptions{})
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *FinalizeController) cleanupResourceOfWork(work *workapiv1.ManifestWork) []error {
|
||||
klog.V(4).Infof("cleaning up %q", work.Name)
|
||||
|
||||
errs := []error{}
|
||||
|
||||
// TODO this can later be based on a list of all resources created by this manifest work
|
||||
// not just the resources currently managed. This overlaps with the need to remove resources we have created
|
||||
// separate from the application of current resources.
|
||||
for _, resourceStatus := range work.Status.ResourceStatus.Manifests {
|
||||
gvr := schema.GroupVersionResource{Group: resourceStatus.ResourceMeta.Group, Version: resourceStatus.ResourceMeta.Version, Resource: resourceStatus.ResourceMeta.Resource}
|
||||
if len(gvr.Resource) == 0 || len(gvr.Version) == 0 || len(resourceStatus.ResourceMeta.Name) == 0 {
|
||||
// without a resource or version, the request cannot be constructed, so we must not have created this either
|
||||
continue
|
||||
}
|
||||
|
||||
err := m.spokeDynamicClient.
|
||||
Resource(gvr).
|
||||
Namespace(resourceStatus.ResourceMeta.Namespace).
|
||||
Delete(context.TODO(), resourceStatus.ResourceMeta.Name, metav1.DeleteOptions{})
|
||||
switch {
|
||||
case errors.IsNotFound(err):
|
||||
// no-oop
|
||||
case err != nil:
|
||||
errs = append(errs, fmt.Errorf(
|
||||
"Failed to delete resource %v with key %s/%s: %w",
|
||||
gvr, resourceStatus.ResourceMeta.Namespace, resourceStatus.ResourceMeta.Name, err))
|
||||
continue
|
||||
}
|
||||
klog.Infof("Successfully delete resource %v with key %s/%s", gvr, resourceStatus.ResourceMeta.Namespace, resourceStatus.ResourceMeta.Name)
|
||||
}
|
||||
|
||||
return errs
|
||||
}
|
||||
|
||||
// removeFinalizer removes a finalizer from the list. It mutates its input.
|
||||
func removeFinalizer(manifestWork *workapiv1.ManifestWork, finalizerName string) {
|
||||
newFinalizers := []string{}
|
||||
for i := range manifestWork.Finalizers {
|
||||
if manifestWork.Finalizers[i] == finalizerName {
|
||||
continue
|
||||
}
|
||||
newFinalizers = append(newFinalizers, manifestWork.Finalizers[i])
|
||||
}
|
||||
manifestWork.Finalizers = newFinalizers
|
||||
}
|
||||
@@ -0,0 +1,134 @@
|
||||
package finalizercontroller
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
fakeworkclient "github.com/open-cluster-management/api/client/work/clientset/versioned/fake"
|
||||
workapiv1 "github.com/open-cluster-management/api/work/v1"
|
||||
"github.com/open-cluster-management/work/pkg/spoke/spoketesting"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
fakedynamic "k8s.io/client-go/dynamic/fake"
|
||||
clienttesting "k8s.io/client-go/testing"
|
||||
)
|
||||
|
||||
func TestFinalize(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
existingFinalizers []string
|
||||
resourcesToRemove []workapiv1.ManifestResourceMeta
|
||||
terminated bool
|
||||
|
||||
validateManifestWorkActions func(t *testing.T, actions []clienttesting.Action)
|
||||
validateDynamicActions func(t *testing.T, actions []clienttesting.Action)
|
||||
}{
|
||||
{
|
||||
name: "skip when not delete",
|
||||
existingFinalizers: []string{manifestWorkFinalizer},
|
||||
validateManifestWorkActions: func(t *testing.T, actions []clienttesting.Action) {
|
||||
if len(actions) > 0 {
|
||||
t.Fatal(spew.Sdump(actions))
|
||||
}
|
||||
},
|
||||
validateDynamicActions: func(t *testing.T, actions []clienttesting.Action) {
|
||||
if len(actions) > 0 {
|
||||
t.Fatal(spew.Sdump(actions))
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "skip when finalizer gone",
|
||||
terminated: true,
|
||||
existingFinalizers: []string{"other-finalizer"},
|
||||
validateManifestWorkActions: func(t *testing.T, actions []clienttesting.Action) {
|
||||
if len(actions) > 0 {
|
||||
t.Fatal(spew.Sdump(actions))
|
||||
}
|
||||
},
|
||||
validateDynamicActions: func(t *testing.T, actions []clienttesting.Action) {
|
||||
if len(actions) > 0 {
|
||||
t.Fatal(spew.Sdump(actions))
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "delete resources",
|
||||
terminated: true,
|
||||
existingFinalizers: []string{"a", manifestWorkFinalizer, "b"},
|
||||
resourcesToRemove: []workapiv1.ManifestResourceMeta{
|
||||
{Group: "g1", Version: "v1", Resource: "r1", Namespace: "", Name: "n1"},
|
||||
{Group: "g2", Version: "v2", Resource: "r2", Namespace: "ns2", Name: "n2"},
|
||||
{Group: "g3", Version: "v3", Resource: "r3", Namespace: "ns3", Name: "n3"},
|
||||
{Group: "g4", Version: "v4", Resource: "r4", Namespace: "", Name: "n4"},
|
||||
},
|
||||
validateManifestWorkActions: func(t *testing.T, actions []clienttesting.Action) {
|
||||
if len(actions) != 1 {
|
||||
t.Fatal(spew.Sdump(actions))
|
||||
}
|
||||
work := actions[0].(clienttesting.UpdateAction).GetObject().(*workapiv1.ManifestWork)
|
||||
if !reflect.DeepEqual(work.Finalizers, []string{"a", "b"}) {
|
||||
t.Fatal(spew.Sdump(actions))
|
||||
}
|
||||
},
|
||||
validateDynamicActions: func(t *testing.T, actions []clienttesting.Action) {
|
||||
if len(actions) != 4 {
|
||||
t.Fatal(spew.Sdump(actions))
|
||||
}
|
||||
|
||||
action := actions[0].(clienttesting.DeleteAction)
|
||||
resource, namespace, name := action.GetResource(), action.GetNamespace(), action.GetName()
|
||||
if !reflect.DeepEqual(resource, schema.GroupVersionResource{Group: "g1", Version: "v1", Resource: "r1"}) || namespace != "" || name != "n1" {
|
||||
t.Fatal(spew.Sdump(actions))
|
||||
}
|
||||
action = actions[1].(clienttesting.DeleteAction)
|
||||
resource, namespace, name = action.GetResource(), action.GetNamespace(), action.GetName()
|
||||
if !reflect.DeepEqual(resource, schema.GroupVersionResource{Group: "g2", Version: "v2", Resource: "r2"}) || namespace != "ns2" || name != "n2" {
|
||||
t.Fatal(spew.Sdump(actions))
|
||||
}
|
||||
action = actions[2].(clienttesting.DeleteAction)
|
||||
resource, namespace, name = action.GetResource(), action.GetNamespace(), action.GetName()
|
||||
if !reflect.DeepEqual(resource, schema.GroupVersionResource{Group: "g3", Version: "v3", Resource: "r3"}) || namespace != "ns3" || name != "n3" {
|
||||
t.Fatal(spew.Sdump(actions))
|
||||
}
|
||||
action = actions[3].(clienttesting.DeleteAction)
|
||||
resource, namespace, name = action.GetResource(), action.GetNamespace(), action.GetName()
|
||||
if !reflect.DeepEqual(resource, schema.GroupVersionResource{Group: "g4", Version: "v4", Resource: "r4"}) || namespace != "" || name != "n4" {
|
||||
t.Fatal(spew.Sdump(actions))
|
||||
}
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
testingWork, _ := spoketesting.NewManifestWork(0)
|
||||
testingWork.Finalizers = c.existingFinalizers
|
||||
if c.terminated {
|
||||
now := metav1.Now()
|
||||
testingWork.DeletionTimestamp = &now
|
||||
}
|
||||
for _, curr := range c.resourcesToRemove {
|
||||
testingWork.Status.ResourceStatus.Manifests = append(testingWork.Status.ResourceStatus.Manifests, workapiv1.ManifestCondition{ResourceMeta: curr})
|
||||
}
|
||||
|
||||
fakeDynamicClient := fakedynamic.NewSimpleDynamicClient(runtime.NewScheme())
|
||||
fakeClient := fakeworkclient.NewSimpleClientset(testingWork)
|
||||
controller := FinalizeController{
|
||||
manifestWorkClient: fakeClient.WorkV1().ManifestWorks(testingWork.Namespace),
|
||||
spokeDynamicClient: fakeDynamicClient,
|
||||
}
|
||||
|
||||
err := controller.syncManifestWork(context.TODO(), testingWork)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
c.validateManifestWorkActions(t, fakeClient.Actions())
|
||||
c.validateDynamicActions(t, fakeDynamicClient.Actions())
|
||||
})
|
||||
}
|
||||
|
||||
}
|
||||
@@ -103,16 +103,6 @@ func (m *ManifestWorkController) sync(ctx context.Context, controllerContext fac
|
||||
}
|
||||
manifestWork = manifestWork.DeepCopy()
|
||||
|
||||
// Work is deleting, we remove its related resources on spoke cluster
|
||||
// TODO: once we make this work initially, the finalizer would live in a different loop.
|
||||
// It will have different backoff considerations.
|
||||
if !manifestWork.DeletionTimestamp.IsZero() {
|
||||
if errs := m.cleanupResourceOfWork(manifestWork); len(errs) != 0 {
|
||||
return utilerrors.NewAggregate(errs)
|
||||
}
|
||||
return m.removeWorkFinalizer(ctx, manifestWork)
|
||||
}
|
||||
|
||||
errs := []error{}
|
||||
// Apply resources on spoke cluster.
|
||||
resourceResults := m.applyManifest(manifestWork.Spec.Workload.Manifests, controllerContext.Recorder())
|
||||
@@ -155,53 +145,6 @@ func (m *ManifestWorkController) sync(ctx context.Context, controllerContext fac
|
||||
return err
|
||||
}
|
||||
|
||||
func (m *ManifestWorkController) cleanupResourceOfWork(work *workapiv1.ManifestWork) []error {
|
||||
errs := []error{}
|
||||
for _, manifest := range work.Spec.Workload.Manifests {
|
||||
gvr, object, err := m.decodeUnstructured(manifest.Raw)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to decode object: %v", err)
|
||||
errs = append(errs, err)
|
||||
continue
|
||||
}
|
||||
|
||||
err = m.spokeDynamicClient.
|
||||
Resource(gvr).
|
||||
Namespace(object.GetNamespace()).
|
||||
Delete(context.TODO(), object.GetName(), metav1.DeleteOptions{})
|
||||
switch {
|
||||
case errors.IsNotFound(err):
|
||||
// no-oop
|
||||
case err != nil:
|
||||
errs = append(errs, fmt.Errorf(
|
||||
"Failed to delete resource %v with key %s/%s: %w",
|
||||
gvr, object.GetNamespace(), object.GetName(), err))
|
||||
continue
|
||||
}
|
||||
klog.V(4).Infof("Successfully delete resource %v with key %s/%s", gvr, object.GetNamespace(), object.GetName())
|
||||
}
|
||||
|
||||
return errs
|
||||
}
|
||||
|
||||
func (m *ManifestWorkController) removeWorkFinalizer(ctx context.Context, manifestWork *workapiv1.ManifestWork) error {
|
||||
copiedFinalizers := []string{}
|
||||
for i := range manifestWork.Finalizers {
|
||||
if manifestWork.Finalizers[i] == manifestWorkFinalizer {
|
||||
continue
|
||||
}
|
||||
copiedFinalizers = append(copiedFinalizers, manifestWork.Finalizers[i])
|
||||
}
|
||||
|
||||
if len(manifestWork.Finalizers) != len(copiedFinalizers) {
|
||||
manifestWork.Finalizers = copiedFinalizers
|
||||
_, err := m.manifestWorkClient.Update(ctx, manifestWork, metav1.UpdateOptions{})
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *ManifestWorkController) applyManifest(manifests []workapiv1.Manifest, recorder events.Recorder) []resourceapply.ApplyResult {
|
||||
clientHolder := resourceapply.NewClientHolder().
|
||||
WithAPIExtensionsClient(m.spokeAPIExtensionClient).
|
||||
|
||||
@@ -298,39 +298,6 @@ func TestSync(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestDeleteWork tests the action when work is deleted
|
||||
func TestDeleteWork(t *testing.T) {
|
||||
tc := newTestCase("delete multiple resources").
|
||||
withWorkManifest(spoketesting.NewUnstructured("v1", "Secret", "ns1", "test"), spoketesting.NewUnstructured("v1", "Secret", "ns2", "test")).
|
||||
withSpokeObject(spoketesting.NewSecret("test", "ns1", "value2")).
|
||||
withExpectedWorkAction("update").
|
||||
withExpectedDynamicAction("delete", "delete")
|
||||
|
||||
work, workKey := spoketesting.NewManifestWork(0, tc.workManifest...)
|
||||
work.Finalizers = []string{manifestWorkFinalizer}
|
||||
now := metav1.Now()
|
||||
work.ObjectMeta.SetDeletionTimestamp(&now)
|
||||
controller := newController(work, spoketesting.NewFakeRestMapper()).withKubeObject(tc.spokeObject...).withUnstructuredObject()
|
||||
syncContext := spoketesting.NewFakeSyncContext(t, workKey)
|
||||
err := controller.controller.sync(nil, syncContext)
|
||||
if err != nil {
|
||||
t.Errorf("Should be success with no err: %v", err)
|
||||
}
|
||||
|
||||
tc.validate(t, controller.dynamicClient, controller.workClient, controller.kubeClient)
|
||||
|
||||
// Verify that finalizer is removed
|
||||
workActions := controller.workClient.Actions()
|
||||
actual, ok := workActions[len(workActions)-1].(clienttesting.UpdateActionImpl)
|
||||
if !ok {
|
||||
t.Errorf("Expected to get update action")
|
||||
}
|
||||
actualWork := actual.Object.(*workapiv1.ManifestWork)
|
||||
if len(actualWork.Finalizers) != 0 {
|
||||
t.Errorf("Expected 0 finailizer but got %#v", actualWork.Finalizers)
|
||||
}
|
||||
}
|
||||
|
||||
// Test applying resource failed
|
||||
func TestFailedToApplyResource(t *testing.T) {
|
||||
tc := newTestCase("multiple create&update resource").
|
||||
|
||||
@@ -96,9 +96,17 @@ func (o *WorkloadAgentOptions) RunWorkloadAgent(ctx context.Context, controllerC
|
||||
workInformerFactory.Work().V1().ManifestWorks(),
|
||||
workInformerFactory.Work().V1().ManifestWorks().Lister().ManifestWorks(o.SpokeClusterName),
|
||||
)
|
||||
finalizeController := finalizercontroller.NewFinalizeController(
|
||||
controllerContext.EventRecorder,
|
||||
spokeDynamicClient,
|
||||
hubWorkClient.WorkV1().ManifestWorks(o.SpokeClusterName),
|
||||
workInformerFactory.Work().V1().ManifestWorks(),
|
||||
workInformerFactory.Work().V1().ManifestWorks().Lister().ManifestWorks(o.SpokeClusterName),
|
||||
)
|
||||
|
||||
go workInformerFactory.Start(ctx.Done())
|
||||
go addFinalizerController.Run(ctx, 1)
|
||||
go finalizeController.Run(ctx, 1)
|
||||
go manifestWorkController.Run(ctx, 1)
|
||||
<-ctx.Done()
|
||||
return nil
|
||||
|
||||
Reference in New Issue
Block a user