Use patcher in work (#190)

Signed-off-by: Jian Qiu <jqiu@redhat.com>
This commit is contained in:
Jian Qiu
2023-06-21 11:38:59 +08:00
committed by GitHub
parent 6c4dbb90fa
commit e344e26a5e
28 changed files with 587 additions and 821 deletions

View File

@@ -10,7 +10,7 @@ rules:
# Allow work agent to get/list/watch/update manifestworks
- apiGroups: ["work.open-cluster-management.io"]
resources: ["manifestworks"]
verbs: ["get", "list", "watch", "update"]
verbs: ["get", "list", "watch", "update", "patch"]
# Allow work agent to update the status of manifestwork
- apiGroups: ["work.open-cluster-management.io"]
resources: ["manifestworks/status"]

View File

@@ -6,13 +6,11 @@ import (
"fmt"
"reflect"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/openshift/library-go/pkg/operator/events/eventstesting"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
@@ -20,7 +18,6 @@ import (
fakedynamic "k8s.io/client-go/dynamic/fake"
clienttesting "k8s.io/client-go/testing"
fakeworkclient "open-cluster-management.io/api/client/work/clientset/versioned/fake"
workapiv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/ocm/pkg/work/spoke/spoketesting"
@@ -39,13 +36,6 @@ func newCondition(name, status, reason, message string, lastTransition *metav1.T
return ret
}
func updateSpokeClusterConditionFn(cond metav1.Condition) UpdateManifestWorkStatusFunc {
return func(oldStatus *workapiv1.ManifestWorkStatus) error {
meta.SetStatusCondition(&oldStatus.Conditions, cond)
return nil
}
}
func newManifestCondition(ordinal int32, resource string, conds ...metav1.Condition) workapiv1.ManifestCondition {
return workapiv1.ManifestCondition{
ResourceMeta: workapiv1.ManifestResourceMeta{Ordinal: ordinal, Resource: resource},
@@ -77,102 +67,6 @@ func newSecret(namespace, name string, terminated bool, uid string, owner ...met
return secret
}
// TestUpdateStatusCondition tests UpdateManifestWorkStatus function
func TestUpdateStatusCondition(t *testing.T) {
nowish := metav1.Now()
beforeish := metav1.Time{Time: nowish.Add(-10 * time.Second)}
afterish := metav1.Time{Time: nowish.Add(10 * time.Second)}
cases := []struct {
name string
startingConditions []metav1.Condition
newCondition metav1.Condition
expectedUpdated bool
expectedConditions []metav1.Condition
}{
{
name: "add to empty",
startingConditions: []metav1.Condition{},
newCondition: newCondition("test", "True", "my-reason", "my-message", nil),
expectedUpdated: true,
expectedConditions: []metav1.Condition{newCondition("test", "True", "my-reason", "my-message", nil)},
},
{
name: "add to non-conflicting",
startingConditions: []metav1.Condition{
newCondition("two", "True", "my-reason", "my-message", nil),
},
newCondition: newCondition("one", "True", "my-reason", "my-message", nil),
expectedUpdated: true,
expectedConditions: []metav1.Condition{
newCondition("two", "True", "my-reason", "my-message", nil),
newCondition("one", "True", "my-reason", "my-message", nil),
},
},
{
name: "change existing status",
startingConditions: []metav1.Condition{
newCondition("two", "True", "my-reason", "my-message", nil),
newCondition("one", "True", "my-reason", "my-message", nil),
},
newCondition: newCondition("one", "False", "my-different-reason", "my-othermessage", nil),
expectedUpdated: true,
expectedConditions: []metav1.Condition{
newCondition("two", "True", "my-reason", "my-message", nil),
newCondition("one", "False", "my-different-reason", "my-othermessage", nil),
},
},
{
name: "leave existing transition time",
startingConditions: []metav1.Condition{
newCondition("two", "True", "my-reason", "my-message", nil),
newCondition("one", "True", "my-reason", "my-message", &beforeish),
},
newCondition: newCondition("one", "True", "my-reason", "my-message", &afterish),
expectedUpdated: false,
expectedConditions: []metav1.Condition{
newCondition("two", "True", "my-reason", "my-message", nil),
newCondition("one", "True", "my-reason", "my-message", &beforeish),
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
manifestWork := &workapiv1.ManifestWork{
ObjectMeta: metav1.ObjectMeta{Name: "work1", Namespace: "cluster1"},
Status: workapiv1.ManifestWorkStatus{
Conditions: c.startingConditions,
},
}
fakeWorkClient := fakeworkclient.NewSimpleClientset(manifestWork)
status, updated, err := UpdateManifestWorkStatus(
context.TODO(),
fakeWorkClient.WorkV1().ManifestWorks("cluster1"),
manifestWork,
updateSpokeClusterConditionFn(c.newCondition),
)
if err != nil {
t.Errorf("unexpected err: %v", err)
}
if updated != c.expectedUpdated {
t.Errorf("expected %t, but %t", c.expectedUpdated, updated)
}
for i := range c.expectedConditions {
expected := c.expectedConditions[i]
actual := status.Conditions[i]
if expected.LastTransitionTime == (metav1.Time{}) {
actual.LastTransitionTime = metav1.Time{}
}
if !equality.Semantic.DeepEqual(expected, actual) {
t.Errorf(cmp.Diff(expected, actual))
}
}
})
}
}
// TestSetManifestCondition tests SetManifestCondition function
func TestMergeManifestConditions(t *testing.T) {
transitionTime := metav1.Now()
@@ -456,45 +350,6 @@ func TestDeleteAppliedResourcess(t *testing.T) {
}
}
func TestRemoveFinalizer(t *testing.T) {
cases := []struct {
name string
obj runtime.Object
finalizerToRemove string
expectedFinalizers []string
}{
{
name: "No finalizers in object",
obj: &workapiv1.ManifestWork{},
finalizerToRemove: "a",
expectedFinalizers: []string{},
},
{
name: "remove finalizer",
obj: &workapiv1.ManifestWork{ObjectMeta: metav1.ObjectMeta{Finalizers: []string{"a"}}},
finalizerToRemove: "a",
expectedFinalizers: []string{},
},
{
name: "multiple finalizers",
obj: &workapiv1.ManifestWork{ObjectMeta: metav1.ObjectMeta{Finalizers: []string{"b", "a", "c"}}},
finalizerToRemove: "a",
expectedFinalizers: []string{"b", "c"},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
RemoveFinalizer(c.obj, c.finalizerToRemove)
accessor, _ := meta.Accessor(c.obj)
finalizers := accessor.GetFinalizers()
if !equality.Semantic.DeepEqual(finalizers, c.expectedFinalizers) {
t.Errorf("Expected finalizers are same, but got %v", finalizers)
}
})
}
}
func TestHubHash(t *testing.T) {
cases := []struct {
name string

View File

@@ -15,7 +15,6 @@ import (
"github.com/openshift/library-go/pkg/operator/resource/resourcemerge"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -26,11 +25,9 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
clusterlister "open-cluster-management.io/api/client/cluster/listers/cluster/v1beta1"
workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1"
clusterv1beta1 "open-cluster-management.io/api/cluster/v1beta1"
workapiv1 "open-cluster-management.io/api/work/v1"
)
@@ -141,59 +138,6 @@ func MergeStatusConditions(conditions []metav1.Condition, newConditions []metav1
return merged
}
type UpdateManifestWorkStatusFunc func(status *workapiv1.ManifestWorkStatus) error
func UpdateManifestWorkStatus(
ctx context.Context,
client workv1client.ManifestWorkInterface,
manifestWork *workapiv1.ManifestWork,
updateFuncs ...UpdateManifestWorkStatusFunc) (*workapiv1.ManifestWorkStatus, bool, error) {
// in order to reduce the number of GET requests to hub apiserver, try to update the manifestwork
// fetched from informer cache (with lister).
updatedWorkStatus, updated, err := updateManifestWorkStatus(ctx, client, manifestWork, updateFuncs...)
if err == nil {
return updatedWorkStatus, updated, nil
}
// if the update failed, retry with the manifestwork resource fetched with work client.
err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
manifestWork, err := client.Get(ctx, manifestWork.Name, metav1.GetOptions{})
if err != nil {
return err
}
updatedWorkStatus, updated, err = updateManifestWorkStatus(ctx, client, manifestWork, updateFuncs...)
return err
})
return updatedWorkStatus, updated, err
}
// updateManifestWorkStatus updates the status of the given manifestWork. The manifestWork is mutated.
func updateManifestWorkStatus(
ctx context.Context,
client workv1client.ManifestWorkInterface,
manifestWork *workapiv1.ManifestWork,
updateFuncs ...UpdateManifestWorkStatusFunc) (*workapiv1.ManifestWorkStatus, bool, error) {
oldStatus := &manifestWork.Status
newStatus := oldStatus.DeepCopy()
for _, update := range updateFuncs {
if err := update(newStatus); err != nil {
return nil, false, err
}
}
if equality.Semantic.DeepEqual(oldStatus, newStatus) {
// We return the newStatus which is a deep copy of oldStatus but with all update funcs applied.
return newStatus, false, nil
}
manifestWork.Status = *newStatus
updatedManifestWork, err := client.UpdateStatus(ctx, manifestWork, metav1.UpdateOptions{})
if err != nil {
return nil, false, err
}
return &updatedManifestWork.Status, true, nil
}
// DeleteAppliedResources deletes all given applied resources and returns those pending for finalization
// If the uid recorded in resources is different from what we get by client, ignore the deletion.
func DeleteAppliedResources(
@@ -327,27 +271,6 @@ func GuessObjectGroupVersionKind(object runtime.Object) (*schema.GroupVersionKin
return nil, fmt.Errorf("cannot get gvk of %v", object)
}
// RemoveFinalizer removes a finalizer from the list. It mutates its input.
func RemoveFinalizer(object runtime.Object, finalizerName string) (finalizersUpdated bool) {
accessor, err := meta.Accessor(object)
if err != nil {
return false
}
finalizers := accessor.GetFinalizers()
newFinalizers := []string{}
for i := range finalizers {
if finalizers[i] == finalizerName {
finalizersUpdated = true
continue
}
newFinalizers = append(newFinalizers, finalizers[i])
}
accessor.SetFinalizers(newFinalizers)
return finalizersUpdated
}
func HasFinalizer(finalizers []string, finalizer string) bool {
found := false
for i := range finalizers {

View File

@@ -3,10 +3,10 @@ package manifestworkreplicasetcontroller
import (
"context"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
workclientset "open-cluster-management.io/api/client/work/clientset/versioned"
workapiv1alpha1 "open-cluster-management.io/api/work/v1alpha1"
"open-cluster-management.io/ocm/pkg/common/patcher"
)
// addFinalizerReconciler is to add finalizer to the manifestworkreplicaset.
@@ -21,14 +21,15 @@ func (a *addFinalizerReconciler) reconcile(ctx context.Context, pw *workapiv1alp
return pw, reconcileStop, nil
}
// don't add finalizer to instances that already have it
for i := range pw.Finalizers {
if pw.Finalizers[i] == ManifestWorkReplicaSetFinalizer {
return pw, reconcileContinue, nil
}
}
workSetPatcher := patcher.NewPatcher[
*workapiv1alpha1.ManifestWorkReplicaSet, workapiv1alpha1.ManifestWorkReplicaSetSpec, workapiv1alpha1.ManifestWorkReplicaSetStatus](
a.workClient.WorkV1alpha1().ManifestWorkReplicaSets(pw.Namespace))
updated, err := workSetPatcher.AddFinalizer(ctx, pw, ManifestWorkReplicaSetFinalizer)
// if this conflicts, we'll simply try again later
pw.Finalizers = append(pw.Finalizers, ManifestWorkReplicaSetFinalizer)
_, err := a.workClient.WorkV1alpha1().ManifestWorkReplicaSets(pw.Namespace).Update(ctx, pw, metav1.UpdateOptions{})
return pw, reconcileStop, err
if updated {
return pw, reconcileStop, err
}
return pw, reconcileContinue, err
}

View File

@@ -21,12 +21,17 @@ func TestAddFinalizerReconcile(t *testing.T) {
workClient: fakeClient,
}
mwrSetTest, _, err := addFinalizerController.reconcile(context.TODO(), mwrSetTest)
_, _, err := addFinalizerController.reconcile(context.TODO(), mwrSetTest)
if err != nil {
t.Fatal(err)
}
if !slices.Contains(mwrSetTest.Finalizers, ManifestWorkReplicaSetFinalizer) {
updatetSet, err := fakeClient.WorkV1alpha1().ManifestWorkReplicaSets(mwrSetTest.Namespace).Get(context.TODO(), mwrSetTest.Name, metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}
if !slices.Contains(updatetSet.Finalizers, ManifestWorkReplicaSetFinalizer) {
t.Fatal("Finalizer did not added")
}

View File

@@ -2,21 +2,16 @@ package manifestworkreplicasetcontroller
import (
"context"
"encoding/json"
"fmt"
"strings"
jsonpatch "github.com/evanphx/json-patch"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
apiequality "k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
@@ -31,6 +26,8 @@ import (
"open-cluster-management.io/api/utils/work/v1/workapplier"
workapiv1 "open-cluster-management.io/api/work/v1"
workapiv1alpha1 "open-cluster-management.io/api/work/v1alpha1"
"open-cluster-management.io/ocm/pkg/common/patcher"
)
const (
@@ -73,20 +70,8 @@ func NewManifestWorkReplicaSetController(
placementInformer clusterinformerv1beta1.PlacementInformer,
placeDecisionInformer clusterinformerv1beta1.PlacementDecisionInformer) factory.Controller {
controller := &ManifestWorkReplicaSetController{
workClient: workClient,
manifestWorkReplicaSetLister: manifestWorkReplicaSetInformer.Lister(),
manifestWorkReplicaSetIndexer: manifestWorkReplicaSetInformer.Informer().GetIndexer(),
reconcilers: []ManifestWorkReplicaSetReconcile{
&finalizeReconciler{workApplier: workapplier.NewWorkApplierWithTypedClient(workClient, manifestWorkInformer.Lister()),
workClient: workClient, manifestWorkLister: manifestWorkInformer.Lister()},
&addFinalizerReconciler{workClient: workClient},
&deployReconciler{workApplier: workapplier.NewWorkApplierWithTypedClient(workClient, manifestWorkInformer.Lister()),
manifestWorkLister: manifestWorkInformer.Lister(), placementLister: placementInformer.Lister(), placeDecisionLister: placeDecisionInformer.Lister()},
&statusReconciler{manifestWorkLister: manifestWorkInformer.Lister()},
},
}
controller := newController(
workClient, manifestWorkReplicaSetInformer, manifestWorkInformer, placementInformer, placeDecisionInformer)
err := manifestWorkReplicaSetInformer.Informer().AddIndexers(
cache.Indexers{
@@ -131,6 +116,27 @@ func NewManifestWorkReplicaSetController(
WithSync(controller.sync).ToController("ManifestWorkReplicaSetController", recorder)
}
func newController(workClient workclientset.Interface,
manifestWorkReplicaSetInformer workinformerv1alpha1.ManifestWorkReplicaSetInformer,
manifestWorkInformer workinformerv1.ManifestWorkInformer,
placementInformer clusterinformerv1beta1.PlacementInformer,
placeDecisionInformer clusterinformerv1beta1.PlacementDecisionInformer) *ManifestWorkReplicaSetController {
return &ManifestWorkReplicaSetController{
workClient: workClient,
manifestWorkReplicaSetLister: manifestWorkReplicaSetInformer.Lister(),
manifestWorkReplicaSetIndexer: manifestWorkReplicaSetInformer.Informer().GetIndexer(),
reconcilers: []ManifestWorkReplicaSetReconcile{
&finalizeReconciler{workApplier: workapplier.NewWorkApplierWithTypedClient(workClient, manifestWorkInformer.Lister()),
workClient: workClient, manifestWorkLister: manifestWorkInformer.Lister()},
&addFinalizerReconciler{workClient: workClient},
&deployReconciler{workApplier: workapplier.NewWorkApplierWithTypedClient(workClient, manifestWorkInformer.Lister()),
manifestWorkLister: manifestWorkInformer.Lister(), placementLister: placementInformer.Lister(), placeDecisionLister: placeDecisionInformer.Lister()},
&statusReconciler{manifestWorkLister: manifestWorkInformer.Lister()},
},
}
}
// sync is the main reconcile loop for placeManifest work. It is triggered every 15sec
func (m *ManifestWorkReplicaSetController) sync(ctx context.Context, controllerContext factory.SyncContext) error {
key := controllerContext.QueueKey()
@@ -143,7 +149,7 @@ func (m *ManifestWorkReplicaSetController) sync(ctx context.Context, controllerC
return nil
}
manifestWorkReplicaSet, err := m.manifestWorkReplicaSetLister.ManifestWorkReplicaSets(namespace).Get(name)
oldManifestWorkReplicaSet, err := m.manifestWorkReplicaSetLister.ManifestWorkReplicaSets(namespace).Get(name)
switch {
case errors.IsNotFound(err):
return nil
@@ -151,8 +157,7 @@ func (m *ManifestWorkReplicaSetController) sync(ctx context.Context, controllerC
return err
}
oldManifestWorkReplicaSet := manifestWorkReplicaSet
manifestWorkReplicaSet = manifestWorkReplicaSet.DeepCopy()
manifestWorkReplicaSet := oldManifestWorkReplicaSet.DeepCopy()
var state reconcileState
var errs []error
@@ -166,46 +171,18 @@ func (m *ManifestWorkReplicaSetController) sync(ctx context.Context, controllerC
}
}
workSetPatcher := patcher.NewPatcher[
*workapiv1alpha1.ManifestWorkReplicaSet, workapiv1alpha1.ManifestWorkReplicaSetSpec, workapiv1alpha1.ManifestWorkReplicaSetStatus](
m.workClient.WorkV1alpha1().ManifestWorkReplicaSets(namespace))
// Patch status
if err := m.patchPlaceManifestStatus(ctx, oldManifestWorkReplicaSet, manifestWorkReplicaSet); err != nil {
if _, err := workSetPatcher.PatchStatus(ctx, manifestWorkReplicaSet, manifestWorkReplicaSet.Status, oldManifestWorkReplicaSet.Status); err != nil {
errs = append(errs, err)
}
return utilerrors.NewAggregate(errs)
}
func (m *ManifestWorkReplicaSetController) patchPlaceManifestStatus(ctx context.Context, old, new *workapiv1alpha1.ManifestWorkReplicaSet) error {
if apiequality.Semantic.DeepEqual(old.Status, new.Status) {
return nil
}
oldData, err := json.Marshal(workapiv1alpha1.ManifestWorkReplicaSet{
Status: old.Status,
})
if err != nil {
return fmt.Errorf("failed to Marshal old data for ManifestWorkReplicaSet status %s: %w", old.Name, err)
}
newData, err := json.Marshal(workapiv1alpha1.ManifestWorkReplicaSet{
ObjectMeta: metav1.ObjectMeta{
UID: old.UID,
ResourceVersion: old.ResourceVersion,
}, // to ensure they appear in the patch as preconditions
Status: new.Status,
})
if err != nil {
return fmt.Errorf("failed to Marshal new data for work status %s: %w", old.Name, err)
}
patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return fmt.Errorf("failed to create patch for work %s: %w", old.Name, err)
}
_, err = m.workClient.WorkV1alpha1().ManifestWorkReplicaSets(old.Namespace).Patch(ctx,
old.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status")
return err
}
func listManifestWorksByManifestWorkReplicaSet(mwrs *workapiv1alpha1.ManifestWorkReplicaSet,
manifestWorkLister worklisterv1.ManifestWorkLister) ([]*workapiv1.ManifestWork, error) {
req, err := labels.NewRequirement(ManifestWorkReplicaSetControllerNameLabelKey, selection.Equals, []string{manifestWorkReplicaSetKey(mwrs)})

View File

@@ -2,81 +2,242 @@ package manifestworkreplicasetcontroller
import (
"context"
"encoding/json"
"reflect"
"testing"
"time"
"github.com/davecgh/go-spew/spew"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
clienttesting "k8s.io/client-go/testing"
fakeclusterclient "open-cluster-management.io/api/client/cluster/clientset/versioned/fake"
clusterinformers "open-cluster-management.io/api/client/cluster/informers/externalversions"
fakeworkclient "open-cluster-management.io/api/client/work/clientset/versioned/fake"
workinformers "open-cluster-management.io/api/client/work/informers/externalversions"
"open-cluster-management.io/api/utils/work/v1/workapplier"
clusterv1beta1 "open-cluster-management.io/api/cluster/v1beta1"
workapiv1alpha1 "open-cluster-management.io/api/work/v1alpha1"
testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
helpertest "open-cluster-management.io/ocm/pkg/work/hub/test"
)
func TestManifestWorkReplicaSetControllerPatchStatus(t *testing.T) {
mwrSetTest := helpertest.CreateTestManifestWorkReplicaSet("mwrSet-test", "default", "place-test")
mwrSetTest.Status.Summary.Total = 1
mw, _ := CreateManifestWork(mwrSetTest, "cls1")
fWorkClient := fakeworkclient.NewSimpleClientset(mwrSetTest, mw)
workInformerFactory := workinformers.NewSharedInformerFactoryWithOptions(fWorkClient, 1*time.Second)
if err := workInformerFactory.Work().V1().ManifestWorks().Informer().GetStore().Add(mw); err != nil {
t.Fatal(err)
}
if err := workInformerFactory.Work().V1alpha1().ManifestWorkReplicaSets().Informer().GetStore().Add(mwrSetTest); err != nil {
t.Fatal(err)
}
placement, placementDecision := helpertest.CreateTestPlacement("place-test", "default", "cls1")
fClusterClient := fakeclusterclient.NewSimpleClientset(placement, placementDecision)
clusterInformerFactory := clusterinformers.NewSharedInformerFactoryWithOptions(fClusterClient, 1*time.Second)
if err := clusterInformerFactory.Cluster().V1beta1().Placements().Informer().GetStore().Add(placement); err != nil {
t.Fatal(err)
}
if err := clusterInformerFactory.Cluster().V1beta1().PlacementDecisions().Informer().GetStore().Add(placementDecision); err != nil {
t.Fatal(err)
}
mwLister := workInformerFactory.Work().V1().ManifestWorks().Lister()
placementLister := clusterInformerFactory.Cluster().V1beta1().Placements().Lister()
placementDecisionLister := clusterInformerFactory.Cluster().V1beta1().PlacementDecisions().Lister()
pmwController := &ManifestWorkReplicaSetController{
workClient: fWorkClient,
manifestWorkReplicaSetLister: workInformerFactory.Work().V1alpha1().ManifestWorkReplicaSets().Lister(),
manifestWorkReplicaSetIndexer: workInformerFactory.Work().V1alpha1().ManifestWorkReplicaSets().Informer().GetIndexer(),
reconcilers: []ManifestWorkReplicaSetReconcile{
&finalizeReconciler{workApplier: workapplier.NewWorkApplierWithTypedClient(fWorkClient, mwLister),
workClient: fWorkClient, manifestWorkLister: mwLister},
&addFinalizerReconciler{workClient: fWorkClient},
&deployReconciler{workApplier: workapplier.NewWorkApplierWithTypedClient(fWorkClient, mwLister),
manifestWorkLister: mwLister, placementLister: placementLister, placeDecisionLister: placementDecisionLister},
&statusReconciler{manifestWorkLister: mwLister},
cases := []struct {
name string
works []runtime.Object
mwrSet *workapiv1alpha1.ManifestWorkReplicaSet
placement *clusterv1beta1.Placement
decision *clusterv1beta1.PlacementDecision
validateActions func(t *testing.T, actions []clienttesting.Action)
}{
{
name: "add finalizer",
mwrSet: helpertest.CreateTestManifestWorkReplicaSet("test", "default", "placement"),
works: []runtime.Object{},
placement: func() *clusterv1beta1.Placement {
p, _ := helpertest.CreateTestPlacement("default", "placement")
return p
}(),
decision: func() *clusterv1beta1.PlacementDecision {
_, d := helpertest.CreateTestPlacement("default", "placement")
return d
}(),
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "patch")
p := actions[0].(clienttesting.PatchActionImpl).Patch
workSet := &workapiv1alpha1.ManifestWorkReplicaSet{}
if err := json.Unmarshal(p, workSet); err != nil {
t.Fatal(err)
}
if !reflect.DeepEqual(workSet.Finalizers, []string{ManifestWorkReplicaSetFinalizer}) {
t.Fatal(spew.Sdump(actions))
}
},
},
{
name: "placement not found",
mwrSet: func() *workapiv1alpha1.ManifestWorkReplicaSet {
w := helpertest.CreateTestManifestWorkReplicaSet("test", "default", "placement")
w.Finalizers = []string{ManifestWorkReplicaSetFinalizer}
return w
}(),
works: []runtime.Object{},
placement: func() *clusterv1beta1.Placement {
p, _ := helpertest.CreateTestPlacement("placement1", "default")
return p
}(),
decision: func() *clusterv1beta1.PlacementDecision {
_, d := helpertest.CreateTestPlacement("placement", "default")
return d
}(),
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "patch")
p := actions[0].(clienttesting.PatchActionImpl).Patch
workSet := &workapiv1alpha1.ManifestWorkReplicaSet{}
if err := json.Unmarshal(p, workSet); err != nil {
t.Fatal(err)
}
if !meta.IsStatusConditionFalse(workSet.Status.Conditions, workapiv1alpha1.ManifestWorkReplicaSetConditionPlacementVerified) {
t.Fatal(spew.Sdump(workSet.Status.Conditions))
}
},
},
{
name: "placement decision not found",
mwrSet: func() *workapiv1alpha1.ManifestWorkReplicaSet {
w := helpertest.CreateTestManifestWorkReplicaSet("test", "default", "placement")
w.Finalizers = []string{ManifestWorkReplicaSetFinalizer}
return w
}(),
works: []runtime.Object{},
placement: func() *clusterv1beta1.Placement {
p, _ := helpertest.CreateTestPlacement("placement", "default")
return p
}(),
decision: func() *clusterv1beta1.PlacementDecision {
_, d := helpertest.CreateTestPlacement("placement1", "default")
return d
}(),
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "patch")
p := actions[0].(clienttesting.PatchActionImpl).Patch
workSet := &workapiv1alpha1.ManifestWorkReplicaSet{}
if err := json.Unmarshal(p, workSet); err != nil {
t.Fatal(err)
}
if !meta.IsStatusConditionFalse(workSet.Status.Conditions, workapiv1alpha1.ManifestWorkReplicaSetConditionManifestworkApplied) {
t.Fatal(spew.Sdump(workSet.Status.Conditions))
}
},
},
{
name: "apply correctly",
mwrSet: func() *workapiv1alpha1.ManifestWorkReplicaSet {
w := helpertest.CreateTestManifestWorkReplicaSet("test", "default", "placement")
w.Finalizers = []string{ManifestWorkReplicaSetFinalizer}
return w
}(),
works: []runtime.Object{},
placement: func() *clusterv1beta1.Placement {
p, _ := helpertest.CreateTestPlacement("placement", "default", "cluster1", "cluster2")
return p
}(),
decision: func() *clusterv1beta1.PlacementDecision {
_, d := helpertest.CreateTestPlacement("placement", "default", "cluster1", "cluster2")
return d
}(),
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "create", "create", "patch")
p := actions[2].(clienttesting.PatchActionImpl).Patch
workSet := &workapiv1alpha1.ManifestWorkReplicaSet{}
if err := json.Unmarshal(p, workSet); err != nil {
t.Fatal(err)
}
if workSet.Status.Summary.Total != 2 {
t.Error(spew.Sdump(workSet.Status.Summary))
}
if !meta.IsStatusConditionFalse(workSet.Status.Conditions, workapiv1alpha1.ManifestWorkReplicaSetConditionManifestworkApplied) {
t.Error(spew.Sdump(workSet.Status.Conditions))
}
},
},
{
name: "no additonal apply needed",
mwrSet: func() *workapiv1alpha1.ManifestWorkReplicaSet {
w := helpertest.CreateTestManifestWorkReplicaSet("test", "default", "placement")
w.Finalizers = []string{ManifestWorkReplicaSetFinalizer}
return w
}(),
works: helpertest.CreateTestManifestWorks("test", "default", "cluster1", "cluster2"),
placement: func() *clusterv1beta1.Placement {
p, _ := helpertest.CreateTestPlacement("placement", "default", "cluster1", "cluster2")
return p
}(),
decision: func() *clusterv1beta1.PlacementDecision {
_, d := helpertest.CreateTestPlacement("placement", "default", "cluster1", "cluster2")
return d
}(),
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "patch")
p := actions[0].(clienttesting.PatchActionImpl).Patch
workSet := &workapiv1alpha1.ManifestWorkReplicaSet{}
if err := json.Unmarshal(p, workSet); err != nil {
t.Fatal(err)
}
if workSet.Status.Summary.Applied != 2 {
t.Error(spew.Sdump(workSet.Status.Summary))
}
if !meta.IsStatusConditionTrue(workSet.Status.Conditions, workapiv1alpha1.ManifestWorkReplicaSetConditionManifestworkApplied) {
t.Error(spew.Sdump(workSet.Status.Conditions))
}
},
},
{
name: "add and delete",
mwrSet: func() *workapiv1alpha1.ManifestWorkReplicaSet {
w := helpertest.CreateTestManifestWorkReplicaSet("test", "default", "placement")
w.Finalizers = []string{ManifestWorkReplicaSetFinalizer}
return w
}(),
works: helpertest.CreateTestManifestWorks("test", "default", "cluster1", "cluster2"),
placement: func() *clusterv1beta1.Placement {
p, _ := helpertest.CreateTestPlacement("placement", "default", "cluster2", "cluster3", "cluster4")
return p
}(),
decision: func() *clusterv1beta1.PlacementDecision {
_, d := helpertest.CreateTestPlacement("placement", "default", "cluster2", "cluster3", "cluster4")
return d
}(),
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "create", "create", "delete", "patch")
p := actions[3].(clienttesting.PatchActionImpl).Patch
workSet := &workapiv1alpha1.ManifestWorkReplicaSet{}
if err := json.Unmarshal(p, workSet); err != nil {
t.Fatal(err)
}
if workSet.Status.Summary.Total != 3 {
t.Error(spew.Sdump(workSet.Status.Summary))
}
if !meta.IsStatusConditionFalse(workSet.Status.Conditions, workapiv1alpha1.ManifestWorkReplicaSetConditionManifestworkApplied) {
t.Error(spew.Sdump(workSet.Status.Conditions))
}
},
},
}
// create new mwrSetTestNew with status
mwrSetTestNew := helpertest.CreateTestManifestWorkReplicaSet("mwrSet-test-new", "default", "place-test-new")
mwrSetTestNew.Status.Summary.Total = mwrSetTest.Status.Summary.Total + 3
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
workObjects := []runtime.Object{c.mwrSet}
workObjects = append(workObjects, c.works...)
fakeClient := fakeworkclient.NewSimpleClientset(workObjects...)
workInformers := workinformers.NewSharedInformerFactory(fakeClient, 10*time.Minute)
workInformers.Work().V1alpha1().ManifestWorkReplicaSets().Informer().GetStore().Add(c.mwrSet)
for _, o := range c.works {
workInformers.Work().V1().ManifestWorks().Informer().GetStore().Add(o)
}
err := pmwController.patchPlaceManifestStatus(context.TODO(), mwrSetTest, mwrSetTestNew)
if err != nil {
t.Fatal(err)
}
fakeClusterClient := fakeclusterclient.NewSimpleClientset(c.placement, c.decision)
clusterInformers := clusterinformers.NewSharedInformerFactory(fakeClusterClient, 10*time.Minute)
clusterInformers.Cluster().V1beta1().Placements().Informer().GetStore().Add(c.placement)
clusterInformers.Cluster().V1beta1().PlacementDecisions().Informer().GetStore().Add(c.decision)
// Check kubeClient has a patch action to apply
actions := ([]clienttesting.Action)(fWorkClient.Actions())
if len(actions) == 0 {
t.Fatal("fWorkClient Should have patch action ")
}
// Check placeMW patch name
if mwrSetTest.Name != actions[0].(clienttesting.PatchAction).GetName() {
t.Fatal("PlaceMW patch action not match ", actions[0])
ctrl := newController(
fakeClient,
workInformers.Work().V1alpha1().ManifestWorkReplicaSets(),
workInformers.Work().V1().ManifestWorks(),
clusterInformers.Cluster().V1beta1().Placements(),
clusterInformers.Cluster().V1beta1().PlacementDecisions(),
)
controllerContext := testingcommon.NewFakeSyncContext(t, c.mwrSet.Namespace+"/"+c.mwrSet.Name)
err := ctrl.sync(context.TODO(), controllerContext)
if err != nil {
t.Error(err)
}
c.validateActions(t, fakeClient.Actions())
})
}
}

View File

@@ -34,10 +34,11 @@ func (d *deployReconciler) reconcile(ctx context.Context, mwrSet *workapiv1alpha
var placements []*clusterv1beta1.Placement
for _, placementRef := range mwrSet.Spec.PlacementRefs {
placement, err := d.placementLister.Placements(mwrSet.Namespace).Get(placementRef.Name)
if errors.IsNotFound(err) {
apimeta.SetStatusCondition(&mwrSet.Status.Conditions, GetPlacementDecisionVerified(workapiv1alpha1.ReasonPlacementDecisionNotFound, ""))
return mwrSet, reconcileStop, nil
}
if err != nil {
if errors.IsNotFound(err) {
apimeta.SetStatusCondition(&mwrSet.Status.Conditions, GetPlacementDecisionVerified(workapiv1alpha1.ReasonPlacementDecisionNotFound, ""))
}
return mwrSet, reconcileContinue, fmt.Errorf("Failed get placement %w", err)
}
placements = append(placements, placement)

View File

@@ -179,8 +179,8 @@ func TestDeployReconcileAsPlacementNotExist(t *testing.T) {
}
mwrSet, _, err := pmwDeployController.reconcile(context.TODO(), mwrSet)
if err == nil {
t.Fatal("Expected Not Found Error ", err)
if err != nil {
t.Fatal(err)
}
// Check the PlacedManifestWork conditions

View File

@@ -3,7 +3,7 @@ package manifestworkreplicasetcontroller
import (
"context"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/api/errors"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
workclientset "open-cluster-management.io/api/client/work/clientset/versioned"
@@ -11,7 +11,7 @@ import (
"open-cluster-management.io/api/utils/work/v1/workapplier"
workapiv1alpha1 "open-cluster-management.io/api/work/v1alpha1"
"open-cluster-management.io/ocm/pkg/work/helper"
"open-cluster-management.io/ocm/pkg/common/patcher"
)
// finalizeReconciler is to finalize the manifestWorkReplicaSet by deleting all related manifestWorks.
@@ -27,28 +27,17 @@ func (f *finalizeReconciler) reconcile(ctx context.Context, mwrSet *workapiv1alp
return mwrSet, reconcileContinue, nil
}
var found bool
for i := range mwrSet.Finalizers {
if mwrSet.Finalizers[i] == ManifestWorkReplicaSetFinalizer {
found = true
break
}
}
// if there is no finalizer, we do not need to reconcile anymore.
if !found {
return mwrSet, reconcileStop, nil
}
if err := f.finalizeManifestWorkReplicaSet(ctx, mwrSet); err != nil {
return mwrSet, reconcileContinue, err
}
workSetPatcher := patcher.NewPatcher[
*workapiv1alpha1.ManifestWorkReplicaSet, workapiv1alpha1.ManifestWorkReplicaSetSpec, workapiv1alpha1.ManifestWorkReplicaSetStatus](
f.workClient.WorkV1alpha1().ManifestWorkReplicaSets(mwrSet.Namespace))
// Remove finalizer after delete all created Manifestworks
if helper.RemoveFinalizer(mwrSet, ManifestWorkReplicaSetFinalizer) {
_, err := f.workClient.WorkV1alpha1().ManifestWorkReplicaSets(mwrSet.Namespace).Update(ctx, mwrSet, metav1.UpdateOptions{})
if err != nil {
return mwrSet, reconcileContinue, err
}
if err := workSetPatcher.RemoveFinalizer(ctx, mwrSet, ManifestWorkReplicaSetFinalizer); err != nil {
return mwrSet, reconcileContinue, err
}
return mwrSet, reconcileStop, nil
@@ -63,7 +52,7 @@ func (m *finalizeReconciler) finalizeManifestWorkReplicaSet(ctx context.Context,
errs := []error{}
for _, mw := range manifestWorks {
err = m.workApplier.Delete(ctx, mw.Namespace, mw.Name)
if err != nil {
if err != nil && !errors.IsNotFound(err) {
errs = append(errs, err)
}
}

View File

@@ -34,13 +34,18 @@ func TestFinalizeReconcile(t *testing.T) {
mwrSetTest.DeletionTimestamp = &timeNow
mwrSetTest.Finalizers = append(mwrSetTest.Finalizers, ManifestWorkReplicaSetFinalizer)
mwrSetTest, _, err := finalizerController.reconcile(context.TODO(), mwrSetTest)
_, _, err := finalizerController.reconcile(context.TODO(), mwrSetTest)
if err != nil {
t.Fatal(err)
}
updatetSet, err := fakeClient.WorkV1alpha1().ManifestWorkReplicaSets(mwrSetTest.Namespace).Get(context.TODO(), mwrSetTest.Name, metav1.GetOptions{})
if err != nil {
t.Fatal(err)
}
// Check mwrSetTest finalizer removed
if slices.Contains(mwrSetTest.Finalizers, ManifestWorkReplicaSetFinalizer) {
if slices.Contains(updatetSet.Finalizers, ManifestWorkReplicaSetFinalizer) {
t.Fatal("Finalizer not deleted", mwrSetTest.Finalizers)
}
}

View File

@@ -1,9 +1,14 @@
package test
import (
"fmt"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
clusterv1beta1 "open-cluster-management.io/api/cluster/v1beta1"
workapiv1 "open-cluster-management.io/api/work/v1"
workapiv1alpha1 "open-cluster-management.io/api/work/v1alpha1"
"open-cluster-management.io/ocm/pkg/work/spoke/spoketesting"
@@ -29,6 +34,29 @@ func CreateTestManifestWorkReplicaSet(name string, ns string, placementName stri
return mwrs
}
func CreateTestManifestWorks(name, namespace string, clusters ...string) []runtime.Object {
obj := spoketesting.NewUnstructured("v1", "kind", "test-ns", "test-name")
works := []runtime.Object{}
for _, c := range clusters {
mw, _ := spoketesting.NewManifestWork(0, obj)
mw.Name = name
mw.Namespace = c
mw.Labels = map[string]string{
"work.open-cluster-management.io/manifestworkreplicaset": fmt.Sprintf("%s.%s", namespace, name),
}
meta.SetStatusCondition(&mw.Status.Conditions, metav1.Condition{
Type: workapiv1.WorkApplied,
Status: metav1.ConditionTrue,
})
meta.SetStatusCondition(&mw.Status.Conditions, metav1.Condition{
Type: workapiv1.WorkAvailable,
Status: metav1.ConditionTrue,
})
works = append(works, mw)
}
return works
}
// Return placement with predicate of label cluster name
func CreateTestPlacement(name string, ns string, clusters ...string) (*clusterv1beta1.Placement, *clusterv1beta1.PlacementDecision) {
namereq := metav1.LabelSelectorRequirement{}

View File

@@ -24,15 +24,15 @@ import (
worklister "open-cluster-management.io/api/client/work/listers/work/v1"
workapiv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/ocm/pkg/common/patcher"
"open-cluster-management.io/ocm/pkg/work/helper"
)
// AppliedManifestWorkController is to sync the applied resources of appliedmanifestwork with related
// manifestwork and delete any resource which is no longer maintained by the manifestwork
type AppliedManifestWorkController struct {
manifestWorkClient workv1client.ManifestWorkInterface
patcher patcher.Patcher[*workapiv1.AppliedManifestWork, workapiv1.AppliedManifestWorkSpec, workapiv1.AppliedManifestWorkStatus]
manifestWorkLister worklister.ManifestWorkNamespaceLister
appliedManifestWorkClient workv1client.AppliedManifestWorkInterface
appliedManifestWorkLister worklister.AppliedManifestWorkLister
spokeDynamicClient dynamic.Interface
hubHash string
@@ -43,7 +43,6 @@ type AppliedManifestWorkController struct {
func NewAppliedManifestWorkController(
recorder events.Recorder,
spokeDynamicClient dynamic.Interface,
manifestWorkClient workv1client.ManifestWorkInterface,
manifestWorkInformer workinformer.ManifestWorkInformer,
manifestWorkLister worklister.ManifestWorkNamespaceLister,
appliedManifestWorkClient workv1client.AppliedManifestWorkInterface,
@@ -51,9 +50,10 @@ func NewAppliedManifestWorkController(
hubHash string) factory.Controller {
controller := &AppliedManifestWorkController{
manifestWorkClient: manifestWorkClient,
patcher: patcher.NewPatcher[
*workapiv1.AppliedManifestWork, workapiv1.AppliedManifestWorkSpec, workapiv1.AppliedManifestWorkStatus](
appliedManifestWorkClient),
manifestWorkLister: manifestWorkLister,
appliedManifestWorkClient: appliedManifestWorkClient,
appliedManifestWorkLister: appliedManifestWorkInformer.Lister(),
spokeDynamicClient: spokeDynamicClient,
hubHash: hubHash,
@@ -211,7 +211,7 @@ func (m *AppliedManifestWorkController) syncManifestWork(
// update appliedmanifestwork status with latest applied resources. if this conflicts, we'll try again later
// for retrying update without reassessing the status can cause overwriting of valid information.
appliedManifestWork.Status.AppliedResources = appliedResources
_, err := m.appliedManifestWorkClient.UpdateStatus(ctx, appliedManifestWork, metav1.UpdateOptions{})
_, err := m.patcher.PatchStatus(ctx, appliedManifestWork, appliedManifestWork.Status, originalAppliedManifestWork.Status)
return err
}

View File

@@ -2,6 +2,7 @@ package appliedmanifestcontroller
import (
"context"
"encoding/json"
"reflect"
"testing"
"time"
@@ -19,6 +20,7 @@ import (
workinformers "open-cluster-management.io/api/client/work/informers/externalversions"
workapiv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/ocm/pkg/common/patcher"
testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
"open-cluster-management.io/ocm/pkg/work/helper"
"open-cluster-management.io/ocm/pkg/work/spoke/spoketesting"
@@ -58,13 +60,9 @@ func TestSyncManifestWork(t *testing.T) {
appliedResources: []workapiv1.AppliedManifestResourceMeta{
{Version: "v1", ResourceIdentifier: workapiv1.ResourceIdentifier{Resource: "secrets", Namespace: "ns1", Name: "n1"}, UID: "ns1-n1"},
},
manifests: []workapiv1.ManifestCondition{newManifest("", "v1", "secrets", "ns1", "n1")},
validateAppliedManifestWorkActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) > 0 {
t.Fatal(spew.Sdump(actions))
}
},
expectedDeleteActions: []clienttesting.DeleteActionImpl{},
manifests: []workapiv1.ManifestCondition{newManifest("", "v1", "secrets", "ns1", "n1")},
validateAppliedManifestWorkActions: testingcommon.AssertNoActions,
expectedDeleteActions: []clienttesting.DeleteActionImpl{},
},
{
name: "delete untracked resources",
@@ -89,10 +87,12 @@ func TestSyncManifestWork(t *testing.T) {
newManifest("", "v1", "secrets", "ns6", "n6"),
},
validateAppliedManifestWorkActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 1 {
t.Fatal(spew.Sdump(actions))
testingcommon.AssertActions(t, actions, "patch")
p := actions[0].(clienttesting.PatchActionImpl).Patch
work := &workapiv1.AppliedManifestWork{}
if err := json.Unmarshal(p, work); err != nil {
t.Fatal(err)
}
work := actions[0].(clienttesting.UpdateAction).GetObject().(*workapiv1.AppliedManifestWork)
if !reflect.DeepEqual(work.Status.AppliedResources, []workapiv1.AppliedManifestResourceMeta{
{Version: "v1", ResourceIdentifier: workapiv1.ResourceIdentifier{Group: "", Resource: "secrets", Namespace: "ns1", Name: "n1"}, UID: "ns1-n1"},
{Version: "v1", ResourceIdentifier: workapiv1.ResourceIdentifier{Group: "", Resource: "secrets", Namespace: "ns2", Name: "n2"}, UID: "ns2-n2"},
@@ -125,13 +125,9 @@ func TestSyncManifestWork(t *testing.T) {
newManifest("", "v1", "secrets", "ns1", "n1"),
newManifest("", "v1", "secrets", "ns2", "n2"),
},
validateAppliedManifestWorkActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) > 0 {
t.Fatal(spew.Sdump(actions))
}
},
expectedDeleteActions: []clienttesting.DeleteActionImpl{},
expectedQueueLen: 1,
validateAppliedManifestWorkActions: testingcommon.AssertNoActions,
expectedDeleteActions: []clienttesting.DeleteActionImpl{},
expectedQueueLen: 1,
},
{
name: "ignore re-created resource",
@@ -149,10 +145,12 @@ func TestSyncManifestWork(t *testing.T) {
newManifest("", "v1", "secrets", "ns5", "n5"),
},
validateAppliedManifestWorkActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 1 {
t.Fatal(spew.Sdump(actions))
testingcommon.AssertActions(t, actions, "patch")
p := actions[0].(clienttesting.PatchActionImpl).Patch
work := &workapiv1.AppliedManifestWork{}
if err := json.Unmarshal(p, work); err != nil {
t.Fatal(err)
}
work := actions[0].(clienttesting.UpdateAction).GetObject().(*workapiv1.AppliedManifestWork)
if !reflect.DeepEqual(work.Status.AppliedResources, []workapiv1.AppliedManifestResourceMeta{
{Version: "v1", ResourceIdentifier: workapiv1.ResourceIdentifier{Resource: "secrets", Namespace: "ns1", Name: "n1"}, UID: "ns1-n1"},
{Version: "v1", ResourceIdentifier: workapiv1.ResourceIdentifier{Resource: "secrets", Namespace: "ns5", Name: "n5"}, UID: "ns5-n5"},
@@ -177,10 +175,12 @@ func TestSyncManifestWork(t *testing.T) {
newManifest("", "v1", "secrets", "ns2", "n2"),
},
validateAppliedManifestWorkActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 1 {
t.Fatal(spew.Sdump(actions))
testingcommon.AssertActions(t, actions, "patch")
p := actions[0].(clienttesting.PatchActionImpl).Patch
work := &workapiv1.AppliedManifestWork{}
if err := json.Unmarshal(p, work); err != nil {
t.Fatal(err)
}
work := actions[0].(clienttesting.UpdateAction).GetObject().(*workapiv1.AppliedManifestWork)
if !reflect.DeepEqual(work.Status.AppliedResources, []workapiv1.AppliedManifestResourceMeta{
{Version: "v1", ResourceIdentifier: workapiv1.ResourceIdentifier{Resource: "secrets", Namespace: "ns1", Name: "n1"}, UID: "ns1-n1"},
{Version: "v1", ResourceIdentifier: workapiv1.ResourceIdentifier{Resource: "secrets", Namespace: "ns2", Name: "n2"}, UID: "ns2-n2-updated"},
@@ -211,9 +211,10 @@ func TestSyncManifestWork(t *testing.T) {
}
controller := AppliedManifestWorkController{
manifestWorkClient: fakeClient.WorkV1().ManifestWorks(testingWork.Namespace),
manifestWorkLister: informerFactory.Work().V1().ManifestWorks().Lister().ManifestWorks("cluster1"),
appliedManifestWorkClient: fakeClient.WorkV1().AppliedManifestWorks(),
manifestWorkLister: informerFactory.Work().V1().ManifestWorks().Lister().ManifestWorks("cluster1"),
patcher: patcher.NewPatcher[
*workapiv1.AppliedManifestWork, workapiv1.AppliedManifestWorkSpec, workapiv1.AppliedManifestWorkStatus](
fakeClient.WorkV1().AppliedManifestWorks()),
appliedManifestWorkLister: informerFactory.Work().V1().AppliedManifestWorks().Lister(),
spokeDynamicClient: fakeDynamicClient,
hubHash: "test",

View File

@@ -7,7 +7,6 @@ import (
"github.com/openshift/library-go/pkg/operator/events"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
@@ -16,13 +15,13 @@ import (
worklister "open-cluster-management.io/api/client/work/listers/work/v1"
workapiv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/ocm/pkg/work/helper"
"open-cluster-management.io/ocm/pkg/common/patcher"
"open-cluster-management.io/ocm/pkg/work/spoke/controllers"
)
// AddFinalizerController is to add the cluster.open-cluster-management.io/manifest-work-cleanup finalizer to manifestworks.
type AddFinalizerController struct {
manifestWorkClient workv1client.ManifestWorkInterface
patcher patcher.Patcher[*workapiv1.ManifestWork, workapiv1.ManifestWorkSpec, workapiv1.ManifestWorkStatus]
manifestWorkLister worklister.ManifestWorkNamespaceLister
}
@@ -35,7 +34,9 @@ func NewAddFinalizerController(
) factory.Controller {
controller := &AddFinalizerController{
manifestWorkClient: manifestWorkClient,
patcher: patcher.NewPatcher[
*workapiv1.ManifestWork, workapiv1.ManifestWorkSpec, workapiv1.ManifestWorkStatus](
manifestWorkClient),
manifestWorkLister: manifestWorkLister,
}
@@ -70,13 +71,8 @@ func (m *AddFinalizerController) syncManifestWork(ctx context.Context, originalM
return nil
}
// don't add finalizer to instances that already have it
if helper.HasFinalizer(manifestWork.Finalizers, controllers.ManifestWorkFinalizer) {
return nil
}
// if this conflicts, we'll simply try again later
manifestWork.Finalizers = append(manifestWork.Finalizers, controllers.ManifestWorkFinalizer)
_, err := m.manifestWorkClient.Update(ctx, manifestWork, metav1.UpdateOptions{})
_, err := m.patcher.AddFinalizer(ctx, manifestWork, controllers.ManifestWorkFinalizer)
return err
}

View File

@@ -2,6 +2,7 @@ package finalizercontroller
import (
"context"
"encoding/json"
"reflect"
"testing"
@@ -12,6 +13,8 @@ import (
fakeworkclient "open-cluster-management.io/api/client/work/clientset/versioned/fake"
workapiv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/ocm/pkg/common/patcher"
testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
"open-cluster-management.io/ocm/pkg/work/spoke/controllers"
"open-cluster-management.io/ocm/pkg/work/spoke/spoketesting"
)
@@ -27,10 +30,12 @@ func TestAddFinalizer(t *testing.T) {
{
name: "add when empty",
validateActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 1 {
t.Fatal(spew.Sdump(actions))
testingcommon.AssertActions(t, actions, "patch")
p := actions[0].(clienttesting.PatchActionImpl).Patch
work := &workapiv1.ManifestWork{}
if err := json.Unmarshal(p, work); err != nil {
t.Fatal(err)
}
work := actions[0].(clienttesting.UpdateAction).GetObject().(*workapiv1.ManifestWork)
if !reflect.DeepEqual(work.Finalizers, []string{controllers.ManifestWorkFinalizer}) {
t.Fatal(spew.Sdump(actions))
}
@@ -40,32 +45,26 @@ func TestAddFinalizer(t *testing.T) {
name: "add when missing",
existingFinalizers: []string{"other"},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 1 {
t.Fatal(spew.Sdump(actions))
testingcommon.AssertActions(t, actions, "patch")
p := actions[0].(clienttesting.PatchActionImpl).Patch
work := &workapiv1.ManifestWork{}
if err := json.Unmarshal(p, work); err != nil {
t.Fatal(err)
}
work := actions[0].(clienttesting.UpdateAction).GetObject().(*workapiv1.ManifestWork)
if !reflect.DeepEqual(work.Finalizers, []string{"other", controllers.ManifestWorkFinalizer}) {
t.Fatal(spew.Sdump(actions))
}
},
},
{
name: "skip when deleted",
terminated: true,
validateActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) > 0 {
t.Fatal(spew.Sdump(actions))
}
},
name: "skip when deleted",
terminated: true,
validateActions: testingcommon.AssertNoActions,
},
{
name: "skip when present",
existingFinalizers: []string{controllers.ManifestWorkFinalizer},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) > 0 {
t.Fatal(spew.Sdump(actions))
}
},
validateActions: testingcommon.AssertNoActions,
},
}
@@ -80,7 +79,9 @@ func TestAddFinalizer(t *testing.T) {
fakeClient := fakeworkclient.NewSimpleClientset(testingWork)
controller := AddFinalizerController{
manifestWorkClient: fakeClient.WorkV1().ManifestWorks(testingWork.Namespace),
patcher: patcher.NewPatcher[
*workapiv1.ManifestWork, workapiv1.ManifestWorkSpec, workapiv1.ManifestWorkStatus](
fakeClient.WorkV1().ManifestWorks(testingWork.Namespace)),
}
err := controller.syncManifestWork(context.TODO(), testingWork)

View File

@@ -9,7 +9,6 @@ import (
"github.com/openshift/library-go/pkg/operator/events"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/dynamic"
@@ -21,6 +20,7 @@ import (
worklister "open-cluster-management.io/api/client/work/listers/work/v1"
workapiv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/ocm/pkg/common/patcher"
"open-cluster-management.io/ocm/pkg/work/helper"
"open-cluster-management.io/ocm/pkg/work/spoke/controllers"
)
@@ -28,7 +28,7 @@ import (
// AppliedManifestWorkFinalizeController handles cleanup of appliedmanifestwork resources before deletion is allowed.
// It should handle all appliedmanifestworks belonging to this agent identified by the agentID.
type AppliedManifestWorkFinalizeController struct {
appliedManifestWorkClient workv1client.AppliedManifestWorkInterface
patcher patcher.Patcher[*workapiv1.AppliedManifestWork, workapiv1.AppliedManifestWorkSpec, workapiv1.AppliedManifestWorkStatus]
appliedManifestWorkLister worklister.AppliedManifestWorkLister
spokeDynamicClient dynamic.Interface
rateLimiter workqueue.RateLimiter
@@ -43,7 +43,9 @@ func NewAppliedManifestWorkFinalizeController(
) factory.Controller {
controller := &AppliedManifestWorkFinalizeController{
appliedManifestWorkClient: appliedManifestWorkClient,
patcher: patcher.NewPatcher[
*workapiv1.AppliedManifestWork, workapiv1.AppliedManifestWorkSpec, workapiv1.AppliedManifestWorkStatus](
appliedManifestWorkClient),
appliedManifestWorkLister: appliedManifestWorkInformer.Lister(),
spokeDynamicClient: spokeDynamicClient,
rateLimiter: workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
@@ -89,8 +91,6 @@ func (m *AppliedManifestWorkFinalizeController) syncAppliedManifestWork(ctx cont
return nil
}
var err error
owner := helper.NewAppliedManifestWorkOwner(appliedManifestWork)
// Work is deleting, we remove its related resources on spoke cluster
@@ -99,38 +99,29 @@ func (m *AppliedManifestWorkFinalizeController) syncAppliedManifestWork(ctx cont
reason := fmt.Sprintf("manifestwork %s is terminating", appliedManifestWork.Spec.ManifestWorkName)
resourcesPendingFinalization, errs := helper.DeleteAppliedResources(
ctx, appliedManifestWork.Status.AppliedResources, reason, m.spokeDynamicClient, controllerContext.Recorder(), *owner)
updatedAppliedManifestWork := false
if len(appliedManifestWork.Status.AppliedResources) != len(resourcesPendingFinalization) {
// update the status of the manifest work accordingly
appliedManifestWork.Status.AppliedResources = resourcesPendingFinalization
appliedManifestWork, err = m.appliedManifestWorkClient.UpdateStatus(ctx, appliedManifestWork, metav1.UpdateOptions{})
if err != nil {
errs = append(errs, fmt.Errorf(
"failed to update status of AppliedManifestWork %s: %w", originalManifestWork.Name, err))
} else {
updatedAppliedManifestWork = true
}
appliedManifestWork.Status.AppliedResources = resourcesPendingFinalization
updatedAppliedManifestWork, err := m.patcher.PatchStatus(ctx, appliedManifestWork, appliedManifestWork.Status, originalManifestWork.Status)
if err != nil {
errs = append(errs, fmt.Errorf(
"failed to update status of AppliedManifestWork %s: %w", originalManifestWork.Name, err))
}
if len(errs) != 0 {
// return quickly when there is update event or err
if updatedAppliedManifestWork || len(errs) != 0 {
return utilerrors.NewAggregate(errs)
}
// requeue the work until all applied resources are deleted and finalized if the appliedmanifestwork itself is not updated
if len(resourcesPendingFinalization) != 0 {
klog.V(4).Infof("%d resources pending deletions %v", len(resourcesPendingFinalization))
if !updatedAppliedManifestWork {
controllerContext.Queue().AddAfter(appliedManifestWork.Name, m.rateLimiter.When(appliedManifestWork.Name))
}
controllerContext.Queue().AddAfter(appliedManifestWork.Name, m.rateLimiter.When(appliedManifestWork.Name))
return nil
}
// reset the rate limiter for the appliedmanifestwork
m.rateLimiter.Forget(appliedManifestWork.Name)
helper.RemoveFinalizer(appliedManifestWork, controllers.AppliedManifestWorkFinalizer)
_, err = m.appliedManifestWorkClient.Update(ctx, appliedManifestWork, metav1.UpdateOptions{})
if err != nil {
if err := m.patcher.RemoveFinalizer(ctx, appliedManifestWork, controllers.AppliedManifestWorkFinalizer); err != nil {
return fmt.Errorf("failed to remove finalizer from AppliedManifestWork %s: %w", appliedManifestWork.Name, err)
}
return nil

View File

@@ -2,6 +2,7 @@ package finalizercontroller
import (
"context"
"encoding/json"
"reflect"
"testing"
"time"
@@ -18,6 +19,7 @@ import (
fakeworkclient "open-cluster-management.io/api/client/work/clientset/versioned/fake"
workapiv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/ocm/pkg/common/patcher"
testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
"open-cluster-management.io/ocm/pkg/work/helper"
"open-cluster-management.io/ocm/pkg/work/spoke/controllers"
@@ -42,15 +44,15 @@ func TestFinalize(t *testing.T) {
{
name: "skip when not delete",
existingFinalizers: []string{controllers.ManifestWorkFinalizer},
validateAppliedManifestWorkActions: noAction,
validateDynamicActions: noAction,
validateAppliedManifestWorkActions: testingcommon.AssertNoActions,
validateDynamicActions: testingcommon.AssertNoActions,
},
{
name: "skip when finalizer gone",
terminated: true,
existingFinalizers: []string{"other-finalizer"},
validateAppliedManifestWorkActions: noAction,
validateDynamicActions: noAction,
validateAppliedManifestWorkActions: testingcommon.AssertNoActions,
validateDynamicActions: testingcommon.AssertNoActions,
},
{
name: "get resources and remove finalizer",
@@ -63,18 +65,15 @@ func TestFinalize(t *testing.T) {
{Version: "v4", ResourceIdentifier: workapiv1.ResourceIdentifier{Group: "g4", Resource: "r4", Namespace: "", Name: "n4"}},
},
validateAppliedManifestWorkActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 2 {
t.Fatal(spew.Sdump(actions))
testingcommon.AssertActions(t, actions, "patch")
p := actions[0].(clienttesting.PatchActionImpl).Patch
work := &workapiv1.AppliedManifestWork{}
if err := json.Unmarshal(p, work); err != nil {
t.Fatal(err)
}
work := actions[0].(clienttesting.UpdateAction).GetObject().(*workapiv1.AppliedManifestWork)
if len(work.Status.AppliedResources) != 0 {
t.Fatal(spew.Sdump(actions[0]))
}
work = actions[1].(clienttesting.UpdateAction).GetObject().(*workapiv1.AppliedManifestWork)
if !reflect.DeepEqual(work.Finalizers, []string{"a", "b"}) {
t.Fatal(spew.Sdump(actions[1]))
}
},
validateDynamicActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 4 {
@@ -115,7 +114,7 @@ func TestFinalize(t *testing.T) {
{Version: "v1", ResourceIdentifier: workapiv1.ResourceIdentifier{Resource: "secrets", Namespace: "ns1", Name: "n1"}, UID: "ns1-n1"},
{Version: "v1", ResourceIdentifier: workapiv1.ResourceIdentifier{Resource: "secrets", Namespace: "ns2", Name: "n2"}, UID: "ns2-n2"},
},
validateAppliedManifestWorkActions: noAction,
validateAppliedManifestWorkActions: testingcommon.AssertNoActions,
validateDynamicActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 2 {
t.Fatal(spew.Sdump(actions))
@@ -146,19 +145,15 @@ func TestFinalize(t *testing.T) {
{Version: "v1", ResourceIdentifier: workapiv1.ResourceIdentifier{Resource: "secrets", Namespace: "ns2", Name: "n2"}, UID: "n2"},
},
validateAppliedManifestWorkActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 2 {
t.Fatal(spew.Sdump(actions))
testingcommon.AssertActions(t, actions, "patch")
p := actions[0].(clienttesting.PatchActionImpl).Patch
work := &workapiv1.AppliedManifestWork{}
if err := json.Unmarshal(p, work); err != nil {
t.Fatal(err)
}
work := actions[0].(clienttesting.UpdateAction).GetObject().(*workapiv1.AppliedManifestWork)
if len(work.Status.AppliedResources) != 0 {
t.Fatal(spew.Sdump(actions[0]))
}
work = actions[1].(clienttesting.UpdateAction).GetObject().(*workapiv1.AppliedManifestWork)
if !reflect.DeepEqual(work.Finalizers, []string{}) {
t.Fatal(spew.Sdump(actions[0]))
}
},
validateDynamicActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 2 {
@@ -193,9 +188,11 @@ func TestFinalize(t *testing.T) {
fakeDynamicClient := fakedynamic.NewSimpleDynamicClient(runtime.NewScheme(), c.existingResources...)
fakeClient := fakeworkclient.NewSimpleClientset(testingWork)
controller := AppliedManifestWorkFinalizeController{
appliedManifestWorkClient: fakeClient.WorkV1().AppliedManifestWorks(),
spokeDynamicClient: fakeDynamicClient,
rateLimiter: workqueue.NewItemExponentialFailureRateLimiter(0, 1*time.Second),
patcher: patcher.NewPatcher[
*workapiv1.AppliedManifestWork, workapiv1.AppliedManifestWorkSpec, workapiv1.AppliedManifestWorkStatus](
fakeClient.WorkV1().AppliedManifestWorks()),
spokeDynamicClient: fakeDynamicClient,
rateLimiter: workqueue.NewItemExponentialFailureRateLimiter(0, 1*time.Second),
}
controllerContext := testingcommon.NewFakeSyncContext(t, testingWork.Name)
@@ -213,9 +210,3 @@ func TestFinalize(t *testing.T) {
})
}
}
func noAction(t *testing.T, actions []clienttesting.Action) {
if len(actions) > 0 {
t.Fatal(spew.Sdump(actions))
}
}

View File

@@ -17,14 +17,16 @@ import (
workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1"
workinformer "open-cluster-management.io/api/client/work/informers/externalversions/work/v1"
worklister "open-cluster-management.io/api/client/work/listers/work/v1"
workapiv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/ocm/pkg/common/patcher"
"open-cluster-management.io/ocm/pkg/work/helper"
"open-cluster-management.io/ocm/pkg/work/spoke/controllers"
)
// ManifestWorkFinalizeController handles cleanup of manifestwork resources before deletion is allowed.
type ManifestWorkFinalizeController struct {
manifestWorkClient workv1client.ManifestWorkInterface
patcher patcher.Patcher[*workapiv1.ManifestWork, workapiv1.ManifestWorkSpec, workapiv1.ManifestWorkStatus]
manifestWorkLister worklister.ManifestWorkNamespaceLister
appliedManifestWorkClient workv1client.AppliedManifestWorkInterface
appliedManifestWorkLister worklister.AppliedManifestWorkLister
@@ -43,7 +45,9 @@ func NewManifestWorkFinalizeController(
) factory.Controller {
controller := &ManifestWorkFinalizeController{
manifestWorkClient: manifestWorkClient,
patcher: patcher.NewPatcher[
*workapiv1.ManifestWork, workapiv1.ManifestWorkSpec, workapiv1.ManifestWorkStatus](
manifestWorkClient),
manifestWorkLister: manifestWorkLister,
appliedManifestWorkClient: appliedManifestWorkClient,
appliedManifestWorkLister: appliedManifestWorkInformer.Lister(),
@@ -107,9 +111,7 @@ func (m *ManifestWorkFinalizeController) sync(ctx context.Context, controllerCon
m.rateLimiter.Forget(manifestWorkName)
manifestWork = manifestWork.DeepCopy()
helper.RemoveFinalizer(manifestWork, controllers.ManifestWorkFinalizer)
_, err = m.manifestWorkClient.Update(ctx, manifestWork, metav1.UpdateOptions{})
if err != nil {
if err := m.patcher.RemoveFinalizer(ctx, manifestWork, controllers.ManifestWorkFinalizer); err != nil {
return fmt.Errorf("failed to remove finalizer from ManifestWork %s/%s: %w", manifestWork.Namespace, manifestWork.Name, err)
}

View File

@@ -2,6 +2,7 @@ package finalizercontroller
import (
"context"
"encoding/json"
"fmt"
"testing"
"time"
@@ -14,6 +15,7 @@ import (
workinformers "open-cluster-management.io/api/client/work/informers/externalversions"
workapiv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/ocm/pkg/common/patcher"
testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
"open-cluster-management.io/ocm/pkg/work/spoke/controllers"
)
@@ -44,12 +46,8 @@ func TestSyncManifestWorkController(t *testing.T) {
t.Errorf("Suppose nothing done for appliedmanifestwork")
}
},
validateManifestWorkActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 0 {
t.Errorf("Suppose nothing done for manifestwork")
}
},
expectedQueueLen: 0,
validateManifestWorkActions: testingcommon.AssertNoActions,
expectedQueueLen: 0,
},
{
name: "delete appliedmanifestworkwork when work has no finalizer on that",
@@ -69,12 +67,8 @@ func TestSyncManifestWorkController(t *testing.T) {
validateAppliedManifestWorkActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "delete")
},
validateManifestWorkActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 0 {
t.Errorf("Suppose nothing done for manifestwork")
}
},
expectedQueueLen: 1,
validateManifestWorkActions: testingcommon.AssertNoActions,
expectedQueueLen: 1,
},
{
name: "delete applied work when work is deleting",
@@ -95,12 +89,8 @@ func TestSyncManifestWorkController(t *testing.T) {
validateAppliedManifestWorkActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "delete")
},
validateManifestWorkActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 0 {
t.Errorf("Suppose nothing done for manifestwork")
}
},
expectedQueueLen: 1,
validateManifestWorkActions: testingcommon.AssertNoActions,
expectedQueueLen: 1,
},
{
name: "requeue work when applied work is deleting",
@@ -119,17 +109,9 @@ func TestSyncManifestWorkController(t *testing.T) {
DeletionTimestamp: &now,
},
},
validateAppliedManifestWorkActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 0 {
t.Errorf("Expect 0 actions on appliedmanifestwork, but have %d", len(actions))
}
},
validateManifestWorkActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 0 {
t.Errorf("Suppose nothing done for manifestwork")
}
},
expectedQueueLen: 1,
validateAppliedManifestWorkActions: testingcommon.AssertNoActions,
validateManifestWorkActions: testingcommon.AssertNoActions,
expectedQueueLen: 1,
},
{
name: "remove finalizer when applied work is cleaned",
@@ -147,16 +129,15 @@ func TestSyncManifestWorkController(t *testing.T) {
Name: "fake",
},
},
validateAppliedManifestWorkActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 0 {
t.Errorf("Expect 0 actions on appliedmanifestwork, but have %d", len(actions))
}
},
validateAppliedManifestWorkActions: testingcommon.AssertNoActions,
validateManifestWorkActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "update")
updateAction := actions[0].(clienttesting.UpdateActionImpl)
obj := updateAction.Object.(*workapiv1.ManifestWork)
if len(obj.Finalizers) != 0 {
testingcommon.AssertActions(t, actions, "patch")
p := actions[0].(clienttesting.PatchActionImpl).Patch
work := &workapiv1.ManifestWork{}
if err := json.Unmarshal(p, work); err != nil {
t.Fatal(err)
}
if len(work.Finalizers) != 0 {
t.Errorf("Expect finalizer is cleaned")
}
},
@@ -176,13 +157,9 @@ func TestSyncManifestWorkController(t *testing.T) {
Name: fmt.Sprintf("%s-work", hubHash),
},
},
validateAppliedManifestWorkActions: noAction,
validateManifestWorkActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 0 {
t.Errorf("Suppose nothing done for manifestwork")
}
},
expectedQueueLen: 0,
validateAppliedManifestWorkActions: testingcommon.AssertNoActions,
validateManifestWorkActions: testingcommon.AssertNoActions,
expectedQueueLen: 0,
},
}
@@ -197,7 +174,9 @@ func TestSyncManifestWorkController(t *testing.T) {
t.Fatal(err)
}
controller := &ManifestWorkFinalizeController{
manifestWorkClient: fakeClient.WorkV1().ManifestWorks("cluster1"),
patcher: patcher.NewPatcher[
*workapiv1.ManifestWork, workapiv1.ManifestWorkSpec, workapiv1.ManifestWorkStatus](
fakeClient.WorkV1().ManifestWorks("cluster1")),
manifestWorkLister: informerFactory.Work().V1().ManifestWorks().Lister().ManifestWorks("cluster1"),
appliedManifestWorkClient: fakeClient.WorkV1().AppliedManifestWorks(),
appliedManifestWorkLister: informerFactory.Work().V1().AppliedManifestWorks().Lister(),

View File

@@ -2,19 +2,16 @@ package finalizercontroller
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
jsonpatch "github.com/evanphx/json-patch"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
@@ -23,12 +20,14 @@ import (
worklister "open-cluster-management.io/api/client/work/listers/work/v1"
workapiv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/ocm/pkg/common/patcher"
"open-cluster-management.io/ocm/pkg/work/helper"
)
type unmanagedAppliedWorkController struct {
manifestWorkLister worklister.ManifestWorkNamespaceLister
appliedManifestWorkClient workv1client.AppliedManifestWorkInterface
patcher patcher.Patcher[*workapiv1.AppliedManifestWork, workapiv1.AppliedManifestWorkSpec, workapiv1.AppliedManifestWorkStatus]
appliedManifestWorkLister worklister.AppliedManifestWorkLister
hubHash string
agentID string
@@ -57,6 +56,9 @@ func NewUnManagedAppliedWorkController(
controller := &unmanagedAppliedWorkController{
manifestWorkLister: manifestWorkLister,
appliedManifestWorkClient: appliedManifestWorkClient,
patcher: patcher.NewPatcher[
*workapiv1.AppliedManifestWork, workapiv1.AppliedManifestWorkSpec, workapiv1.AppliedManifestWorkStatus](
appliedManifestWorkClient),
appliedManifestWorkLister: appliedManifestWorkInformer.Lister(),
hubHash: hubHash,
agentID: agentID,
@@ -138,33 +140,8 @@ func (m *unmanagedAppliedWorkController) stopToEvictAppliedManifestWork(
func (m *unmanagedAppliedWorkController) patchEvictionStartTime(ctx context.Context,
appliedManifestWork *workapiv1.AppliedManifestWork, evictionStartTime *metav1.Time) error {
oldData, err := json.Marshal(workapiv1.AppliedManifestWork{
Status: workapiv1.AppliedManifestWorkStatus{
EvictionStartTime: appliedManifestWork.Status.EvictionStartTime,
},
})
if err != nil {
return fmt.Errorf("failed to Marshal old data for appliedmanifestwork status %s: %w", appliedManifestWork.Name, err)
}
newData, err := json.Marshal(workapiv1.AppliedManifestWork{
ObjectMeta: metav1.ObjectMeta{
UID: appliedManifestWork.UID,
ResourceVersion: appliedManifestWork.ResourceVersion,
},
Status: workapiv1.AppliedManifestWorkStatus{
EvictionStartTime: evictionStartTime,
},
})
if err != nil {
return fmt.Errorf("failed to Marshal new data for appliedmanifestwork status %s: %w", appliedManifestWork.Name, err)
}
patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return fmt.Errorf("failed to create patch for cluster %s: %w", appliedManifestWork.Name, err)
}
_, err = m.appliedManifestWorkClient.Patch(ctx, appliedManifestWork.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status")
newAppliedWork := appliedManifestWork.DeepCopy()
newAppliedWork.Status.EvictionStartTime = evictionStartTime
_, err := m.patcher.PatchStatus(ctx, newAppliedWork, newAppliedWork.Status, appliedManifestWork.Status)
return err
}

View File

@@ -14,6 +14,7 @@ import (
workinformers "open-cluster-management.io/api/client/work/informers/externalversions"
workapiv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/ocm/pkg/common/patcher"
testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
)
@@ -36,7 +37,7 @@ func TestSyncUnamanagedAppliedWork(t *testing.T) {
agentID: "test-agent",
works: []runtime.Object{},
appliedWorks: []runtime.Object{},
validateAppliedManifestWorkActions: noAction,
validateAppliedManifestWorkActions: testingcommon.AssertNoActions,
},
{
name: "evict appliedmanifestwork when its relating manifestwork is missing on the hub",
@@ -183,7 +184,7 @@ func TestSyncUnamanagedAppliedWork(t *testing.T) {
},
},
expectedQueueLen: 1,
validateAppliedManifestWorkActions: noAction,
validateAppliedManifestWorkActions: testingcommon.AssertNoActions,
},
}
@@ -205,6 +206,9 @@ func TestSyncUnamanagedAppliedWork(t *testing.T) {
controller := &unmanagedAppliedWorkController{
manifestWorkLister: informerFactory.Work().V1().ManifestWorks().Lister().ManifestWorks("test"),
appliedManifestWorkClient: fakeClient.WorkV1().AppliedManifestWorks(),
patcher: patcher.NewPatcher[
*workapiv1.AppliedManifestWork, workapiv1.AppliedManifestWorkSpec, workapiv1.AppliedManifestWorkStatus](
fakeClient.WorkV1().AppliedManifestWorks()),
appliedManifestWorkLister: informerFactory.Work().V1().AppliedManifestWorks().Lister(),
hubHash: c.hubHash,
agentID: c.agentID,

View File

@@ -2,16 +2,13 @@ package manifestcontroller
import (
"context"
"encoding/json"
"fmt"
"time"
jsonpatch "github.com/evanphx/json-patch"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
"github.com/pkg/errors"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -29,6 +26,7 @@ import (
worklister "open-cluster-management.io/api/client/work/listers/work/v1"
workapiv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/ocm/pkg/common/patcher"
"open-cluster-management.io/ocm/pkg/work/helper"
"open-cluster-management.io/ocm/pkg/work/spoke/apply"
"open-cluster-management.io/ocm/pkg/work/spoke/auth"
@@ -44,16 +42,17 @@ var (
// ManifestWorkController is to reconcile the workload resources
// fetched from hub cluster on spoke cluster.
type ManifestWorkController struct {
manifestWorkClient workv1client.ManifestWorkInterface
manifestWorkLister worklister.ManifestWorkNamespaceLister
appliedManifestWorkClient workv1client.AppliedManifestWorkInterface
appliedManifestWorkLister worklister.AppliedManifestWorkLister
spokeDynamicClient dynamic.Interface
hubHash string
agentID string
restMapper meta.RESTMapper
appliers *apply.Appliers
validator auth.ExecutorValidator
manifestWorkPatcher patcher.Patcher[*workapiv1.ManifestWork, workapiv1.ManifestWorkSpec, workapiv1.ManifestWorkStatus]
manifestWorkLister worklister.ManifestWorkNamespaceLister
appliedManifestWorkClient workv1client.AppliedManifestWorkInterface
appliedManifestWorkPatcher patcher.Patcher[*workapiv1.AppliedManifestWork, workapiv1.AppliedManifestWorkSpec, workapiv1.AppliedManifestWorkStatus]
appliedManifestWorkLister worklister.AppliedManifestWorkLister
spokeDynamicClient dynamic.Interface
hubHash string
agentID string
restMapper meta.RESTMapper
appliers *apply.Appliers
validator auth.ExecutorValidator
}
type applyResult struct {
@@ -79,9 +78,14 @@ func NewManifestWorkController(
validator auth.ExecutorValidator) factory.Controller {
controller := &ManifestWorkController{
manifestWorkClient: manifestWorkClient,
manifestWorkPatcher: patcher.NewPatcher[
*workapiv1.ManifestWork, workapiv1.ManifestWorkSpec, workapiv1.ManifestWorkStatus](
manifestWorkClient),
manifestWorkLister: manifestWorkLister,
appliedManifestWorkClient: appliedManifestWorkClient,
appliedManifestWorkPatcher: patcher.NewPatcher[
*workapiv1.AppliedManifestWork, workapiv1.AppliedManifestWorkSpec, workapiv1.AppliedManifestWorkStatus](
appliedManifestWorkClient),
appliedManifestWorkLister: appliedManifestWorkInformer.Lister(),
spokeDynamicClient: spokeDynamicClient,
hubHash: hubHash,
@@ -110,7 +114,7 @@ func (m *ManifestWorkController) sync(ctx context.Context, controllerContext fac
manifestWorkName := controllerContext.QueueKey()
klog.V(4).Infof("Reconciling ManifestWork %q", manifestWorkName)
manifestWork, err := m.manifestWorkLister.Get(manifestWorkName)
oldManifestWork, err := m.manifestWorkLister.Get(manifestWorkName)
if apierrors.IsNotFound(err) {
// work not found, could have been deleted, do nothing.
return nil
@@ -118,7 +122,7 @@ func (m *ManifestWorkController) sync(ctx context.Context, controllerContext fac
if err != nil {
return err
}
manifestWork = manifestWork.DeepCopy()
manifestWork := oldManifestWork.DeepCopy()
// no work to do if we're deleted
if !manifestWork.DeletionTimestamp.IsZero() {
@@ -190,10 +194,28 @@ func (m *ManifestWorkController) sync(ctx context.Context, controllerContext fac
errs = append(errs, result.Error)
}
}
manifestWork.Status.ResourceStatus.Manifests = helper.MergeManifestConditions(
manifestWork.Status.ResourceStatus.Manifests, newManifestConditions)
// handle condition type Applied
// #1: Applied - work status condition (with type Applied) is applied if all manifest conditions (with type Applied) are applied
if inCondition, exists := allInCondition(string(workapiv1.ManifestApplied), newManifestConditions); exists {
appliedCondition := metav1.Condition{
Type: workapiv1.WorkApplied,
ObservedGeneration: manifestWork.Generation,
Status: metav1.ConditionFalse,
Reason: "AppliedManifestWorkFailed",
Message: "Failed to apply manifest work",
}
if inCondition {
appliedCondition.Status = metav1.ConditionTrue
appliedCondition.Reason = "AppliedManifestWorkComplete"
appliedCondition.Message = "Apply manifest work complete"
}
meta.SetStatusCondition(&manifestWork.Status.Conditions, appliedCondition)
}
// Update work status
_, updated, err := helper.UpdateManifestWorkStatus(
ctx, m.manifestWorkClient, manifestWork, m.generateUpdateStatusFunc(manifestWork.Generation, newManifestConditions))
updated, err := m.manifestWorkPatcher.PatchStatus(ctx, manifestWork, manifestWork.Status, oldManifestWork.Status)
if err != nil {
errs = append(errs, fmt.Errorf("failed to update work status with err %w", err))
}
@@ -233,38 +255,7 @@ func (m *ManifestWorkController) applyAppliedManifestWork(ctx context.Context, w
return nil, err
}
if equality.Semantic.DeepEqual(appliedManifestWork.Spec, requiredAppliedWork.Spec) {
return appliedManifestWork, nil
}
oldData, err := json.Marshal(workapiv1.AppliedManifestWork{
ObjectMeta: metav1.ObjectMeta{
Finalizers: appliedManifestWork.Finalizers,
},
Spec: appliedManifestWork.Spec,
})
if err != nil {
return nil, fmt.Errorf("failed to Marshal old data for appliedManifestWork %s: %w", appliedManifestWorkName, err)
}
newData, err := json.Marshal(workapiv1.AppliedManifestWork{
ObjectMeta: metav1.ObjectMeta{
UID: appliedManifestWork.UID,
ResourceVersion: appliedManifestWork.ResourceVersion,
Finalizers: requiredAppliedWork.Finalizers,
}, // to ensure they appear in the patch as preconditions
Spec: requiredAppliedWork.Spec,
})
if err != nil {
return nil, fmt.Errorf("failed to Marshal new data for appliedManifestWork %s: %w", appliedManifestWorkName, err)
}
patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return nil, fmt.Errorf("failed to create patch for appliedManifestWork %s: %w", appliedManifestWorkName, err)
}
appliedManifestWork, err = m.appliedManifestWorkClient.Patch(ctx, appliedManifestWorkName, types.MergePatchType, patchBytes, metav1.PatchOptions{})
_, err = m.appliedManifestWorkPatcher.PatchSpec(ctx, appliedManifestWork, requiredAppliedWork.Spec, appliedManifestWork.Spec)
return appliedManifestWork, err
}
@@ -360,43 +351,6 @@ func manageOwnerRef(
return *ownerCopy
}
// generateUpdateStatusFunc returns a function which aggregates manifest conditions and generates work conditions.
// Rules to generate work status conditions from manifest conditions
// #1: Applied - work status condition (with type Applied) is applied if all manifest conditions (with type Applied) are applied
// TODO: add rules for other condition types, like Progressing, Available, Degraded
func (m *ManifestWorkController) generateUpdateStatusFunc(generation int64,
newManifestConditions []workapiv1.ManifestCondition) helper.UpdateManifestWorkStatusFunc {
return func(oldStatus *workapiv1.ManifestWorkStatus) error {
// merge the new manifest conditions with the existing manifest conditions
oldStatus.ResourceStatus.Manifests = helper.MergeManifestConditions(
oldStatus.ResourceStatus.Manifests, newManifestConditions)
// aggregate manifest condition to generate work condition
newConditions := []metav1.Condition{}
// handle condition type Applied
if inCondition, exists := allInCondition(string(workapiv1.ManifestApplied), newManifestConditions); exists {
appliedCondition := metav1.Condition{
Type: workapiv1.WorkApplied,
ObservedGeneration: generation,
}
if inCondition {
appliedCondition.Status = metav1.ConditionTrue
appliedCondition.Reason = "AppliedManifestWorkComplete"
appliedCondition.Message = "Apply manifest work complete"
} else {
appliedCondition.Status = metav1.ConditionFalse
appliedCondition.Reason = "AppliedManifestWorkFailed"
appliedCondition.Message = "Failed to apply manifest work"
}
newConditions = append(newConditions, appliedCondition)
}
oldStatus.Conditions = helper.MergeStatusConditions(oldStatus.Conditions, newConditions)
return nil
}
}
// allInCondition checks status of conditions with a particular type in ManifestCondition array.
// Return true only if conditions with the condition type exist and they are all in condition.
func allInCondition(conditionType string, manifests []workapiv1.ManifestCondition) (inCondition bool, exists bool) {

View File

@@ -2,6 +2,7 @@ package manifestcontroller
import (
"context"
"encoding/json"
"fmt"
"testing"
"time"
@@ -23,6 +24,7 @@ import (
workinformers "open-cluster-management.io/api/client/work/informers/externalversions"
workapiv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/ocm/pkg/common/patcher"
testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
"open-cluster-management.io/ocm/pkg/work/helper"
"open-cluster-management.io/ocm/pkg/work/spoke/apply"
@@ -43,8 +45,13 @@ func newController(t *testing.T, work *workapiv1.ManifestWork, appliedWork *work
workInformerFactory := workinformers.NewSharedInformerFactoryWithOptions(fakeWorkClient, 5*time.Minute, workinformers.WithNamespace("cluster1"))
spokeKubeClient := fakekube.NewSimpleClientset()
controller := &ManifestWorkController{
manifestWorkClient: fakeWorkClient.WorkV1().ManifestWorks("cluster1"),
manifestWorkLister: workInformerFactory.Work().V1().ManifestWorks().Lister().ManifestWorks("cluster1"),
manifestWorkPatcher: patcher.NewPatcher[
*workapiv1.ManifestWork, workapiv1.ManifestWorkSpec, workapiv1.ManifestWorkStatus](
fakeWorkClient.WorkV1().ManifestWorks("cluster1")),
manifestWorkLister: workInformerFactory.Work().V1().ManifestWorks().Lister().ManifestWorks("cluster1"),
appliedManifestWorkPatcher: patcher.NewPatcher[
*workapiv1.AppliedManifestWork, workapiv1.AppliedManifestWorkSpec, workapiv1.AppliedManifestWorkStatus](
fakeWorkClient.WorkV1().AppliedManifestWorks()),
appliedManifestWorkClient: fakeWorkClient.WorkV1().AppliedManifestWorks(),
appliedManifestWorkLister: workInformerFactory.Work().V1().AppliedManifestWorks().Lister(),
restMapper: mapper,
@@ -212,11 +219,15 @@ func (t *testCase) validate(
spokeKubeActions := kubeClient.Actions()
testingcommon.AssertActions(ts, spokeKubeActions, t.expectedKubeAction...)
actual, ok := actualWorkActions[len(actualWorkActions)-1].(clienttesting.UpdateActionImpl)
actual, ok := actualWorkActions[len(actualWorkActions)-1].(clienttesting.PatchActionImpl)
if !ok {
ts.Errorf("Expected to get update action")
ts.Errorf("Expected to get patch action")
}
p := actual.Patch
actualWork := &workapiv1.ManifestWork{}
if err := json.Unmarshal(p, actualWork); err != nil {
ts.Fatal(err)
}
actualWork := actual.Object.(*workapiv1.ManifestWork)
for index, cond := range t.expectedManifestConditions {
assertManifestCondition(ts, actualWork.Status.ResourceStatus.Manifests, int32(index), cond.conditionType, cond.status)
}
@@ -265,14 +276,14 @@ func TestSync(t *testing.T) {
cases := []*testCase{
newTestCase("create single resource").
withWorkManifest(spoketesting.NewUnstructured("v1", "Secret", "ns1", "test")).
withExpectedWorkAction("update").
withExpectedWorkAction("patch").
withAppliedWorkAction("create").
withExpectedKubeAction("get", "create").
withExpectedManifestCondition(expectedCondition{string(workapiv1.ManifestApplied), metav1.ConditionTrue}).
withExpectedWorkCondition(expectedCondition{string(workapiv1.WorkApplied), metav1.ConditionTrue}),
newTestCase("create single deployment resource").
withWorkManifest(spoketesting.NewUnstructured("apps/v1", "Deployment", "ns1", "test")).
withExpectedWorkAction("update").
withExpectedWorkAction("patch").
withAppliedWorkAction("create").
withExpectedDynamicAction("get", "create").
withExpectedManifestCondition(expectedCondition{string(workapiv1.ManifestApplied), metav1.ConditionTrue}).
@@ -280,14 +291,14 @@ func TestSync(t *testing.T) {
newTestCase("update single resource").
withWorkManifest(spoketesting.NewUnstructured("v1", "Secret", "ns1", "test")).
withSpokeObject(spoketesting.NewSecret("test", "ns1", "value2")).
withExpectedWorkAction("update").
withExpectedWorkAction("patch").
withAppliedWorkAction("create").
withExpectedKubeAction("get", "delete", "create").
withExpectedManifestCondition(expectedCondition{string(workapiv1.ManifestApplied), metav1.ConditionTrue}).
withExpectedWorkCondition(expectedCondition{string(workapiv1.WorkApplied), metav1.ConditionTrue}),
newTestCase("create single unstructured resource").
withWorkManifest(spoketesting.NewUnstructured("v1", "NewObject", "ns1", "test")).
withExpectedWorkAction("update").
withExpectedWorkAction("patch").
withAppliedWorkAction("create").
withExpectedDynamicAction("get", "create").
withExpectedManifestCondition(expectedCondition{string(workapiv1.ManifestApplied), metav1.ConditionTrue}).
@@ -295,7 +306,7 @@ func TestSync(t *testing.T) {
newTestCase("update single unstructured resource").
withWorkManifest(spoketesting.NewUnstructuredWithContent("v1", "NewObject", "ns1", "n1", map[string]interface{}{"spec": map[string]interface{}{"key1": "val1"}})).
withSpokeDynamicObject(spoketesting.NewUnstructuredWithContent("v1", "NewObject", "ns1", "n1", map[string]interface{}{"spec": map[string]interface{}{"key1": "val2"}})).
withExpectedWorkAction("update").
withExpectedWorkAction("patch").
withAppliedWorkAction("create").
withExpectedDynamicAction("get", "update").
withExpectedManifestCondition(expectedCondition{string(workapiv1.ManifestApplied), metav1.ConditionTrue}).
@@ -303,7 +314,7 @@ func TestSync(t *testing.T) {
newTestCase("multiple create&update resource").
withWorkManifest(spoketesting.NewUnstructured("v1", "Secret", "ns1", "test"), spoketesting.NewUnstructured("v1", "Secret", "ns2", "test")).
withSpokeObject(spoketesting.NewSecret("test", "ns1", "value2")).
withExpectedWorkAction("update").
withExpectedWorkAction("patch").
withAppliedWorkAction("create").
withExpectedKubeAction("get", "delete", "create", "get", "create").
withExpectedManifestCondition(expectedCondition{string(workapiv1.ManifestApplied), metav1.ConditionTrue}, expectedCondition{string(workapiv1.ManifestApplied), metav1.ConditionTrue}).
@@ -333,7 +344,7 @@ func TestFailedToApplyResource(t *testing.T) {
tc := newTestCase("multiple create&update resource").
withWorkManifest(spoketesting.NewUnstructured("v1", "Secret", "ns1", "test"), spoketesting.NewUnstructured("v1", "Secret", "ns2", "test")).
withSpokeObject(spoketesting.NewSecret("test", "ns1", "value2")).
withExpectedWorkAction("update").
withExpectedWorkAction("patch").
withAppliedWorkAction("create").
withExpectedKubeAction("get", "delete", "create", "get", "create").
withExpectedManifestCondition(expectedCondition{string(workapiv1.ManifestApplied), metav1.ConditionTrue}, expectedCondition{string(workapiv1.ManifestApplied), metav1.ConditionFalse}).
@@ -372,7 +383,7 @@ func TestUpdateStrategy(t *testing.T) {
withWorkManifest(spoketesting.NewUnstructuredWithContent("v1", "NewObject", "ns1", "n1", map[string]interface{}{"spec": map[string]interface{}{"key1": "val1"}})).
withSpokeDynamicObject(spoketesting.NewUnstructuredWithContent("v1", "NewObject", "ns1", "n1", map[string]interface{}{"spec": map[string]interface{}{"key1": "val2"}})).
withManifestConfig(newManifestConfigOption("", "newobjects", "ns1", "n1", nil)).
withExpectedWorkAction("update").
withExpectedWorkAction("patch").
withAppliedWorkAction("create").
withExpectedDynamicAction("get", "update").
withExpectedManifestCondition(expectedCondition{string(workapiv1.ManifestApplied), metav1.ConditionTrue}).
@@ -381,7 +392,7 @@ func TestUpdateStrategy(t *testing.T) {
withWorkManifest(spoketesting.NewUnstructuredWithContent("v1", "NewObject", "ns1", "n1", map[string]interface{}{"spec": map[string]interface{}{"key1": "val1"}})).
withSpokeDynamicObject(spoketesting.NewUnstructuredWithContent("v1", "NewObject", "ns1", "n1", map[string]interface{}{"spec": map[string]interface{}{"key1": "val2"}})).
withManifestConfig(newManifestConfigOption("", "newobjects", "ns1", "n1", &workapiv1.UpdateStrategy{Type: workapiv1.UpdateStrategyTypeUpdate})).
withExpectedWorkAction("update").
withExpectedWorkAction("patch").
withAppliedWorkAction("create").
withExpectedDynamicAction("get", "update").
withExpectedManifestCondition(expectedCondition{string(workapiv1.ManifestApplied), metav1.ConditionTrue}).
@@ -390,7 +401,7 @@ func TestUpdateStrategy(t *testing.T) {
withWorkManifest(spoketesting.NewUnstructuredWithContent("v1", "NewObject", "ns1", "n1", map[string]interface{}{"spec": map[string]interface{}{"key1": "val1"}})).
withSpokeDynamicObject(spoketesting.NewUnstructuredWithContent("v1", "NewObject", "ns1", "n1", map[string]interface{}{"spec": map[string]interface{}{"key1": "val2"}})).
withManifestConfig(newManifestConfigOption("", "newobjects", "ns1", "n2", &workapiv1.UpdateStrategy{Type: workapiv1.UpdateStrategyTypeServerSideApply})).
withExpectedWorkAction("update").
withExpectedWorkAction("patch").
withAppliedWorkAction("create").
withExpectedDynamicAction("get", "update").
withExpectedManifestCondition(expectedCondition{string(workapiv1.ManifestApplied), metav1.ConditionTrue}).
@@ -398,7 +409,7 @@ func TestUpdateStrategy(t *testing.T) {
newTestCase("create single resource with server side apply updateStrategy").
withWorkManifest(spoketesting.NewUnstructuredWithContent("v1", "NewObject", "ns1", "n1", map[string]interface{}{"spec": map[string]interface{}{"key1": "val1"}})).
withManifestConfig(newManifestConfigOption("", "newobjects", "ns1", "n1", &workapiv1.UpdateStrategy{Type: workapiv1.UpdateStrategyTypeServerSideApply})).
withExpectedWorkAction("update").
withExpectedWorkAction("patch").
withAppliedWorkAction("create").
withExpectedDynamicAction("patch", "patch").
withExpectedManifestCondition(expectedCondition{string(workapiv1.ManifestApplied), metav1.ConditionTrue}).
@@ -407,7 +418,7 @@ func TestUpdateStrategy(t *testing.T) {
withWorkManifest(spoketesting.NewUnstructuredWithContent("v1", "NewObject", "ns1", "n1", map[string]interface{}{"spec": map[string]interface{}{"key1": "val1"}})).
withSpokeDynamicObject(spoketesting.NewUnstructuredWithContent("v1", "NewObject", "ns1", "n1", map[string]interface{}{"spec": map[string]interface{}{"key1": "val2"}})).
withManifestConfig(newManifestConfigOption("", "newobjects", "ns1", "n1", &workapiv1.UpdateStrategy{Type: workapiv1.UpdateStrategyTypeServerSideApply})).
withExpectedWorkAction("update").
withExpectedWorkAction("patch").
withAppliedWorkAction("create").
withExpectedDynamicAction("patch", "patch").
withExpectedManifestCondition(expectedCondition{string(workapiv1.ManifestApplied), metav1.ConditionTrue}).
@@ -416,7 +427,7 @@ func TestUpdateStrategy(t *testing.T) {
withWorkManifest(spoketesting.NewUnstructuredWithContent("v1", "NewObject", "ns1", "n1", map[string]interface{}{"spec": map[string]interface{}{"key1": "val1"}})).
withSpokeDynamicObject(spoketesting.NewUnstructuredWithContent("v1", "NewObject", "ns1", "n1", map[string]interface{}{"spec": map[string]interface{}{"key1": "val2"}})).
withManifestConfig(newManifestConfigOption("", "newobjects", "ns1", "n1", &workapiv1.UpdateStrategy{Type: workapiv1.UpdateStrategyTypeCreateOnly})).
withExpectedWorkAction("update").
withExpectedWorkAction("patch").
withAppliedWorkAction("create").
withExpectedDynamicAction("get", "patch").
withExpectedManifestCondition(expectedCondition{string(workapiv1.ManifestApplied), metav1.ConditionTrue}).
@@ -452,7 +463,7 @@ func TestServerSideApplyConflict(t *testing.T) {
withWorkManifest(spoketesting.NewUnstructuredWithContent("v1", "NewObject", "ns1", "n1", map[string]interface{}{"spec": map[string]interface{}{"key1": "val1"}})).
withSpokeDynamicObject(spoketesting.NewUnstructuredWithContent("v1", "NewObject", "ns1", "n1", map[string]interface{}{"spec": map[string]interface{}{"key1": "val2"}})).
withManifestConfig(newManifestConfigOption("", "newobjects", "ns1", "n1", &workapiv1.UpdateStrategy{Type: workapiv1.UpdateStrategyTypeServerSideApply})).
withExpectedWorkAction("update").
withExpectedWorkAction("patch").
withAppliedWorkAction("create").
withExpectedDynamicAction("patch").
withExpectedManifestCondition(expectedCondition{string(workapiv1.ManifestApplied), metav1.ConditionFalse}).
@@ -490,97 +501,6 @@ func newManifestConfigOption(group, resource, namespace, name string, strategy *
}
}
func TestGenerateUpdateStatusFunc(t *testing.T) {
transitionTime := metav1.Now()
cases := []struct {
name string
startingStatusConditions []metav1.Condition
manifestConditions []workapiv1.ManifestCondition
generation int64
expectedStatusConditions []metav1.Condition
}{
{
name: "no manifest condition exists",
manifestConditions: []workapiv1.ManifestCondition{},
expectedStatusConditions: []metav1.Condition{},
},
{
name: "all manifests are applied successfully",
manifestConditions: []workapiv1.ManifestCondition{
newManifestCondition(0, "resource0", newCondition(string(workapiv1.ManifestApplied), string(metav1.ConditionTrue), "my-reason", "my-message", 0, nil)),
newManifestCondition(1, "resource1", newCondition(string(workapiv1.ManifestApplied), string(metav1.ConditionTrue), "my-reason", "my-message", 0, nil)),
},
expectedStatusConditions: []metav1.Condition{
newCondition(string(workapiv1.WorkApplied), string(metav1.ConditionTrue), "AppliedManifestWorkComplete", "Apply manifest work complete", 0, nil),
},
},
{
name: "one of manifests is not applied",
manifestConditions: []workapiv1.ManifestCondition{
newManifestCondition(0, "resource0", newCondition(string(workapiv1.ManifestApplied), string(metav1.ConditionTrue), "my-reason", "my-message", 0, nil)),
newManifestCondition(1, "resource1", newCondition(string(workapiv1.ManifestApplied), string(metav1.ConditionFalse), "my-reason", "my-message", 0, nil)),
},
expectedStatusConditions: []metav1.Condition{
newCondition(string(workapiv1.WorkApplied), string(metav1.ConditionFalse), "AppliedManifestWorkFailed", "Failed to apply manifest work", 0, nil),
},
},
{
name: "update existing status condition",
startingStatusConditions: []metav1.Condition{
newCondition(string(workapiv1.WorkApplied), string(metav1.ConditionTrue), "AppliedManifestWorkComplete", "Apply manifest work complete", 0, &transitionTime),
},
generation: 1,
manifestConditions: []workapiv1.ManifestCondition{
newManifestCondition(0, "resource0", newCondition(string(workapiv1.ManifestApplied), string(metav1.ConditionTrue), "my-reason", "my-message", 0, nil)),
newManifestCondition(1, "resource1", newCondition(string(workapiv1.ManifestApplied), string(metav1.ConditionTrue), "my-reason", "my-message", 0, nil)),
},
expectedStatusConditions: []metav1.Condition{
newCondition(string(workapiv1.WorkApplied), string(metav1.ConditionTrue), "AppliedManifestWorkComplete", "Apply manifest work complete", 1, &transitionTime),
},
},
{
name: "override existing status conditions",
startingStatusConditions: []metav1.Condition{
newCondition(string(workapiv1.WorkApplied), string(metav1.ConditionTrue), "AppliedManifestWorkComplete", "Apply manifest work complete", 0, nil),
},
manifestConditions: []workapiv1.ManifestCondition{
newManifestCondition(0, "resource0", newCondition(string(workapiv1.ManifestApplied), string(metav1.ConditionTrue), "my-reason", "my-message", 0, nil)),
newManifestCondition(1, "resource1", newCondition(string(workapiv1.ManifestApplied), string(metav1.ConditionFalse), "my-reason", "my-message", 0, nil)),
},
generation: 1,
expectedStatusConditions: []metav1.Condition{
newCondition(string(workapiv1.WorkApplied), string(metav1.ConditionFalse), "AppliedManifestWorkFailed", "Failed to apply manifest work", 1, nil),
},
},
}
controller := &ManifestWorkController{}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
updateStatusFunc := controller.generateUpdateStatusFunc(c.generation, c.manifestConditions)
manifestWorkStatus := &workapiv1.ManifestWorkStatus{
Conditions: c.startingStatusConditions,
}
err := updateStatusFunc(manifestWorkStatus)
if err != nil {
t.Errorf("Should be success with no err: %v", err)
}
for i, expect := range c.expectedStatusConditions {
actual := manifestWorkStatus.Conditions[i]
if expect.LastTransitionTime == (metav1.Time{}) {
actual.LastTransitionTime = metav1.Time{}
}
if !equality.Semantic.DeepEqual(actual, expect) {
t.Errorf(diff.ObjectDiff(actual, expect))
}
}
})
}
}
func TestAllInCondition(t *testing.T) {
cases := []struct {
name string

View File

@@ -24,6 +24,7 @@ import (
worklister "open-cluster-management.io/api/client/work/listers/work/v1"
workapiv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/ocm/pkg/common/patcher"
"open-cluster-management.io/ocm/pkg/work/helper"
"open-cluster-management.io/ocm/pkg/work/spoke/controllers"
"open-cluster-management.io/ocm/pkg/work/spoke/statusfeedback"
@@ -36,7 +37,7 @@ const statusFeedbackConditionType = "StatusFeedbackSynced"
// are logically disinct, however, they are put in the same control loop to reduce live get call to spoke apiserver
// and status update call to hub apiserver.
type AvailableStatusController struct {
manifestWorkClient workv1client.ManifestWorkInterface
patcher patcher.Patcher[*workapiv1.ManifestWork, workapiv1.ManifestWorkSpec, workapiv1.ManifestWorkStatus]
manifestWorkLister worklister.ManifestWorkNamespaceLister
spokeDynamicClient dynamic.Interface
statusReader *statusfeedback.StatusReader
@@ -52,7 +53,9 @@ func NewAvailableStatusController(
syncInterval time.Duration,
) factory.Controller {
controller := &AvailableStatusController{
manifestWorkClient: manifestWorkClient,
patcher: patcher.NewPatcher[
*workapiv1.ManifestWork, workapiv1.ManifestWorkSpec, workapiv1.ManifestWorkStatus](
manifestWorkClient),
manifestWorkLister: manifestWorkLister,
spokeDynamicClient: spokeDynamicClient,
statusReader: statusfeedback.NewStatusReader(),
@@ -141,7 +144,7 @@ func (c *AvailableStatusController) syncManifestWork(ctx context.Context, origin
}
// update status of manifestwork. if this conflicts, try again later
_, err := c.manifestWorkClient.UpdateStatus(ctx, manifestWork, metav1.UpdateOptions{})
_, err := c.patcher.PatchStatus(ctx, manifestWork, manifestWork.Status, originalManifestWork.Status)
return err
}

View File

@@ -2,6 +2,7 @@ package statuscontroller
import (
"context"
"encoding/json"
"testing"
"github.com/davecgh/go-spew/spew"
@@ -15,6 +16,8 @@ import (
fakeworkclient "open-cluster-management.io/api/client/work/clientset/versioned/fake"
workapiv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/ocm/pkg/common/patcher"
testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
"open-cluster-management.io/ocm/pkg/work/spoke/controllers"
"open-cluster-management.io/ocm/pkg/work/spoke/spoketesting"
"open-cluster-management.io/ocm/pkg/work/spoke/statusfeedback"
@@ -39,12 +42,13 @@ func TestSyncManifestWork(t *testing.T) {
},
},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 1 {
t.Fatal(spew.Sdump(actions))
testingcommon.AssertActions(t, actions, "patch")
p := actions[0].(clienttesting.PatchActionImpl).Patch
work := &workapiv1.ManifestWork{}
if err := json.Unmarshal(p, work); err != nil {
t.Fatal(err)
}
work := actions[0].(clienttesting.UpdateAction).GetObject().(*workapiv1.ManifestWork)
if !hasStatusCondition(work.Status.Conditions, workapiv1.WorkAvailable, metav1.ConditionUnknown) {
t.Fatal(spew.Sdump(work.Status.Conditions))
}
@@ -58,11 +62,7 @@ func TestSyncManifestWork(t *testing.T) {
manifests: []workapiv1.ManifestCondition{
newManifestWthCondition("", "v1", "secrets", "ns1", "n1"),
},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 0 {
t.Fatal(spew.Sdump(actions))
}
},
validateActions: testingcommon.AssertNoActions,
},
{
name: "Do not update if existing conditions are correct",
@@ -83,11 +83,7 @@ func TestSyncManifestWork(t *testing.T) {
Message: "All resources are available",
},
},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 0 {
t.Fatal(spew.Sdump(actions))
}
},
validateActions: testingcommon.AssertNoActions,
},
{
name: "build status with existing resource",
@@ -103,11 +99,12 @@ func TestSyncManifestWork(t *testing.T) {
},
},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 1 {
t.Fatal(spew.Sdump(actions))
testingcommon.AssertActions(t, actions, "patch")
p := actions[0].(clienttesting.PatchActionImpl).Patch
work := &workapiv1.ManifestWork{}
if err := json.Unmarshal(p, work); err != nil {
t.Fatal(err)
}
work := actions[0].(clienttesting.UpdateAction).GetObject().(*workapiv1.ManifestWork)
if len(work.Status.ResourceStatus.Manifests) != 1 {
t.Fatal(spew.Sdump(work.Status.ResourceStatus.Manifests))
}
@@ -135,13 +132,11 @@ func TestSyncManifestWork(t *testing.T) {
},
},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 1 {
t.Fatal(spew.Sdump(actions))
}
work := actions[0].(clienttesting.UpdateAction).GetObject().(*workapiv1.ManifestWork)
if len(work.Status.ResourceStatus.Manifests) != 2 {
t.Fatal(spew.Sdump(work.Status.ResourceStatus.Manifests))
testingcommon.AssertActions(t, actions, "patch")
p := actions[0].(clienttesting.PatchActionImpl).Patch
work := &workapiv1.ManifestWork{}
if err := json.Unmarshal(p, work); err != nil {
t.Fatal(err)
}
if !hasStatusCondition(work.Status.ResourceStatus.Manifests[0].Conditions, string(workapiv1.ManifestAvailable), metav1.ConditionTrue) {
t.Fatal(spew.Sdump(work.Status.ResourceStatus.Manifests[0].Conditions))
@@ -170,11 +165,12 @@ func TestSyncManifestWork(t *testing.T) {
},
},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 1 {
t.Fatal(spew.Sdump(actions))
testingcommon.AssertActions(t, actions, "patch")
p := actions[0].(clienttesting.PatchActionImpl).Patch
work := &workapiv1.ManifestWork{}
if err := json.Unmarshal(p, work); err != nil {
t.Fatal(err)
}
work := actions[0].(clienttesting.UpdateAction).GetObject().(*workapiv1.ManifestWork)
if len(work.Status.ResourceStatus.Manifests) != 2 {
t.Fatal(spew.Sdump(work.Status.ResourceStatus.Manifests))
}
@@ -206,8 +202,10 @@ func TestSyncManifestWork(t *testing.T) {
fakeClient := fakeworkclient.NewSimpleClientset(testingWork)
fakeDynamicClient := fakedynamic.NewSimpleDynamicClient(runtime.NewScheme(), c.existingResources...)
controller := AvailableStatusController{
manifestWorkClient: fakeClient.WorkV1().ManifestWorks(testingWork.Namespace),
spokeDynamicClient: fakeDynamicClient,
patcher: patcher.NewPatcher[
*workapiv1.ManifestWork, workapiv1.ManifestWorkSpec, workapiv1.ManifestWorkStatus](
fakeClient.WorkV1().ManifestWorks(testingWork.Namespace)),
}
err := controller.syncManifestWork(context.TODO(), testingWork)
@@ -241,11 +239,12 @@ func TestStatusFeedback(t *testing.T) {
newManifest("", "v1", "secrets", "ns1", "n1"),
},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 1 {
t.Fatal(spew.Sdump(actions))
testingcommon.AssertActions(t, actions, "patch")
p := actions[0].(clienttesting.PatchActionImpl).Patch
work := &workapiv1.ManifestWork{}
if err := json.Unmarshal(p, work); err != nil {
t.Fatal(err)
}
work := actions[0].(clienttesting.UpdateAction).GetObject().(*workapiv1.ManifestWork)
if len(work.Status.ResourceStatus.Manifests) != 1 {
t.Fatal(spew.Sdump(work.Status.ResourceStatus.Manifests))
}
@@ -277,11 +276,12 @@ func TestStatusFeedback(t *testing.T) {
newManifest("apps", "v1", "deployments", "ns1", "deploy1"),
},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 1 {
t.Fatal(spew.Sdump(actions))
testingcommon.AssertActions(t, actions, "patch")
p := actions[0].(clienttesting.PatchActionImpl).Patch
work := &workapiv1.ManifestWork{}
if err := json.Unmarshal(p, work); err != nil {
t.Fatal(err)
}
work := actions[0].(clienttesting.UpdateAction).GetObject().(*workapiv1.ManifestWork)
if len(work.Status.ResourceStatus.Manifests) != 2 {
t.Fatal(spew.Sdump(work.Status.ResourceStatus.Manifests))
}
@@ -351,11 +351,12 @@ func TestStatusFeedback(t *testing.T) {
newManifest("apps", "v1", "deployments", "ns1", "deploy1"),
},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 1 {
t.Fatal(spew.Sdump(actions))
testingcommon.AssertActions(t, actions, "patch")
p := actions[0].(clienttesting.PatchActionImpl).Patch
work := &workapiv1.ManifestWork{}
if err := json.Unmarshal(p, work); err != nil {
t.Fatal(err)
}
work := actions[0].(clienttesting.UpdateAction).GetObject().(*workapiv1.ManifestWork)
if len(work.Status.ResourceStatus.Manifests) != 1 {
t.Fatal(spew.Sdump(work.Status.ResourceStatus.Manifests))
}
@@ -396,9 +397,11 @@ func TestStatusFeedback(t *testing.T) {
fakeClient := fakeworkclient.NewSimpleClientset(testingWork)
fakeDynamicClient := fakedynamic.NewSimpleDynamicClient(runtime.NewScheme(), c.existingResources...)
controller := AvailableStatusController{
manifestWorkClient: fakeClient.WorkV1().ManifestWorks(testingWork.Namespace),
spokeDynamicClient: fakeDynamicClient,
statusReader: statusfeedback.NewStatusReader(),
patcher: patcher.NewPatcher[
*workapiv1.ManifestWork, workapiv1.ManifestWorkSpec, workapiv1.ManifestWorkStatus](
fakeClient.WorkV1().ManifestWorks(testingWork.Namespace)),
}
err := controller.syncManifestWork(context.TODO(), testingWork)

View File

@@ -183,7 +183,6 @@ func (o *WorkloadAgentOptions) RunWorkloadAgent(ctx context.Context, controllerC
appliedManifestWorkController := appliedmanifestcontroller.NewAppliedManifestWorkController(
controllerContext.EventRecorder,
spokeDynamicClient,
hubWorkClient.WorkV1().ManifestWorks(o.AgentOptions.SpokeClusterName),
workInformerFactory.Work().V1().ManifestWorks(),
workInformerFactory.Work().V1().ManifestWorks().Lister().ManifestWorks(o.AgentOptions.SpokeClusterName),
spokeWorkClient.WorkV1().AppliedManifestWorks(),