diff --git a/pkg/common/patcher/patcher.go b/pkg/common/patcher/patcher.go new file mode 100644 index 000000000..4796548ea --- /dev/null +++ b/pkg/common/patcher/patcher.go @@ -0,0 +1,158 @@ +package patcher + +import ( + "context" + "encoding/json" + "fmt" + jsonpatch "github.com/evanphx/json-patch" + "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/klog/v2" +) + +// Patcher is just the Patch API with a generic to keep use sites type safe. +// This is inspired by the commiter code in https://github.com/kcp-dev/kcp/blob/main/pkg/reconciler/committer/committer.go +type PatchClient[R runtime.Object] interface { + Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (R, error) +} + +type Patcher[R runtime.Object, Sp any, St any] interface { + AddFinalizer(context.Context, R, string) (bool, error) + RemoveFinalizer(context.Context, R, string) error + PatchStatus(context.Context, R, St, St) (bool, error) + PatchSpec(context.Context, R, Sp, Sp) (bool, error) +} + +// Resource is a generic wrapper around resources so we can generate patches. +type Resource[Sp any, St any] struct { + metav1.ObjectMeta `json:"metadata,omitempty"` + Spec Sp `json:"spec"` + Status St `json:"status,omitempty"` +} + +type patcher[R runtime.Object, Sp any, St any] struct { + client PatchClient[R] +} + +func NewPatcher[R runtime.Object, Sp any, St any](client PatchClient[R]) *patcher[R, Sp, St] { + p := &patcher[R, Sp, St]{ + client: client, + } + return p +} + +func (p *patcher[R, Sp, St]) AddFinalizer(ctx context.Context, object R, finalizer string) (bool, error) { + hasFinalizer := false + accessor, err := meta.Accessor(object) + if err != nil { + return !hasFinalizer, err + } + + finalizers := accessor.GetFinalizers() + for i := range finalizers { + if finalizers[i] == finalizer { + hasFinalizer = true + break + } + } + if !hasFinalizer { + finalizerBytes, err := json.Marshal(append(finalizers, finalizer)) + if err != nil { + return !hasFinalizer, err + } + patch := fmt.Sprintf("{\"metadata\": {\"finalizers\": %s}}", string(finalizerBytes)) + + _, err = p.client.Patch( + ctx, accessor.GetName(), types.MergePatchType, []byte(patch), metav1.PatchOptions{}) + return !hasFinalizer, err + } + + return !hasFinalizer, nil +} + +func (p *patcher[R, Sp, St]) RemoveFinalizer(ctx context.Context, object R, finalizer string) error { + accessor, err := meta.Accessor(object) + if err != nil { + return err + } + + copiedFinalizers := []string{} + finalizers := accessor.GetFinalizers() + for i := range finalizers { + if finalizers[i] == finalizer { + continue + } + copiedFinalizers = append(copiedFinalizers, finalizers[i]) + } + + if len(finalizers) != len(copiedFinalizers) { + finalizerBytes, err := json.Marshal(copiedFinalizers) + if err != nil { + return err + } + patch := fmt.Sprintf("{\"metadata\": {\"finalizers\": %s}}", string(finalizerBytes)) + + _, err = p.client.Patch( + ctx, accessor.GetName(), types.MergePatchType, []byte(patch), metav1.PatchOptions{}) + return err + } + + return nil +} + +func (p *patcher[R, Sp, St]) patch(ctx context.Context, object R, newObject, oldObject *Resource[Sp, St], subresources ...string) error { + accessor, err := meta.Accessor(object) + if err != nil { + return err + } + + oldData, err := json.Marshal(oldObject) + if err != nil { + return fmt.Errorf("failed to Marshal old data for %s: %w", accessor.GetName(), err) + } + + newObject.UID = accessor.GetUID() + newObject.ResourceVersion = accessor.GetResourceVersion() + newData, err := json.Marshal(newObject) + if err != nil { + return fmt.Errorf("failed to Marshal new data for %s: %w", accessor.GetName(), err) + } + + patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData) + if err != nil { + return fmt.Errorf("failed to create patch for %s: %w", accessor.GetName(), err) + } + + _, err = p.client.Patch( + ctx, accessor.GetName(), types.MergePatchType, patchBytes, metav1.PatchOptions{}, subresources...) + if err != nil { + klog.V(2).Infof("Object with type %t and name %s is patched with patch %s", object, accessor.GetName(), string(patchBytes)) + } + return err +} + +func (p *patcher[R, Sp, St]) PatchStatus(ctx context.Context, object R, newStatus, oldStatus St) (bool, error) { + statusChanged := !equality.Semantic.DeepEqual(oldStatus, newStatus) + if !statusChanged { + return false, nil + } + + oldObject := &Resource[Sp, St]{Status: oldStatus} + newObject := &Resource[Sp, St]{Status: newStatus} + + return true, p.patch(ctx, object, newObject, oldObject, "status") +} + +func (p *patcher[R, Sp, St]) PatchSpec(ctx context.Context, object R, newSpec, oldSpec Sp) (bool, error) { + specChanged := !equality.Semantic.DeepEqual(newSpec, oldSpec) + if !specChanged { + return false, nil + } + + oldObject := &Resource[Sp, St]{Spec: oldSpec} + newObject := &Resource[Sp, St]{Spec: newSpec} + return true, p.patch(ctx, object, newObject, oldObject) +} diff --git a/pkg/common/patcher/patcher_test.go b/pkg/common/patcher/patcher_test.go new file mode 100644 index 000000000..4631dfdec --- /dev/null +++ b/pkg/common/patcher/patcher_test.go @@ -0,0 +1,237 @@ +package patcher + +import ( + "context" + "encoding/json" + "k8s.io/apimachinery/pkg/api/equality" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clienttesting "k8s.io/client-go/testing" + clusterfake "open-cluster-management.io/api/client/cluster/clientset/versioned/fake" + clusterv1 "open-cluster-management.io/api/cluster/v1" + testingcommon "open-cluster-management.io/ocm/pkg/common/testing" + testinghelpers "open-cluster-management.io/ocm/pkg/registration/helpers/testing" + "testing" +) + +func TestAddFinalizer(t *testing.T) { + cases := []struct { + name string + obj *clusterv1.ManagedCluster + finalizer string + validateActions func(t *testing.T, actions []clienttesting.Action) + }{ + { + name: "add finalizer", + obj: newManagedClusterWithFinalizer(), + finalizer: "test-finalizer", + validateActions: func(t *testing.T, actions []clienttesting.Action) { + testingcommon.AssertActions(t, actions, "patch") + patch := actions[0].(clienttesting.PatchAction).GetPatch() + managedCluster := &clusterv1.ManagedCluster{} + err := json.Unmarshal(patch, managedCluster) + if err != nil { + t.Fatal(err) + } + testinghelpers.AssertFinalizers(t, managedCluster, []string{"test-finalizer"}) + }, + }, + { + name: "no action", + obj: newManagedClusterWithFinalizer("test-finalizer-1", "test-finalizer"), + finalizer: "test-finalizer", + validateActions: testingcommon.AssertNoActions, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + clusterClient := clusterfake.NewSimpleClientset(c.obj) + patcher := NewPatcher[ + *clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus]( + clusterClient.ClusterV1().ManagedClusters()) + if _, err := patcher.AddFinalizer(context.TODO(), c.obj, c.finalizer); err != nil { + t.Error(err) + } + c.validateActions(t, clusterClient.Actions()) + }) + } +} + +func TestRemoveFinalizer(t *testing.T) { + cases := []struct { + name string + obj *clusterv1.ManagedCluster + finalizer string + validateActions func(t *testing.T, actions []clienttesting.Action) + }{ + { + name: "remove finalizer", + obj: newManagedClusterWithFinalizer("test-finalizer", "test-finalizer-1"), + finalizer: "test-finalizer", + validateActions: func(t *testing.T, actions []clienttesting.Action) { + testingcommon.AssertActions(t, actions, "patch") + patch := actions[0].(clienttesting.PatchAction).GetPatch() + managedCluster := &clusterv1.ManagedCluster{} + err := json.Unmarshal(patch, managedCluster) + if err != nil { + t.Fatal(err) + } + testinghelpers.AssertFinalizers(t, managedCluster, []string{"test-finalizer-1"}) + }, + }, + { + name: "no action", + obj: newManagedClusterWithFinalizer("test-finalizer-1"), + finalizer: "test-finalizer", + validateActions: testingcommon.AssertNoActions, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + clusterClient := clusterfake.NewSimpleClientset(c.obj) + patcher := NewPatcher[ + *clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus]( + clusterClient.ClusterV1().ManagedClusters()) + if err := patcher.RemoveFinalizer(context.TODO(), c.obj, c.finalizer); err != nil { + t.Error(err) + } + c.validateActions(t, clusterClient.Actions()) + }) + } +} + +func TestPatchSpec(t *testing.T) { + cases := []struct { + name string + obj *clusterv1.ManagedCluster + newObj *clusterv1.ManagedCluster + validateActions func(t *testing.T, actions []clienttesting.Action) + }{ + { + name: "patch spec", + obj: newManagedClusterWithTaint(clusterv1.Taint{Key: "key1"}), + newObj: newManagedClusterWithTaint(clusterv1.Taint{Key: "key2"}), + validateActions: func(t *testing.T, actions []clienttesting.Action) { + testingcommon.AssertActions(t, actions, "patch") + patch := actions[0].(clienttesting.PatchAction).GetPatch() + managedCluster := &clusterv1.ManagedCluster{} + err := json.Unmarshal(patch, managedCluster) + if err != nil { + t.Fatal(err) + } + if !equality.Semantic.DeepEqual(managedCluster.Spec, newManagedClusterWithTaint(clusterv1.Taint{Key: "key2"}).Spec) { + t.Errorf("not patched correctly got %v", managedCluster.Spec) + } + }, + }, + { + name: "no patch", + obj: newManagedClusterWithTaint(clusterv1.Taint{Key: "key1"}), + newObj: newManagedClusterWithTaint(clusterv1.Taint{Key: "key1"}), + validateActions: testingcommon.AssertNoActions, + }, + { + name: "no patch with status change", + obj: newManagedClusterWithConditions(metav1.Condition{Type: "Type1"}), + newObj: newManagedClusterWithConditions(metav1.Condition{Type: "Type2"}), + validateActions: testingcommon.AssertNoActions, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + clusterClient := clusterfake.NewSimpleClientset(c.obj) + patcher := NewPatcher[ + *clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus]( + clusterClient.ClusterV1().ManagedClusters()) + if _, err := patcher.PatchSpec(context.TODO(), c.obj, c.newObj.Spec, c.obj.Spec); err != nil { + t.Error(err) + } + c.validateActions(t, clusterClient.Actions()) + }) + } +} + +func TestPatchStatus(t *testing.T) { + cases := []struct { + name string + obj *clusterv1.ManagedCluster + newObj *clusterv1.ManagedCluster + validateActions func(t *testing.T, actions []clienttesting.Action) + }{ + { + name: "patch status", + obj: newManagedClusterWithConditions(metav1.Condition{Type: "Type1"}), + newObj: newManagedClusterWithConditions(metav1.Condition{Type: "Type2"}), + validateActions: func(t *testing.T, actions []clienttesting.Action) { + testingcommon.AssertActions(t, actions, "patch") + patch := actions[0].(clienttesting.PatchAction).GetPatch() + managedCluster := &clusterv1.ManagedCluster{} + err := json.Unmarshal(patch, managedCluster) + if err != nil { + t.Fatal(err) + } + if !equality.Semantic.DeepEqual(managedCluster.Status, newManagedClusterWithConditions(metav1.Condition{Type: "Type2"}).Status) { + t.Errorf("not patched correctly got %v", managedCluster.Status) + } + }, + }, + { + name: "no patch", + obj: newManagedClusterWithConditions(metav1.Condition{Type: "Type1"}), + newObj: newManagedClusterWithConditions(metav1.Condition{Type: "Type1"}), + validateActions: testingcommon.AssertNoActions, + }, + { + name: "no patch with spec change", + obj: newManagedClusterWithTaint(clusterv1.Taint{Key: "key1"}), + newObj: newManagedClusterWithTaint(clusterv1.Taint{Key: "key2"}), + validateActions: testingcommon.AssertNoActions, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + clusterClient := clusterfake.NewSimpleClientset(c.obj) + patcher := NewPatcher[ + *clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus]( + clusterClient.ClusterV1().ManagedClusters()) + if _, err := patcher.PatchStatus(context.TODO(), c.obj, c.newObj.Status, c.obj.Status); err != nil { + t.Error(err) + } + c.validateActions(t, clusterClient.Actions()) + }) + } +} + +func newManagedClusterWithFinalizer(finalizers ...string) *clusterv1.ManagedCluster { + return &clusterv1.ManagedCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + Finalizers: finalizers, + }, + } +} + +func newManagedClusterWithTaint(taints ...clusterv1.Taint) *clusterv1.ManagedCluster { + return &clusterv1.ManagedCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Spec: clusterv1.ManagedClusterSpec{ + Taints: taints, + }, + } +} + +func newManagedClusterWithConditions(conds ...metav1.Condition) *clusterv1.ManagedCluster { + return &clusterv1.ManagedCluster{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test", + }, + Status: clusterv1.ManagedClusterStatus{ + Conditions: conds, + }, + } +} diff --git a/pkg/registration/helpers/helpers.go b/pkg/registration/helpers/helpers.go index 7a6b64fe8..a215aaa6c 100644 --- a/pkg/registration/helpers/helpers.go +++ b/pkg/registration/helpers/helpers.go @@ -3,16 +3,11 @@ package helpers import ( "context" "embed" - "encoding/json" "fmt" "net/url" - addonv1alpha1 "open-cluster-management.io/api/addon/v1alpha1" - addonv1alpha1client "open-cluster-management.io/api/client/addon/clientset/versioned" - clusterclientset "open-cluster-management.io/api/client/cluster/clientset/versioned" clusterv1 "open-cluster-management.io/api/cluster/v1" - jsonpatch "github.com/evanphx/json-patch" "github.com/openshift/api" "github.com/openshift/library-go/pkg/assets" "github.com/openshift/library-go/pkg/operator/events" @@ -24,19 +19,15 @@ import ( certificatesv1beta1 "k8s.io/api/certificates/v1beta1" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" - "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" - "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/discovery/cached/memory" "k8s.io/client-go/kubernetes" "k8s.io/client-go/restmapper" - "k8s.io/client-go/util/retry" ) var ( @@ -49,148 +40,6 @@ func init() { utilruntime.Must(api.InstallKube(genericScheme)) } -type UpdateManagedClusterStatusFunc func(status *clusterv1.ManagedClusterStatus) error - -func UpdateManagedClusterStatus( - ctx context.Context, - client clusterclientset.Interface, - spokeClusterName string, - updateFuncs ...UpdateManagedClusterStatusFunc) (*clusterv1.ManagedClusterStatus, bool, error) { - updated := false - var updatedManagedClusterStatus *clusterv1.ManagedClusterStatus - - err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { - managedCluster, err := client.ClusterV1().ManagedClusters().Get(ctx, spokeClusterName, metav1.GetOptions{}) - if err != nil { - return err - } - oldStatus := &managedCluster.Status - - newStatus := oldStatus.DeepCopy() - for _, update := range updateFuncs { - if err := update(newStatus); err != nil { - return err - } - } - if equality.Semantic.DeepEqual(oldStatus, newStatus) { - // We return the newStatus which is a deep copy of oldStatus but with all update funcs applied. - updatedManagedClusterStatus = newStatus - return nil - } - - oldData, err := json.Marshal(clusterv1.ManagedCluster{ - Status: *oldStatus, - }) - - if err != nil { - return fmt.Errorf("failed to Marshal old data for cluster status %s: %w", managedCluster.Name, err) - } - - newData, err := json.Marshal(clusterv1.ManagedCluster{ - ObjectMeta: metav1.ObjectMeta{ - UID: managedCluster.UID, - ResourceVersion: managedCluster.ResourceVersion, - }, // to ensure they appear in the patch as preconditions - Status: *newStatus, - }) - if err != nil { - return fmt.Errorf("failed to Marshal new data for cluster status %s: %w", managedCluster.Name, err) - } - - patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData) - if err != nil { - return fmt.Errorf("failed to create patch for cluster %s: %w", managedCluster.Name, err) - } - - updatedManagedCluster, err := client.ClusterV1().ManagedClusters().Patch(ctx, managedCluster.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status") - - updatedManagedClusterStatus = &updatedManagedCluster.Status - updated = err == nil - return err - }) - - return updatedManagedClusterStatus, updated, err -} - -func UpdateManagedClusterConditionFn(cond metav1.Condition) UpdateManagedClusterStatusFunc { - return func(oldStatus *clusterv1.ManagedClusterStatus) error { - meta.SetStatusCondition(&oldStatus.Conditions, cond) - return nil - } -} - -type UpdateManagedClusterAddOnStatusFunc func(status *addonv1alpha1.ManagedClusterAddOnStatus) error - -func UpdateManagedClusterAddOnStatus( - ctx context.Context, - client addonv1alpha1client.Interface, - addOnNamespace, addOnName string, - updateFuncs ...UpdateManagedClusterAddOnStatusFunc) (*addonv1alpha1.ManagedClusterAddOnStatus, bool, error) { - updated := false - var updatedAddOnStatus *addonv1alpha1.ManagedClusterAddOnStatus - - err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { - addOn, err := client.AddonV1alpha1().ManagedClusterAddOns(addOnNamespace).Get(ctx, addOnName, metav1.GetOptions{}) - if err != nil { - return err - } - oldStatus := &addOn.Status - - newStatus := oldStatus.DeepCopy() - for _, update := range updateFuncs { - if err := update(newStatus); err != nil { - return err - } - } - if equality.Semantic.DeepEqual(oldStatus, newStatus) { - // We return the newStatus which is a deep copy of oldStatus but with all update funcs applied. - updatedAddOnStatus = newStatus - return nil - } - - oldData, err := json.Marshal(addonv1alpha1.ManagedClusterAddOn{ - Status: *oldStatus, - }) - - if err != nil { - return fmt.Errorf("failed to Marshal old data for addon status %s: %w", addOn.Name, err) - } - - newData, err := json.Marshal(addonv1alpha1.ManagedClusterAddOn{ - ObjectMeta: metav1.ObjectMeta{ - UID: addOn.UID, - ResourceVersion: addOn.ResourceVersion, - }, // to ensure they appear in the patch as preconditions - Status: *newStatus, - }) - if err != nil { - return fmt.Errorf("failed to Marshal new data for addon status %s: %w", addOn.Name, err) - } - - patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData) - if err != nil { - return fmt.Errorf("failed to create patch for cluster %s: %w", addOn.Name, err) - } - - updatedAddOn, err := client.AddonV1alpha1().ManagedClusterAddOns(addOnNamespace).Patch(ctx, addOn.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status") - if err != nil { - return err - } - updatedAddOnStatus = &updatedAddOn.Status - updated = err == nil - return err - }) - - return updatedAddOnStatus, updated, err -} - -func UpdateManagedClusterAddOnStatusFn(cond metav1.Condition) UpdateManagedClusterAddOnStatusFunc { - return func(oldStatus *addonv1alpha1.ManagedClusterAddOnStatus) error { - meta.SetStatusCondition(&oldStatus.Conditions, cond) - return nil - } -} - // Check whether a CSR is in terminal state func IsCSRInTerminalState(status *certificatesv1.CertificateSigningRequestStatus) bool { for _, c := range status.Conditions { diff --git a/pkg/registration/helpers/helpers_test.go b/pkg/registration/helpers/helpers_test.go index c7216a869..5192f6064 100644 --- a/pkg/registration/helpers/helpers_test.go +++ b/pkg/registration/helpers/helpers_test.go @@ -4,217 +4,22 @@ import ( "context" "encoding/json" "fmt" - "reflect" - "testing" - "time" - - addonv1alpha1 "open-cluster-management.io/api/addon/v1alpha1" - addonfake "open-cluster-management.io/api/client/addon/clientset/versioned/fake" - clusterfake "open-cluster-management.io/api/client/cluster/clientset/versioned/fake" clusterv1 "open-cluster-management.io/api/cluster/v1" testingcommon "open-cluster-management.io/ocm/pkg/common/testing" testinghelpers "open-cluster-management.io/ocm/pkg/registration/helpers/testing" + "reflect" + "testing" "github.com/openshift/library-go/pkg/operator/events/eventstesting" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" - "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/diff" fakekube "k8s.io/client-go/kubernetes/fake" clienttesting "k8s.io/client-go/testing" ) -func TestUpdateStatusCondition(t *testing.T) { - nowish := metav1.Now() - beforeish := metav1.Time{Time: nowish.Add(-10 * time.Second)} - afterish := metav1.Time{Time: nowish.Add(10 * time.Second)} - - cases := []struct { - name string - startingConditions []metav1.Condition - newCondition metav1.Condition - expextedUpdated bool - expectedConditions []metav1.Condition - }{ - { - name: "add to empty", - startingConditions: []metav1.Condition{}, - newCondition: testinghelpers.NewManagedClusterCondition("test", "True", "my-reason", "my-message", nil), - expextedUpdated: true, - expectedConditions: []metav1.Condition{testinghelpers.NewManagedClusterCondition("test", "True", "my-reason", "my-message", nil)}, - }, - { - name: "add to non-conflicting", - startingConditions: []metav1.Condition{ - testinghelpers.NewManagedClusterCondition("two", "True", "my-reason", "my-message", nil), - }, - newCondition: testinghelpers.NewManagedClusterCondition("one", "True", "my-reason", "my-message", nil), - expextedUpdated: true, - expectedConditions: []metav1.Condition{ - testinghelpers.NewManagedClusterCondition("two", "True", "my-reason", "my-message", nil), - testinghelpers.NewManagedClusterCondition("one", "True", "my-reason", "my-message", nil), - }, - }, - { - name: "change existing status", - startingConditions: []metav1.Condition{ - testinghelpers.NewManagedClusterCondition("two", "True", "my-reason", "my-message", nil), - testinghelpers.NewManagedClusterCondition("one", "True", "my-reason", "my-message", nil), - }, - newCondition: testinghelpers.NewManagedClusterCondition("one", "False", "my-different-reason", "my-othermessage", nil), - expextedUpdated: true, - expectedConditions: []metav1.Condition{ - testinghelpers.NewManagedClusterCondition("two", "True", "my-reason", "my-message", nil), - testinghelpers.NewManagedClusterCondition("one", "False", "my-different-reason", "my-othermessage", nil), - }, - }, - { - name: "leave existing transition time", - startingConditions: []metav1.Condition{ - testinghelpers.NewManagedClusterCondition("two", "True", "my-reason", "my-message", nil), - testinghelpers.NewManagedClusterCondition("one", "True", "my-reason", "my-message", &beforeish), - }, - newCondition: testinghelpers.NewManagedClusterCondition("one", "True", "my-reason", "my-message", &afterish), - expextedUpdated: false, - expectedConditions: []metav1.Condition{ - testinghelpers.NewManagedClusterCondition("two", "True", "my-reason", "my-message", nil), - testinghelpers.NewManagedClusterCondition("one", "True", "my-reason", "my-message", &beforeish), - }, - }, - } - - for _, c := range cases { - t.Run(c.name, func(t *testing.T) { - fakeClusterClient := clusterfake.NewSimpleClientset(&clusterv1.ManagedCluster{ - ObjectMeta: metav1.ObjectMeta{Name: "testspokecluster"}, - Status: clusterv1.ManagedClusterStatus{ - Conditions: c.startingConditions, - }, - }) - - status, updated, err := UpdateManagedClusterStatus( - context.TODO(), - fakeClusterClient, - "testspokecluster", - UpdateManagedClusterConditionFn(c.newCondition), - ) - if err != nil { - t.Errorf("unexpected err: %v", err) - } - if updated != c.expextedUpdated { - t.Errorf("expected %t, but %t", c.expextedUpdated, updated) - } - for i := range c.expectedConditions { - expected := c.expectedConditions[i] - actual := status.Conditions[i] - if expected.LastTransitionTime == (metav1.Time{}) { - actual.LastTransitionTime = metav1.Time{} - } - if !equality.Semantic.DeepEqual(expected, actual) { - t.Errorf(diff.ObjectDiff(expected, actual)) - } - } - }) - } -} - -func TestUpdateManagedClusterAddOnStatus(t *testing.T) { - nowish := metav1.Now() - beforeish := metav1.Time{Time: nowish.Add(-10 * time.Second)} - afterish := metav1.Time{Time: nowish.Add(10 * time.Second)} - - cases := []struct { - name string - startingConditions []metav1.Condition - newCondition metav1.Condition - expextedUpdated bool - expectedConditions []metav1.Condition - }{ - { - name: "add to empty", - startingConditions: []metav1.Condition{}, - newCondition: testinghelpers.NewManagedClusterCondition("test", "True", "my-reason", "my-message", nil), - expextedUpdated: true, - expectedConditions: []metav1.Condition{testinghelpers.NewManagedClusterCondition("test", "True", "my-reason", "my-message", nil)}, - }, - { - name: "add to non-conflicting", - startingConditions: []metav1.Condition{ - testinghelpers.NewManagedClusterCondition("two", "True", "my-reason", "my-message", nil), - }, - newCondition: testinghelpers.NewManagedClusterCondition("one", "True", "my-reason", "my-message", nil), - expextedUpdated: true, - expectedConditions: []metav1.Condition{ - testinghelpers.NewManagedClusterCondition("two", "True", "my-reason", "my-message", nil), - testinghelpers.NewManagedClusterCondition("one", "True", "my-reason", "my-message", nil), - }, - }, - { - name: "change existing status", - startingConditions: []metav1.Condition{ - testinghelpers.NewManagedClusterCondition("two", "True", "my-reason", "my-message", nil), - testinghelpers.NewManagedClusterCondition("one", "True", "my-reason", "my-message", nil), - }, - newCondition: testinghelpers.NewManagedClusterCondition("one", "False", "my-different-reason", "my-othermessage", nil), - expextedUpdated: true, - expectedConditions: []metav1.Condition{ - testinghelpers.NewManagedClusterCondition("two", "True", "my-reason", "my-message", nil), - testinghelpers.NewManagedClusterCondition("one", "False", "my-different-reason", "my-othermessage", nil), - }, - }, - { - name: "leave existing transition time", - startingConditions: []metav1.Condition{ - testinghelpers.NewManagedClusterCondition("two", "True", "my-reason", "my-message", nil), - testinghelpers.NewManagedClusterCondition("one", "True", "my-reason", "my-message", &beforeish), - }, - newCondition: testinghelpers.NewManagedClusterCondition("one", "True", "my-reason", "my-message", &afterish), - expextedUpdated: false, - expectedConditions: []metav1.Condition{ - testinghelpers.NewManagedClusterCondition("two", "True", "my-reason", "my-message", nil), - testinghelpers.NewManagedClusterCondition("one", "True", "my-reason", "my-message", &beforeish), - }, - }, - } - - for _, c := range cases { - t.Run(c.name, func(t *testing.T) { - fakeAddOnClient := addonfake.NewSimpleClientset(&addonv1alpha1.ManagedClusterAddOn{ - ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "test"}, - Status: addonv1alpha1.ManagedClusterAddOnStatus{ - Conditions: c.startingConditions, - }, - }) - - status, updated, err := UpdateManagedClusterAddOnStatus( - context.TODO(), - fakeAddOnClient, - "test", "test", - UpdateManagedClusterAddOnStatusFn(c.newCondition), - ) - if err != nil { - t.Errorf("unexpected err: %v", err) - } - if updated != c.expextedUpdated { - t.Errorf("expected %t, but %t", c.expextedUpdated, updated) - } - for i := range c.expectedConditions { - expected := c.expectedConditions[i] - actual := status.Conditions[i] - if expected.LastTransitionTime == (metav1.Time{}) { - actual.LastTransitionTime = metav1.Time{} - } - if !equality.Semantic.DeepEqual(expected, actual) { - t.Errorf(diff.ObjectDiff(expected, actual)) - } - } - }) - } -} - func TestIsValidHTTPSURL(t *testing.T) { cases := []struct { name string diff --git a/pkg/registration/hub/addon/healthcheck_controller.go b/pkg/registration/hub/addon/healthcheck_controller.go index d202dc501..8af7330ee 100644 --- a/pkg/registration/hub/addon/healthcheck_controller.go +++ b/pkg/registration/hub/addon/healthcheck_controller.go @@ -2,7 +2,10 @@ package addon import ( "context" + patcher "open-cluster-management.io/ocm/pkg/common/patcher" + "github.com/openshift/library-go/pkg/controller/factory" + "github.com/openshift/library-go/pkg/operator/events" operatorhelpers "github.com/openshift/library-go/pkg/operator/v1helpers" addonv1alpha1 "open-cluster-management.io/api/addon/v1alpha1" addonclient "open-cluster-management.io/api/client/addon/clientset/versioned" @@ -11,10 +14,6 @@ import ( clusterinformerv1 "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1" clusterlisterv1 "open-cluster-management.io/api/client/cluster/listers/cluster/v1" clusterv1 "open-cluster-management.io/api/cluster/v1" - "open-cluster-management.io/ocm/pkg/registration/helpers" - - "github.com/openshift/library-go/pkg/controller/factory" - "github.com/openshift/library-go/pkg/operator/events" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" @@ -87,19 +86,20 @@ func (c *managedClusterAddOnHealthCheckController) sync(ctx context.Context, syn } errs := []error{} + patcher := patcher.NewPatcher[ + *addonv1alpha1.ManagedClusterAddOn, addonv1alpha1.ManagedClusterAddOnSpec, addonv1alpha1.ManagedClusterAddOnStatus]( + c.addOnClient.AddonV1alpha1().ManagedClusterAddOns(managedClusterName), + ) for _, addOn := range addOns { - _, updated, err := helpers.UpdateManagedClusterAddOnStatus( - ctx, - c.addOnClient, - addOn.Namespace, - addOn.Name, - helpers.UpdateManagedClusterAddOnStatusFn(metav1.Condition{ - Type: addonv1alpha1.ManagedClusterAddOnConditionAvailable, - Status: managedClusterAvailableCondition.Status, - Reason: managedClusterAvailableCondition.Reason, - Message: managedClusterAvailableCondition.Message, - }), - ) + newManagedClusterAddon := addOn.DeepCopy() + meta.SetStatusCondition(&newManagedClusterAddon.Status.Conditions, metav1.Condition{ + Type: addonv1alpha1.ManagedClusterAddOnConditionAvailable, + Status: managedClusterAvailableCondition.Status, + Reason: managedClusterAvailableCondition.Reason, + Message: managedClusterAvailableCondition.Message, + }) + + updated, err := patcher.PatchStatus(ctx, newManagedClusterAddon, newManagedClusterAddon.Status, addOn.Status) if err != nil { errs = append(errs, err) } diff --git a/pkg/registration/hub/addon/healthcheck_controller_test.go b/pkg/registration/hub/addon/healthcheck_controller_test.go index 42fc5da34..c745ac17f 100644 --- a/pkg/registration/hub/addon/healthcheck_controller_test.go +++ b/pkg/registration/hub/addon/healthcheck_controller_test.go @@ -57,9 +57,9 @@ func TestSync(t *testing.T) { ObjectMeta: metav1.ObjectMeta{Namespace: testinghelpers.TestManagedClusterName, Name: "test"}, }}, validateActions: func(t *testing.T, actions []clienttesting.Action) { - testingcommon.AssertActions(t, actions, "get", "patch") + testingcommon.AssertActions(t, actions, "patch") - patch := actions[1].(clienttesting.PatchAction).GetPatch() + patch := actions[0].(clienttesting.PatchAction).GetPatch() addOn := &addonv1alpha1.ManagedClusterAddOn{} err := json.Unmarshal(patch, addOn) if err != nil { diff --git a/pkg/registration/hub/lease/controller.go b/pkg/registration/hub/lease/controller.go index bc32ded09..37fdaa9d3 100644 --- a/pkg/registration/hub/lease/controller.go +++ b/pkg/registration/hub/lease/controller.go @@ -2,16 +2,15 @@ package lease import ( "context" + "open-cluster-management.io/ocm/pkg/common/patcher" "time" + "github.com/openshift/library-go/pkg/controller/factory" + "github.com/openshift/library-go/pkg/operator/events" clientset "open-cluster-management.io/api/client/cluster/clientset/versioned" clusterv1informer "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1" clusterv1listers "open-cluster-management.io/api/client/cluster/listers/cluster/v1" clusterv1 "open-cluster-management.io/api/cluster/v1" - "open-cluster-management.io/ocm/pkg/registration/helpers" - - "github.com/openshift/library-go/pkg/controller/factory" - "github.com/openshift/library-go/pkg/operator/events" coordv1 "k8s.io/api/coordination/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -35,7 +34,7 @@ var ( // leaseController checks the lease of managed clusters on hub cluster to determine whether a managed cluster is available. type leaseController struct { kubeClient kubernetes.Interface - clusterClient clientset.Interface + patcher patcher.Patcher[*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus] clusterLister clusterv1listers.ManagedClusterLister leaseLister coordlisters.LeaseLister eventRecorder events.Recorder @@ -49,8 +48,10 @@ func NewClusterLeaseController( leaseInformer coordinformers.LeaseInformer, recorder events.Recorder) factory.Controller { c := &leaseController{ - kubeClient: kubeClient, - clusterClient: clusterClient, + kubeClient: kubeClient, + patcher: patcher.NewPatcher[ + *clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus]( + clusterClient.ClusterV1().ManagedClusters()), clusterLister: clusterInformer.Lister(), leaseLister: leaseInformer.Lister(), eventRecorder: recorder.WithComponentSuffix("managed-cluster-lease-controller"), @@ -155,15 +156,15 @@ func (c *leaseController) updateClusterStatus(ctx context.Context, cluster *clus return nil } - // the lease is not constantly updated, update it to unknown - conditionUpdateFn := helpers.UpdateManagedClusterConditionFn(metav1.Condition{ + newCluster := cluster.DeepCopy() + meta.SetStatusCondition(&newCluster.Status.Conditions, metav1.Condition{ Type: clusterv1.ManagedClusterConditionAvailable, Status: metav1.ConditionUnknown, Reason: "ManagedClusterLeaseUpdateStopped", Message: "Registration agent stopped updating its lease.", }) - _, updated, err := helpers.UpdateManagedClusterStatus(ctx, c.clusterClient, cluster.Name, conditionUpdateFn) + updated, err := c.patcher.PatchStatus(ctx, newCluster, newCluster.Status, cluster.Status) if updated { c.eventRecorder.Eventf("ManagedClusterAvailableConditionUpdated", "update managed cluster %q available condition to unknown, due to its lease is not updated constantly", diff --git a/pkg/registration/hub/lease/controller_test.go b/pkg/registration/hub/lease/controller_test.go index b01d48916..252869b71 100644 --- a/pkg/registration/hub/lease/controller_test.go +++ b/pkg/registration/hub/lease/controller_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "open-cluster-management.io/ocm/pkg/common/patcher" "testing" "time" @@ -62,8 +63,8 @@ func TestSync(t *testing.T) { Reason: "ManagedClusterLeaseUpdateStopped", Message: "Registration agent stopped updating its lease.", } - testingcommon.AssertActions(t, clusterActions, "get", "patch") - patch := clusterActions[1].(clienttesting.PatchAction).GetPatch() + testingcommon.AssertActions(t, clusterActions, "patch") + patch := clusterActions[0].(clienttesting.PatchAction).GetPatch() managedCluster := &v1.ManagedCluster{} err := json.Unmarshal(patch, managedCluster) if err != nil { @@ -90,8 +91,8 @@ func TestSync(t *testing.T) { Reason: "ManagedClusterLeaseUpdateStopped", Message: "Registration agent stopped updating its lease.", } - testingcommon.AssertActions(t, clusterActions, "get", "patch") - patch := clusterActions[1].(clienttesting.PatchAction).GetPatch() + testingcommon.AssertActions(t, clusterActions, "patch") + patch := clusterActions[0].(clienttesting.PatchAction).GetPatch() managedCluster := &v1.ManagedCluster{} err := json.Unmarshal(patch, managedCluster) if err != nil { @@ -133,8 +134,10 @@ func TestSync(t *testing.T) { syncCtx := testingcommon.NewFakeSyncContext(t, testinghelpers.TestManagedClusterName) ctrl := &leaseController{ - kubeClient: leaseClient, - clusterClient: clusterClient, + kubeClient: leaseClient, + patcher: patcher.NewPatcher[ + *clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus]( + clusterClient.ClusterV1().ManagedClusters()), clusterLister: clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(), leaseLister: leaseInformerFactory.Coordination().V1().Leases().Lister(), eventRecorder: syncCtx.Recorder(), diff --git a/pkg/registration/hub/managedcluster/controller.go b/pkg/registration/hub/managedcluster/controller.go index 0213541ab..9d397c1c4 100644 --- a/pkg/registration/hub/managedcluster/controller.go +++ b/pkg/registration/hub/managedcluster/controller.go @@ -3,8 +3,8 @@ package managedcluster import ( "context" "embed" - "encoding/json" "fmt" + "open-cluster-management.io/ocm/pkg/common/patcher" clientset "open-cluster-management.io/api/client/cluster/clientset/versioned" informerv1 "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1" @@ -21,7 +21,6 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" ) @@ -43,8 +42,8 @@ var staticFiles = []string{ // managedClusterController reconciles instances of ManagedCluster on the hub. type managedClusterController struct { kubeClient kubernetes.Interface - clusterClient clientset.Interface clusterLister listerv1.ManagedClusterLister + patcher patcher.Patcher[*v1.ManagedCluster, v1.ManagedClusterSpec, v1.ManagedClusterStatus] cache resourceapply.ResourceCache eventRecorder events.Recorder } @@ -57,8 +56,10 @@ func NewManagedClusterController( recorder events.Recorder) factory.Controller { c := &managedClusterController{ kubeClient: kubeClient, - clusterClient: clusterClient, clusterLister: clusterInformer.Lister(), + patcher: patcher.NewPatcher[ + *v1.ManagedCluster, v1.ManagedClusterSpec, v1.ManagedClusterStatus]( + clusterClient.ClusterV1().ManagedClusters()), cache: resourceapply.NewResourceCache(), eventRecorder: recorder.WithComponentSuffix("managed-cluster-controller"), } @@ -83,24 +84,10 @@ func (c *managedClusterController) sync(ctx context.Context, syncCtx factory.Syn return err } - managedCluster = managedCluster.DeepCopy() + newManagedCluster := managedCluster.DeepCopy() if managedCluster.DeletionTimestamp.IsZero() { - hasFinalizer := false - for i := range managedCluster.Finalizers { - if managedCluster.Finalizers[i] == managedClusterFinalizer { - hasFinalizer = true - break - } - } - if !hasFinalizer { - finalizerBytes, err := json.Marshal(append(managedCluster.Finalizers, managedClusterFinalizer)) - if err != nil { - return err - } - patch := fmt.Sprintf("{\"metadata\": {\"finalizers\": %s}}", string(finalizerBytes)) - - _, err = c.clusterClient.ClusterV1().ManagedClusters().Patch( - ctx, managedCluster.Name, types.MergePatchType, []byte(patch), metav1.PatchOptions{}) + updated, err := c.patcher.AddFinalizer(ctx, managedCluster, managedClusterFinalizer) + if err != nil || updated { return err } } @@ -110,7 +97,7 @@ func (c *managedClusterController) sync(ctx context.Context, syncCtx factory.Syn if err := c.removeManagedClusterResources(ctx, managedClusterName); err != nil { return err } - return c.removeManagedClusterFinalizer(ctx, managedCluster) + return c.patcher.RemoveFinalizer(ctx, managedCluster, managedClusterFinalizer) } if !managedCluster.Spec.HubAcceptsClient { @@ -126,18 +113,17 @@ func (c *managedClusterController) sync(ctx context.Context, syncCtx factory.Syn return err } - _, _, err := helpers.UpdateManagedClusterStatus( - ctx, - c.clusterClient, - managedClusterName, - helpers.UpdateManagedClusterConditionFn(metav1.Condition{ - Type: v1.ManagedClusterConditionHubAccepted, - Status: metav1.ConditionFalse, - Reason: "HubClusterAdminDenied", - Message: "Denied by hub cluster admin", - }), - ) - return err + meta.SetStatusCondition(&newManagedCluster.Status.Conditions, metav1.Condition{ + Type: v1.ManagedClusterConditionHubAccepted, + Status: metav1.ConditionFalse, + Reason: "HubClusterAdminDenied", + Message: "Denied by hub cluster admin", + }) + + if _, err := c.patcher.PatchStatus(ctx, newManagedCluster, newManagedCluster.Status, managedCluster.Status); err != nil { + return err + } + return nil } // TODO consider to add the managedcluster-namespace.yaml back to staticFiles, @@ -178,12 +164,8 @@ func (c *managedClusterController) sync(ctx context.Context, syncCtx factory.Syn acceptedCondition.Message = applyErrors.Error() } - _, updated, updatedErr := helpers.UpdateManagedClusterStatus( - ctx, - c.clusterClient, - managedClusterName, - helpers.UpdateManagedClusterConditionFn(acceptedCondition), - ) + meta.SetStatusCondition(&newManagedCluster.Status.Conditions, acceptedCondition) + updated, updatedErr := c.patcher.PatchStatus(ctx, newManagedCluster, newManagedCluster.Status, managedCluster.Status) if updatedErr != nil { errs = append(errs, updatedErr) } @@ -202,27 +184,3 @@ func (c *managedClusterController) removeManagedClusterResources(ctx context.Con } return operatorhelpers.NewMultiLineAggregate(errs) } - -func (c *managedClusterController) removeManagedClusterFinalizer(ctx context.Context, managedCluster *v1.ManagedCluster) error { - copiedFinalizers := []string{} - for i := range managedCluster.Finalizers { - if managedCluster.Finalizers[i] == managedClusterFinalizer { - continue - } - copiedFinalizers = append(copiedFinalizers, managedCluster.Finalizers[i]) - } - - if len(managedCluster.Finalizers) != len(copiedFinalizers) { - finalizerBytes, err := json.Marshal(copiedFinalizers) - if err != nil { - return err - } - patch := fmt.Sprintf("{\"metadata\": {\"finalizers\": %s}}", string(finalizerBytes)) - - _, err = c.clusterClient.ClusterV1().ManagedClusters().Patch( - ctx, managedCluster.Name, types.MergePatchType, []byte(patch), metav1.PatchOptions{}) - return err - } - - return nil -} diff --git a/pkg/registration/hub/managedcluster/controller_test.go b/pkg/registration/hub/managedcluster/controller_test.go index be2527ea1..ed6cf518f 100644 --- a/pkg/registration/hub/managedcluster/controller_test.go +++ b/pkg/registration/hub/managedcluster/controller_test.go @@ -3,6 +3,7 @@ package managedcluster import ( "context" "encoding/json" + "open-cluster-management.io/ocm/pkg/common/patcher" "testing" "time" @@ -58,8 +59,8 @@ func TestSyncManagedCluster(t *testing.T) { Reason: "HubClusterAdminAccepted", Message: "Accepted by hub cluster admin", } - testingcommon.AssertActions(t, actions, "get", "patch") - patch := actions[1].(clienttesting.PatchAction).GetPatch() + testingcommon.AssertActions(t, actions, "patch") + patch := actions[0].(clienttesting.PatchAction).GetPatch() managedCluster := &v1.ManagedCluster{} err := json.Unmarshal(patch, managedCluster) if err != nil { @@ -72,7 +73,7 @@ func TestSyncManagedCluster(t *testing.T) { name: "sync an accepted spoke cluster", startingObjects: []runtime.Object{testinghelpers.NewAcceptedManagedCluster()}, validateActions: func(t *testing.T, actions []clienttesting.Action) { - testingcommon.AssertActions(t, actions, "get") + testingcommon.AssertNoActions(t, actions) }, }, { @@ -85,8 +86,8 @@ func TestSyncManagedCluster(t *testing.T) { Reason: "HubClusterAdminDenied", Message: "Denied by hub cluster admin", } - testingcommon.AssertActions(t, actions, "get", "patch") - patch := actions[1].(clienttesting.PatchAction).GetPatch() + testingcommon.AssertActions(t, actions, "patch") + patch := actions[0].(clienttesting.PatchAction).GetPatch() managedCluster := &v1.ManagedCluster{} err := json.Unmarshal(patch, managedCluster) if err != nil { @@ -123,7 +124,12 @@ func TestSyncManagedCluster(t *testing.T) { } } - ctrl := managedClusterController{kubeClient, clusterClient, clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(), resourceapply.NewResourceCache(), eventstesting.NewTestingEventRecorder(t)} + ctrl := managedClusterController{ + kubeClient, + clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(), + patcher.NewPatcher[*v1.ManagedCluster, v1.ManagedClusterSpec, v1.ManagedClusterStatus](clusterClient.ClusterV1().ManagedClusters()), + resourceapply.NewResourceCache(), + eventstesting.NewTestingEventRecorder(t)} syncErr := ctrl.sync(context.TODO(), testingcommon.NewFakeSyncContext(t, testinghelpers.TestManagedClusterName)) if syncErr != nil { t.Errorf("unexpected err: %v", syncErr) diff --git a/pkg/registration/hub/managedclusterset/controller.go b/pkg/registration/hub/managedclusterset/controller.go index 738457e12..018d49c83 100644 --- a/pkg/registration/hub/managedclusterset/controller.go +++ b/pkg/registration/hub/managedclusterset/controller.go @@ -3,6 +3,7 @@ package managedclusterset import ( "context" "fmt" + "open-cluster-management.io/ocm/pkg/common/patcher" "reflect" "github.com/openshift/library-go/pkg/controller/factory" @@ -27,7 +28,7 @@ import ( // managedClusterSetController reconciles instances of ManagedClusterSet on the hub. type managedClusterSetController struct { - clusterClient clientset.Interface + patcher patcher.Patcher[*clusterv1beta2.ManagedClusterSet, clusterv1beta2.ManagedClusterSetSpec, clusterv1beta2.ManagedClusterSetStatus] clusterLister clusterlisterv1.ManagedClusterLister clusterSetLister clusterlisterv1beta2.ManagedClusterSetLister eventRecorder events.Recorder @@ -45,7 +46,9 @@ func NewManagedClusterSetController( syncCtx := factory.NewSyncContext(controllerName, recorder) c := &managedClusterSetController{ - clusterClient: clusterClient, + patcher: patcher.NewPatcher[ + *clusterv1beta2.ManagedClusterSet, clusterv1beta2.ManagedClusterSetSpec, clusterv1beta2.ManagedClusterSetStatus]( + clusterClient.ClusterV1beta2().ManagedClusterSets()), clusterLister: clusterInformer.Lister(), clusterSetLister: clusterSetInformer.Lister(), eventRecorder: recorder.WithComponentSuffix("managed-cluster-set-controller"), @@ -164,12 +167,7 @@ func (c *managedClusterSetController) syncClusterSet(ctx context.Context, origin } meta.SetStatusCondition(&clusterSet.Status.Conditions, emptyCondition) - // skip update if cluster set status does not change - if reflect.DeepEqual(clusterSet.Status.Conditions, originalClusterSet.Status.Conditions) { - return nil - } - - _, err = c.clusterClient.ClusterV1beta2().ManagedClusterSets().UpdateStatus(ctx, clusterSet, metav1.UpdateOptions{}) + _, err = c.patcher.PatchStatus(ctx, clusterSet, clusterSet.Status, originalClusterSet.Status) if err != nil { return fmt.Errorf("failed to update status of ManagedClusterSet %q: %w", clusterSet.Name, err) } diff --git a/pkg/registration/hub/managedclusterset/controller_test.go b/pkg/registration/hub/managedclusterset/controller_test.go index 0d36a8c4f..3e635faa8 100644 --- a/pkg/registration/hub/managedclusterset/controller_test.go +++ b/pkg/registration/hub/managedclusterset/controller_test.go @@ -2,6 +2,7 @@ package managedclusterset import ( "context" + "open-cluster-management.io/ocm/pkg/common/patcher" testingcommon "open-cluster-management.io/ocm/pkg/common/testing" "reflect" "testing" @@ -228,7 +229,9 @@ func TestSyncClusterSet(t *testing.T) { } ctrl := managedClusterSetController{ - clusterClient: clusterClient, + patcher: patcher.NewPatcher[ + *clusterv1beta2.ManagedClusterSet, clusterv1beta2.ManagedClusterSetSpec, clusterv1beta2.ManagedClusterSetStatus]( + clusterClient.ClusterV1beta2().ManagedClusterSets()), clusterLister: informerFactory.Cluster().V1().ManagedClusters().Lister(), clusterSetLister: informerFactory.Cluster().V1beta2().ManagedClusterSets().Lister(), eventRecorder: eventstesting.NewTestingEventRecorder(t), @@ -380,7 +383,6 @@ func TestEnqueueUpdateClusterClusterSet(t *testing.T) { syncCtx := testingcommon.NewFakeSyncContext(t, "fake") ctrl := managedClusterSetController{ - clusterClient: clusterClient, clusterSetLister: informerFactory.Cluster().V1beta2().ManagedClusterSets().Lister(), eventRecorder: eventstesting.NewTestingEventRecorder(t), queue: syncCtx.Queue(), diff --git a/pkg/registration/hub/managedclustersetbinding/controller.go b/pkg/registration/hub/managedclustersetbinding/controller.go index 805026cf7..402be2505 100644 --- a/pkg/registration/hub/managedclustersetbinding/controller.go +++ b/pkg/registration/hub/managedclustersetbinding/controller.go @@ -2,18 +2,13 @@ package managedclustersetbinding import ( "context" - "encoding/json" "fmt" - - jsonpatch "github.com/evanphx/json-patch" "github.com/openshift/library-go/pkg/controller/factory" "github.com/openshift/library-go/pkg/operator/events" - "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/types" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" @@ -22,6 +17,7 @@ import ( clusterinformerv1beta2 "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1beta2" clusterlisterv1beta2 "open-cluster-management.io/api/client/cluster/listers/cluster/v1beta2" clusterv1beta2 "open-cluster-management.io/api/cluster/v1beta2" + "open-cluster-management.io/ocm/pkg/common/patcher" ) const ( @@ -148,6 +144,10 @@ func (c *managedClusterSetBindingController) sync(ctx context.Context, syncCtx f return err } + patcher := patcher.NewPatcher[ + *clusterv1beta2.ManagedClusterSetBinding, clusterv1beta2.ManagedClusterSetBindingSpec, clusterv1beta2.ManagedClusterSetBindingStatus]( + c.clusterClient.ClusterV1beta2().ManagedClusterSetBindings(bindingNamespace)) + if len(bindingNamespace) == 0 { return nil } @@ -170,7 +170,10 @@ func (c *managedClusterSetBindingController) sync(ctx context.Context, syncCtx f Status: metav1.ConditionFalse, Reason: "ClusterSetNotFound", }) - return c.patchCondition(ctx, binding, bindingCopy) + if _, err := patcher.PatchStatus(ctx, bindingCopy, bindingCopy.Status, binding.Status); err != nil { + return err + } + return nil case err != nil: return err } @@ -181,43 +184,9 @@ func (c *managedClusterSetBindingController) sync(ctx context.Context, syncCtx f Reason: "ClusterSetBound", }) - return c.patchCondition(ctx, binding, bindingCopy) -} - -func (c *managedClusterSetBindingController) patchCondition(ctx context.Context, old, new *clusterv1beta2.ManagedClusterSetBinding) error { - if equality.Semantic.DeepEqual(old.Status.Conditions, new.Status.Conditions) { - return nil - } - - oldData, err := json.Marshal(clusterv1beta2.ManagedClusterSetBinding{ - Status: clusterv1beta2.ManagedClusterSetBindingStatus{ - Conditions: old.Status.Conditions, - }, - }) - if err != nil { - return fmt.Errorf("failed to Marshal old data for workspace %s: %w", old.Name, err) - } - - newData, err := json.Marshal(clusterv1beta2.ManagedClusterSetBinding{ - ObjectMeta: metav1.ObjectMeta{ - UID: old.UID, - ResourceVersion: old.ResourceVersion, - }, // to ensure they appear in the patch as preconditions - Status: clusterv1beta2.ManagedClusterSetBindingStatus{ - Conditions: new.Status.Conditions, - }, - }) - if err != nil { - return fmt.Errorf("failed to Marshal new data for workspace %s: %w", new.Name, err) - } - - patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData) - if err != nil { - return fmt.Errorf("failed to create patch for workspace %s: %w", new.Name, err) - } - - c.eventRecorder.Eventf("PatchClusterSetBindingCondition", "patch clustersetbinding %s/%s condition", new.Namespace, new.Name) - - _, err = c.clusterClient.ClusterV1beta2().ManagedClusterSetBindings(new.Namespace).Patch(ctx, new.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status") - return err + if _, err := patcher.PatchStatus(ctx, bindingCopy, bindingCopy.Status, binding.Status); err != nil { + return err + } + + return nil } diff --git a/pkg/registration/hub/taint/controller.go b/pkg/registration/hub/taint/controller.go index a959344b9..aa6190d73 100644 --- a/pkg/registration/hub/taint/controller.go +++ b/pkg/registration/hub/taint/controller.go @@ -2,6 +2,7 @@ package taint import ( "context" + "open-cluster-management.io/ocm/pkg/common/patcher" "github.com/openshift/library-go/pkg/controller/factory" "github.com/openshift/library-go/pkg/operator/events" @@ -31,7 +32,7 @@ var ( // taintController type taintController struct { - clusterClient clientset.Interface + patcher patcher.Patcher[*v1.ManagedCluster, v1.ManagedClusterSpec, v1.ManagedClusterStatus] clusterLister listerv1.ManagedClusterLister eventRecorder events.Recorder } @@ -42,7 +43,9 @@ func NewTaintController( clusterInformer informerv1.ManagedClusterInformer, recorder events.Recorder) factory.Controller { c := &taintController{ - clusterClient: clusterClient, + patcher: patcher.NewPatcher[ + *v1.ManagedCluster, v1.ManagedClusterSpec, v1.ManagedClusterStatus]( + clusterClient.ClusterV1().ManagedClusters()), clusterLister: clusterInformer.Lister(), eventRecorder: recorder.WithComponentSuffix("taint-controller"), } @@ -70,9 +73,9 @@ func (c *taintController) sync(ctx context.Context, syncCtx factory.SyncContext) return nil } - managedCluster = managedCluster.DeepCopy() - newTaints := managedCluster.Spec.Taints - cond := meta.FindStatusCondition(managedCluster.Status.Conditions, v1.ManagedClusterConditionAvailable) + newManagedCluster := managedCluster.DeepCopy() + newTaints := newManagedCluster.Spec.Taints + cond := meta.FindStatusCondition(newManagedCluster.Status.Conditions, v1.ManagedClusterConditionAvailable) var updated bool switch { @@ -87,8 +90,8 @@ func (c *taintController) sync(ctx context.Context, syncCtx factory.SyncContext) } if updated { - managedCluster.Spec.Taints = newTaints - if _, err = c.clusterClient.ClusterV1().ManagedClusters().Update(ctx, managedCluster, metav1.UpdateOptions{}); err != nil { + newManagedCluster.Spec.Taints = newTaints + if _, err = c.patcher.PatchSpec(ctx, newManagedCluster, newManagedCluster.Spec, managedCluster.Spec); err != nil { return err } c.eventRecorder.Eventf("ManagedClusterConditionAvailableUpdated", "Update the original taints to the %+v", newTaints) diff --git a/pkg/registration/hub/taint/controller_test.go b/pkg/registration/hub/taint/controller_test.go index bf449f71e..97289defb 100644 --- a/pkg/registration/hub/taint/controller_test.go +++ b/pkg/registration/hub/taint/controller_test.go @@ -2,6 +2,8 @@ package taint import ( "context" + "encoding/json" + "open-cluster-management.io/ocm/pkg/common/patcher" "reflect" "testing" "time" @@ -36,8 +38,13 @@ func TestSyncTaintCluster(t *testing.T) { name: "ManagedClusterConditionAvailable conditionStatus is False", startingObjects: []runtime.Object{testinghelpers.NewUnAvailableManagedCluster()}, validateActions: func(t *testing.T, actions []clienttesting.Action) { - testingcommon.AssertActions(t, actions, "update") - managedCluster := (actions[0].(clienttesting.UpdateActionImpl).Object).(*v1.ManagedCluster) + testingcommon.AssertActions(t, actions, "patch") + patchData := actions[0].(clienttesting.PatchActionImpl).Patch + managedCluster := &v1.ManagedCluster{} + err := json.Unmarshal(patchData, managedCluster) + if err != nil { + t.Fatal(err) + } taints := []v1.Taint{UnavailableTaint} if !reflect.DeepEqual(managedCluster.Spec.Taints, taints) { t.Errorf("expected taint %#v, but actualTaints: %#v", taints, managedCluster.Spec.Taints) @@ -48,8 +55,13 @@ func TestSyncTaintCluster(t *testing.T) { name: "There is no ManagedClusterConditionAvailable", startingObjects: []runtime.Object{testinghelpers.NewManagedCluster()}, validateActions: func(t *testing.T, actions []clienttesting.Action) { - testingcommon.AssertActions(t, actions, "update") - managedCluster := (actions[0].(clienttesting.UpdateActionImpl).Object).(*v1.ManagedCluster) + testingcommon.AssertActions(t, actions, "patch") + patchData := actions[0].(clienttesting.PatchActionImpl).Patch + managedCluster := &v1.ManagedCluster{} + err := json.Unmarshal(patchData, managedCluster) + if err != nil { + t.Fatal(err) + } taints := []v1.Taint{UnreachableTaint} if !reflect.DeepEqual(managedCluster.Spec.Taints, taints) { t.Errorf("expected taint %#v, but actualTaints: %#v", taints, managedCluster.Spec.Taints) @@ -60,8 +72,13 @@ func TestSyncTaintCluster(t *testing.T) { name: "ManagedClusterConditionAvailable conditionStatus is Unknown", startingObjects: []runtime.Object{testinghelpers.NewUnknownManagedCluster()}, validateActions: func(t *testing.T, actions []clienttesting.Action) { - testingcommon.AssertActions(t, actions, "update") - managedCluster := (actions[0].(clienttesting.UpdateActionImpl).Object).(*v1.ManagedCluster) + testingcommon.AssertActions(t, actions, "patch") + patchData := actions[0].(clienttesting.PatchActionImpl).Patch + managedCluster := &v1.ManagedCluster{} + err := json.Unmarshal(patchData, managedCluster) + if err != nil { + t.Fatal(err) + } taints := []v1.Taint{UnreachableTaint} if !reflect.DeepEqual(managedCluster.Spec.Taints, taints) { t.Errorf("expected taint %#v, but actualTaints: %#v", taints, managedCluster.Spec.Taints) @@ -88,7 +105,11 @@ func TestSyncTaintCluster(t *testing.T) { } } - ctrl := taintController{clusterClient, clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(), eventstesting.NewTestingEventRecorder(t)} + ctrl := taintController{ + patcher.NewPatcher[ + *v1.ManagedCluster, v1.ManagedClusterSpec, v1.ManagedClusterStatus]( + clusterClient.ClusterV1().ManagedClusters()), + clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(), eventstesting.NewTestingEventRecorder(t)} syncErr := ctrl.sync(context.TODO(), testingcommon.NewFakeSyncContext(t, testinghelpers.TestManagedClusterName)) if syncErr != nil { t.Errorf("unexpected err: %v", syncErr) diff --git a/pkg/registration/spoke/addon/lease_controller.go b/pkg/registration/spoke/addon/lease_controller.go index 46339a71f..ea5479b65 100644 --- a/pkg/registration/spoke/addon/lease_controller.go +++ b/pkg/registration/spoke/addon/lease_controller.go @@ -3,16 +3,15 @@ package addon import ( "context" "fmt" + "open-cluster-management.io/ocm/pkg/common/patcher" "time" + "github.com/openshift/library-go/pkg/controller/factory" + "github.com/openshift/library-go/pkg/operator/events" addonv1alpha1 "open-cluster-management.io/api/addon/v1alpha1" addonclient "open-cluster-management.io/api/client/addon/clientset/versioned" addoninformerv1alpha1 "open-cluster-management.io/api/client/addon/informers/externalversions/addon/v1alpha1" addonlisterv1alpha1 "open-cluster-management.io/api/client/addon/listers/addon/v1alpha1" - "open-cluster-management.io/ocm/pkg/registration/helpers" - - "github.com/openshift/library-go/pkg/controller/factory" - "github.com/openshift/library-go/pkg/operator/events" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" @@ -33,9 +32,10 @@ var AddOnLeaseControllerLeaseDurationSeconds = 60 // managedClusterAddOnLeaseController updates the managed cluster addons status on the hub cluster through checking the add-on // lease on the managed/management cluster. type managedClusterAddOnLeaseController struct { - clusterName string - clock clock.Clock - addOnClient addonclient.Interface + clusterName string + clock clock.Clock + patcher patcher.Patcher[ + *addonv1alpha1.ManagedClusterAddOn, addonv1alpha1.ManagedClusterAddOnSpec, addonv1alpha1.ManagedClusterAddOnStatus] addOnLister addonlisterv1alpha1.ManagedClusterAddOnLister hubLeaseClient coordv1client.CoordinationV1Interface managementLeaseClient coordv1client.CoordinationV1Interface @@ -52,9 +52,11 @@ func NewManagedClusterAddOnLeaseController(clusterName string, resyncInterval time.Duration, recorder events.Recorder) factory.Controller { c := &managedClusterAddOnLeaseController{ - clusterName: clusterName, - clock: clock.RealClock{}, - addOnClient: addOnClient, + clusterName: clusterName, + clock: clock.RealClock{}, + patcher: patcher.NewPatcher[ + *addonv1alpha1.ManagedClusterAddOn, addonv1alpha1.ManagedClusterAddOnSpec, addonv1alpha1.ManagedClusterAddOnStatus]( + addOnClient.AddonV1alpha1().ManagedClusterAddOns(clusterName)), addOnLister: addOnInformer.Lister(), hubLeaseClient: hubLeaseClient, managementLeaseClient: managementLeaseClient, @@ -183,18 +185,9 @@ func (c *managedClusterAddOnLeaseController) syncSingle(ctx context.Context, } } - if meta.IsStatusConditionPresentAndEqual(addOn.Status.Conditions, condition.Type, condition.Status) { - // addon status is not changed, do nothing - return nil - } - - _, updated, err := helpers.UpdateManagedClusterAddOnStatus( - ctx, - c.addOnClient, - c.clusterName, - addOn.Name, - helpers.UpdateManagedClusterAddOnStatusFn(condition), - ) + newAddon := addOn.DeepCopy() + meta.SetStatusCondition(&newAddon.Status.Conditions, condition) + updated, err := c.patcher.PatchStatus(ctx, newAddon, newAddon.Status, addOn.Status) if err != nil { return err } diff --git a/pkg/registration/spoke/addon/lease_controller_test.go b/pkg/registration/spoke/addon/lease_controller_test.go index 0c87f3818..cd6a02f93 100644 --- a/pkg/registration/spoke/addon/lease_controller_test.go +++ b/pkg/registration/spoke/addon/lease_controller_test.go @@ -3,6 +3,7 @@ package addon import ( "context" "encoding/json" + "open-cluster-management.io/ocm/pkg/common/patcher" "testing" "time" @@ -141,8 +142,8 @@ func TestSync(t *testing.T) { hubLeases: []runtime.Object{}, spokeLeases: []runtime.Object{}, validateActions: func(t *testing.T, ctx *testingcommon.FakeSyncContext, actions []clienttesting.Action) { - testingcommon.AssertActions(t, actions, "get", "patch") - patch := actions[1].(clienttesting.PatchAction).GetPatch() + testingcommon.AssertActions(t, actions, "patch") + patch := actions[0].(clienttesting.PatchAction).GetPatch() addOn := &addonv1alpha1.ManagedClusterAddOn{} err := json.Unmarshal(patch, addOn) if err != nil { @@ -175,8 +176,8 @@ func TestSync(t *testing.T) { testinghelpers.NewAddOnLease("test", "test", now.Add(-5*time.Minute)), }, validateActions: func(t *testing.T, ctx *testingcommon.FakeSyncContext, actions []clienttesting.Action) { - testingcommon.AssertActions(t, actions, "get", "patch") - patch := actions[1].(clienttesting.PatchAction).GetPatch() + testingcommon.AssertActions(t, actions, "patch") + patch := actions[0].(clienttesting.PatchAction).GetPatch() addOn := &addonv1alpha1.ManagedClusterAddOn{} err := json.Unmarshal(patch, addOn) if err != nil { @@ -209,8 +210,8 @@ func TestSync(t *testing.T) { testinghelpers.NewAddOnLease("test", "test", now), }, validateActions: func(t *testing.T, ctx *testingcommon.FakeSyncContext, actions []clienttesting.Action) { - testingcommon.AssertActions(t, actions, "get", "patch") - patch := actions[1].(clienttesting.PatchAction).GetPatch() + testingcommon.AssertActions(t, actions, "patch") + patch := actions[0].(clienttesting.PatchAction).GetPatch() addOn := &addonv1alpha1.ManagedClusterAddOn{} err := json.Unmarshal(patch, addOn) if err != nil { @@ -243,7 +244,7 @@ func TestSync(t *testing.T) { Type: "Available", Status: metav1.ConditionTrue, Reason: "ManagedClusterAddOnLeaseUpdated", - Message: "Managed cluster addon agent updates its lease constantly.", + Message: "test add-on is available.", }, }, }, @@ -306,8 +307,8 @@ func TestSync(t *testing.T) { testinghelpers.NewAddOnLease("test", "test", now), }, validateActions: func(t *testing.T, ctx *testingcommon.FakeSyncContext, actions []clienttesting.Action) { - testingcommon.AssertActions(t, actions, "get", "patch") - patch := actions[1].(clienttesting.PatchAction).GetPatch() + testingcommon.AssertActions(t, actions, "patch") + patch := actions[0].(clienttesting.PatchAction).GetPatch() addOn := &addonv1alpha1.ManagedClusterAddOn{} err := json.Unmarshal(patch, addOn) if err != nil { @@ -335,8 +336,8 @@ func TestSync(t *testing.T) { hubLeases: []runtime.Object{testinghelpers.NewAddOnLease(testinghelpers.TestManagedClusterName, "test", now)}, spokeLeases: []runtime.Object{}, validateActions: func(t *testing.T, ctx *testingcommon.FakeSyncContext, actions []clienttesting.Action) { - testingcommon.AssertActions(t, actions, "get", "patch") - patch := actions[1].(clienttesting.PatchAction).GetPatch() + testingcommon.AssertActions(t, actions, "patch") + patch := actions[0].(clienttesting.PatchAction).GetPatch() addOn := &addonv1alpha1.ManagedClusterAddOn{} err := json.Unmarshal(patch, addOn) if err != nil { @@ -390,10 +391,12 @@ func TestSync(t *testing.T) { spokeLeaseClient := kubefake.NewSimpleClientset(c.spokeLeases...) ctrl := &managedClusterAddOnLeaseController{ - clusterName: testinghelpers.TestManagedClusterName, - clock: clocktesting.NewFakeClock(time.Now()), - hubLeaseClient: hubClient.CoordinationV1(), - addOnClient: addOnClient, + clusterName: testinghelpers.TestManagedClusterName, + clock: clocktesting.NewFakeClock(time.Now()), + hubLeaseClient: hubClient.CoordinationV1(), + patcher: patcher.NewPatcher[ + *addonv1alpha1.ManagedClusterAddOn, addonv1alpha1.ManagedClusterAddOnSpec, addonv1alpha1.ManagedClusterAddOnStatus]( + addOnClient.AddonV1alpha1().ManagedClusterAddOns(testinghelpers.TestManagedClusterName)), addOnLister: addOnInformerFactory.Addon().V1alpha1().ManagedClusterAddOns().Lister(), managementLeaseClient: managementLeaseClient.CoordinationV1(), spokeLeaseClient: spokeLeaseClient.CoordinationV1(), diff --git a/pkg/registration/spoke/addon/registration_controller.go b/pkg/registration/spoke/addon/registration_controller.go index b0fca7854..e30dd88cd 100644 --- a/pkg/registration/spoke/addon/registration_controller.go +++ b/pkg/registration/spoke/addon/registration_controller.go @@ -3,6 +3,7 @@ package addon import ( "context" "fmt" + "open-cluster-management.io/ocm/pkg/common/patcher" "time" addonv1alpha1 "open-cluster-management.io/api/addon/v1alpha1" @@ -25,7 +26,6 @@ import ( addoninformerv1alpha1 "open-cluster-management.io/api/client/addon/informers/externalversions/addon/v1alpha1" addonlisterv1alpha1 "open-cluster-management.io/api/client/addon/listers/addon/v1alpha1" "open-cluster-management.io/ocm/pkg/registration/clientcert" - "open-cluster-management.io/ocm/pkg/registration/helpers" ) const ( @@ -46,10 +46,11 @@ type addOnRegistrationController struct { managementKubeClient kubernetes.Interface // in-cluster local management kubeClient spokeKubeClient kubernetes.Interface hubAddOnLister addonlisterv1alpha1.ManagedClusterAddOnLister - addOnClient addonclient.Interface - csrControl clientcert.CSRControl - recorder events.Recorder - csrIndexer cache.Indexer + patcher patcher.Patcher[ + *addonv1alpha1.ManagedClusterAddOn, addonv1alpha1.ManagedClusterAddOnSpec, addonv1alpha1.ManagedClusterAddOnStatus] + csrControl clientcert.CSRControl + recorder events.Recorder + csrIndexer cache.Indexer startRegistrationFunc func(ctx context.Context, config registrationConfig) context.CancelFunc @@ -71,14 +72,16 @@ func NewAddOnRegistrationController( recorder events.Recorder, ) factory.Controller { c := &addOnRegistrationController{ - clusterName: clusterName, - agentName: agentName, - kubeconfigData: kubeconfigData, - managementKubeClient: managementKubeClient, - spokeKubeClient: managedKubeClient, - hubAddOnLister: hubAddOnInformers.Lister(), - csrControl: csrControl, - addOnClient: addOnClient, + clusterName: clusterName, + agentName: agentName, + kubeconfigData: kubeconfigData, + managementKubeClient: managementKubeClient, + spokeKubeClient: managedKubeClient, + hubAddOnLister: hubAddOnInformers.Lister(), + csrControl: csrControl, + patcher: patcher.NewPatcher[ + *addonv1alpha1.ManagedClusterAddOn, addonv1alpha1.ManagedClusterAddOnSpec, addonv1alpha1.ManagedClusterAddOnStatus]( + addOnClient.AddonV1alpha1().ManagedClusterAddOns(clusterName)), recorder: recorder, csrIndexer: csrControl.Informer().GetIndexer(), addOnRegistrationConfigs: map[string]map[string]registrationConfig{}, @@ -276,11 +279,18 @@ func (c *addOnRegistrationController) haltCSRCreationFunc(addonName string) func func (c *addOnRegistrationController) generateStatusUpdate(clusterName, addonName string) clientcert.StatusUpdateFunc { return func(ctx context.Context, cond metav1.Condition) error { - _, _, updatedErr := helpers.UpdateManagedClusterAddOnStatus( - ctx, c.addOnClient, clusterName, addonName, helpers.UpdateManagedClusterAddOnStatusFn(cond), - ) + addon, err := c.hubAddOnLister.ManagedClusterAddOns(clusterName).Get(addonName) + if errors.IsNotFound(err) { + return nil + } + if err != nil { + return err + } - return updatedErr + newAddon := addon.DeepCopy() + meta.SetStatusCondition(&newAddon.Status.Conditions, cond) + _, err = c.patcher.PatchStatus(ctx, newAddon, newAddon.Status, addon.Status) + return err } } diff --git a/pkg/registration/spoke/managedcluster/lease_controller.go b/pkg/registration/spoke/lease/lease_controller.go similarity index 99% rename from pkg/registration/spoke/managedcluster/lease_controller.go rename to pkg/registration/spoke/lease/lease_controller.go index 444bd36cb..1c93339e1 100644 --- a/pkg/registration/spoke/managedcluster/lease_controller.go +++ b/pkg/registration/spoke/lease/lease_controller.go @@ -1,4 +1,4 @@ -package managedcluster +package lease import ( "context" diff --git a/pkg/registration/spoke/managedcluster/lease_controller_test.go b/pkg/registration/spoke/lease/lease_controller_test.go similarity index 99% rename from pkg/registration/spoke/managedcluster/lease_controller_test.go rename to pkg/registration/spoke/lease/lease_controller_test.go index 4b7d021c4..c39e8071e 100644 --- a/pkg/registration/spoke/managedcluster/lease_controller_test.go +++ b/pkg/registration/spoke/lease/lease_controller_test.go @@ -1,4 +1,4 @@ -package managedcluster +package lease import ( "context" diff --git a/pkg/registration/spoke/managedcluster/claim_controller.go b/pkg/registration/spoke/managedcluster/claim_controller.go deleted file mode 100644 index dd1ee32b5..000000000 --- a/pkg/registration/spoke/managedcluster/claim_controller.go +++ /dev/null @@ -1,150 +0,0 @@ -package managedcluster - -import ( - "context" - "fmt" - "sort" - - "k8s.io/apimachinery/pkg/selection" - - clientset "open-cluster-management.io/api/client/cluster/clientset/versioned" - clusterv1informer "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1" - clusterv1alpha1informer "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1alpha1" - clusterv1listers "open-cluster-management.io/api/client/cluster/listers/cluster/v1" - clusterv1alpha1listers "open-cluster-management.io/api/client/cluster/listers/cluster/v1alpha1" - clusterv1 "open-cluster-management.io/api/cluster/v1" - clusterv1alpha1 "open-cluster-management.io/api/cluster/v1alpha1" - "open-cluster-management.io/ocm/pkg/registration/helpers" - - "github.com/openshift/library-go/pkg/controller/factory" - "github.com/openshift/library-go/pkg/operator/events" - - "k8s.io/apimachinery/pkg/api/meta" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/klog/v2" -) - -const labelCustomizedOnly = "open-cluster-management.io/spoke-only" - -// managedClusterClaimController exposes cluster claims created on managed cluster on hub after it joins the hub. -type managedClusterClaimController struct { - clusterName string - hubClusterClient clientset.Interface - hubClusterLister clusterv1listers.ManagedClusterLister - claimLister clusterv1alpha1listers.ClusterClaimLister - maxCustomClusterClaims int -} - -// NewManagedClusterClaimController creates a new managed cluster claim controller on the managed cluster. -func NewManagedClusterClaimController( - clusterName string, - maxCustomClusterClaims int, - hubClusterClient clientset.Interface, - hubManagedClusterInformer clusterv1informer.ManagedClusterInformer, - claimInformer clusterv1alpha1informer.ClusterClaimInformer, - recorder events.Recorder) factory.Controller { - c := &managedClusterClaimController{ - clusterName: clusterName, - maxCustomClusterClaims: maxCustomClusterClaims, - hubClusterClient: hubClusterClient, - hubClusterLister: hubManagedClusterInformer.Lister(), - claimLister: claimInformer.Lister(), - } - - return factory.New(). - WithInformers(claimInformer.Informer()). - WithInformersQueueKeyFunc(func(obj runtime.Object) string { - accessor, _ := meta.Accessor(obj) - return accessor.GetName() - }, hubManagedClusterInformer.Informer()). - WithSync(c.sync). - ToController("ClusterClaimController", recorder) -} - -// sync maintains the cluster claims in status of the managed cluster on hub once it joins the hub. -func (c managedClusterClaimController) sync(ctx context.Context, syncCtx factory.SyncContext) error { - managedCluster, err := c.hubClusterLister.Get(c.clusterName) - if err != nil { - return fmt.Errorf("unable to get managed cluster with name %q from hub: %w", c.clusterName, err) - } - - // current managed cluster has not joined the hub yet, do nothing. - if !meta.IsStatusConditionTrue(managedCluster.Status.Conditions, clusterv1.ManagedClusterConditionJoined) { - syncCtx.Recorder().Eventf("ManagedClusterIsNotAccepted", "Managed cluster %q does not join the hub yet", c.clusterName) - return nil - } - - return c.exposeClaims(ctx, syncCtx, managedCluster) -} - -// exposeClaims saves cluster claims fetched on managed cluster into status of the -// managed cluster on hub. Some of the customized claims might not be exposed once -// the total number of the claims exceeds the value of `cluster-claims-max`. -func (c managedClusterClaimController) exposeClaims(ctx context.Context, syncCtx factory.SyncContext, - managedCluster *clusterv1.ManagedCluster) error { - reservedClaims := []clusterv1.ManagedClusterClaim{} - customClaims := []clusterv1.ManagedClusterClaim{} - - // clusterClaim with label `open-cluster-management.io/spoke-only` will not be synced to managedCluster.Status at hub. - requirement, _ := labels.NewRequirement(labelCustomizedOnly, selection.DoesNotExist, []string{}) - selector := labels.NewSelector().Add(*requirement) - clusterClaims, err := c.claimLister.List(selector) - if err != nil { - return fmt.Errorf("unable to list cluster claims: %w", err) - } - - reservedClaimNames := sets.NewString(clusterv1alpha1.ReservedClusterClaimNames[:]...) - for _, clusterClaim := range clusterClaims { - managedClusterClaim := clusterv1.ManagedClusterClaim{ - Name: clusterClaim.Name, - Value: clusterClaim.Spec.Value, - } - if reservedClaimNames.Has(clusterClaim.Name) { - reservedClaims = append(reservedClaims, managedClusterClaim) - continue - } - customClaims = append(customClaims, managedClusterClaim) - } - - // sort claims by name - sort.SliceStable(reservedClaims, func(i, j int) bool { - return reservedClaims[i].Name < reservedClaims[j].Name - }) - - sort.SliceStable(customClaims, func(i, j int) bool { - return customClaims[i].Name < customClaims[j].Name - }) - - // truncate custom claims if the number exceeds `max-custom-cluster-claims` - if n := len(customClaims); n > c.maxCustomClusterClaims { - customClaims = customClaims[:c.maxCustomClusterClaims] - syncCtx.Recorder().Eventf("CustomClusterClaimsTruncated", "%d cluster claims are found. It exceeds the max number of custom cluster claims (%d). %d custom cluster claims are not exposed.", - n, c.maxCustomClusterClaims, n-c.maxCustomClusterClaims) - } - - // merge reserved claims and custom claims - claims := append(reservedClaims, customClaims...) - - // update the status of the managed cluster - updateStatusFuncs := []helpers.UpdateManagedClusterStatusFunc{updateClusterClaimsFn(clusterv1.ManagedClusterStatus{ - ClusterClaims: claims, - })} - - _, updated, err := helpers.UpdateManagedClusterStatus(ctx, c.hubClusterClient, c.clusterName, updateStatusFuncs...) - if err != nil { - return fmt.Errorf("unable to update status of managed cluster %q: %w", c.clusterName, err) - } - if updated { - klog.V(4).Infof("The cluster claims in status of managed cluster %q has been updated", c.clusterName) - } - return nil -} - -func updateClusterClaimsFn(status clusterv1.ManagedClusterStatus) helpers.UpdateManagedClusterStatusFunc { - return func(oldStatus *clusterv1.ManagedClusterStatus) error { - oldStatus.ClusterClaims = status.ClusterClaims - return nil - } -} diff --git a/pkg/registration/spoke/managedcluster/claim_reconcile.go b/pkg/registration/spoke/managedcluster/claim_reconcile.go new file mode 100644 index 000000000..2eb901737 --- /dev/null +++ b/pkg/registration/spoke/managedcluster/claim_reconcile.go @@ -0,0 +1,89 @@ +package managedcluster + +import ( + "context" + "fmt" + "github.com/openshift/library-go/pkg/operator/events" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/selection" + "k8s.io/apimachinery/pkg/util/sets" + clusterv1alpha1listers "open-cluster-management.io/api/client/cluster/listers/cluster/v1alpha1" + clusterv1 "open-cluster-management.io/api/cluster/v1" + clusterv1alpha1 "open-cluster-management.io/api/cluster/v1alpha1" + ocmfeature "open-cluster-management.io/api/feature" + "open-cluster-management.io/ocm/pkg/features" + "sort" +) + +const labelCustomizedOnly = "open-cluster-management.io/spoke-only" + +type claimReconcile struct { + recorder events.Recorder + claimLister clusterv1alpha1listers.ClusterClaimLister + maxCustomClusterClaims int +} + +func (r *claimReconcile) reconcile(ctx context.Context, cluster *clusterv1.ManagedCluster) (*clusterv1.ManagedCluster, reconcileState, error) { + if !features.DefaultSpokeRegistrationMutableFeatureGate.Enabled(ocmfeature.ClusterClaim) { + return cluster, reconcileContinue, nil + } + // current managed cluster has not joined the hub yet, do nothing. + if !meta.IsStatusConditionTrue(cluster.Status.Conditions, clusterv1.ManagedClusterConditionJoined) { + r.recorder.Eventf("ManagedClusterIsNotAccepted", "Managed cluster %q does not join the hub yet", cluster.Name) + return cluster, reconcileContinue, nil + } + + err := r.exposeClaims(ctx, cluster) + return cluster, reconcileContinue, err +} + +// exposeClaims saves cluster claims fetched on managed cluster into status of the +// managed cluster on hub. Some of the customized claims might not be exposed once +// the total number of the claims exceeds the value of `cluster-claims-max`. +func (r *claimReconcile) exposeClaims(ctx context.Context, cluster *clusterv1.ManagedCluster) error { + reservedClaims := []clusterv1.ManagedClusterClaim{} + customClaims := []clusterv1.ManagedClusterClaim{} + + // clusterClaim with label `open-cluster-management.io/spoke-only` will not be synced to managedCluster.Status at hub. + requirement, _ := labels.NewRequirement(labelCustomizedOnly, selection.DoesNotExist, []string{}) + selector := labels.NewSelector().Add(*requirement) + clusterClaims, err := r.claimLister.List(selector) + if err != nil { + return fmt.Errorf("unable to list cluster claims: %w", err) + } + + reservedClaimNames := sets.NewString(clusterv1alpha1.ReservedClusterClaimNames[:]...) + for _, clusterClaim := range clusterClaims { + managedClusterClaim := clusterv1.ManagedClusterClaim{ + Name: clusterClaim.Name, + Value: clusterClaim.Spec.Value, + } + if reservedClaimNames.Has(clusterClaim.Name) { + reservedClaims = append(reservedClaims, managedClusterClaim) + continue + } + customClaims = append(customClaims, managedClusterClaim) + } + + // sort claims by name + sort.SliceStable(reservedClaims, func(i, j int) bool { + return reservedClaims[i].Name < reservedClaims[j].Name + }) + + sort.SliceStable(customClaims, func(i, j int) bool { + return customClaims[i].Name < customClaims[j].Name + }) + + // truncate custom claims if the number exceeds `max-custom-cluster-claims` + if n := len(customClaims); n > r.maxCustomClusterClaims { + customClaims = customClaims[:r.maxCustomClusterClaims] + r.recorder.Eventf("CustomClusterClaimsTruncated", "%d cluster claims are found. It exceeds the max number of custom cluster claims (%d). %d custom cluster claims are not exposed.", + n, r.maxCustomClusterClaims, n-r.maxCustomClusterClaims) + } + + // merge reserved claims and custom claims + claims := append(reservedClaims, customClaims...) + cluster.Status.ClusterClaims = claims + return nil +} diff --git a/pkg/registration/spoke/managedcluster/claim_controller_test.go b/pkg/registration/spoke/managedcluster/claim_reconcile_test.go similarity index 78% rename from pkg/registration/spoke/managedcluster/claim_controller_test.go rename to pkg/registration/spoke/managedcluster/claim_reconcile_test.go index 8918036fd..2fa218719 100644 --- a/pkg/registration/spoke/managedcluster/claim_controller_test.go +++ b/pkg/registration/spoke/managedcluster/claim_reconcile_test.go @@ -3,6 +3,9 @@ package managedcluster import ( "context" "encoding/json" + "github.com/openshift/library-go/pkg/operator/events/eventstesting" + kubeinformers "k8s.io/client-go/informers" + kubefake "k8s.io/client-go/kubernetes/fake" "reflect" "testing" "time" @@ -30,7 +33,7 @@ func TestSync(t *testing.T) { { name: "sync no managed cluster", validateActions: testingcommon.AssertNoActions, - expectedErr: "unable to get managed cluster with name \"testmanagedcluster\" from hub: managedcluster.cluster.open-cluster-management.io \"testmanagedcluster\" not found", + expectedErr: "unable to get managed cluster \"testmanagedcluster\" from hub: managedcluster.cluster.open-cluster-management.io \"testmanagedcluster\" not found", }, { name: "skip when managed cluster does not join the hub yet", @@ -51,8 +54,8 @@ func TestSync(t *testing.T) { }, }, validateActions: func(t *testing.T, actions []clienttesting.Action) { - testingcommon.AssertActions(t, actions, "get", "patch") - patch := actions[1].(clienttesting.PatchAction).GetPatch() + testingcommon.AssertActions(t, actions, "patch") + patch := actions[0].(clienttesting.PatchAction).GetPatch() cluster := &clusterv1.ManagedCluster{} err := json.Unmarshal(patch, cluster) if err != nil { @@ -72,6 +75,11 @@ func TestSync(t *testing.T) { }, } + apiServer, discoveryClient := newDiscoveryServer(t, nil) + defer apiServer.Close() + kubeClient := kubefake.NewSimpleClientset() + kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Minute*10) + for _, c := range cases { t.Run(c.name, func(t *testing.T) { objects := []runtime.Object{} @@ -93,13 +101,16 @@ func TestSync(t *testing.T) { } } - ctrl := managedClusterClaimController{ - clusterName: testinghelpers.TestManagedClusterName, - maxCustomClusterClaims: 20, - hubClusterClient: clusterClient, - hubClusterLister: clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(), - claimLister: clusterInformerFactory.Cluster().V1alpha1().ClusterClaims().Lister(), - } + ctrl := newManagedClusterStatusController( + testinghelpers.TestManagedClusterName, + clusterClient, + clusterInformerFactory.Cluster().V1().ManagedClusters(), + discoveryClient, + clusterInformerFactory.Cluster().V1alpha1().ClusterClaims(), + kubeInformerFactory.Core().V1().Nodes(), + 20, + eventstesting.NewTestingEventRecorder(t), + ) syncErr := ctrl.sync(context.TODO(), testingcommon.NewFakeSyncContext(t, "")) testingcommon.AssertError(t, syncErr, c.expectedErr) @@ -132,8 +143,8 @@ func TestExposeClaims(t *testing.T) { }, }, validateActions: func(t *testing.T, actions []clienttesting.Action) { - testingcommon.AssertActions(t, actions, "get", "patch") - patch := actions[1].(clienttesting.PatchAction).GetPatch() + testingcommon.AssertActions(t, actions, "patch") + patch := actions[0].(clienttesting.PatchAction).GetPatch() cluster := &clusterv1.ManagedCluster{} err := json.Unmarshal(patch, cluster) if err != nil { @@ -190,8 +201,8 @@ func TestExposeClaims(t *testing.T) { }, maxCustomClusterClaims: 2, validateActions: func(t *testing.T, actions []clienttesting.Action) { - testingcommon.AssertActions(t, actions, "get", "patch") - patch := actions[1].(clienttesting.PatchAction).GetPatch() + testingcommon.AssertActions(t, actions, "patch") + patch := actions[0].(clienttesting.PatchAction).GetPatch() cluster := &clusterv1.ManagedCluster{} err := json.Unmarshal(patch, cluster) if err != nil { @@ -226,8 +237,8 @@ func TestExposeClaims(t *testing.T) { }, }), validateActions: func(t *testing.T, actions []clienttesting.Action) { - testingcommon.AssertActions(t, actions, "get", "patch") - patch := actions[1].(clienttesting.PatchAction).GetPatch() + testingcommon.AssertActions(t, actions, "patch") + patch := actions[0].(clienttesting.PatchAction).GetPatch() cluster := &clusterv1.ManagedCluster{} err := json.Unmarshal(patch, cluster) if err != nil { @@ -262,8 +273,8 @@ func TestExposeClaims(t *testing.T) { }, }, validateActions: func(t *testing.T, actions []clienttesting.Action) { - testingcommon.AssertActions(t, actions, "get", "patch") - patch := actions[1].(clienttesting.PatchAction).GetPatch() + testingcommon.AssertActions(t, actions, "patch") + patch := actions[0].(clienttesting.PatchAction).GetPatch() cluster := &clusterv1.ManagedCluster{} err := json.Unmarshal(patch, cluster) if err != nil { @@ -283,6 +294,11 @@ func TestExposeClaims(t *testing.T) { }, } + apiServer, discoveryClient := newDiscoveryServer(t, nil) + defer apiServer.Close() + kubeClient := kubefake.NewSimpleClientset() + kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Minute*10) + for _, c := range cases { t.Run(c.name, func(t *testing.T) { objects := []runtime.Object{} @@ -308,15 +324,18 @@ func TestExposeClaims(t *testing.T) { c.maxCustomClusterClaims = 20 } - ctrl := managedClusterClaimController{ - clusterName: testinghelpers.TestManagedClusterName, - maxCustomClusterClaims: c.maxCustomClusterClaims, - hubClusterClient: clusterClient, - hubClusterLister: clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(), - claimLister: clusterInformerFactory.Cluster().V1alpha1().ClusterClaims().Lister(), - } + ctrl := newManagedClusterStatusController( + testinghelpers.TestManagedClusterName, + clusterClient, + clusterInformerFactory.Cluster().V1().ManagedClusters(), + discoveryClient, + clusterInformerFactory.Cluster().V1alpha1().ClusterClaims(), + kubeInformerFactory.Core().V1().Nodes(), + c.maxCustomClusterClaims, + eventstesting.NewTestingEventRecorder(t), + ) - syncErr := ctrl.exposeClaims(context.TODO(), testingcommon.NewFakeSyncContext(t, c.cluster.Name), c.cluster) + syncErr := ctrl.sync(context.TODO(), testingcommon.NewFakeSyncContext(t, c.cluster.Name)) testingcommon.AssertError(t, syncErr, c.expectedErr) c.validateActions(t, clusterClient.Actions()) diff --git a/pkg/registration/spoke/managedcluster/joining_controller.go b/pkg/registration/spoke/managedcluster/joining_controller.go deleted file mode 100644 index 20011b4ee..000000000 --- a/pkg/registration/spoke/managedcluster/joining_controller.go +++ /dev/null @@ -1,86 +0,0 @@ -package managedcluster - -import ( - "context" - "fmt" - "time" - - clientset "open-cluster-management.io/api/client/cluster/clientset/versioned" - clusterv1informer "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1" - clusterv1listers "open-cluster-management.io/api/client/cluster/listers/cluster/v1" - clusterv1 "open-cluster-management.io/api/cluster/v1" - "open-cluster-management.io/ocm/pkg/registration/helpers" - - "github.com/openshift/library-go/pkg/controller/factory" - "github.com/openshift/library-go/pkg/operator/events" - - "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -// managedClusterJoiningController add the joined condition to a ManagedCluster on the managed cluster after it is accepted by hub cluster admin. -type managedClusterJoiningController struct { - clusterName string - hubClusterClient clientset.Interface - hubClusterLister clusterv1listers.ManagedClusterLister -} - -// NewManagedClusterJoiningController creates a new managed cluster joining controller on the managed cluster. -func NewManagedClusterJoiningController( - clusterName string, - hubClusterClient clientset.Interface, - hubManagedClusterInformer clusterv1informer.ManagedClusterInformer, - recorder events.Recorder) factory.Controller { - c := &managedClusterJoiningController{ - clusterName: clusterName, - hubClusterClient: hubClusterClient, - hubClusterLister: hubManagedClusterInformer.Lister(), - } - - return factory.New(). - WithInformers(hubManagedClusterInformer.Informer()). - WithSync(c.sync). - ResyncEvery(5*time.Minute). - ToController("ManagedClusterJoiningController", recorder) -} - -// sync maintains the managed cluster side status of a ManagedCluster, it maintains the ManagedClusterJoined condition according to -// the value of the ManagedClusterHubAccepted condition. -func (c managedClusterJoiningController) sync(ctx context.Context, syncCtx factory.SyncContext) error { - managedCluster, err := c.hubClusterLister.Get(c.clusterName) - if err != nil { - return fmt.Errorf("unable to get managed cluster with name %q from hub: %w", c.clusterName, err) - } - - // current managed cluster is not accepted, do nothing. - if !meta.IsStatusConditionTrue(managedCluster.Status.Conditions, clusterv1.ManagedClusterConditionHubAccepted) { - syncCtx.Recorder().Eventf("ManagedClusterIsNotAccepted", "Managed cluster %q is not accepted by hub yet", c.clusterName) - return nil - } - - joined := meta.IsStatusConditionTrue(managedCluster.Status.Conditions, clusterv1.ManagedClusterConditionJoined) - if joined { - // current managed cluster is joined, do nothing. - return nil - } - - // current managed cluster did not join the hub cluster, join it. - _, updated, err := helpers.UpdateManagedClusterStatus( - ctx, - c.hubClusterClient, - c.clusterName, - helpers.UpdateManagedClusterConditionFn(metav1.Condition{ - Type: clusterv1.ManagedClusterConditionJoined, - Status: metav1.ConditionTrue, - Reason: "ManagedClusterJoined", - Message: "Managed cluster joined", - }), - ) - if err != nil { - return fmt.Errorf("unable to update status of managed cluster %q: %w", c.clusterName, err) - } - if updated { - syncCtx.Recorder().Eventf("ManagedClusterJoined", "Managed cluster %q joined hub", c.clusterName) - } - return nil -} diff --git a/pkg/registration/spoke/managedcluster/joining_controller_test.go b/pkg/registration/spoke/managedcluster/joining_controller_test.go index 45f14f6c1..a777ceaf2 100644 --- a/pkg/registration/spoke/managedcluster/joining_controller_test.go +++ b/pkg/registration/spoke/managedcluster/joining_controller_test.go @@ -3,6 +3,9 @@ package managedcluster import ( "context" "encoding/json" + "github.com/openshift/library-go/pkg/operator/events/eventstesting" + kubeinformers "k8s.io/client-go/informers" + kubefake "k8s.io/client-go/kubernetes/fake" "testing" "time" @@ -28,7 +31,7 @@ func TestSyncManagedCluster(t *testing.T) { name: "sync no managed cluster", startingObjects: []runtime.Object{}, validateActions: testingcommon.AssertNoActions, - expectedErr: "unable to get managed cluster with name \"testmanagedcluster\" from hub: managedcluster.cluster.open-cluster-management.io \"testmanagedcluster\" not found", + expectedErr: "unable to get managed cluster \"testmanagedcluster\" from hub: managedcluster.cluster.open-cluster-management.io \"testmanagedcluster\" not found", }, { name: "sync an unaccepted managed cluster", @@ -45,8 +48,8 @@ func TestSyncManagedCluster(t *testing.T) { Reason: "ManagedClusterJoined", Message: "Managed cluster joined", } - testingcommon.AssertActions(t, actions, "get", "patch") - patch := actions[1].(clienttesting.PatchAction).GetPatch() + testingcommon.AssertActions(t, actions, "patch") + patch := actions[0].(clienttesting.PatchAction).GetPatch() managedCluster := &clusterv1.ManagedCluster{} err := json.Unmarshal(patch, managedCluster) if err != nil { @@ -57,6 +60,11 @@ func TestSyncManagedCluster(t *testing.T) { }, } + apiServer, discoveryClient := newDiscoveryServer(t, nil) + defer apiServer.Close() + kubeClient := kubefake.NewSimpleClientset() + kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Minute*10) + for _, c := range cases { t.Run(c.name, func(t *testing.T) { clusterClient := clusterfake.NewSimpleClientset(c.startingObjects...) @@ -68,11 +76,16 @@ func TestSyncManagedCluster(t *testing.T) { } } - ctrl := managedClusterJoiningController{ - clusterName: testinghelpers.TestManagedClusterName, - hubClusterClient: clusterClient, - hubClusterLister: clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(), - } + ctrl := newManagedClusterStatusController( + testinghelpers.TestManagedClusterName, + clusterClient, + clusterInformerFactory.Cluster().V1().ManagedClusters(), + discoveryClient, + clusterInformerFactory.Cluster().V1alpha1().ClusterClaims(), + kubeInformerFactory.Core().V1().Nodes(), + 20, + eventstesting.NewTestingEventRecorder(t), + ) syncErr := ctrl.sync(context.TODO(), testingcommon.NewFakeSyncContext(t, "")) testingcommon.AssertError(t, syncErr, c.expectedErr) diff --git a/pkg/registration/spoke/managedcluster/joining_reconcile.go b/pkg/registration/spoke/managedcluster/joining_reconcile.go new file mode 100644 index 000000000..870ea8c23 --- /dev/null +++ b/pkg/registration/spoke/managedcluster/joining_reconcile.go @@ -0,0 +1,36 @@ +package managedcluster + +import ( + "context" + "github.com/openshift/library-go/pkg/operator/events" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + clusterv1 "open-cluster-management.io/api/cluster/v1" +) + +type joiningReconcile struct { + recorder events.Recorder +} + +func (r *joiningReconcile) reconcile(ctx context.Context, cluster *clusterv1.ManagedCluster) (*clusterv1.ManagedCluster, reconcileState, error) { + // current managed cluster is not accepted, do nothing. + if !meta.IsStatusConditionTrue(cluster.Status.Conditions, clusterv1.ManagedClusterConditionHubAccepted) { + r.recorder.Eventf("ManagedClusterIsNotAccepted", "Managed cluster %q is not accepted by hub yet", cluster.Name) + return cluster, reconcileStop, nil + } + + joined := meta.IsStatusConditionTrue(cluster.Status.Conditions, clusterv1.ManagedClusterConditionJoined) + if joined { + // current managed cluster is joined, do nothing. + return cluster, reconcileContinue, nil + } + + meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{ + Type: clusterv1.ManagedClusterConditionJoined, + Status: metav1.ConditionTrue, + Reason: "ManagedClusterJoined", + Message: "Managed cluster joined", + }) + + return cluster, reconcileContinue, nil +} diff --git a/pkg/registration/spoke/managedcluster/resource_reconcile.go b/pkg/registration/spoke/managedcluster/resource_reconcile.go new file mode 100644 index 000000000..81114bc84 --- /dev/null +++ b/pkg/registration/spoke/managedcluster/resource_reconcile.go @@ -0,0 +1,125 @@ +package managedcluster + +import ( + "context" + "fmt" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/discovery" + corev1lister "k8s.io/client-go/listers/core/v1" + "net/http" + clusterv1 "open-cluster-management.io/api/cluster/v1" +) + +type resoureReconcile struct { + managedClusterDiscoveryClient discovery.DiscoveryInterface + nodeLister corev1lister.NodeLister +} + +func (r *resoureReconcile) reconcile(ctx context.Context, cluster *clusterv1.ManagedCluster) (*clusterv1.ManagedCluster, reconcileState, error) { + // check the kube-apiserver health on managed cluster. + condition := r.checkKubeAPIServerStatus(ctx) + + // the managed cluster kube-apiserver is health, update its version and resources if necessary. + if condition.Status == metav1.ConditionTrue { + clusterVersion, err := r.getClusterVersion() + if err != nil { + return cluster, reconcileStop, fmt.Errorf("unable to get server version of managed cluster %q: %w", cluster.Name, err) + } + + capacity, allocatable, err := r.getClusterResources() + if err != nil { + return cluster, reconcileStop, fmt.Errorf("unable to get capacity and allocatable of managed cluster %q: %w", cluster.Name, err) + } + + cluster.Status.Capacity = capacity + cluster.Status.Allocatable = allocatable + cluster.Status.Version = *clusterVersion + } + + meta.SetStatusCondition(&cluster.Status.Conditions, condition) + return cluster, reconcileContinue, nil +} + +// using readyz api to check the status of kube apiserver +func (r *resoureReconcile) checkKubeAPIServerStatus(ctx context.Context) metav1.Condition { + statusCode := 0 + condition := metav1.Condition{Type: clusterv1.ManagedClusterConditionAvailable} + result := r.managedClusterDiscoveryClient.RESTClient().Get().AbsPath("/livez").Do(ctx).StatusCode(&statusCode) + if statusCode == http.StatusOK { + condition.Status = metav1.ConditionTrue + condition.Reason = "ManagedClusterAvailable" + condition.Message = "Managed cluster is available" + return condition + } + + // for backward compatible, the livez endpoint is supported from Kubernetes 1.16, so if the livez is not found or + // forbidden, the healthz endpoint will be used. + if statusCode == http.StatusNotFound || statusCode == http.StatusForbidden { + result = r.managedClusterDiscoveryClient.RESTClient().Get().AbsPath("/healthz").Do(ctx).StatusCode(&statusCode) + if statusCode == http.StatusOK { + condition.Status = metav1.ConditionTrue + condition.Reason = "ManagedClusterAvailable" + condition.Message = "Managed cluster is available" + return condition + } + } + + condition.Status = metav1.ConditionFalse + condition.Reason = "ManagedClusterKubeAPIServerUnavailable" + body, err := result.Raw() + if err == nil { + condition.Message = fmt.Sprintf("The kube-apiserver is not ok, status code: %d, %v", statusCode, string(body)) + return condition + } + + condition.Message = fmt.Sprintf("The kube-apiserver is not ok, status code: %d, %v", statusCode, err) + return condition +} + +func (r *resoureReconcile) getClusterVersion() (*clusterv1.ManagedClusterVersion, error) { + serverVersion, err := r.managedClusterDiscoveryClient.ServerVersion() + if err != nil { + return nil, err + } + return &clusterv1.ManagedClusterVersion{Kubernetes: serverVersion.String()}, nil +} + +func (r *resoureReconcile) getClusterResources() (capacity, allocatable clusterv1.ResourceList, err error) { + nodes, err := r.nodeLister.List(labels.Everything()) + if err != nil { + return nil, nil, err + } + + capacityList := make(map[clusterv1.ResourceName]resource.Quantity) + allocatableList := make(map[clusterv1.ResourceName]resource.Quantity) + + for _, node := range nodes { + for key, value := range node.Status.Capacity { + if capacity, exist := capacityList[clusterv1.ResourceName(key)]; exist { + capacity.Add(value) + capacityList[clusterv1.ResourceName(key)] = capacity + } else { + capacityList[clusterv1.ResourceName(key)] = value + } + } + + // the node is unschedulable, ignore its allocatable resources + if node.Spec.Unschedulable { + continue + } + + for key, value := range node.Status.Allocatable { + if allocatable, exist := allocatableList[clusterv1.ResourceName(key)]; exist { + allocatable.Add(value) + allocatableList[clusterv1.ResourceName(key)] = allocatable + } else { + allocatableList[clusterv1.ResourceName(key)] = value + } + } + } + + return capacityList, allocatableList, nil +} diff --git a/pkg/registration/spoke/managedcluster/status_controller_test.go b/pkg/registration/spoke/managedcluster/resource_reconcile_test.go similarity index 87% rename from pkg/registration/spoke/managedcluster/status_controller_test.go rename to pkg/registration/spoke/managedcluster/resource_reconcile_test.go index edebc2f51..7523c8a20 100644 --- a/pkg/registration/spoke/managedcluster/status_controller_test.go +++ b/pkg/registration/spoke/managedcluster/resource_reconcile_test.go @@ -3,6 +3,7 @@ package managedcluster import ( "context" "encoding/json" + "github.com/openshift/library-go/pkg/operator/events/eventstesting" "net/http" "net/http/httptest" "testing" @@ -31,8 +32,13 @@ type serverResponse struct { responseMsg string } -func TestHealthCheck(t *testing.T) { - serverResponse := &serverResponse{} +func newDiscoveryServer(t *testing.T, resp *serverResponse) (*httptest.Server, *discovery.DiscoveryClient) { + serverResponse := &serverResponse{ + httpStatus: http.StatusOK, + } + if resp != nil { + serverResponse = resp + } apiServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { if req.URL.Path == "/healthz" { w.WriteHeader(http.StatusOK) @@ -60,9 +66,14 @@ func TestHealthCheck(t *testing.T) { t.Fatal(err) } })) - defer apiServer.Close() - discoveryClient := discovery.NewDiscoveryClientForConfigOrDie(&rest.Config{Host: apiServer.URL}) + return apiServer, discoveryClient +} + +func TestHealthCheck(t *testing.T) { + serverResponse := &serverResponse{} + apiServer, discoveryClient := newDiscoveryServer(t, serverResponse) + defer apiServer.Close() cases := []struct { name string @@ -91,8 +102,8 @@ func TestHealthCheck(t *testing.T) { Reason: "ManagedClusterKubeAPIServerUnavailable", Message: "The kube-apiserver is not ok, status code: 500, an error on the server (\"internal server error\") has prevented the request from succeeding", } - testingcommon.AssertActions(t, actions, "get", "patch") - patch := actions[1].(clienttesting.PatchAction).GetPatch() + testingcommon.AssertActions(t, actions, "patch") + patch := actions[0].(clienttesting.PatchAction).GetPatch() managedCluster := &clusterv1.ManagedCluster{} err := json.Unmarshal(patch, managedCluster) if err != nil { @@ -128,8 +139,8 @@ func TestHealthCheck(t *testing.T) { clusterv1.ResourceMemory: *resource.NewQuantity(int64(1024*1024*32), resource.BinarySI), }, } - testingcommon.AssertActions(t, actions, "get", "patch") - patch := actions[1].(clienttesting.PatchAction).GetPatch() + testingcommon.AssertActions(t, actions, "patch") + patch := actions[0].(clienttesting.PatchAction).GetPatch() managedCluster := &clusterv1.ManagedCluster{} err := json.Unmarshal(patch, managedCluster) if err != nil { @@ -151,8 +162,8 @@ func TestHealthCheck(t *testing.T) { Reason: "ManagedClusterAvailable", Message: "Managed cluster is available", } - testingcommon.AssertActions(t, actions, "get", "patch") - patch := actions[1].(clienttesting.PatchAction).GetPatch() + testingcommon.AssertActions(t, actions, "patch") + patch := actions[0].(clienttesting.PatchAction).GetPatch() managedCluster := &clusterv1.ManagedCluster{} err := json.Unmarshal(patch, managedCluster) if err != nil { @@ -173,8 +184,8 @@ func TestHealthCheck(t *testing.T) { Reason: "ManagedClusterAvailable", Message: "Managed cluster is available", } - testingcommon.AssertActions(t, actions, "get", "patch") - patch := actions[1].(clienttesting.PatchAction).GetPatch() + testingcommon.AssertActions(t, actions, "patch") + patch := actions[0].(clienttesting.PatchAction).GetPatch() managedCluster := &clusterv1.ManagedCluster{} err := json.Unmarshal(patch, managedCluster) if err != nil { @@ -220,8 +231,8 @@ func TestHealthCheck(t *testing.T) { clusterv1.ResourceMemory: *resource.NewQuantity(int64(1024*1024*64), resource.BinarySI), }, } - testingcommon.AssertActions(t, actions, "get", "patch") - patch := actions[1].(clienttesting.PatchAction).GetPatch() + testingcommon.AssertActions(t, actions, "patch") + patch := actions[0].(clienttesting.PatchAction).GetPatch() managedCluster := &clusterv1.ManagedCluster{} err := json.Unmarshal(patch, managedCluster) if err != nil { @@ -254,14 +265,16 @@ func TestHealthCheck(t *testing.T) { serverResponse.httpStatus = c.httpStatus serverResponse.responseMsg = c.responseMsg - - ctrl := &managedClusterStatusController{ - clusterName: testinghelpers.TestManagedClusterName, - hubClusterClient: clusterClient, - hubClusterLister: clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(), - managedClusterDiscoveryClient: discoveryClient, - nodeLister: kubeInformerFactory.Core().V1().Nodes().Lister(), - } + ctrl := newManagedClusterStatusController( + testinghelpers.TestManagedClusterName, + clusterClient, + clusterInformerFactory.Cluster().V1().ManagedClusters(), + discoveryClient, + clusterInformerFactory.Cluster().V1alpha1().ClusterClaims(), + kubeInformerFactory.Core().V1().Nodes(), + 20, + eventstesting.NewTestingEventRecorder(t), + ) syncErr := ctrl.sync(context.TODO(), testingcommon.NewFakeSyncContext(t, "")) testingcommon.AssertError(t, syncErr, c.expectedErr) diff --git a/pkg/registration/spoke/managedcluster/status_controller.go b/pkg/registration/spoke/managedcluster/status_controller.go index e8aaa87c1..d8c3a12a1 100644 --- a/pkg/registration/spoke/managedcluster/status_controller.go +++ b/pkg/registration/spoke/managedcluster/status_controller.go @@ -3,197 +3,118 @@ package managedcluster import ( "context" "fmt" - "net/http" + "k8s.io/apimachinery/pkg/util/errors" + clusterv1alpha1informer "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1alpha1" + "open-cluster-management.io/ocm/pkg/common/patcher" "time" + "github.com/openshift/library-go/pkg/controller/factory" + "github.com/openshift/library-go/pkg/operator/events" clientset "open-cluster-management.io/api/client/cluster/clientset/versioned" clusterv1informer "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1" clusterv1listers "open-cluster-management.io/api/client/cluster/listers/cluster/v1" clusterv1 "open-cluster-management.io/api/cluster/v1" - "open-cluster-management.io/ocm/pkg/registration/helpers" - "github.com/openshift/library-go/pkg/controller/factory" - "github.com/openshift/library-go/pkg/operator/events" - - "k8s.io/apimachinery/pkg/api/resource" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" discovery "k8s.io/client-go/discovery" corev1informers "k8s.io/client-go/informers/core/v1" - corev1lister "k8s.io/client-go/listers/core/v1" ) // managedClusterStatusController checks the kube-apiserver health on managed cluster to determine it whether is available // and ensure that the managed cluster resources and version are up to date. type managedClusterStatusController struct { - clusterName string - hubClusterClient clientset.Interface - hubClusterLister clusterv1listers.ManagedClusterLister - managedClusterDiscoveryClient discovery.DiscoveryInterface - nodeLister corev1lister.NodeLister + clusterName string + reconcilers []statusReconcile + patcher patcher.Patcher[*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus] + hubClusterLister clusterv1listers.ManagedClusterLister } +type statusReconcile interface { + reconcile(ctx context.Context, cm *clusterv1.ManagedCluster) (*clusterv1.ManagedCluster, reconcileState, error) +} + +type reconcileState int64 + +const ( + reconcileStop reconcileState = iota + reconcileContinue +) + // NewManagedClusterStatusController creates a managed cluster status controller on managed cluster. func NewManagedClusterStatusController( clusterName string, hubClusterClient clientset.Interface, hubClusterInformer clusterv1informer.ManagedClusterInformer, managedClusterDiscoveryClient discovery.DiscoveryInterface, + claimInformer clusterv1alpha1informer.ClusterClaimInformer, nodeInformer corev1informers.NodeInformer, + maxCustomClusterClaims int, resyncInterval time.Duration, recorder events.Recorder) factory.Controller { - c := &managedClusterStatusController{ - clusterName: clusterName, - hubClusterClient: hubClusterClient, - hubClusterLister: hubClusterInformer.Lister(), - managedClusterDiscoveryClient: managedClusterDiscoveryClient, - nodeLister: nodeInformer.Lister(), - } + c := newManagedClusterStatusController( + clusterName, + hubClusterClient, + hubClusterInformer, + managedClusterDiscoveryClient, + claimInformer, + nodeInformer, + maxCustomClusterClaims, + recorder, + ) return factory.New(). - WithInformers(hubClusterInformer.Informer(), nodeInformer.Informer()). + WithInformers(hubClusterInformer.Informer(), nodeInformer.Informer(), claimInformer.Informer()). WithSync(c.sync). ResyncEvery(resyncInterval). ToController("ManagedClusterStatusController", recorder) } +func newManagedClusterStatusController( + clusterName string, + hubClusterClient clientset.Interface, + hubClusterInformer clusterv1informer.ManagedClusterInformer, + managedClusterDiscoveryClient discovery.DiscoveryInterface, + claimInformer clusterv1alpha1informer.ClusterClaimInformer, + nodeInformer corev1informers.NodeInformer, + maxCustomClusterClaims int, + recorder events.Recorder) *managedClusterStatusController { + return &managedClusterStatusController{ + clusterName: clusterName, + patcher: patcher.NewPatcher[ + *clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus]( + hubClusterClient.ClusterV1().ManagedClusters()), + reconcilers: []statusReconcile{ + &joiningReconcile{recorder: recorder}, + &resoureReconcile{managedClusterDiscoveryClient: managedClusterDiscoveryClient, nodeLister: nodeInformer.Lister()}, + &claimReconcile{claimLister: claimInformer.Lister(), recorder: recorder, maxCustomClusterClaims: maxCustomClusterClaims}, + }, + hubClusterLister: hubClusterInformer.Lister(), + } +} + // sync updates managed cluster available condition by checking kube-apiserver health on managed cluster. // if the kube-apiserver is health, it will ensure that managed cluster resources and version are up to date. func (c *managedClusterStatusController) sync(ctx context.Context, syncCtx factory.SyncContext) error { - if _, err := c.hubClusterLister.Get(c.clusterName); err != nil { + cluster, err := c.hubClusterLister.Get(c.clusterName) + if err != nil { return fmt.Errorf("unable to get managed cluster %q from hub: %w", c.clusterName, err) } - updateStatusFuncs := []helpers.UpdateManagedClusterStatusFunc{} - - // check the kube-apiserver health on managed cluster. - condition := c.checkKubeAPIServerStatus(ctx) - - // the managed cluster kube-apiserver is health, update its version and resources if necessary. - if condition.Status == metav1.ConditionTrue { - clusterVersion, err := c.getClusterVersion() + newCluster := cluster.DeepCopy() + var errs []error + for _, reconciler := range c.reconcilers { + var state reconcileState + newCluster, state, err = reconciler.reconcile(ctx, newCluster) if err != nil { - return fmt.Errorf("unable to get server version of managed cluster %q: %w", c.clusterName, err) + errs = append(errs, err) } - - capacity, allocatable, err := c.getClusterResources() - if err != nil { - return fmt.Errorf("unable to get capacity and allocatable of managed cluster %q: %w", c.clusterName, err) + if state == reconcileStop { + break } - - updateStatusFuncs = append(updateStatusFuncs, updateClusterResourcesFn(clusterv1.ManagedClusterStatus{ - Capacity: capacity, - Allocatable: allocatable, - Version: *clusterVersion, - })) } - updateStatusFuncs = append(updateStatusFuncs, helpers.UpdateManagedClusterConditionFn(condition)) - _, updated, err := helpers.UpdateManagedClusterStatus(ctx, c.hubClusterClient, c.clusterName, updateStatusFuncs...) - if err != nil { - return fmt.Errorf("unable to update status of managed cluster %q: %w", c.clusterName, err) - } - if updated { - syncCtx.Recorder().Eventf("ManagedClusterStatusUpdated", "the status of managed cluster %q has been updated, available condition is %q, due to %q", - c.clusterName, condition.Status, condition.Message) - } - return nil -} - -// using readyz api to check the status of kube apiserver -func (c *managedClusterStatusController) checkKubeAPIServerStatus(ctx context.Context) metav1.Condition { - statusCode := 0 - condition := metav1.Condition{Type: clusterv1.ManagedClusterConditionAvailable} - result := c.managedClusterDiscoveryClient.RESTClient().Get().AbsPath("/livez").Do(ctx).StatusCode(&statusCode) - if statusCode == http.StatusOK { - condition.Status = metav1.ConditionTrue - condition.Reason = "ManagedClusterAvailable" - condition.Message = "Managed cluster is available" - return condition - } - - // for backward compatible, the livez endpoint is supported from Kubernetes 1.16, so if the livez is not found or - // forbidden, the healthz endpoint will be used. - if statusCode == http.StatusNotFound || statusCode == http.StatusForbidden { - result = c.managedClusterDiscoveryClient.RESTClient().Get().AbsPath("/healthz").Do(ctx).StatusCode(&statusCode) - if statusCode == http.StatusOK { - condition.Status = metav1.ConditionTrue - condition.Reason = "ManagedClusterAvailable" - condition.Message = "Managed cluster is available" - return condition - } - } - - condition.Status = metav1.ConditionFalse - condition.Reason = "ManagedClusterKubeAPIServerUnavailable" - body, err := result.Raw() - if err == nil { - condition.Message = fmt.Sprintf("The kube-apiserver is not ok, status code: %d, %v", statusCode, string(body)) - return condition - } - - condition.Message = fmt.Sprintf("The kube-apiserver is not ok, status code: %d, %v", statusCode, err) - return condition -} - -func (c *managedClusterStatusController) getClusterVersion() (*clusterv1.ManagedClusterVersion, error) { - serverVersion, err := c.managedClusterDiscoveryClient.ServerVersion() - if err != nil { - return nil, err - } - return &clusterv1.ManagedClusterVersion{Kubernetes: serverVersion.String()}, nil -} - -func (c *managedClusterStatusController) getClusterResources() (capacity, allocatable clusterv1.ResourceList, err error) { - nodes, err := c.nodeLister.List(labels.Everything()) - if err != nil { - return nil, nil, err - } - - capacityList := make(map[clusterv1.ResourceName]resource.Quantity) - allocatableList := make(map[clusterv1.ResourceName]resource.Quantity) - - for _, node := range nodes { - for key, value := range node.Status.Capacity { - if capacity, exist := capacityList[clusterv1.ResourceName(key)]; exist { - capacity.Add(value) - capacityList[clusterv1.ResourceName(key)] = capacity - } else { - capacityList[clusterv1.ResourceName(key)] = value - } - } - - // the node is unschedulable, ignore its allocatable resources - if node.Spec.Unschedulable { - continue - } - - for key, value := range node.Status.Allocatable { - if allocatable, exist := allocatableList[clusterv1.ResourceName(key)]; exist { - allocatable.Add(value) - allocatableList[clusterv1.ResourceName(key)] = allocatable - } else { - allocatableList[clusterv1.ResourceName(key)] = value - } - } - } - - return capacityList, allocatableList, nil -} - -func updateClusterResourcesFn(status clusterv1.ManagedClusterStatus) helpers.UpdateManagedClusterStatusFunc { - return func(oldStatus *clusterv1.ManagedClusterStatus) error { - // merge the old capacity to new capacity, if one old capacity entry does not exist in new capacity, - // we add it back to new capacity - for key, val := range oldStatus.Capacity { - if _, ok := status.Capacity[key]; !ok { - status.Capacity[key] = val - continue - } - } - oldStatus.Capacity = status.Capacity - oldStatus.Allocatable = status.Allocatable - oldStatus.Version = status.Version - return nil + if _, err := c.patcher.PatchStatus(ctx, newCluster, newCluster.Status, cluster.Status); err != nil { + errs = append(errs, err) } + + return errors.NewAggregate(errs) } diff --git a/pkg/registration/spoke/managedcluster/creating_controller.go b/pkg/registration/spoke/registration/creating_controller.go similarity index 99% rename from pkg/registration/spoke/managedcluster/creating_controller.go rename to pkg/registration/spoke/registration/creating_controller.go index 29010907d..cb8a01ded 100644 --- a/pkg/registration/spoke/managedcluster/creating_controller.go +++ b/pkg/registration/spoke/registration/creating_controller.go @@ -1,4 +1,4 @@ -package managedcluster +package registration import ( "context" diff --git a/pkg/registration/spoke/managedcluster/creating_controller_test.go b/pkg/registration/spoke/registration/creating_controller_test.go similarity index 98% rename from pkg/registration/spoke/managedcluster/creating_controller_test.go rename to pkg/registration/spoke/registration/creating_controller_test.go index d2b20e7e0..539f3a5ba 100644 --- a/pkg/registration/spoke/managedcluster/creating_controller_test.go +++ b/pkg/registration/spoke/registration/creating_controller_test.go @@ -1,4 +1,4 @@ -package managedcluster +package registration import ( "context" diff --git a/pkg/registration/spoke/managedcluster/registration.go b/pkg/registration/spoke/registration/registration.go similarity index 87% rename from pkg/registration/spoke/managedcluster/registration.go rename to pkg/registration/spoke/registration/registration.go index 2433fa2d5..e4f7f99b3 100644 --- a/pkg/registration/spoke/managedcluster/registration.go +++ b/pkg/registration/spoke/registration/registration.go @@ -1,8 +1,11 @@ -package managedcluster +package registration import ( "crypto/x509/pkix" "fmt" + "k8s.io/apimachinery/pkg/api/errors" + clusterv1listers "open-cluster-management.io/api/client/cluster/listers/cluster/v1" + "open-cluster-management.io/ocm/pkg/common/patcher" "strings" addonv1alpha1 "open-cluster-management.io/api/addon/v1alpha1" @@ -22,7 +25,6 @@ import ( clientset "open-cluster-management.io/api/client/cluster/clientset/versioned" "open-cluster-management.io/ocm/pkg/registration/clientcert" - "open-cluster-management.io/ocm/pkg/registration/helpers" "open-cluster-management.io/ocm/pkg/registration/hub/user" ) @@ -143,13 +145,22 @@ func GenerateBootstrapStatusUpdater() clientcert.StatusUpdateFunc { } // GenerateStatusUpdater generates status update func for the certificate management -func GenerateStatusUpdater(hubClusterClient clientset.Interface, clusterName string) clientcert.StatusUpdateFunc { +func GenerateStatusUpdater(hubClusterClient clientset.Interface, hubClusterLister clusterv1listers.ManagedClusterLister, clusterName string) clientcert.StatusUpdateFunc { return func(ctx context.Context, cond metav1.Condition) error { - _, _, updatedErr := helpers.UpdateManagedClusterStatus( - ctx, hubClusterClient, clusterName, helpers.UpdateManagedClusterConditionFn(cond), - ) - - return updatedErr + cluster, err := hubClusterLister.Get(clusterName) + if errors.IsNotFound(err) { + return nil + } + if err != nil { + return err + } + newCluster := cluster.DeepCopy() + meta.SetStatusCondition(&newCluster.Status.Conditions, cond) + patcher := patcher.NewPatcher[ + *clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus]( + hubClusterClient.ClusterV1().ManagedClusters()) + _, err = patcher.PatchStatus(ctx, newCluster, newCluster.Status, cluster.Status) + return err } } diff --git a/pkg/registration/spoke/managedcluster/registration_test.go b/pkg/registration/spoke/registration/registration_test.go similarity index 98% rename from pkg/registration/spoke/managedcluster/registration_test.go rename to pkg/registration/spoke/registration/registration_test.go index 0588d0de7..baeb0c90b 100644 --- a/pkg/registration/spoke/managedcluster/registration_test.go +++ b/pkg/registration/spoke/registration/registration_test.go @@ -1,4 +1,4 @@ -package managedcluster +package registration import ( "testing" diff --git a/pkg/registration/spoke/managedcluster/secret_controller.go b/pkg/registration/spoke/registration/secret_controller.go similarity index 99% rename from pkg/registration/spoke/managedcluster/secret_controller.go rename to pkg/registration/spoke/registration/secret_controller.go index 464c6763b..3a04e7762 100644 --- a/pkg/registration/spoke/managedcluster/secret_controller.go +++ b/pkg/registration/spoke/registration/secret_controller.go @@ -1,4 +1,4 @@ -package managedcluster +package registration import ( "bytes" diff --git a/pkg/registration/spoke/managedcluster/secret_controller_test.go b/pkg/registration/spoke/registration/secret_controller_test.go similarity index 99% rename from pkg/registration/spoke/managedcluster/secret_controller_test.go rename to pkg/registration/spoke/registration/secret_controller_test.go index 7bc5e6a66..38932013e 100644 --- a/pkg/registration/spoke/managedcluster/secret_controller_test.go +++ b/pkg/registration/spoke/registration/secret_controller_test.go @@ -1,4 +1,4 @@ -package managedcluster +package registration import ( "context" diff --git a/pkg/registration/spoke/spokeagent.go b/pkg/registration/spoke/spokeagent.go index b9c03fd4f..7c1748f93 100644 --- a/pkg/registration/spoke/spokeagent.go +++ b/pkg/registration/spoke/spokeagent.go @@ -4,6 +4,8 @@ import ( "context" "errors" "fmt" + "open-cluster-management.io/ocm/pkg/registration/spoke/lease" + "open-cluster-management.io/ocm/pkg/registration/spoke/registration" "os" "path" "time" @@ -164,7 +166,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext } // start a SpokeClusterCreatingController to make sure there is a spoke cluster on hub cluster - spokeClusterCreatingController := managedcluster.NewManagedClusterCreatingController( + spokeClusterCreatingController := registration.NewManagedClusterCreatingController( o.AgentOptions.SpokeClusterName, o.SpokeExternalServerURLs, spokeClusterCABundle, bootstrapClusterClient, @@ -172,7 +174,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext ) go spokeClusterCreatingController.Run(ctx, 1) - hubKubeconfigSecretController := managedcluster.NewHubKubeconfigSecretController( + hubKubeconfigSecretController := registration.NewHubKubeconfigSecretController( o.HubKubeconfigDir, o.ComponentNamespace, o.HubKubeconfigSecret, // the hub kubeconfig secret stored in the cluster where the agent pod runs managementKubeClient.CoreV1(), @@ -212,7 +214,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext } controllerName := fmt.Sprintf("BootstrapClientCertController@cluster:%s", o.AgentOptions.SpokeClusterName) - clientCertForHubController := managedcluster.NewClientCertForHubController( + clientCertForHubController := registration.NewClientCertForHubController( o.AgentOptions.SpokeClusterName, o.AgentName, o.ComponentNamespace, o.HubKubeconfigSecret, kubeconfigData, // store the secret in the cluster where the agent pod runs @@ -220,7 +222,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext csrControl, o.ClientCertExpirationSeconds, managementKubeClient, - managedcluster.GenerateBootstrapStatusUpdater(), + registration.GenerateBootstrapStatusUpdater(), controllerContext.EventRecorder, controllerName, ) @@ -299,14 +301,17 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext // create another ClientCertForHubController for client certificate rotation controllerName := fmt.Sprintf("ClientCertController@cluster:%s", o.AgentOptions.SpokeClusterName) - clientCertForHubController := managedcluster.NewClientCertForHubController( + clientCertForHubController := registration.NewClientCertForHubController( o.AgentOptions.SpokeClusterName, o.AgentName, o.ComponentNamespace, o.HubKubeconfigSecret, kubeconfigData, namespacedManagementKubeInformerFactory.Core().V1().Secrets(), csrControl, o.ClientCertExpirationSeconds, managementKubeClient, - managedcluster.GenerateStatusUpdater(hubClusterClient, o.AgentOptions.SpokeClusterName), + registration.GenerateStatusUpdater( + hubClusterClient, + hubClusterInformerFactory.Cluster().V1().ManagedClusters().Lister(), + o.AgentOptions.SpokeClusterName), controllerContext.EventRecorder, controllerName, ) @@ -314,50 +319,32 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext return err } - // create ManagedClusterJoiningController to reconcile instances of ManagedCluster on the managed cluster - managedClusterJoiningController := managedcluster.NewManagedClusterJoiningController( - o.AgentOptions.SpokeClusterName, - hubClusterClient, - hubClusterInformerFactory.Cluster().V1().ManagedClusters(), - controllerContext.EventRecorder, - ) - // create ManagedClusterLeaseController to keep the spoke cluster heartbeat - managedClusterLeaseController := managedcluster.NewManagedClusterLeaseController( + managedClusterLeaseController := lease.NewManagedClusterLeaseController( o.AgentOptions.SpokeClusterName, hubKubeClient, hubClusterInformerFactory.Cluster().V1().ManagedClusters(), controllerContext.EventRecorder, ) + spokeClusterClient, err := clusterv1client.NewForConfig(spokeClientConfig) + if err != nil { + return err + } + spokeClusterInformerFactory := clusterv1informers.NewSharedInformerFactory(spokeClusterClient, 10*time.Minute) + // create NewManagedClusterStatusController to update the spoke cluster status managedClusterHealthCheckController := managedcluster.NewManagedClusterStatusController( o.AgentOptions.SpokeClusterName, hubClusterClient, hubClusterInformerFactory.Cluster().V1().ManagedClusters(), spokeKubeClient.Discovery(), + spokeClusterInformerFactory.Cluster().V1alpha1().ClusterClaims(), spokeKubeInformerFactory.Core().V1().Nodes(), + o.MaxCustomClusterClaims, o.ClusterHealthCheckPeriod, controllerContext.EventRecorder, ) - spokeClusterClient, err := clusterv1client.NewForConfig(spokeClientConfig) - if err != nil { - return err - } - spokeClusterInformerFactory := clusterv1informers.NewSharedInformerFactory(spokeClusterClient, 10*time.Minute) - - var managedClusterClaimController factory.Controller - if features.DefaultSpokeRegistrationMutableFeatureGate.Enabled(ocmfeature.ClusterClaim) { - // create managedClusterClaimController to sync cluster claims - managedClusterClaimController = managedcluster.NewManagedClusterClaimController( - o.AgentOptions.SpokeClusterName, - o.MaxCustomClusterClaims, - hubClusterClient, - hubClusterInformerFactory.Cluster().V1().ManagedClusters(), - spokeClusterInformerFactory.Cluster().V1alpha1().ClusterClaims(), - controllerContext.EventRecorder, - ) - } var addOnLeaseController factory.Controller var addOnRegistrationController factory.Controller @@ -390,16 +377,14 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext go hubClusterInformerFactory.Start(ctx.Done()) go spokeKubeInformerFactory.Start(ctx.Done()) go namespacedManagementKubeInformerFactory.Start(ctx.Done()) - go spokeClusterInformerFactory.Start(ctx.Done()) go addOnInformerFactory.Start(ctx.Done()) + if features.DefaultSpokeRegistrationMutableFeatureGate.Enabled(ocmfeature.ClusterClaim) { + go spokeClusterInformerFactory.Start(ctx.Done()) + } go clientCertForHubController.Run(ctx, 1) - go managedClusterJoiningController.Run(ctx, 1) go managedClusterLeaseController.Run(ctx, 1) go managedClusterHealthCheckController.Run(ctx, 1) - if features.DefaultSpokeRegistrationMutableFeatureGate.Enabled(ocmfeature.ClusterClaim) { - go managedClusterClaimController.Run(ctx, 1) - } if features.DefaultSpokeRegistrationMutableFeatureGate.Enabled(ocmfeature.AddonManagement) { go addOnLeaseController.Run(ctx, 1) go addOnRegistrationController.Run(ctx, 1) @@ -474,7 +459,7 @@ func (o *SpokeAgentOptions) Complete(coreV1Client corev1client.CoreV1Interface, } // dump data in hub kubeconfig secret into file system if it exists - err = managedcluster.DumpSecret(coreV1Client, o.ComponentNamespace, o.HubKubeconfigSecret, + err = registration.DumpSecret(coreV1Client, o.ComponentNamespace, o.HubKubeconfigSecret, o.HubKubeconfigDir, ctx, recorder) if err != nil { return err @@ -527,7 +512,7 @@ func (o *SpokeAgentOptions) hasValidHubClientConfig() (bool, error) { } // check if the tls certificate is issued for the current cluster/agent - clusterName, agentName, err := managedcluster.GetClusterAgentNamesFromCertificate(certData) + clusterName, agentName, err := registration.GetClusterAgentNamesFromCertificate(certData) if err != nil { return false, nil } @@ -559,7 +544,7 @@ func (o *SpokeAgentOptions) getOrGenerateClusterAgentNames() (string, string) { certPath := path.Join(o.HubKubeconfigDir, clientcert.TLSCertFile) certData, certErr := os.ReadFile(path.Clean(certPath)) if certErr == nil { - clusterNameInCert, agentNameInCert, _ = managedcluster.GetClusterAgentNamesFromCertificate(certData) + clusterNameInCert, agentNameInCert, _ = registration.GetClusterAgentNamesFromCertificate(certData) } clusterName := o.AgentOptions.SpokeClusterName diff --git a/test/integration/registration/integration_suite_test.go b/test/integration/registration/integration_suite_test.go index 0a7d3f098..860dc4394 100644 --- a/test/integration/registration/integration_suite_test.go +++ b/test/integration/registration/integration_suite_test.go @@ -3,6 +3,7 @@ package registration_test import ( "context" "fmt" + "open-cluster-management.io/ocm/pkg/registration/spoke/registration" "open-cluster-management.io/ocm/test/integration/util" "os" "path" @@ -28,7 +29,6 @@ import ( "open-cluster-management.io/ocm/pkg/registration/hub" "open-cluster-management.io/ocm/pkg/registration/spoke" "open-cluster-management.io/ocm/pkg/registration/spoke/addon" - "open-cluster-management.io/ocm/pkg/registration/spoke/managedcluster" "sigs.k8s.io/controller-runtime/pkg/envtest" logf "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/log/zap" @@ -101,7 +101,7 @@ var _ = ginkgo.BeforeSuite(func() { // crank up the sync speed transport.CertCallbackRefreshDuration = 5 * time.Second clientcert.ControllerResyncInterval = 5 * time.Second - managedcluster.CreatingControllerSyncInterval = 1 * time.Second + registration.CreatingControllerSyncInterval = 1 * time.Second hub.ResyncInterval = 5 * time.Second // crank up the addon lease sync and udpate speed