🌱 Add a common patcher and adopt it in registration (#178)

* Add a common patcher and adopt it in registration

Signed-off-by: Jian Qiu <jqiu@redhat.com>

* Add patcher test

Signed-off-by: Jian Qiu <jqiu@redhat.com>

---------

Signed-off-by: Jian Qiu <jqiu@redhat.com>
This commit is contained in:
Jian Qiu
2023-06-12 14:40:09 +08:00
committed by GitHub
parent 6d4c48822c
commit 3c9bfea949
37 changed files with 1066 additions and 1074 deletions

View File

@@ -0,0 +1,158 @@
package patcher
import (
"context"
"encoding/json"
"fmt"
jsonpatch "github.com/evanphx/json-patch"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog/v2"
)
// Patcher is just the Patch API with a generic to keep use sites type safe.
// This is inspired by the commiter code in https://github.com/kcp-dev/kcp/blob/main/pkg/reconciler/committer/committer.go
type PatchClient[R runtime.Object] interface {
Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (R, error)
}
type Patcher[R runtime.Object, Sp any, St any] interface {
AddFinalizer(context.Context, R, string) (bool, error)
RemoveFinalizer(context.Context, R, string) error
PatchStatus(context.Context, R, St, St) (bool, error)
PatchSpec(context.Context, R, Sp, Sp) (bool, error)
}
// Resource is a generic wrapper around resources so we can generate patches.
type Resource[Sp any, St any] struct {
metav1.ObjectMeta `json:"metadata,omitempty"`
Spec Sp `json:"spec"`
Status St `json:"status,omitempty"`
}
type patcher[R runtime.Object, Sp any, St any] struct {
client PatchClient[R]
}
func NewPatcher[R runtime.Object, Sp any, St any](client PatchClient[R]) *patcher[R, Sp, St] {
p := &patcher[R, Sp, St]{
client: client,
}
return p
}
func (p *patcher[R, Sp, St]) AddFinalizer(ctx context.Context, object R, finalizer string) (bool, error) {
hasFinalizer := false
accessor, err := meta.Accessor(object)
if err != nil {
return !hasFinalizer, err
}
finalizers := accessor.GetFinalizers()
for i := range finalizers {
if finalizers[i] == finalizer {
hasFinalizer = true
break
}
}
if !hasFinalizer {
finalizerBytes, err := json.Marshal(append(finalizers, finalizer))
if err != nil {
return !hasFinalizer, err
}
patch := fmt.Sprintf("{\"metadata\": {\"finalizers\": %s}}", string(finalizerBytes))
_, err = p.client.Patch(
ctx, accessor.GetName(), types.MergePatchType, []byte(patch), metav1.PatchOptions{})
return !hasFinalizer, err
}
return !hasFinalizer, nil
}
func (p *patcher[R, Sp, St]) RemoveFinalizer(ctx context.Context, object R, finalizer string) error {
accessor, err := meta.Accessor(object)
if err != nil {
return err
}
copiedFinalizers := []string{}
finalizers := accessor.GetFinalizers()
for i := range finalizers {
if finalizers[i] == finalizer {
continue
}
copiedFinalizers = append(copiedFinalizers, finalizers[i])
}
if len(finalizers) != len(copiedFinalizers) {
finalizerBytes, err := json.Marshal(copiedFinalizers)
if err != nil {
return err
}
patch := fmt.Sprintf("{\"metadata\": {\"finalizers\": %s}}", string(finalizerBytes))
_, err = p.client.Patch(
ctx, accessor.GetName(), types.MergePatchType, []byte(patch), metav1.PatchOptions{})
return err
}
return nil
}
func (p *patcher[R, Sp, St]) patch(ctx context.Context, object R, newObject, oldObject *Resource[Sp, St], subresources ...string) error {
accessor, err := meta.Accessor(object)
if err != nil {
return err
}
oldData, err := json.Marshal(oldObject)
if err != nil {
return fmt.Errorf("failed to Marshal old data for %s: %w", accessor.GetName(), err)
}
newObject.UID = accessor.GetUID()
newObject.ResourceVersion = accessor.GetResourceVersion()
newData, err := json.Marshal(newObject)
if err != nil {
return fmt.Errorf("failed to Marshal new data for %s: %w", accessor.GetName(), err)
}
patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return fmt.Errorf("failed to create patch for %s: %w", accessor.GetName(), err)
}
_, err = p.client.Patch(
ctx, accessor.GetName(), types.MergePatchType, patchBytes, metav1.PatchOptions{}, subresources...)
if err != nil {
klog.V(2).Infof("Object with type %t and name %s is patched with patch %s", object, accessor.GetName(), string(patchBytes))
}
return err
}
func (p *patcher[R, Sp, St]) PatchStatus(ctx context.Context, object R, newStatus, oldStatus St) (bool, error) {
statusChanged := !equality.Semantic.DeepEqual(oldStatus, newStatus)
if !statusChanged {
return false, nil
}
oldObject := &Resource[Sp, St]{Status: oldStatus}
newObject := &Resource[Sp, St]{Status: newStatus}
return true, p.patch(ctx, object, newObject, oldObject, "status")
}
func (p *patcher[R, Sp, St]) PatchSpec(ctx context.Context, object R, newSpec, oldSpec Sp) (bool, error) {
specChanged := !equality.Semantic.DeepEqual(newSpec, oldSpec)
if !specChanged {
return false, nil
}
oldObject := &Resource[Sp, St]{Spec: oldSpec}
newObject := &Resource[Sp, St]{Spec: newSpec}
return true, p.patch(ctx, object, newObject, oldObject)
}

View File

@@ -0,0 +1,237 @@
package patcher
import (
"context"
"encoding/json"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clienttesting "k8s.io/client-go/testing"
clusterfake "open-cluster-management.io/api/client/cluster/clientset/versioned/fake"
clusterv1 "open-cluster-management.io/api/cluster/v1"
testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
testinghelpers "open-cluster-management.io/ocm/pkg/registration/helpers/testing"
"testing"
)
func TestAddFinalizer(t *testing.T) {
cases := []struct {
name string
obj *clusterv1.ManagedCluster
finalizer string
validateActions func(t *testing.T, actions []clienttesting.Action)
}{
{
name: "add finalizer",
obj: newManagedClusterWithFinalizer(),
finalizer: "test-finalizer",
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "patch")
patch := actions[0].(clienttesting.PatchAction).GetPatch()
managedCluster := &clusterv1.ManagedCluster{}
err := json.Unmarshal(patch, managedCluster)
if err != nil {
t.Fatal(err)
}
testinghelpers.AssertFinalizers(t, managedCluster, []string{"test-finalizer"})
},
},
{
name: "no action",
obj: newManagedClusterWithFinalizer("test-finalizer-1", "test-finalizer"),
finalizer: "test-finalizer",
validateActions: testingcommon.AssertNoActions,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
clusterClient := clusterfake.NewSimpleClientset(c.obj)
patcher := NewPatcher[
*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus](
clusterClient.ClusterV1().ManagedClusters())
if _, err := patcher.AddFinalizer(context.TODO(), c.obj, c.finalizer); err != nil {
t.Error(err)
}
c.validateActions(t, clusterClient.Actions())
})
}
}
func TestRemoveFinalizer(t *testing.T) {
cases := []struct {
name string
obj *clusterv1.ManagedCluster
finalizer string
validateActions func(t *testing.T, actions []clienttesting.Action)
}{
{
name: "remove finalizer",
obj: newManagedClusterWithFinalizer("test-finalizer", "test-finalizer-1"),
finalizer: "test-finalizer",
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "patch")
patch := actions[0].(clienttesting.PatchAction).GetPatch()
managedCluster := &clusterv1.ManagedCluster{}
err := json.Unmarshal(patch, managedCluster)
if err != nil {
t.Fatal(err)
}
testinghelpers.AssertFinalizers(t, managedCluster, []string{"test-finalizer-1"})
},
},
{
name: "no action",
obj: newManagedClusterWithFinalizer("test-finalizer-1"),
finalizer: "test-finalizer",
validateActions: testingcommon.AssertNoActions,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
clusterClient := clusterfake.NewSimpleClientset(c.obj)
patcher := NewPatcher[
*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus](
clusterClient.ClusterV1().ManagedClusters())
if err := patcher.RemoveFinalizer(context.TODO(), c.obj, c.finalizer); err != nil {
t.Error(err)
}
c.validateActions(t, clusterClient.Actions())
})
}
}
func TestPatchSpec(t *testing.T) {
cases := []struct {
name string
obj *clusterv1.ManagedCluster
newObj *clusterv1.ManagedCluster
validateActions func(t *testing.T, actions []clienttesting.Action)
}{
{
name: "patch spec",
obj: newManagedClusterWithTaint(clusterv1.Taint{Key: "key1"}),
newObj: newManagedClusterWithTaint(clusterv1.Taint{Key: "key2"}),
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "patch")
patch := actions[0].(clienttesting.PatchAction).GetPatch()
managedCluster := &clusterv1.ManagedCluster{}
err := json.Unmarshal(patch, managedCluster)
if err != nil {
t.Fatal(err)
}
if !equality.Semantic.DeepEqual(managedCluster.Spec, newManagedClusterWithTaint(clusterv1.Taint{Key: "key2"}).Spec) {
t.Errorf("not patched correctly got %v", managedCluster.Spec)
}
},
},
{
name: "no patch",
obj: newManagedClusterWithTaint(clusterv1.Taint{Key: "key1"}),
newObj: newManagedClusterWithTaint(clusterv1.Taint{Key: "key1"}),
validateActions: testingcommon.AssertNoActions,
},
{
name: "no patch with status change",
obj: newManagedClusterWithConditions(metav1.Condition{Type: "Type1"}),
newObj: newManagedClusterWithConditions(metav1.Condition{Type: "Type2"}),
validateActions: testingcommon.AssertNoActions,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
clusterClient := clusterfake.NewSimpleClientset(c.obj)
patcher := NewPatcher[
*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus](
clusterClient.ClusterV1().ManagedClusters())
if _, err := patcher.PatchSpec(context.TODO(), c.obj, c.newObj.Spec, c.obj.Spec); err != nil {
t.Error(err)
}
c.validateActions(t, clusterClient.Actions())
})
}
}
func TestPatchStatus(t *testing.T) {
cases := []struct {
name string
obj *clusterv1.ManagedCluster
newObj *clusterv1.ManagedCluster
validateActions func(t *testing.T, actions []clienttesting.Action)
}{
{
name: "patch status",
obj: newManagedClusterWithConditions(metav1.Condition{Type: "Type1"}),
newObj: newManagedClusterWithConditions(metav1.Condition{Type: "Type2"}),
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "patch")
patch := actions[0].(clienttesting.PatchAction).GetPatch()
managedCluster := &clusterv1.ManagedCluster{}
err := json.Unmarshal(patch, managedCluster)
if err != nil {
t.Fatal(err)
}
if !equality.Semantic.DeepEqual(managedCluster.Status, newManagedClusterWithConditions(metav1.Condition{Type: "Type2"}).Status) {
t.Errorf("not patched correctly got %v", managedCluster.Status)
}
},
},
{
name: "no patch",
obj: newManagedClusterWithConditions(metav1.Condition{Type: "Type1"}),
newObj: newManagedClusterWithConditions(metav1.Condition{Type: "Type1"}),
validateActions: testingcommon.AssertNoActions,
},
{
name: "no patch with spec change",
obj: newManagedClusterWithTaint(clusterv1.Taint{Key: "key1"}),
newObj: newManagedClusterWithTaint(clusterv1.Taint{Key: "key2"}),
validateActions: testingcommon.AssertNoActions,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
clusterClient := clusterfake.NewSimpleClientset(c.obj)
patcher := NewPatcher[
*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus](
clusterClient.ClusterV1().ManagedClusters())
if _, err := patcher.PatchStatus(context.TODO(), c.obj, c.newObj.Status, c.obj.Status); err != nil {
t.Error(err)
}
c.validateActions(t, clusterClient.Actions())
})
}
}
func newManagedClusterWithFinalizer(finalizers ...string) *clusterv1.ManagedCluster {
return &clusterv1.ManagedCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
Finalizers: finalizers,
},
}
}
func newManagedClusterWithTaint(taints ...clusterv1.Taint) *clusterv1.ManagedCluster {
return &clusterv1.ManagedCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Spec: clusterv1.ManagedClusterSpec{
Taints: taints,
},
}
}
func newManagedClusterWithConditions(conds ...metav1.Condition) *clusterv1.ManagedCluster {
return &clusterv1.ManagedCluster{
ObjectMeta: metav1.ObjectMeta{
Name: "test",
},
Status: clusterv1.ManagedClusterStatus{
Conditions: conds,
},
}
}

View File

@@ -3,16 +3,11 @@ package helpers
import (
"context"
"embed"
"encoding/json"
"fmt"
"net/url"
addonv1alpha1 "open-cluster-management.io/api/addon/v1alpha1"
addonv1alpha1client "open-cluster-management.io/api/client/addon/clientset/versioned"
clusterclientset "open-cluster-management.io/api/client/cluster/clientset/versioned"
clusterv1 "open-cluster-management.io/api/cluster/v1"
jsonpatch "github.com/evanphx/json-patch"
"github.com/openshift/api"
"github.com/openshift/library-go/pkg/assets"
"github.com/openshift/library-go/pkg/operator/events"
@@ -24,19 +19,15 @@ import (
certificatesv1beta1 "k8s.io/api/certificates/v1beta1"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/restmapper"
"k8s.io/client-go/util/retry"
)
var (
@@ -49,148 +40,6 @@ func init() {
utilruntime.Must(api.InstallKube(genericScheme))
}
type UpdateManagedClusterStatusFunc func(status *clusterv1.ManagedClusterStatus) error
func UpdateManagedClusterStatus(
ctx context.Context,
client clusterclientset.Interface,
spokeClusterName string,
updateFuncs ...UpdateManagedClusterStatusFunc) (*clusterv1.ManagedClusterStatus, bool, error) {
updated := false
var updatedManagedClusterStatus *clusterv1.ManagedClusterStatus
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
managedCluster, err := client.ClusterV1().ManagedClusters().Get(ctx, spokeClusterName, metav1.GetOptions{})
if err != nil {
return err
}
oldStatus := &managedCluster.Status
newStatus := oldStatus.DeepCopy()
for _, update := range updateFuncs {
if err := update(newStatus); err != nil {
return err
}
}
if equality.Semantic.DeepEqual(oldStatus, newStatus) {
// We return the newStatus which is a deep copy of oldStatus but with all update funcs applied.
updatedManagedClusterStatus = newStatus
return nil
}
oldData, err := json.Marshal(clusterv1.ManagedCluster{
Status: *oldStatus,
})
if err != nil {
return fmt.Errorf("failed to Marshal old data for cluster status %s: %w", managedCluster.Name, err)
}
newData, err := json.Marshal(clusterv1.ManagedCluster{
ObjectMeta: metav1.ObjectMeta{
UID: managedCluster.UID,
ResourceVersion: managedCluster.ResourceVersion,
}, // to ensure they appear in the patch as preconditions
Status: *newStatus,
})
if err != nil {
return fmt.Errorf("failed to Marshal new data for cluster status %s: %w", managedCluster.Name, err)
}
patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return fmt.Errorf("failed to create patch for cluster %s: %w", managedCluster.Name, err)
}
updatedManagedCluster, err := client.ClusterV1().ManagedClusters().Patch(ctx, managedCluster.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status")
updatedManagedClusterStatus = &updatedManagedCluster.Status
updated = err == nil
return err
})
return updatedManagedClusterStatus, updated, err
}
func UpdateManagedClusterConditionFn(cond metav1.Condition) UpdateManagedClusterStatusFunc {
return func(oldStatus *clusterv1.ManagedClusterStatus) error {
meta.SetStatusCondition(&oldStatus.Conditions, cond)
return nil
}
}
type UpdateManagedClusterAddOnStatusFunc func(status *addonv1alpha1.ManagedClusterAddOnStatus) error
func UpdateManagedClusterAddOnStatus(
ctx context.Context,
client addonv1alpha1client.Interface,
addOnNamespace, addOnName string,
updateFuncs ...UpdateManagedClusterAddOnStatusFunc) (*addonv1alpha1.ManagedClusterAddOnStatus, bool, error) {
updated := false
var updatedAddOnStatus *addonv1alpha1.ManagedClusterAddOnStatus
err := retry.RetryOnConflict(retry.DefaultBackoff, func() error {
addOn, err := client.AddonV1alpha1().ManagedClusterAddOns(addOnNamespace).Get(ctx, addOnName, metav1.GetOptions{})
if err != nil {
return err
}
oldStatus := &addOn.Status
newStatus := oldStatus.DeepCopy()
for _, update := range updateFuncs {
if err := update(newStatus); err != nil {
return err
}
}
if equality.Semantic.DeepEqual(oldStatus, newStatus) {
// We return the newStatus which is a deep copy of oldStatus but with all update funcs applied.
updatedAddOnStatus = newStatus
return nil
}
oldData, err := json.Marshal(addonv1alpha1.ManagedClusterAddOn{
Status: *oldStatus,
})
if err != nil {
return fmt.Errorf("failed to Marshal old data for addon status %s: %w", addOn.Name, err)
}
newData, err := json.Marshal(addonv1alpha1.ManagedClusterAddOn{
ObjectMeta: metav1.ObjectMeta{
UID: addOn.UID,
ResourceVersion: addOn.ResourceVersion,
}, // to ensure they appear in the patch as preconditions
Status: *newStatus,
})
if err != nil {
return fmt.Errorf("failed to Marshal new data for addon status %s: %w", addOn.Name, err)
}
patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return fmt.Errorf("failed to create patch for cluster %s: %w", addOn.Name, err)
}
updatedAddOn, err := client.AddonV1alpha1().ManagedClusterAddOns(addOnNamespace).Patch(ctx, addOn.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status")
if err != nil {
return err
}
updatedAddOnStatus = &updatedAddOn.Status
updated = err == nil
return err
})
return updatedAddOnStatus, updated, err
}
func UpdateManagedClusterAddOnStatusFn(cond metav1.Condition) UpdateManagedClusterAddOnStatusFunc {
return func(oldStatus *addonv1alpha1.ManagedClusterAddOnStatus) error {
meta.SetStatusCondition(&oldStatus.Conditions, cond)
return nil
}
}
// Check whether a CSR is in terminal state
func IsCSRInTerminalState(status *certificatesv1.CertificateSigningRequestStatus) bool {
for _, c := range status.Conditions {

View File

@@ -4,217 +4,22 @@ import (
"context"
"encoding/json"
"fmt"
"reflect"
"testing"
"time"
addonv1alpha1 "open-cluster-management.io/api/addon/v1alpha1"
addonfake "open-cluster-management.io/api/client/addon/clientset/versioned/fake"
clusterfake "open-cluster-management.io/api/client/cluster/clientset/versioned/fake"
clusterv1 "open-cluster-management.io/api/cluster/v1"
testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
testinghelpers "open-cluster-management.io/ocm/pkg/registration/helpers/testing"
"reflect"
"testing"
"github.com/openshift/library-go/pkg/operator/events/eventstesting"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/diff"
fakekube "k8s.io/client-go/kubernetes/fake"
clienttesting "k8s.io/client-go/testing"
)
func TestUpdateStatusCondition(t *testing.T) {
nowish := metav1.Now()
beforeish := metav1.Time{Time: nowish.Add(-10 * time.Second)}
afterish := metav1.Time{Time: nowish.Add(10 * time.Second)}
cases := []struct {
name string
startingConditions []metav1.Condition
newCondition metav1.Condition
expextedUpdated bool
expectedConditions []metav1.Condition
}{
{
name: "add to empty",
startingConditions: []metav1.Condition{},
newCondition: testinghelpers.NewManagedClusterCondition("test", "True", "my-reason", "my-message", nil),
expextedUpdated: true,
expectedConditions: []metav1.Condition{testinghelpers.NewManagedClusterCondition("test", "True", "my-reason", "my-message", nil)},
},
{
name: "add to non-conflicting",
startingConditions: []metav1.Condition{
testinghelpers.NewManagedClusterCondition("two", "True", "my-reason", "my-message", nil),
},
newCondition: testinghelpers.NewManagedClusterCondition("one", "True", "my-reason", "my-message", nil),
expextedUpdated: true,
expectedConditions: []metav1.Condition{
testinghelpers.NewManagedClusterCondition("two", "True", "my-reason", "my-message", nil),
testinghelpers.NewManagedClusterCondition("one", "True", "my-reason", "my-message", nil),
},
},
{
name: "change existing status",
startingConditions: []metav1.Condition{
testinghelpers.NewManagedClusterCondition("two", "True", "my-reason", "my-message", nil),
testinghelpers.NewManagedClusterCondition("one", "True", "my-reason", "my-message", nil),
},
newCondition: testinghelpers.NewManagedClusterCondition("one", "False", "my-different-reason", "my-othermessage", nil),
expextedUpdated: true,
expectedConditions: []metav1.Condition{
testinghelpers.NewManagedClusterCondition("two", "True", "my-reason", "my-message", nil),
testinghelpers.NewManagedClusterCondition("one", "False", "my-different-reason", "my-othermessage", nil),
},
},
{
name: "leave existing transition time",
startingConditions: []metav1.Condition{
testinghelpers.NewManagedClusterCondition("two", "True", "my-reason", "my-message", nil),
testinghelpers.NewManagedClusterCondition("one", "True", "my-reason", "my-message", &beforeish),
},
newCondition: testinghelpers.NewManagedClusterCondition("one", "True", "my-reason", "my-message", &afterish),
expextedUpdated: false,
expectedConditions: []metav1.Condition{
testinghelpers.NewManagedClusterCondition("two", "True", "my-reason", "my-message", nil),
testinghelpers.NewManagedClusterCondition("one", "True", "my-reason", "my-message", &beforeish),
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
fakeClusterClient := clusterfake.NewSimpleClientset(&clusterv1.ManagedCluster{
ObjectMeta: metav1.ObjectMeta{Name: "testspokecluster"},
Status: clusterv1.ManagedClusterStatus{
Conditions: c.startingConditions,
},
})
status, updated, err := UpdateManagedClusterStatus(
context.TODO(),
fakeClusterClient,
"testspokecluster",
UpdateManagedClusterConditionFn(c.newCondition),
)
if err != nil {
t.Errorf("unexpected err: %v", err)
}
if updated != c.expextedUpdated {
t.Errorf("expected %t, but %t", c.expextedUpdated, updated)
}
for i := range c.expectedConditions {
expected := c.expectedConditions[i]
actual := status.Conditions[i]
if expected.LastTransitionTime == (metav1.Time{}) {
actual.LastTransitionTime = metav1.Time{}
}
if !equality.Semantic.DeepEqual(expected, actual) {
t.Errorf(diff.ObjectDiff(expected, actual))
}
}
})
}
}
func TestUpdateManagedClusterAddOnStatus(t *testing.T) {
nowish := metav1.Now()
beforeish := metav1.Time{Time: nowish.Add(-10 * time.Second)}
afterish := metav1.Time{Time: nowish.Add(10 * time.Second)}
cases := []struct {
name string
startingConditions []metav1.Condition
newCondition metav1.Condition
expextedUpdated bool
expectedConditions []metav1.Condition
}{
{
name: "add to empty",
startingConditions: []metav1.Condition{},
newCondition: testinghelpers.NewManagedClusterCondition("test", "True", "my-reason", "my-message", nil),
expextedUpdated: true,
expectedConditions: []metav1.Condition{testinghelpers.NewManagedClusterCondition("test", "True", "my-reason", "my-message", nil)},
},
{
name: "add to non-conflicting",
startingConditions: []metav1.Condition{
testinghelpers.NewManagedClusterCondition("two", "True", "my-reason", "my-message", nil),
},
newCondition: testinghelpers.NewManagedClusterCondition("one", "True", "my-reason", "my-message", nil),
expextedUpdated: true,
expectedConditions: []metav1.Condition{
testinghelpers.NewManagedClusterCondition("two", "True", "my-reason", "my-message", nil),
testinghelpers.NewManagedClusterCondition("one", "True", "my-reason", "my-message", nil),
},
},
{
name: "change existing status",
startingConditions: []metav1.Condition{
testinghelpers.NewManagedClusterCondition("two", "True", "my-reason", "my-message", nil),
testinghelpers.NewManagedClusterCondition("one", "True", "my-reason", "my-message", nil),
},
newCondition: testinghelpers.NewManagedClusterCondition("one", "False", "my-different-reason", "my-othermessage", nil),
expextedUpdated: true,
expectedConditions: []metav1.Condition{
testinghelpers.NewManagedClusterCondition("two", "True", "my-reason", "my-message", nil),
testinghelpers.NewManagedClusterCondition("one", "False", "my-different-reason", "my-othermessage", nil),
},
},
{
name: "leave existing transition time",
startingConditions: []metav1.Condition{
testinghelpers.NewManagedClusterCondition("two", "True", "my-reason", "my-message", nil),
testinghelpers.NewManagedClusterCondition("one", "True", "my-reason", "my-message", &beforeish),
},
newCondition: testinghelpers.NewManagedClusterCondition("one", "True", "my-reason", "my-message", &afterish),
expextedUpdated: false,
expectedConditions: []metav1.Condition{
testinghelpers.NewManagedClusterCondition("two", "True", "my-reason", "my-message", nil),
testinghelpers.NewManagedClusterCondition("one", "True", "my-reason", "my-message", &beforeish),
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
fakeAddOnClient := addonfake.NewSimpleClientset(&addonv1alpha1.ManagedClusterAddOn{
ObjectMeta: metav1.ObjectMeta{Namespace: "test", Name: "test"},
Status: addonv1alpha1.ManagedClusterAddOnStatus{
Conditions: c.startingConditions,
},
})
status, updated, err := UpdateManagedClusterAddOnStatus(
context.TODO(),
fakeAddOnClient,
"test", "test",
UpdateManagedClusterAddOnStatusFn(c.newCondition),
)
if err != nil {
t.Errorf("unexpected err: %v", err)
}
if updated != c.expextedUpdated {
t.Errorf("expected %t, but %t", c.expextedUpdated, updated)
}
for i := range c.expectedConditions {
expected := c.expectedConditions[i]
actual := status.Conditions[i]
if expected.LastTransitionTime == (metav1.Time{}) {
actual.LastTransitionTime = metav1.Time{}
}
if !equality.Semantic.DeepEqual(expected, actual) {
t.Errorf(diff.ObjectDiff(expected, actual))
}
}
})
}
}
func TestIsValidHTTPSURL(t *testing.T) {
cases := []struct {
name string

View File

@@ -2,7 +2,10 @@ package addon
import (
"context"
patcher "open-cluster-management.io/ocm/pkg/common/patcher"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
operatorhelpers "github.com/openshift/library-go/pkg/operator/v1helpers"
addonv1alpha1 "open-cluster-management.io/api/addon/v1alpha1"
addonclient "open-cluster-management.io/api/client/addon/clientset/versioned"
@@ -11,10 +14,6 @@ import (
clusterinformerv1 "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1"
clusterlisterv1 "open-cluster-management.io/api/client/cluster/listers/cluster/v1"
clusterv1 "open-cluster-management.io/api/cluster/v1"
"open-cluster-management.io/ocm/pkg/registration/helpers"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
@@ -87,19 +86,20 @@ func (c *managedClusterAddOnHealthCheckController) sync(ctx context.Context, syn
}
errs := []error{}
patcher := patcher.NewPatcher[
*addonv1alpha1.ManagedClusterAddOn, addonv1alpha1.ManagedClusterAddOnSpec, addonv1alpha1.ManagedClusterAddOnStatus](
c.addOnClient.AddonV1alpha1().ManagedClusterAddOns(managedClusterName),
)
for _, addOn := range addOns {
_, updated, err := helpers.UpdateManagedClusterAddOnStatus(
ctx,
c.addOnClient,
addOn.Namespace,
addOn.Name,
helpers.UpdateManagedClusterAddOnStatusFn(metav1.Condition{
Type: addonv1alpha1.ManagedClusterAddOnConditionAvailable,
Status: managedClusterAvailableCondition.Status,
Reason: managedClusterAvailableCondition.Reason,
Message: managedClusterAvailableCondition.Message,
}),
)
newManagedClusterAddon := addOn.DeepCopy()
meta.SetStatusCondition(&newManagedClusterAddon.Status.Conditions, metav1.Condition{
Type: addonv1alpha1.ManagedClusterAddOnConditionAvailable,
Status: managedClusterAvailableCondition.Status,
Reason: managedClusterAvailableCondition.Reason,
Message: managedClusterAvailableCondition.Message,
})
updated, err := patcher.PatchStatus(ctx, newManagedClusterAddon, newManagedClusterAddon.Status, addOn.Status)
if err != nil {
errs = append(errs, err)
}

View File

@@ -57,9 +57,9 @@ func TestSync(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{Namespace: testinghelpers.TestManagedClusterName, Name: "test"},
}},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "get", "patch")
testingcommon.AssertActions(t, actions, "patch")
patch := actions[1].(clienttesting.PatchAction).GetPatch()
patch := actions[0].(clienttesting.PatchAction).GetPatch()
addOn := &addonv1alpha1.ManagedClusterAddOn{}
err := json.Unmarshal(patch, addOn)
if err != nil {

View File

@@ -2,16 +2,15 @@ package lease
import (
"context"
"open-cluster-management.io/ocm/pkg/common/patcher"
"time"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
clientset "open-cluster-management.io/api/client/cluster/clientset/versioned"
clusterv1informer "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1"
clusterv1listers "open-cluster-management.io/api/client/cluster/listers/cluster/v1"
clusterv1 "open-cluster-management.io/api/cluster/v1"
"open-cluster-management.io/ocm/pkg/registration/helpers"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
coordv1 "k8s.io/api/coordination/v1"
"k8s.io/apimachinery/pkg/api/errors"
@@ -35,7 +34,7 @@ var (
// leaseController checks the lease of managed clusters on hub cluster to determine whether a managed cluster is available.
type leaseController struct {
kubeClient kubernetes.Interface
clusterClient clientset.Interface
patcher patcher.Patcher[*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus]
clusterLister clusterv1listers.ManagedClusterLister
leaseLister coordlisters.LeaseLister
eventRecorder events.Recorder
@@ -49,8 +48,10 @@ func NewClusterLeaseController(
leaseInformer coordinformers.LeaseInformer,
recorder events.Recorder) factory.Controller {
c := &leaseController{
kubeClient: kubeClient,
clusterClient: clusterClient,
kubeClient: kubeClient,
patcher: patcher.NewPatcher[
*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus](
clusterClient.ClusterV1().ManagedClusters()),
clusterLister: clusterInformer.Lister(),
leaseLister: leaseInformer.Lister(),
eventRecorder: recorder.WithComponentSuffix("managed-cluster-lease-controller"),
@@ -155,15 +156,15 @@ func (c *leaseController) updateClusterStatus(ctx context.Context, cluster *clus
return nil
}
// the lease is not constantly updated, update it to unknown
conditionUpdateFn := helpers.UpdateManagedClusterConditionFn(metav1.Condition{
newCluster := cluster.DeepCopy()
meta.SetStatusCondition(&newCluster.Status.Conditions, metav1.Condition{
Type: clusterv1.ManagedClusterConditionAvailable,
Status: metav1.ConditionUnknown,
Reason: "ManagedClusterLeaseUpdateStopped",
Message: "Registration agent stopped updating its lease.",
})
_, updated, err := helpers.UpdateManagedClusterStatus(ctx, c.clusterClient, cluster.Name, conditionUpdateFn)
updated, err := c.patcher.PatchStatus(ctx, newCluster, newCluster.Status, cluster.Status)
if updated {
c.eventRecorder.Eventf("ManagedClusterAvailableConditionUpdated",
"update managed cluster %q available condition to unknown, due to its lease is not updated constantly",

View File

@@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"open-cluster-management.io/ocm/pkg/common/patcher"
"testing"
"time"
@@ -62,8 +63,8 @@ func TestSync(t *testing.T) {
Reason: "ManagedClusterLeaseUpdateStopped",
Message: "Registration agent stopped updating its lease.",
}
testingcommon.AssertActions(t, clusterActions, "get", "patch")
patch := clusterActions[1].(clienttesting.PatchAction).GetPatch()
testingcommon.AssertActions(t, clusterActions, "patch")
patch := clusterActions[0].(clienttesting.PatchAction).GetPatch()
managedCluster := &v1.ManagedCluster{}
err := json.Unmarshal(patch, managedCluster)
if err != nil {
@@ -90,8 +91,8 @@ func TestSync(t *testing.T) {
Reason: "ManagedClusterLeaseUpdateStopped",
Message: "Registration agent stopped updating its lease.",
}
testingcommon.AssertActions(t, clusterActions, "get", "patch")
patch := clusterActions[1].(clienttesting.PatchAction).GetPatch()
testingcommon.AssertActions(t, clusterActions, "patch")
patch := clusterActions[0].(clienttesting.PatchAction).GetPatch()
managedCluster := &v1.ManagedCluster{}
err := json.Unmarshal(patch, managedCluster)
if err != nil {
@@ -133,8 +134,10 @@ func TestSync(t *testing.T) {
syncCtx := testingcommon.NewFakeSyncContext(t, testinghelpers.TestManagedClusterName)
ctrl := &leaseController{
kubeClient: leaseClient,
clusterClient: clusterClient,
kubeClient: leaseClient,
patcher: patcher.NewPatcher[
*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus](
clusterClient.ClusterV1().ManagedClusters()),
clusterLister: clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(),
leaseLister: leaseInformerFactory.Coordination().V1().Leases().Lister(),
eventRecorder: syncCtx.Recorder(),

View File

@@ -3,8 +3,8 @@ package managedcluster
import (
"context"
"embed"
"encoding/json"
"fmt"
"open-cluster-management.io/ocm/pkg/common/patcher"
clientset "open-cluster-management.io/api/client/cluster/clientset/versioned"
informerv1 "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1"
@@ -21,7 +21,6 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
)
@@ -43,8 +42,8 @@ var staticFiles = []string{
// managedClusterController reconciles instances of ManagedCluster on the hub.
type managedClusterController struct {
kubeClient kubernetes.Interface
clusterClient clientset.Interface
clusterLister listerv1.ManagedClusterLister
patcher patcher.Patcher[*v1.ManagedCluster, v1.ManagedClusterSpec, v1.ManagedClusterStatus]
cache resourceapply.ResourceCache
eventRecorder events.Recorder
}
@@ -57,8 +56,10 @@ func NewManagedClusterController(
recorder events.Recorder) factory.Controller {
c := &managedClusterController{
kubeClient: kubeClient,
clusterClient: clusterClient,
clusterLister: clusterInformer.Lister(),
patcher: patcher.NewPatcher[
*v1.ManagedCluster, v1.ManagedClusterSpec, v1.ManagedClusterStatus](
clusterClient.ClusterV1().ManagedClusters()),
cache: resourceapply.NewResourceCache(),
eventRecorder: recorder.WithComponentSuffix("managed-cluster-controller"),
}
@@ -83,24 +84,10 @@ func (c *managedClusterController) sync(ctx context.Context, syncCtx factory.Syn
return err
}
managedCluster = managedCluster.DeepCopy()
newManagedCluster := managedCluster.DeepCopy()
if managedCluster.DeletionTimestamp.IsZero() {
hasFinalizer := false
for i := range managedCluster.Finalizers {
if managedCluster.Finalizers[i] == managedClusterFinalizer {
hasFinalizer = true
break
}
}
if !hasFinalizer {
finalizerBytes, err := json.Marshal(append(managedCluster.Finalizers, managedClusterFinalizer))
if err != nil {
return err
}
patch := fmt.Sprintf("{\"metadata\": {\"finalizers\": %s}}", string(finalizerBytes))
_, err = c.clusterClient.ClusterV1().ManagedClusters().Patch(
ctx, managedCluster.Name, types.MergePatchType, []byte(patch), metav1.PatchOptions{})
updated, err := c.patcher.AddFinalizer(ctx, managedCluster, managedClusterFinalizer)
if err != nil || updated {
return err
}
}
@@ -110,7 +97,7 @@ func (c *managedClusterController) sync(ctx context.Context, syncCtx factory.Syn
if err := c.removeManagedClusterResources(ctx, managedClusterName); err != nil {
return err
}
return c.removeManagedClusterFinalizer(ctx, managedCluster)
return c.patcher.RemoveFinalizer(ctx, managedCluster, managedClusterFinalizer)
}
if !managedCluster.Spec.HubAcceptsClient {
@@ -126,18 +113,17 @@ func (c *managedClusterController) sync(ctx context.Context, syncCtx factory.Syn
return err
}
_, _, err := helpers.UpdateManagedClusterStatus(
ctx,
c.clusterClient,
managedClusterName,
helpers.UpdateManagedClusterConditionFn(metav1.Condition{
Type: v1.ManagedClusterConditionHubAccepted,
Status: metav1.ConditionFalse,
Reason: "HubClusterAdminDenied",
Message: "Denied by hub cluster admin",
}),
)
return err
meta.SetStatusCondition(&newManagedCluster.Status.Conditions, metav1.Condition{
Type: v1.ManagedClusterConditionHubAccepted,
Status: metav1.ConditionFalse,
Reason: "HubClusterAdminDenied",
Message: "Denied by hub cluster admin",
})
if _, err := c.patcher.PatchStatus(ctx, newManagedCluster, newManagedCluster.Status, managedCluster.Status); err != nil {
return err
}
return nil
}
// TODO consider to add the managedcluster-namespace.yaml back to staticFiles,
@@ -178,12 +164,8 @@ func (c *managedClusterController) sync(ctx context.Context, syncCtx factory.Syn
acceptedCondition.Message = applyErrors.Error()
}
_, updated, updatedErr := helpers.UpdateManagedClusterStatus(
ctx,
c.clusterClient,
managedClusterName,
helpers.UpdateManagedClusterConditionFn(acceptedCondition),
)
meta.SetStatusCondition(&newManagedCluster.Status.Conditions, acceptedCondition)
updated, updatedErr := c.patcher.PatchStatus(ctx, newManagedCluster, newManagedCluster.Status, managedCluster.Status)
if updatedErr != nil {
errs = append(errs, updatedErr)
}
@@ -202,27 +184,3 @@ func (c *managedClusterController) removeManagedClusterResources(ctx context.Con
}
return operatorhelpers.NewMultiLineAggregate(errs)
}
func (c *managedClusterController) removeManagedClusterFinalizer(ctx context.Context, managedCluster *v1.ManagedCluster) error {
copiedFinalizers := []string{}
for i := range managedCluster.Finalizers {
if managedCluster.Finalizers[i] == managedClusterFinalizer {
continue
}
copiedFinalizers = append(copiedFinalizers, managedCluster.Finalizers[i])
}
if len(managedCluster.Finalizers) != len(copiedFinalizers) {
finalizerBytes, err := json.Marshal(copiedFinalizers)
if err != nil {
return err
}
patch := fmt.Sprintf("{\"metadata\": {\"finalizers\": %s}}", string(finalizerBytes))
_, err = c.clusterClient.ClusterV1().ManagedClusters().Patch(
ctx, managedCluster.Name, types.MergePatchType, []byte(patch), metav1.PatchOptions{})
return err
}
return nil
}

View File

@@ -3,6 +3,7 @@ package managedcluster
import (
"context"
"encoding/json"
"open-cluster-management.io/ocm/pkg/common/patcher"
"testing"
"time"
@@ -58,8 +59,8 @@ func TestSyncManagedCluster(t *testing.T) {
Reason: "HubClusterAdminAccepted",
Message: "Accepted by hub cluster admin",
}
testingcommon.AssertActions(t, actions, "get", "patch")
patch := actions[1].(clienttesting.PatchAction).GetPatch()
testingcommon.AssertActions(t, actions, "patch")
patch := actions[0].(clienttesting.PatchAction).GetPatch()
managedCluster := &v1.ManagedCluster{}
err := json.Unmarshal(patch, managedCluster)
if err != nil {
@@ -72,7 +73,7 @@ func TestSyncManagedCluster(t *testing.T) {
name: "sync an accepted spoke cluster",
startingObjects: []runtime.Object{testinghelpers.NewAcceptedManagedCluster()},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "get")
testingcommon.AssertNoActions(t, actions)
},
},
{
@@ -85,8 +86,8 @@ func TestSyncManagedCluster(t *testing.T) {
Reason: "HubClusterAdminDenied",
Message: "Denied by hub cluster admin",
}
testingcommon.AssertActions(t, actions, "get", "patch")
patch := actions[1].(clienttesting.PatchAction).GetPatch()
testingcommon.AssertActions(t, actions, "patch")
patch := actions[0].(clienttesting.PatchAction).GetPatch()
managedCluster := &v1.ManagedCluster{}
err := json.Unmarshal(patch, managedCluster)
if err != nil {
@@ -123,7 +124,12 @@ func TestSyncManagedCluster(t *testing.T) {
}
}
ctrl := managedClusterController{kubeClient, clusterClient, clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(), resourceapply.NewResourceCache(), eventstesting.NewTestingEventRecorder(t)}
ctrl := managedClusterController{
kubeClient,
clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(),
patcher.NewPatcher[*v1.ManagedCluster, v1.ManagedClusterSpec, v1.ManagedClusterStatus](clusterClient.ClusterV1().ManagedClusters()),
resourceapply.NewResourceCache(),
eventstesting.NewTestingEventRecorder(t)}
syncErr := ctrl.sync(context.TODO(), testingcommon.NewFakeSyncContext(t, testinghelpers.TestManagedClusterName))
if syncErr != nil {
t.Errorf("unexpected err: %v", syncErr)

View File

@@ -3,6 +3,7 @@ package managedclusterset
import (
"context"
"fmt"
"open-cluster-management.io/ocm/pkg/common/patcher"
"reflect"
"github.com/openshift/library-go/pkg/controller/factory"
@@ -27,7 +28,7 @@ import (
// managedClusterSetController reconciles instances of ManagedClusterSet on the hub.
type managedClusterSetController struct {
clusterClient clientset.Interface
patcher patcher.Patcher[*clusterv1beta2.ManagedClusterSet, clusterv1beta2.ManagedClusterSetSpec, clusterv1beta2.ManagedClusterSetStatus]
clusterLister clusterlisterv1.ManagedClusterLister
clusterSetLister clusterlisterv1beta2.ManagedClusterSetLister
eventRecorder events.Recorder
@@ -45,7 +46,9 @@ func NewManagedClusterSetController(
syncCtx := factory.NewSyncContext(controllerName, recorder)
c := &managedClusterSetController{
clusterClient: clusterClient,
patcher: patcher.NewPatcher[
*clusterv1beta2.ManagedClusterSet, clusterv1beta2.ManagedClusterSetSpec, clusterv1beta2.ManagedClusterSetStatus](
clusterClient.ClusterV1beta2().ManagedClusterSets()),
clusterLister: clusterInformer.Lister(),
clusterSetLister: clusterSetInformer.Lister(),
eventRecorder: recorder.WithComponentSuffix("managed-cluster-set-controller"),
@@ -164,12 +167,7 @@ func (c *managedClusterSetController) syncClusterSet(ctx context.Context, origin
}
meta.SetStatusCondition(&clusterSet.Status.Conditions, emptyCondition)
// skip update if cluster set status does not change
if reflect.DeepEqual(clusterSet.Status.Conditions, originalClusterSet.Status.Conditions) {
return nil
}
_, err = c.clusterClient.ClusterV1beta2().ManagedClusterSets().UpdateStatus(ctx, clusterSet, metav1.UpdateOptions{})
_, err = c.patcher.PatchStatus(ctx, clusterSet, clusterSet.Status, originalClusterSet.Status)
if err != nil {
return fmt.Errorf("failed to update status of ManagedClusterSet %q: %w", clusterSet.Name, err)
}

View File

@@ -2,6 +2,7 @@ package managedclusterset
import (
"context"
"open-cluster-management.io/ocm/pkg/common/patcher"
testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
"reflect"
"testing"
@@ -228,7 +229,9 @@ func TestSyncClusterSet(t *testing.T) {
}
ctrl := managedClusterSetController{
clusterClient: clusterClient,
patcher: patcher.NewPatcher[
*clusterv1beta2.ManagedClusterSet, clusterv1beta2.ManagedClusterSetSpec, clusterv1beta2.ManagedClusterSetStatus](
clusterClient.ClusterV1beta2().ManagedClusterSets()),
clusterLister: informerFactory.Cluster().V1().ManagedClusters().Lister(),
clusterSetLister: informerFactory.Cluster().V1beta2().ManagedClusterSets().Lister(),
eventRecorder: eventstesting.NewTestingEventRecorder(t),
@@ -380,7 +383,6 @@ func TestEnqueueUpdateClusterClusterSet(t *testing.T) {
syncCtx := testingcommon.NewFakeSyncContext(t, "fake")
ctrl := managedClusterSetController{
clusterClient: clusterClient,
clusterSetLister: informerFactory.Cluster().V1beta2().ManagedClusterSets().Lister(),
eventRecorder: eventstesting.NewTestingEventRecorder(t),
queue: syncCtx.Queue(),

View File

@@ -2,18 +2,13 @@ package managedclustersetbinding
import (
"context"
"encoding/json"
"fmt"
jsonpatch "github.com/evanphx/json-patch"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
@@ -22,6 +17,7 @@ import (
clusterinformerv1beta2 "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1beta2"
clusterlisterv1beta2 "open-cluster-management.io/api/client/cluster/listers/cluster/v1beta2"
clusterv1beta2 "open-cluster-management.io/api/cluster/v1beta2"
"open-cluster-management.io/ocm/pkg/common/patcher"
)
const (
@@ -148,6 +144,10 @@ func (c *managedClusterSetBindingController) sync(ctx context.Context, syncCtx f
return err
}
patcher := patcher.NewPatcher[
*clusterv1beta2.ManagedClusterSetBinding, clusterv1beta2.ManagedClusterSetBindingSpec, clusterv1beta2.ManagedClusterSetBindingStatus](
c.clusterClient.ClusterV1beta2().ManagedClusterSetBindings(bindingNamespace))
if len(bindingNamespace) == 0 {
return nil
}
@@ -170,7 +170,10 @@ func (c *managedClusterSetBindingController) sync(ctx context.Context, syncCtx f
Status: metav1.ConditionFalse,
Reason: "ClusterSetNotFound",
})
return c.patchCondition(ctx, binding, bindingCopy)
if _, err := patcher.PatchStatus(ctx, bindingCopy, bindingCopy.Status, binding.Status); err != nil {
return err
}
return nil
case err != nil:
return err
}
@@ -181,43 +184,9 @@ func (c *managedClusterSetBindingController) sync(ctx context.Context, syncCtx f
Reason: "ClusterSetBound",
})
return c.patchCondition(ctx, binding, bindingCopy)
}
func (c *managedClusterSetBindingController) patchCondition(ctx context.Context, old, new *clusterv1beta2.ManagedClusterSetBinding) error {
if equality.Semantic.DeepEqual(old.Status.Conditions, new.Status.Conditions) {
return nil
}
oldData, err := json.Marshal(clusterv1beta2.ManagedClusterSetBinding{
Status: clusterv1beta2.ManagedClusterSetBindingStatus{
Conditions: old.Status.Conditions,
},
})
if err != nil {
return fmt.Errorf("failed to Marshal old data for workspace %s: %w", old.Name, err)
}
newData, err := json.Marshal(clusterv1beta2.ManagedClusterSetBinding{
ObjectMeta: metav1.ObjectMeta{
UID: old.UID,
ResourceVersion: old.ResourceVersion,
}, // to ensure they appear in the patch as preconditions
Status: clusterv1beta2.ManagedClusterSetBindingStatus{
Conditions: new.Status.Conditions,
},
})
if err != nil {
return fmt.Errorf("failed to Marshal new data for workspace %s: %w", new.Name, err)
}
patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
if err != nil {
return fmt.Errorf("failed to create patch for workspace %s: %w", new.Name, err)
}
c.eventRecorder.Eventf("PatchClusterSetBindingCondition", "patch clustersetbinding %s/%s condition", new.Namespace, new.Name)
_, err = c.clusterClient.ClusterV1beta2().ManagedClusterSetBindings(new.Namespace).Patch(ctx, new.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status")
return err
if _, err := patcher.PatchStatus(ctx, bindingCopy, bindingCopy.Status, binding.Status); err != nil {
return err
}
return nil
}

View File

@@ -2,6 +2,7 @@ package taint
import (
"context"
"open-cluster-management.io/ocm/pkg/common/patcher"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
@@ -31,7 +32,7 @@ var (
// taintController
type taintController struct {
clusterClient clientset.Interface
patcher patcher.Patcher[*v1.ManagedCluster, v1.ManagedClusterSpec, v1.ManagedClusterStatus]
clusterLister listerv1.ManagedClusterLister
eventRecorder events.Recorder
}
@@ -42,7 +43,9 @@ func NewTaintController(
clusterInformer informerv1.ManagedClusterInformer,
recorder events.Recorder) factory.Controller {
c := &taintController{
clusterClient: clusterClient,
patcher: patcher.NewPatcher[
*v1.ManagedCluster, v1.ManagedClusterSpec, v1.ManagedClusterStatus](
clusterClient.ClusterV1().ManagedClusters()),
clusterLister: clusterInformer.Lister(),
eventRecorder: recorder.WithComponentSuffix("taint-controller"),
}
@@ -70,9 +73,9 @@ func (c *taintController) sync(ctx context.Context, syncCtx factory.SyncContext)
return nil
}
managedCluster = managedCluster.DeepCopy()
newTaints := managedCluster.Spec.Taints
cond := meta.FindStatusCondition(managedCluster.Status.Conditions, v1.ManagedClusterConditionAvailable)
newManagedCluster := managedCluster.DeepCopy()
newTaints := newManagedCluster.Spec.Taints
cond := meta.FindStatusCondition(newManagedCluster.Status.Conditions, v1.ManagedClusterConditionAvailable)
var updated bool
switch {
@@ -87,8 +90,8 @@ func (c *taintController) sync(ctx context.Context, syncCtx factory.SyncContext)
}
if updated {
managedCluster.Spec.Taints = newTaints
if _, err = c.clusterClient.ClusterV1().ManagedClusters().Update(ctx, managedCluster, metav1.UpdateOptions{}); err != nil {
newManagedCluster.Spec.Taints = newTaints
if _, err = c.patcher.PatchSpec(ctx, newManagedCluster, newManagedCluster.Spec, managedCluster.Spec); err != nil {
return err
}
c.eventRecorder.Eventf("ManagedClusterConditionAvailableUpdated", "Update the original taints to the %+v", newTaints)

View File

@@ -2,6 +2,8 @@ package taint
import (
"context"
"encoding/json"
"open-cluster-management.io/ocm/pkg/common/patcher"
"reflect"
"testing"
"time"
@@ -36,8 +38,13 @@ func TestSyncTaintCluster(t *testing.T) {
name: "ManagedClusterConditionAvailable conditionStatus is False",
startingObjects: []runtime.Object{testinghelpers.NewUnAvailableManagedCluster()},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "update")
managedCluster := (actions[0].(clienttesting.UpdateActionImpl).Object).(*v1.ManagedCluster)
testingcommon.AssertActions(t, actions, "patch")
patchData := actions[0].(clienttesting.PatchActionImpl).Patch
managedCluster := &v1.ManagedCluster{}
err := json.Unmarshal(patchData, managedCluster)
if err != nil {
t.Fatal(err)
}
taints := []v1.Taint{UnavailableTaint}
if !reflect.DeepEqual(managedCluster.Spec.Taints, taints) {
t.Errorf("expected taint %#v, but actualTaints: %#v", taints, managedCluster.Spec.Taints)
@@ -48,8 +55,13 @@ func TestSyncTaintCluster(t *testing.T) {
name: "There is no ManagedClusterConditionAvailable",
startingObjects: []runtime.Object{testinghelpers.NewManagedCluster()},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "update")
managedCluster := (actions[0].(clienttesting.UpdateActionImpl).Object).(*v1.ManagedCluster)
testingcommon.AssertActions(t, actions, "patch")
patchData := actions[0].(clienttesting.PatchActionImpl).Patch
managedCluster := &v1.ManagedCluster{}
err := json.Unmarshal(patchData, managedCluster)
if err != nil {
t.Fatal(err)
}
taints := []v1.Taint{UnreachableTaint}
if !reflect.DeepEqual(managedCluster.Spec.Taints, taints) {
t.Errorf("expected taint %#v, but actualTaints: %#v", taints, managedCluster.Spec.Taints)
@@ -60,8 +72,13 @@ func TestSyncTaintCluster(t *testing.T) {
name: "ManagedClusterConditionAvailable conditionStatus is Unknown",
startingObjects: []runtime.Object{testinghelpers.NewUnknownManagedCluster()},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "update")
managedCluster := (actions[0].(clienttesting.UpdateActionImpl).Object).(*v1.ManagedCluster)
testingcommon.AssertActions(t, actions, "patch")
patchData := actions[0].(clienttesting.PatchActionImpl).Patch
managedCluster := &v1.ManagedCluster{}
err := json.Unmarshal(patchData, managedCluster)
if err != nil {
t.Fatal(err)
}
taints := []v1.Taint{UnreachableTaint}
if !reflect.DeepEqual(managedCluster.Spec.Taints, taints) {
t.Errorf("expected taint %#v, but actualTaints: %#v", taints, managedCluster.Spec.Taints)
@@ -88,7 +105,11 @@ func TestSyncTaintCluster(t *testing.T) {
}
}
ctrl := taintController{clusterClient, clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(), eventstesting.NewTestingEventRecorder(t)}
ctrl := taintController{
patcher.NewPatcher[
*v1.ManagedCluster, v1.ManagedClusterSpec, v1.ManagedClusterStatus](
clusterClient.ClusterV1().ManagedClusters()),
clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(), eventstesting.NewTestingEventRecorder(t)}
syncErr := ctrl.sync(context.TODO(), testingcommon.NewFakeSyncContext(t, testinghelpers.TestManagedClusterName))
if syncErr != nil {
t.Errorf("unexpected err: %v", syncErr)

View File

@@ -3,16 +3,15 @@ package addon
import (
"context"
"fmt"
"open-cluster-management.io/ocm/pkg/common/patcher"
"time"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
addonv1alpha1 "open-cluster-management.io/api/addon/v1alpha1"
addonclient "open-cluster-management.io/api/client/addon/clientset/versioned"
addoninformerv1alpha1 "open-cluster-management.io/api/client/addon/informers/externalversions/addon/v1alpha1"
addonlisterv1alpha1 "open-cluster-management.io/api/client/addon/listers/addon/v1alpha1"
"open-cluster-management.io/ocm/pkg/registration/helpers"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
@@ -33,9 +32,10 @@ var AddOnLeaseControllerLeaseDurationSeconds = 60
// managedClusterAddOnLeaseController updates the managed cluster addons status on the hub cluster through checking the add-on
// lease on the managed/management cluster.
type managedClusterAddOnLeaseController struct {
clusterName string
clock clock.Clock
addOnClient addonclient.Interface
clusterName string
clock clock.Clock
patcher patcher.Patcher[
*addonv1alpha1.ManagedClusterAddOn, addonv1alpha1.ManagedClusterAddOnSpec, addonv1alpha1.ManagedClusterAddOnStatus]
addOnLister addonlisterv1alpha1.ManagedClusterAddOnLister
hubLeaseClient coordv1client.CoordinationV1Interface
managementLeaseClient coordv1client.CoordinationV1Interface
@@ -52,9 +52,11 @@ func NewManagedClusterAddOnLeaseController(clusterName string,
resyncInterval time.Duration,
recorder events.Recorder) factory.Controller {
c := &managedClusterAddOnLeaseController{
clusterName: clusterName,
clock: clock.RealClock{},
addOnClient: addOnClient,
clusterName: clusterName,
clock: clock.RealClock{},
patcher: patcher.NewPatcher[
*addonv1alpha1.ManagedClusterAddOn, addonv1alpha1.ManagedClusterAddOnSpec, addonv1alpha1.ManagedClusterAddOnStatus](
addOnClient.AddonV1alpha1().ManagedClusterAddOns(clusterName)),
addOnLister: addOnInformer.Lister(),
hubLeaseClient: hubLeaseClient,
managementLeaseClient: managementLeaseClient,
@@ -183,18 +185,9 @@ func (c *managedClusterAddOnLeaseController) syncSingle(ctx context.Context,
}
}
if meta.IsStatusConditionPresentAndEqual(addOn.Status.Conditions, condition.Type, condition.Status) {
// addon status is not changed, do nothing
return nil
}
_, updated, err := helpers.UpdateManagedClusterAddOnStatus(
ctx,
c.addOnClient,
c.clusterName,
addOn.Name,
helpers.UpdateManagedClusterAddOnStatusFn(condition),
)
newAddon := addOn.DeepCopy()
meta.SetStatusCondition(&newAddon.Status.Conditions, condition)
updated, err := c.patcher.PatchStatus(ctx, newAddon, newAddon.Status, addOn.Status)
if err != nil {
return err
}

View File

@@ -3,6 +3,7 @@ package addon
import (
"context"
"encoding/json"
"open-cluster-management.io/ocm/pkg/common/patcher"
"testing"
"time"
@@ -141,8 +142,8 @@ func TestSync(t *testing.T) {
hubLeases: []runtime.Object{},
spokeLeases: []runtime.Object{},
validateActions: func(t *testing.T, ctx *testingcommon.FakeSyncContext, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "get", "patch")
patch := actions[1].(clienttesting.PatchAction).GetPatch()
testingcommon.AssertActions(t, actions, "patch")
patch := actions[0].(clienttesting.PatchAction).GetPatch()
addOn := &addonv1alpha1.ManagedClusterAddOn{}
err := json.Unmarshal(patch, addOn)
if err != nil {
@@ -175,8 +176,8 @@ func TestSync(t *testing.T) {
testinghelpers.NewAddOnLease("test", "test", now.Add(-5*time.Minute)),
},
validateActions: func(t *testing.T, ctx *testingcommon.FakeSyncContext, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "get", "patch")
patch := actions[1].(clienttesting.PatchAction).GetPatch()
testingcommon.AssertActions(t, actions, "patch")
patch := actions[0].(clienttesting.PatchAction).GetPatch()
addOn := &addonv1alpha1.ManagedClusterAddOn{}
err := json.Unmarshal(patch, addOn)
if err != nil {
@@ -209,8 +210,8 @@ func TestSync(t *testing.T) {
testinghelpers.NewAddOnLease("test", "test", now),
},
validateActions: func(t *testing.T, ctx *testingcommon.FakeSyncContext, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "get", "patch")
patch := actions[1].(clienttesting.PatchAction).GetPatch()
testingcommon.AssertActions(t, actions, "patch")
patch := actions[0].(clienttesting.PatchAction).GetPatch()
addOn := &addonv1alpha1.ManagedClusterAddOn{}
err := json.Unmarshal(patch, addOn)
if err != nil {
@@ -243,7 +244,7 @@ func TestSync(t *testing.T) {
Type: "Available",
Status: metav1.ConditionTrue,
Reason: "ManagedClusterAddOnLeaseUpdated",
Message: "Managed cluster addon agent updates its lease constantly.",
Message: "test add-on is available.",
},
},
},
@@ -306,8 +307,8 @@ func TestSync(t *testing.T) {
testinghelpers.NewAddOnLease("test", "test", now),
},
validateActions: func(t *testing.T, ctx *testingcommon.FakeSyncContext, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "get", "patch")
patch := actions[1].(clienttesting.PatchAction).GetPatch()
testingcommon.AssertActions(t, actions, "patch")
patch := actions[0].(clienttesting.PatchAction).GetPatch()
addOn := &addonv1alpha1.ManagedClusterAddOn{}
err := json.Unmarshal(patch, addOn)
if err != nil {
@@ -335,8 +336,8 @@ func TestSync(t *testing.T) {
hubLeases: []runtime.Object{testinghelpers.NewAddOnLease(testinghelpers.TestManagedClusterName, "test", now)},
spokeLeases: []runtime.Object{},
validateActions: func(t *testing.T, ctx *testingcommon.FakeSyncContext, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "get", "patch")
patch := actions[1].(clienttesting.PatchAction).GetPatch()
testingcommon.AssertActions(t, actions, "patch")
patch := actions[0].(clienttesting.PatchAction).GetPatch()
addOn := &addonv1alpha1.ManagedClusterAddOn{}
err := json.Unmarshal(patch, addOn)
if err != nil {
@@ -390,10 +391,12 @@ func TestSync(t *testing.T) {
spokeLeaseClient := kubefake.NewSimpleClientset(c.spokeLeases...)
ctrl := &managedClusterAddOnLeaseController{
clusterName: testinghelpers.TestManagedClusterName,
clock: clocktesting.NewFakeClock(time.Now()),
hubLeaseClient: hubClient.CoordinationV1(),
addOnClient: addOnClient,
clusterName: testinghelpers.TestManagedClusterName,
clock: clocktesting.NewFakeClock(time.Now()),
hubLeaseClient: hubClient.CoordinationV1(),
patcher: patcher.NewPatcher[
*addonv1alpha1.ManagedClusterAddOn, addonv1alpha1.ManagedClusterAddOnSpec, addonv1alpha1.ManagedClusterAddOnStatus](
addOnClient.AddonV1alpha1().ManagedClusterAddOns(testinghelpers.TestManagedClusterName)),
addOnLister: addOnInformerFactory.Addon().V1alpha1().ManagedClusterAddOns().Lister(),
managementLeaseClient: managementLeaseClient.CoordinationV1(),
spokeLeaseClient: spokeLeaseClient.CoordinationV1(),

View File

@@ -3,6 +3,7 @@ package addon
import (
"context"
"fmt"
"open-cluster-management.io/ocm/pkg/common/patcher"
"time"
addonv1alpha1 "open-cluster-management.io/api/addon/v1alpha1"
@@ -25,7 +26,6 @@ import (
addoninformerv1alpha1 "open-cluster-management.io/api/client/addon/informers/externalversions/addon/v1alpha1"
addonlisterv1alpha1 "open-cluster-management.io/api/client/addon/listers/addon/v1alpha1"
"open-cluster-management.io/ocm/pkg/registration/clientcert"
"open-cluster-management.io/ocm/pkg/registration/helpers"
)
const (
@@ -46,10 +46,11 @@ type addOnRegistrationController struct {
managementKubeClient kubernetes.Interface // in-cluster local management kubeClient
spokeKubeClient kubernetes.Interface
hubAddOnLister addonlisterv1alpha1.ManagedClusterAddOnLister
addOnClient addonclient.Interface
csrControl clientcert.CSRControl
recorder events.Recorder
csrIndexer cache.Indexer
patcher patcher.Patcher[
*addonv1alpha1.ManagedClusterAddOn, addonv1alpha1.ManagedClusterAddOnSpec, addonv1alpha1.ManagedClusterAddOnStatus]
csrControl clientcert.CSRControl
recorder events.Recorder
csrIndexer cache.Indexer
startRegistrationFunc func(ctx context.Context, config registrationConfig) context.CancelFunc
@@ -71,14 +72,16 @@ func NewAddOnRegistrationController(
recorder events.Recorder,
) factory.Controller {
c := &addOnRegistrationController{
clusterName: clusterName,
agentName: agentName,
kubeconfigData: kubeconfigData,
managementKubeClient: managementKubeClient,
spokeKubeClient: managedKubeClient,
hubAddOnLister: hubAddOnInformers.Lister(),
csrControl: csrControl,
addOnClient: addOnClient,
clusterName: clusterName,
agentName: agentName,
kubeconfigData: kubeconfigData,
managementKubeClient: managementKubeClient,
spokeKubeClient: managedKubeClient,
hubAddOnLister: hubAddOnInformers.Lister(),
csrControl: csrControl,
patcher: patcher.NewPatcher[
*addonv1alpha1.ManagedClusterAddOn, addonv1alpha1.ManagedClusterAddOnSpec, addonv1alpha1.ManagedClusterAddOnStatus](
addOnClient.AddonV1alpha1().ManagedClusterAddOns(clusterName)),
recorder: recorder,
csrIndexer: csrControl.Informer().GetIndexer(),
addOnRegistrationConfigs: map[string]map[string]registrationConfig{},
@@ -276,11 +279,18 @@ func (c *addOnRegistrationController) haltCSRCreationFunc(addonName string) func
func (c *addOnRegistrationController) generateStatusUpdate(clusterName, addonName string) clientcert.StatusUpdateFunc {
return func(ctx context.Context, cond metav1.Condition) error {
_, _, updatedErr := helpers.UpdateManagedClusterAddOnStatus(
ctx, c.addOnClient, clusterName, addonName, helpers.UpdateManagedClusterAddOnStatusFn(cond),
)
addon, err := c.hubAddOnLister.ManagedClusterAddOns(clusterName).Get(addonName)
if errors.IsNotFound(err) {
return nil
}
if err != nil {
return err
}
return updatedErr
newAddon := addon.DeepCopy()
meta.SetStatusCondition(&newAddon.Status.Conditions, cond)
_, err = c.patcher.PatchStatus(ctx, newAddon, newAddon.Status, addon.Status)
return err
}
}

View File

@@ -1,4 +1,4 @@
package managedcluster
package lease
import (
"context"

View File

@@ -1,4 +1,4 @@
package managedcluster
package lease
import (
"context"

View File

@@ -1,150 +0,0 @@
package managedcluster
import (
"context"
"fmt"
"sort"
"k8s.io/apimachinery/pkg/selection"
clientset "open-cluster-management.io/api/client/cluster/clientset/versioned"
clusterv1informer "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1"
clusterv1alpha1informer "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1alpha1"
clusterv1listers "open-cluster-management.io/api/client/cluster/listers/cluster/v1"
clusterv1alpha1listers "open-cluster-management.io/api/client/cluster/listers/cluster/v1alpha1"
clusterv1 "open-cluster-management.io/api/cluster/v1"
clusterv1alpha1 "open-cluster-management.io/api/cluster/v1alpha1"
"open-cluster-management.io/ocm/pkg/registration/helpers"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
)
const labelCustomizedOnly = "open-cluster-management.io/spoke-only"
// managedClusterClaimController exposes cluster claims created on managed cluster on hub after it joins the hub.
type managedClusterClaimController struct {
clusterName string
hubClusterClient clientset.Interface
hubClusterLister clusterv1listers.ManagedClusterLister
claimLister clusterv1alpha1listers.ClusterClaimLister
maxCustomClusterClaims int
}
// NewManagedClusterClaimController creates a new managed cluster claim controller on the managed cluster.
func NewManagedClusterClaimController(
clusterName string,
maxCustomClusterClaims int,
hubClusterClient clientset.Interface,
hubManagedClusterInformer clusterv1informer.ManagedClusterInformer,
claimInformer clusterv1alpha1informer.ClusterClaimInformer,
recorder events.Recorder) factory.Controller {
c := &managedClusterClaimController{
clusterName: clusterName,
maxCustomClusterClaims: maxCustomClusterClaims,
hubClusterClient: hubClusterClient,
hubClusterLister: hubManagedClusterInformer.Lister(),
claimLister: claimInformer.Lister(),
}
return factory.New().
WithInformers(claimInformer.Informer()).
WithInformersQueueKeyFunc(func(obj runtime.Object) string {
accessor, _ := meta.Accessor(obj)
return accessor.GetName()
}, hubManagedClusterInformer.Informer()).
WithSync(c.sync).
ToController("ClusterClaimController", recorder)
}
// sync maintains the cluster claims in status of the managed cluster on hub once it joins the hub.
func (c managedClusterClaimController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
managedCluster, err := c.hubClusterLister.Get(c.clusterName)
if err != nil {
return fmt.Errorf("unable to get managed cluster with name %q from hub: %w", c.clusterName, err)
}
// current managed cluster has not joined the hub yet, do nothing.
if !meta.IsStatusConditionTrue(managedCluster.Status.Conditions, clusterv1.ManagedClusterConditionJoined) {
syncCtx.Recorder().Eventf("ManagedClusterIsNotAccepted", "Managed cluster %q does not join the hub yet", c.clusterName)
return nil
}
return c.exposeClaims(ctx, syncCtx, managedCluster)
}
// exposeClaims saves cluster claims fetched on managed cluster into status of the
// managed cluster on hub. Some of the customized claims might not be exposed once
// the total number of the claims exceeds the value of `cluster-claims-max`.
func (c managedClusterClaimController) exposeClaims(ctx context.Context, syncCtx factory.SyncContext,
managedCluster *clusterv1.ManagedCluster) error {
reservedClaims := []clusterv1.ManagedClusterClaim{}
customClaims := []clusterv1.ManagedClusterClaim{}
// clusterClaim with label `open-cluster-management.io/spoke-only` will not be synced to managedCluster.Status at hub.
requirement, _ := labels.NewRequirement(labelCustomizedOnly, selection.DoesNotExist, []string{})
selector := labels.NewSelector().Add(*requirement)
clusterClaims, err := c.claimLister.List(selector)
if err != nil {
return fmt.Errorf("unable to list cluster claims: %w", err)
}
reservedClaimNames := sets.NewString(clusterv1alpha1.ReservedClusterClaimNames[:]...)
for _, clusterClaim := range clusterClaims {
managedClusterClaim := clusterv1.ManagedClusterClaim{
Name: clusterClaim.Name,
Value: clusterClaim.Spec.Value,
}
if reservedClaimNames.Has(clusterClaim.Name) {
reservedClaims = append(reservedClaims, managedClusterClaim)
continue
}
customClaims = append(customClaims, managedClusterClaim)
}
// sort claims by name
sort.SliceStable(reservedClaims, func(i, j int) bool {
return reservedClaims[i].Name < reservedClaims[j].Name
})
sort.SliceStable(customClaims, func(i, j int) bool {
return customClaims[i].Name < customClaims[j].Name
})
// truncate custom claims if the number exceeds `max-custom-cluster-claims`
if n := len(customClaims); n > c.maxCustomClusterClaims {
customClaims = customClaims[:c.maxCustomClusterClaims]
syncCtx.Recorder().Eventf("CustomClusterClaimsTruncated", "%d cluster claims are found. It exceeds the max number of custom cluster claims (%d). %d custom cluster claims are not exposed.",
n, c.maxCustomClusterClaims, n-c.maxCustomClusterClaims)
}
// merge reserved claims and custom claims
claims := append(reservedClaims, customClaims...)
// update the status of the managed cluster
updateStatusFuncs := []helpers.UpdateManagedClusterStatusFunc{updateClusterClaimsFn(clusterv1.ManagedClusterStatus{
ClusterClaims: claims,
})}
_, updated, err := helpers.UpdateManagedClusterStatus(ctx, c.hubClusterClient, c.clusterName, updateStatusFuncs...)
if err != nil {
return fmt.Errorf("unable to update status of managed cluster %q: %w", c.clusterName, err)
}
if updated {
klog.V(4).Infof("The cluster claims in status of managed cluster %q has been updated", c.clusterName)
}
return nil
}
func updateClusterClaimsFn(status clusterv1.ManagedClusterStatus) helpers.UpdateManagedClusterStatusFunc {
return func(oldStatus *clusterv1.ManagedClusterStatus) error {
oldStatus.ClusterClaims = status.ClusterClaims
return nil
}
}

View File

@@ -0,0 +1,89 @@
package managedcluster
import (
"context"
"fmt"
"github.com/openshift/library-go/pkg/operator/events"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/util/sets"
clusterv1alpha1listers "open-cluster-management.io/api/client/cluster/listers/cluster/v1alpha1"
clusterv1 "open-cluster-management.io/api/cluster/v1"
clusterv1alpha1 "open-cluster-management.io/api/cluster/v1alpha1"
ocmfeature "open-cluster-management.io/api/feature"
"open-cluster-management.io/ocm/pkg/features"
"sort"
)
const labelCustomizedOnly = "open-cluster-management.io/spoke-only"
type claimReconcile struct {
recorder events.Recorder
claimLister clusterv1alpha1listers.ClusterClaimLister
maxCustomClusterClaims int
}
func (r *claimReconcile) reconcile(ctx context.Context, cluster *clusterv1.ManagedCluster) (*clusterv1.ManagedCluster, reconcileState, error) {
if !features.DefaultSpokeRegistrationMutableFeatureGate.Enabled(ocmfeature.ClusterClaim) {
return cluster, reconcileContinue, nil
}
// current managed cluster has not joined the hub yet, do nothing.
if !meta.IsStatusConditionTrue(cluster.Status.Conditions, clusterv1.ManagedClusterConditionJoined) {
r.recorder.Eventf("ManagedClusterIsNotAccepted", "Managed cluster %q does not join the hub yet", cluster.Name)
return cluster, reconcileContinue, nil
}
err := r.exposeClaims(ctx, cluster)
return cluster, reconcileContinue, err
}
// exposeClaims saves cluster claims fetched on managed cluster into status of the
// managed cluster on hub. Some of the customized claims might not be exposed once
// the total number of the claims exceeds the value of `cluster-claims-max`.
func (r *claimReconcile) exposeClaims(ctx context.Context, cluster *clusterv1.ManagedCluster) error {
reservedClaims := []clusterv1.ManagedClusterClaim{}
customClaims := []clusterv1.ManagedClusterClaim{}
// clusterClaim with label `open-cluster-management.io/spoke-only` will not be synced to managedCluster.Status at hub.
requirement, _ := labels.NewRequirement(labelCustomizedOnly, selection.DoesNotExist, []string{})
selector := labels.NewSelector().Add(*requirement)
clusterClaims, err := r.claimLister.List(selector)
if err != nil {
return fmt.Errorf("unable to list cluster claims: %w", err)
}
reservedClaimNames := sets.NewString(clusterv1alpha1.ReservedClusterClaimNames[:]...)
for _, clusterClaim := range clusterClaims {
managedClusterClaim := clusterv1.ManagedClusterClaim{
Name: clusterClaim.Name,
Value: clusterClaim.Spec.Value,
}
if reservedClaimNames.Has(clusterClaim.Name) {
reservedClaims = append(reservedClaims, managedClusterClaim)
continue
}
customClaims = append(customClaims, managedClusterClaim)
}
// sort claims by name
sort.SliceStable(reservedClaims, func(i, j int) bool {
return reservedClaims[i].Name < reservedClaims[j].Name
})
sort.SliceStable(customClaims, func(i, j int) bool {
return customClaims[i].Name < customClaims[j].Name
})
// truncate custom claims if the number exceeds `max-custom-cluster-claims`
if n := len(customClaims); n > r.maxCustomClusterClaims {
customClaims = customClaims[:r.maxCustomClusterClaims]
r.recorder.Eventf("CustomClusterClaimsTruncated", "%d cluster claims are found. It exceeds the max number of custom cluster claims (%d). %d custom cluster claims are not exposed.",
n, r.maxCustomClusterClaims, n-r.maxCustomClusterClaims)
}
// merge reserved claims and custom claims
claims := append(reservedClaims, customClaims...)
cluster.Status.ClusterClaims = claims
return nil
}

View File

@@ -3,6 +3,9 @@ package managedcluster
import (
"context"
"encoding/json"
"github.com/openshift/library-go/pkg/operator/events/eventstesting"
kubeinformers "k8s.io/client-go/informers"
kubefake "k8s.io/client-go/kubernetes/fake"
"reflect"
"testing"
"time"
@@ -30,7 +33,7 @@ func TestSync(t *testing.T) {
{
name: "sync no managed cluster",
validateActions: testingcommon.AssertNoActions,
expectedErr: "unable to get managed cluster with name \"testmanagedcluster\" from hub: managedcluster.cluster.open-cluster-management.io \"testmanagedcluster\" not found",
expectedErr: "unable to get managed cluster \"testmanagedcluster\" from hub: managedcluster.cluster.open-cluster-management.io \"testmanagedcluster\" not found",
},
{
name: "skip when managed cluster does not join the hub yet",
@@ -51,8 +54,8 @@ func TestSync(t *testing.T) {
},
},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "get", "patch")
patch := actions[1].(clienttesting.PatchAction).GetPatch()
testingcommon.AssertActions(t, actions, "patch")
patch := actions[0].(clienttesting.PatchAction).GetPatch()
cluster := &clusterv1.ManagedCluster{}
err := json.Unmarshal(patch, cluster)
if err != nil {
@@ -72,6 +75,11 @@ func TestSync(t *testing.T) {
},
}
apiServer, discoveryClient := newDiscoveryServer(t, nil)
defer apiServer.Close()
kubeClient := kubefake.NewSimpleClientset()
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Minute*10)
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
objects := []runtime.Object{}
@@ -93,13 +101,16 @@ func TestSync(t *testing.T) {
}
}
ctrl := managedClusterClaimController{
clusterName: testinghelpers.TestManagedClusterName,
maxCustomClusterClaims: 20,
hubClusterClient: clusterClient,
hubClusterLister: clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(),
claimLister: clusterInformerFactory.Cluster().V1alpha1().ClusterClaims().Lister(),
}
ctrl := newManagedClusterStatusController(
testinghelpers.TestManagedClusterName,
clusterClient,
clusterInformerFactory.Cluster().V1().ManagedClusters(),
discoveryClient,
clusterInformerFactory.Cluster().V1alpha1().ClusterClaims(),
kubeInformerFactory.Core().V1().Nodes(),
20,
eventstesting.NewTestingEventRecorder(t),
)
syncErr := ctrl.sync(context.TODO(), testingcommon.NewFakeSyncContext(t, ""))
testingcommon.AssertError(t, syncErr, c.expectedErr)
@@ -132,8 +143,8 @@ func TestExposeClaims(t *testing.T) {
},
},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "get", "patch")
patch := actions[1].(clienttesting.PatchAction).GetPatch()
testingcommon.AssertActions(t, actions, "patch")
patch := actions[0].(clienttesting.PatchAction).GetPatch()
cluster := &clusterv1.ManagedCluster{}
err := json.Unmarshal(patch, cluster)
if err != nil {
@@ -190,8 +201,8 @@ func TestExposeClaims(t *testing.T) {
},
maxCustomClusterClaims: 2,
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "get", "patch")
patch := actions[1].(clienttesting.PatchAction).GetPatch()
testingcommon.AssertActions(t, actions, "patch")
patch := actions[0].(clienttesting.PatchAction).GetPatch()
cluster := &clusterv1.ManagedCluster{}
err := json.Unmarshal(patch, cluster)
if err != nil {
@@ -226,8 +237,8 @@ func TestExposeClaims(t *testing.T) {
},
}),
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "get", "patch")
patch := actions[1].(clienttesting.PatchAction).GetPatch()
testingcommon.AssertActions(t, actions, "patch")
patch := actions[0].(clienttesting.PatchAction).GetPatch()
cluster := &clusterv1.ManagedCluster{}
err := json.Unmarshal(patch, cluster)
if err != nil {
@@ -262,8 +273,8 @@ func TestExposeClaims(t *testing.T) {
},
},
validateActions: func(t *testing.T, actions []clienttesting.Action) {
testingcommon.AssertActions(t, actions, "get", "patch")
patch := actions[1].(clienttesting.PatchAction).GetPatch()
testingcommon.AssertActions(t, actions, "patch")
patch := actions[0].(clienttesting.PatchAction).GetPatch()
cluster := &clusterv1.ManagedCluster{}
err := json.Unmarshal(patch, cluster)
if err != nil {
@@ -283,6 +294,11 @@ func TestExposeClaims(t *testing.T) {
},
}
apiServer, discoveryClient := newDiscoveryServer(t, nil)
defer apiServer.Close()
kubeClient := kubefake.NewSimpleClientset()
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Minute*10)
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
objects := []runtime.Object{}
@@ -308,15 +324,18 @@ func TestExposeClaims(t *testing.T) {
c.maxCustomClusterClaims = 20
}
ctrl := managedClusterClaimController{
clusterName: testinghelpers.TestManagedClusterName,
maxCustomClusterClaims: c.maxCustomClusterClaims,
hubClusterClient: clusterClient,
hubClusterLister: clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(),
claimLister: clusterInformerFactory.Cluster().V1alpha1().ClusterClaims().Lister(),
}
ctrl := newManagedClusterStatusController(
testinghelpers.TestManagedClusterName,
clusterClient,
clusterInformerFactory.Cluster().V1().ManagedClusters(),
discoveryClient,
clusterInformerFactory.Cluster().V1alpha1().ClusterClaims(),
kubeInformerFactory.Core().V1().Nodes(),
c.maxCustomClusterClaims,
eventstesting.NewTestingEventRecorder(t),
)
syncErr := ctrl.exposeClaims(context.TODO(), testingcommon.NewFakeSyncContext(t, c.cluster.Name), c.cluster)
syncErr := ctrl.sync(context.TODO(), testingcommon.NewFakeSyncContext(t, c.cluster.Name))
testingcommon.AssertError(t, syncErr, c.expectedErr)
c.validateActions(t, clusterClient.Actions())

View File

@@ -1,86 +0,0 @@
package managedcluster
import (
"context"
"fmt"
"time"
clientset "open-cluster-management.io/api/client/cluster/clientset/versioned"
clusterv1informer "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1"
clusterv1listers "open-cluster-management.io/api/client/cluster/listers/cluster/v1"
clusterv1 "open-cluster-management.io/api/cluster/v1"
"open-cluster-management.io/ocm/pkg/registration/helpers"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// managedClusterJoiningController add the joined condition to a ManagedCluster on the managed cluster after it is accepted by hub cluster admin.
type managedClusterJoiningController struct {
clusterName string
hubClusterClient clientset.Interface
hubClusterLister clusterv1listers.ManagedClusterLister
}
// NewManagedClusterJoiningController creates a new managed cluster joining controller on the managed cluster.
func NewManagedClusterJoiningController(
clusterName string,
hubClusterClient clientset.Interface,
hubManagedClusterInformer clusterv1informer.ManagedClusterInformer,
recorder events.Recorder) factory.Controller {
c := &managedClusterJoiningController{
clusterName: clusterName,
hubClusterClient: hubClusterClient,
hubClusterLister: hubManagedClusterInformer.Lister(),
}
return factory.New().
WithInformers(hubManagedClusterInformer.Informer()).
WithSync(c.sync).
ResyncEvery(5*time.Minute).
ToController("ManagedClusterJoiningController", recorder)
}
// sync maintains the managed cluster side status of a ManagedCluster, it maintains the ManagedClusterJoined condition according to
// the value of the ManagedClusterHubAccepted condition.
func (c managedClusterJoiningController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
managedCluster, err := c.hubClusterLister.Get(c.clusterName)
if err != nil {
return fmt.Errorf("unable to get managed cluster with name %q from hub: %w", c.clusterName, err)
}
// current managed cluster is not accepted, do nothing.
if !meta.IsStatusConditionTrue(managedCluster.Status.Conditions, clusterv1.ManagedClusterConditionHubAccepted) {
syncCtx.Recorder().Eventf("ManagedClusterIsNotAccepted", "Managed cluster %q is not accepted by hub yet", c.clusterName)
return nil
}
joined := meta.IsStatusConditionTrue(managedCluster.Status.Conditions, clusterv1.ManagedClusterConditionJoined)
if joined {
// current managed cluster is joined, do nothing.
return nil
}
// current managed cluster did not join the hub cluster, join it.
_, updated, err := helpers.UpdateManagedClusterStatus(
ctx,
c.hubClusterClient,
c.clusterName,
helpers.UpdateManagedClusterConditionFn(metav1.Condition{
Type: clusterv1.ManagedClusterConditionJoined,
Status: metav1.ConditionTrue,
Reason: "ManagedClusterJoined",
Message: "Managed cluster joined",
}),
)
if err != nil {
return fmt.Errorf("unable to update status of managed cluster %q: %w", c.clusterName, err)
}
if updated {
syncCtx.Recorder().Eventf("ManagedClusterJoined", "Managed cluster %q joined hub", c.clusterName)
}
return nil
}

View File

@@ -3,6 +3,9 @@ package managedcluster
import (
"context"
"encoding/json"
"github.com/openshift/library-go/pkg/operator/events/eventstesting"
kubeinformers "k8s.io/client-go/informers"
kubefake "k8s.io/client-go/kubernetes/fake"
"testing"
"time"
@@ -28,7 +31,7 @@ func TestSyncManagedCluster(t *testing.T) {
name: "sync no managed cluster",
startingObjects: []runtime.Object{},
validateActions: testingcommon.AssertNoActions,
expectedErr: "unable to get managed cluster with name \"testmanagedcluster\" from hub: managedcluster.cluster.open-cluster-management.io \"testmanagedcluster\" not found",
expectedErr: "unable to get managed cluster \"testmanagedcluster\" from hub: managedcluster.cluster.open-cluster-management.io \"testmanagedcluster\" not found",
},
{
name: "sync an unaccepted managed cluster",
@@ -45,8 +48,8 @@ func TestSyncManagedCluster(t *testing.T) {
Reason: "ManagedClusterJoined",
Message: "Managed cluster joined",
}
testingcommon.AssertActions(t, actions, "get", "patch")
patch := actions[1].(clienttesting.PatchAction).GetPatch()
testingcommon.AssertActions(t, actions, "patch")
patch := actions[0].(clienttesting.PatchAction).GetPatch()
managedCluster := &clusterv1.ManagedCluster{}
err := json.Unmarshal(patch, managedCluster)
if err != nil {
@@ -57,6 +60,11 @@ func TestSyncManagedCluster(t *testing.T) {
},
}
apiServer, discoveryClient := newDiscoveryServer(t, nil)
defer apiServer.Close()
kubeClient := kubefake.NewSimpleClientset()
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Minute*10)
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
clusterClient := clusterfake.NewSimpleClientset(c.startingObjects...)
@@ -68,11 +76,16 @@ func TestSyncManagedCluster(t *testing.T) {
}
}
ctrl := managedClusterJoiningController{
clusterName: testinghelpers.TestManagedClusterName,
hubClusterClient: clusterClient,
hubClusterLister: clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(),
}
ctrl := newManagedClusterStatusController(
testinghelpers.TestManagedClusterName,
clusterClient,
clusterInformerFactory.Cluster().V1().ManagedClusters(),
discoveryClient,
clusterInformerFactory.Cluster().V1alpha1().ClusterClaims(),
kubeInformerFactory.Core().V1().Nodes(),
20,
eventstesting.NewTestingEventRecorder(t),
)
syncErr := ctrl.sync(context.TODO(), testingcommon.NewFakeSyncContext(t, ""))
testingcommon.AssertError(t, syncErr, c.expectedErr)

View File

@@ -0,0 +1,36 @@
package managedcluster
import (
"context"
"github.com/openshift/library-go/pkg/operator/events"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
clusterv1 "open-cluster-management.io/api/cluster/v1"
)
type joiningReconcile struct {
recorder events.Recorder
}
func (r *joiningReconcile) reconcile(ctx context.Context, cluster *clusterv1.ManagedCluster) (*clusterv1.ManagedCluster, reconcileState, error) {
// current managed cluster is not accepted, do nothing.
if !meta.IsStatusConditionTrue(cluster.Status.Conditions, clusterv1.ManagedClusterConditionHubAccepted) {
r.recorder.Eventf("ManagedClusterIsNotAccepted", "Managed cluster %q is not accepted by hub yet", cluster.Name)
return cluster, reconcileStop, nil
}
joined := meta.IsStatusConditionTrue(cluster.Status.Conditions, clusterv1.ManagedClusterConditionJoined)
if joined {
// current managed cluster is joined, do nothing.
return cluster, reconcileContinue, nil
}
meta.SetStatusCondition(&cluster.Status.Conditions, metav1.Condition{
Type: clusterv1.ManagedClusterConditionJoined,
Status: metav1.ConditionTrue,
Reason: "ManagedClusterJoined",
Message: "Managed cluster joined",
})
return cluster, reconcileContinue, nil
}

View File

@@ -0,0 +1,125 @@
package managedcluster
import (
"context"
"fmt"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/discovery"
corev1lister "k8s.io/client-go/listers/core/v1"
"net/http"
clusterv1 "open-cluster-management.io/api/cluster/v1"
)
type resoureReconcile struct {
managedClusterDiscoveryClient discovery.DiscoveryInterface
nodeLister corev1lister.NodeLister
}
func (r *resoureReconcile) reconcile(ctx context.Context, cluster *clusterv1.ManagedCluster) (*clusterv1.ManagedCluster, reconcileState, error) {
// check the kube-apiserver health on managed cluster.
condition := r.checkKubeAPIServerStatus(ctx)
// the managed cluster kube-apiserver is health, update its version and resources if necessary.
if condition.Status == metav1.ConditionTrue {
clusterVersion, err := r.getClusterVersion()
if err != nil {
return cluster, reconcileStop, fmt.Errorf("unable to get server version of managed cluster %q: %w", cluster.Name, err)
}
capacity, allocatable, err := r.getClusterResources()
if err != nil {
return cluster, reconcileStop, fmt.Errorf("unable to get capacity and allocatable of managed cluster %q: %w", cluster.Name, err)
}
cluster.Status.Capacity = capacity
cluster.Status.Allocatable = allocatable
cluster.Status.Version = *clusterVersion
}
meta.SetStatusCondition(&cluster.Status.Conditions, condition)
return cluster, reconcileContinue, nil
}
// using readyz api to check the status of kube apiserver
func (r *resoureReconcile) checkKubeAPIServerStatus(ctx context.Context) metav1.Condition {
statusCode := 0
condition := metav1.Condition{Type: clusterv1.ManagedClusterConditionAvailable}
result := r.managedClusterDiscoveryClient.RESTClient().Get().AbsPath("/livez").Do(ctx).StatusCode(&statusCode)
if statusCode == http.StatusOK {
condition.Status = metav1.ConditionTrue
condition.Reason = "ManagedClusterAvailable"
condition.Message = "Managed cluster is available"
return condition
}
// for backward compatible, the livez endpoint is supported from Kubernetes 1.16, so if the livez is not found or
// forbidden, the healthz endpoint will be used.
if statusCode == http.StatusNotFound || statusCode == http.StatusForbidden {
result = r.managedClusterDiscoveryClient.RESTClient().Get().AbsPath("/healthz").Do(ctx).StatusCode(&statusCode)
if statusCode == http.StatusOK {
condition.Status = metav1.ConditionTrue
condition.Reason = "ManagedClusterAvailable"
condition.Message = "Managed cluster is available"
return condition
}
}
condition.Status = metav1.ConditionFalse
condition.Reason = "ManagedClusterKubeAPIServerUnavailable"
body, err := result.Raw()
if err == nil {
condition.Message = fmt.Sprintf("The kube-apiserver is not ok, status code: %d, %v", statusCode, string(body))
return condition
}
condition.Message = fmt.Sprintf("The kube-apiserver is not ok, status code: %d, %v", statusCode, err)
return condition
}
func (r *resoureReconcile) getClusterVersion() (*clusterv1.ManagedClusterVersion, error) {
serverVersion, err := r.managedClusterDiscoveryClient.ServerVersion()
if err != nil {
return nil, err
}
return &clusterv1.ManagedClusterVersion{Kubernetes: serverVersion.String()}, nil
}
func (r *resoureReconcile) getClusterResources() (capacity, allocatable clusterv1.ResourceList, err error) {
nodes, err := r.nodeLister.List(labels.Everything())
if err != nil {
return nil, nil, err
}
capacityList := make(map[clusterv1.ResourceName]resource.Quantity)
allocatableList := make(map[clusterv1.ResourceName]resource.Quantity)
for _, node := range nodes {
for key, value := range node.Status.Capacity {
if capacity, exist := capacityList[clusterv1.ResourceName(key)]; exist {
capacity.Add(value)
capacityList[clusterv1.ResourceName(key)] = capacity
} else {
capacityList[clusterv1.ResourceName(key)] = value
}
}
// the node is unschedulable, ignore its allocatable resources
if node.Spec.Unschedulable {
continue
}
for key, value := range node.Status.Allocatable {
if allocatable, exist := allocatableList[clusterv1.ResourceName(key)]; exist {
allocatable.Add(value)
allocatableList[clusterv1.ResourceName(key)] = allocatable
} else {
allocatableList[clusterv1.ResourceName(key)] = value
}
}
}
return capacityList, allocatableList, nil
}

View File

@@ -3,6 +3,7 @@ package managedcluster
import (
"context"
"encoding/json"
"github.com/openshift/library-go/pkg/operator/events/eventstesting"
"net/http"
"net/http/httptest"
"testing"
@@ -31,8 +32,13 @@ type serverResponse struct {
responseMsg string
}
func TestHealthCheck(t *testing.T) {
serverResponse := &serverResponse{}
func newDiscoveryServer(t *testing.T, resp *serverResponse) (*httptest.Server, *discovery.DiscoveryClient) {
serverResponse := &serverResponse{
httpStatus: http.StatusOK,
}
if resp != nil {
serverResponse = resp
}
apiServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
if req.URL.Path == "/healthz" {
w.WriteHeader(http.StatusOK)
@@ -60,9 +66,14 @@ func TestHealthCheck(t *testing.T) {
t.Fatal(err)
}
}))
defer apiServer.Close()
discoveryClient := discovery.NewDiscoveryClientForConfigOrDie(&rest.Config{Host: apiServer.URL})
return apiServer, discoveryClient
}
func TestHealthCheck(t *testing.T) {
serverResponse := &serverResponse{}
apiServer, discoveryClient := newDiscoveryServer(t, serverResponse)
defer apiServer.Close()
cases := []struct {
name string
@@ -91,8 +102,8 @@ func TestHealthCheck(t *testing.T) {
Reason: "ManagedClusterKubeAPIServerUnavailable",
Message: "The kube-apiserver is not ok, status code: 500, an error on the server (\"internal server error\") has prevented the request from succeeding",
}
testingcommon.AssertActions(t, actions, "get", "patch")
patch := actions[1].(clienttesting.PatchAction).GetPatch()
testingcommon.AssertActions(t, actions, "patch")
patch := actions[0].(clienttesting.PatchAction).GetPatch()
managedCluster := &clusterv1.ManagedCluster{}
err := json.Unmarshal(patch, managedCluster)
if err != nil {
@@ -128,8 +139,8 @@ func TestHealthCheck(t *testing.T) {
clusterv1.ResourceMemory: *resource.NewQuantity(int64(1024*1024*32), resource.BinarySI),
},
}
testingcommon.AssertActions(t, actions, "get", "patch")
patch := actions[1].(clienttesting.PatchAction).GetPatch()
testingcommon.AssertActions(t, actions, "patch")
patch := actions[0].(clienttesting.PatchAction).GetPatch()
managedCluster := &clusterv1.ManagedCluster{}
err := json.Unmarshal(patch, managedCluster)
if err != nil {
@@ -151,8 +162,8 @@ func TestHealthCheck(t *testing.T) {
Reason: "ManagedClusterAvailable",
Message: "Managed cluster is available",
}
testingcommon.AssertActions(t, actions, "get", "patch")
patch := actions[1].(clienttesting.PatchAction).GetPatch()
testingcommon.AssertActions(t, actions, "patch")
patch := actions[0].(clienttesting.PatchAction).GetPatch()
managedCluster := &clusterv1.ManagedCluster{}
err := json.Unmarshal(patch, managedCluster)
if err != nil {
@@ -173,8 +184,8 @@ func TestHealthCheck(t *testing.T) {
Reason: "ManagedClusterAvailable",
Message: "Managed cluster is available",
}
testingcommon.AssertActions(t, actions, "get", "patch")
patch := actions[1].(clienttesting.PatchAction).GetPatch()
testingcommon.AssertActions(t, actions, "patch")
patch := actions[0].(clienttesting.PatchAction).GetPatch()
managedCluster := &clusterv1.ManagedCluster{}
err := json.Unmarshal(patch, managedCluster)
if err != nil {
@@ -220,8 +231,8 @@ func TestHealthCheck(t *testing.T) {
clusterv1.ResourceMemory: *resource.NewQuantity(int64(1024*1024*64), resource.BinarySI),
},
}
testingcommon.AssertActions(t, actions, "get", "patch")
patch := actions[1].(clienttesting.PatchAction).GetPatch()
testingcommon.AssertActions(t, actions, "patch")
patch := actions[0].(clienttesting.PatchAction).GetPatch()
managedCluster := &clusterv1.ManagedCluster{}
err := json.Unmarshal(patch, managedCluster)
if err != nil {
@@ -254,14 +265,16 @@ func TestHealthCheck(t *testing.T) {
serverResponse.httpStatus = c.httpStatus
serverResponse.responseMsg = c.responseMsg
ctrl := &managedClusterStatusController{
clusterName: testinghelpers.TestManagedClusterName,
hubClusterClient: clusterClient,
hubClusterLister: clusterInformerFactory.Cluster().V1().ManagedClusters().Lister(),
managedClusterDiscoveryClient: discoveryClient,
nodeLister: kubeInformerFactory.Core().V1().Nodes().Lister(),
}
ctrl := newManagedClusterStatusController(
testinghelpers.TestManagedClusterName,
clusterClient,
clusterInformerFactory.Cluster().V1().ManagedClusters(),
discoveryClient,
clusterInformerFactory.Cluster().V1alpha1().ClusterClaims(),
kubeInformerFactory.Core().V1().Nodes(),
20,
eventstesting.NewTestingEventRecorder(t),
)
syncErr := ctrl.sync(context.TODO(), testingcommon.NewFakeSyncContext(t, ""))
testingcommon.AssertError(t, syncErr, c.expectedErr)

View File

@@ -3,197 +3,118 @@ package managedcluster
import (
"context"
"fmt"
"net/http"
"k8s.io/apimachinery/pkg/util/errors"
clusterv1alpha1informer "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1alpha1"
"open-cluster-management.io/ocm/pkg/common/patcher"
"time"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
clientset "open-cluster-management.io/api/client/cluster/clientset/versioned"
clusterv1informer "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1"
clusterv1listers "open-cluster-management.io/api/client/cluster/listers/cluster/v1"
clusterv1 "open-cluster-management.io/api/cluster/v1"
"open-cluster-management.io/ocm/pkg/registration/helpers"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
discovery "k8s.io/client-go/discovery"
corev1informers "k8s.io/client-go/informers/core/v1"
corev1lister "k8s.io/client-go/listers/core/v1"
)
// managedClusterStatusController checks the kube-apiserver health on managed cluster to determine it whether is available
// and ensure that the managed cluster resources and version are up to date.
type managedClusterStatusController struct {
clusterName string
hubClusterClient clientset.Interface
hubClusterLister clusterv1listers.ManagedClusterLister
managedClusterDiscoveryClient discovery.DiscoveryInterface
nodeLister corev1lister.NodeLister
clusterName string
reconcilers []statusReconcile
patcher patcher.Patcher[*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus]
hubClusterLister clusterv1listers.ManagedClusterLister
}
type statusReconcile interface {
reconcile(ctx context.Context, cm *clusterv1.ManagedCluster) (*clusterv1.ManagedCluster, reconcileState, error)
}
type reconcileState int64
const (
reconcileStop reconcileState = iota
reconcileContinue
)
// NewManagedClusterStatusController creates a managed cluster status controller on managed cluster.
func NewManagedClusterStatusController(
clusterName string,
hubClusterClient clientset.Interface,
hubClusterInformer clusterv1informer.ManagedClusterInformer,
managedClusterDiscoveryClient discovery.DiscoveryInterface,
claimInformer clusterv1alpha1informer.ClusterClaimInformer,
nodeInformer corev1informers.NodeInformer,
maxCustomClusterClaims int,
resyncInterval time.Duration,
recorder events.Recorder) factory.Controller {
c := &managedClusterStatusController{
clusterName: clusterName,
hubClusterClient: hubClusterClient,
hubClusterLister: hubClusterInformer.Lister(),
managedClusterDiscoveryClient: managedClusterDiscoveryClient,
nodeLister: nodeInformer.Lister(),
}
c := newManagedClusterStatusController(
clusterName,
hubClusterClient,
hubClusterInformer,
managedClusterDiscoveryClient,
claimInformer,
nodeInformer,
maxCustomClusterClaims,
recorder,
)
return factory.New().
WithInformers(hubClusterInformer.Informer(), nodeInformer.Informer()).
WithInformers(hubClusterInformer.Informer(), nodeInformer.Informer(), claimInformer.Informer()).
WithSync(c.sync).
ResyncEvery(resyncInterval).
ToController("ManagedClusterStatusController", recorder)
}
func newManagedClusterStatusController(
clusterName string,
hubClusterClient clientset.Interface,
hubClusterInformer clusterv1informer.ManagedClusterInformer,
managedClusterDiscoveryClient discovery.DiscoveryInterface,
claimInformer clusterv1alpha1informer.ClusterClaimInformer,
nodeInformer corev1informers.NodeInformer,
maxCustomClusterClaims int,
recorder events.Recorder) *managedClusterStatusController {
return &managedClusterStatusController{
clusterName: clusterName,
patcher: patcher.NewPatcher[
*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus](
hubClusterClient.ClusterV1().ManagedClusters()),
reconcilers: []statusReconcile{
&joiningReconcile{recorder: recorder},
&resoureReconcile{managedClusterDiscoveryClient: managedClusterDiscoveryClient, nodeLister: nodeInformer.Lister()},
&claimReconcile{claimLister: claimInformer.Lister(), recorder: recorder, maxCustomClusterClaims: maxCustomClusterClaims},
},
hubClusterLister: hubClusterInformer.Lister(),
}
}
// sync updates managed cluster available condition by checking kube-apiserver health on managed cluster.
// if the kube-apiserver is health, it will ensure that managed cluster resources and version are up to date.
func (c *managedClusterStatusController) sync(ctx context.Context, syncCtx factory.SyncContext) error {
if _, err := c.hubClusterLister.Get(c.clusterName); err != nil {
cluster, err := c.hubClusterLister.Get(c.clusterName)
if err != nil {
return fmt.Errorf("unable to get managed cluster %q from hub: %w", c.clusterName, err)
}
updateStatusFuncs := []helpers.UpdateManagedClusterStatusFunc{}
// check the kube-apiserver health on managed cluster.
condition := c.checkKubeAPIServerStatus(ctx)
// the managed cluster kube-apiserver is health, update its version and resources if necessary.
if condition.Status == metav1.ConditionTrue {
clusterVersion, err := c.getClusterVersion()
newCluster := cluster.DeepCopy()
var errs []error
for _, reconciler := range c.reconcilers {
var state reconcileState
newCluster, state, err = reconciler.reconcile(ctx, newCluster)
if err != nil {
return fmt.Errorf("unable to get server version of managed cluster %q: %w", c.clusterName, err)
errs = append(errs, err)
}
capacity, allocatable, err := c.getClusterResources()
if err != nil {
return fmt.Errorf("unable to get capacity and allocatable of managed cluster %q: %w", c.clusterName, err)
if state == reconcileStop {
break
}
updateStatusFuncs = append(updateStatusFuncs, updateClusterResourcesFn(clusterv1.ManagedClusterStatus{
Capacity: capacity,
Allocatable: allocatable,
Version: *clusterVersion,
}))
}
updateStatusFuncs = append(updateStatusFuncs, helpers.UpdateManagedClusterConditionFn(condition))
_, updated, err := helpers.UpdateManagedClusterStatus(ctx, c.hubClusterClient, c.clusterName, updateStatusFuncs...)
if err != nil {
return fmt.Errorf("unable to update status of managed cluster %q: %w", c.clusterName, err)
}
if updated {
syncCtx.Recorder().Eventf("ManagedClusterStatusUpdated", "the status of managed cluster %q has been updated, available condition is %q, due to %q",
c.clusterName, condition.Status, condition.Message)
}
return nil
}
// using readyz api to check the status of kube apiserver
func (c *managedClusterStatusController) checkKubeAPIServerStatus(ctx context.Context) metav1.Condition {
statusCode := 0
condition := metav1.Condition{Type: clusterv1.ManagedClusterConditionAvailable}
result := c.managedClusterDiscoveryClient.RESTClient().Get().AbsPath("/livez").Do(ctx).StatusCode(&statusCode)
if statusCode == http.StatusOK {
condition.Status = metav1.ConditionTrue
condition.Reason = "ManagedClusterAvailable"
condition.Message = "Managed cluster is available"
return condition
}
// for backward compatible, the livez endpoint is supported from Kubernetes 1.16, so if the livez is not found or
// forbidden, the healthz endpoint will be used.
if statusCode == http.StatusNotFound || statusCode == http.StatusForbidden {
result = c.managedClusterDiscoveryClient.RESTClient().Get().AbsPath("/healthz").Do(ctx).StatusCode(&statusCode)
if statusCode == http.StatusOK {
condition.Status = metav1.ConditionTrue
condition.Reason = "ManagedClusterAvailable"
condition.Message = "Managed cluster is available"
return condition
}
}
condition.Status = metav1.ConditionFalse
condition.Reason = "ManagedClusterKubeAPIServerUnavailable"
body, err := result.Raw()
if err == nil {
condition.Message = fmt.Sprintf("The kube-apiserver is not ok, status code: %d, %v", statusCode, string(body))
return condition
}
condition.Message = fmt.Sprintf("The kube-apiserver is not ok, status code: %d, %v", statusCode, err)
return condition
}
func (c *managedClusterStatusController) getClusterVersion() (*clusterv1.ManagedClusterVersion, error) {
serverVersion, err := c.managedClusterDiscoveryClient.ServerVersion()
if err != nil {
return nil, err
}
return &clusterv1.ManagedClusterVersion{Kubernetes: serverVersion.String()}, nil
}
func (c *managedClusterStatusController) getClusterResources() (capacity, allocatable clusterv1.ResourceList, err error) {
nodes, err := c.nodeLister.List(labels.Everything())
if err != nil {
return nil, nil, err
}
capacityList := make(map[clusterv1.ResourceName]resource.Quantity)
allocatableList := make(map[clusterv1.ResourceName]resource.Quantity)
for _, node := range nodes {
for key, value := range node.Status.Capacity {
if capacity, exist := capacityList[clusterv1.ResourceName(key)]; exist {
capacity.Add(value)
capacityList[clusterv1.ResourceName(key)] = capacity
} else {
capacityList[clusterv1.ResourceName(key)] = value
}
}
// the node is unschedulable, ignore its allocatable resources
if node.Spec.Unschedulable {
continue
}
for key, value := range node.Status.Allocatable {
if allocatable, exist := allocatableList[clusterv1.ResourceName(key)]; exist {
allocatable.Add(value)
allocatableList[clusterv1.ResourceName(key)] = allocatable
} else {
allocatableList[clusterv1.ResourceName(key)] = value
}
}
}
return capacityList, allocatableList, nil
}
func updateClusterResourcesFn(status clusterv1.ManagedClusterStatus) helpers.UpdateManagedClusterStatusFunc {
return func(oldStatus *clusterv1.ManagedClusterStatus) error {
// merge the old capacity to new capacity, if one old capacity entry does not exist in new capacity,
// we add it back to new capacity
for key, val := range oldStatus.Capacity {
if _, ok := status.Capacity[key]; !ok {
status.Capacity[key] = val
continue
}
}
oldStatus.Capacity = status.Capacity
oldStatus.Allocatable = status.Allocatable
oldStatus.Version = status.Version
return nil
if _, err := c.patcher.PatchStatus(ctx, newCluster, newCluster.Status, cluster.Status); err != nil {
errs = append(errs, err)
}
return errors.NewAggregate(errs)
}

View File

@@ -1,4 +1,4 @@
package managedcluster
package registration
import (
"context"

View File

@@ -1,4 +1,4 @@
package managedcluster
package registration
import (
"context"

View File

@@ -1,8 +1,11 @@
package managedcluster
package registration
import (
"crypto/x509/pkix"
"fmt"
"k8s.io/apimachinery/pkg/api/errors"
clusterv1listers "open-cluster-management.io/api/client/cluster/listers/cluster/v1"
"open-cluster-management.io/ocm/pkg/common/patcher"
"strings"
addonv1alpha1 "open-cluster-management.io/api/addon/v1alpha1"
@@ -22,7 +25,6 @@ import (
clientset "open-cluster-management.io/api/client/cluster/clientset/versioned"
"open-cluster-management.io/ocm/pkg/registration/clientcert"
"open-cluster-management.io/ocm/pkg/registration/helpers"
"open-cluster-management.io/ocm/pkg/registration/hub/user"
)
@@ -143,13 +145,22 @@ func GenerateBootstrapStatusUpdater() clientcert.StatusUpdateFunc {
}
// GenerateStatusUpdater generates status update func for the certificate management
func GenerateStatusUpdater(hubClusterClient clientset.Interface, clusterName string) clientcert.StatusUpdateFunc {
func GenerateStatusUpdater(hubClusterClient clientset.Interface, hubClusterLister clusterv1listers.ManagedClusterLister, clusterName string) clientcert.StatusUpdateFunc {
return func(ctx context.Context, cond metav1.Condition) error {
_, _, updatedErr := helpers.UpdateManagedClusterStatus(
ctx, hubClusterClient, clusterName, helpers.UpdateManagedClusterConditionFn(cond),
)
return updatedErr
cluster, err := hubClusterLister.Get(clusterName)
if errors.IsNotFound(err) {
return nil
}
if err != nil {
return err
}
newCluster := cluster.DeepCopy()
meta.SetStatusCondition(&newCluster.Status.Conditions, cond)
patcher := patcher.NewPatcher[
*clusterv1.ManagedCluster, clusterv1.ManagedClusterSpec, clusterv1.ManagedClusterStatus](
hubClusterClient.ClusterV1().ManagedClusters())
_, err = patcher.PatchStatus(ctx, newCluster, newCluster.Status, cluster.Status)
return err
}
}

View File

@@ -1,4 +1,4 @@
package managedcluster
package registration
import (
"testing"

View File

@@ -1,4 +1,4 @@
package managedcluster
package registration
import (
"bytes"

View File

@@ -1,4 +1,4 @@
package managedcluster
package registration
import (
"context"

View File

@@ -4,6 +4,8 @@ import (
"context"
"errors"
"fmt"
"open-cluster-management.io/ocm/pkg/registration/spoke/lease"
"open-cluster-management.io/ocm/pkg/registration/spoke/registration"
"os"
"path"
"time"
@@ -164,7 +166,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
}
// start a SpokeClusterCreatingController to make sure there is a spoke cluster on hub cluster
spokeClusterCreatingController := managedcluster.NewManagedClusterCreatingController(
spokeClusterCreatingController := registration.NewManagedClusterCreatingController(
o.AgentOptions.SpokeClusterName, o.SpokeExternalServerURLs,
spokeClusterCABundle,
bootstrapClusterClient,
@@ -172,7 +174,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
)
go spokeClusterCreatingController.Run(ctx, 1)
hubKubeconfigSecretController := managedcluster.NewHubKubeconfigSecretController(
hubKubeconfigSecretController := registration.NewHubKubeconfigSecretController(
o.HubKubeconfigDir, o.ComponentNamespace, o.HubKubeconfigSecret,
// the hub kubeconfig secret stored in the cluster where the agent pod runs
managementKubeClient.CoreV1(),
@@ -212,7 +214,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
}
controllerName := fmt.Sprintf("BootstrapClientCertController@cluster:%s", o.AgentOptions.SpokeClusterName)
clientCertForHubController := managedcluster.NewClientCertForHubController(
clientCertForHubController := registration.NewClientCertForHubController(
o.AgentOptions.SpokeClusterName, o.AgentName, o.ComponentNamespace, o.HubKubeconfigSecret,
kubeconfigData,
// store the secret in the cluster where the agent pod runs
@@ -220,7 +222,7 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
csrControl,
o.ClientCertExpirationSeconds,
managementKubeClient,
managedcluster.GenerateBootstrapStatusUpdater(),
registration.GenerateBootstrapStatusUpdater(),
controllerContext.EventRecorder,
controllerName,
)
@@ -299,14 +301,17 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
// create another ClientCertForHubController for client certificate rotation
controllerName := fmt.Sprintf("ClientCertController@cluster:%s", o.AgentOptions.SpokeClusterName)
clientCertForHubController := managedcluster.NewClientCertForHubController(
clientCertForHubController := registration.NewClientCertForHubController(
o.AgentOptions.SpokeClusterName, o.AgentName, o.ComponentNamespace, o.HubKubeconfigSecret,
kubeconfigData,
namespacedManagementKubeInformerFactory.Core().V1().Secrets(),
csrControl,
o.ClientCertExpirationSeconds,
managementKubeClient,
managedcluster.GenerateStatusUpdater(hubClusterClient, o.AgentOptions.SpokeClusterName),
registration.GenerateStatusUpdater(
hubClusterClient,
hubClusterInformerFactory.Cluster().V1().ManagedClusters().Lister(),
o.AgentOptions.SpokeClusterName),
controllerContext.EventRecorder,
controllerName,
)
@@ -314,50 +319,32 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
return err
}
// create ManagedClusterJoiningController to reconcile instances of ManagedCluster on the managed cluster
managedClusterJoiningController := managedcluster.NewManagedClusterJoiningController(
o.AgentOptions.SpokeClusterName,
hubClusterClient,
hubClusterInformerFactory.Cluster().V1().ManagedClusters(),
controllerContext.EventRecorder,
)
// create ManagedClusterLeaseController to keep the spoke cluster heartbeat
managedClusterLeaseController := managedcluster.NewManagedClusterLeaseController(
managedClusterLeaseController := lease.NewManagedClusterLeaseController(
o.AgentOptions.SpokeClusterName,
hubKubeClient,
hubClusterInformerFactory.Cluster().V1().ManagedClusters(),
controllerContext.EventRecorder,
)
spokeClusterClient, err := clusterv1client.NewForConfig(spokeClientConfig)
if err != nil {
return err
}
spokeClusterInformerFactory := clusterv1informers.NewSharedInformerFactory(spokeClusterClient, 10*time.Minute)
// create NewManagedClusterStatusController to update the spoke cluster status
managedClusterHealthCheckController := managedcluster.NewManagedClusterStatusController(
o.AgentOptions.SpokeClusterName,
hubClusterClient,
hubClusterInformerFactory.Cluster().V1().ManagedClusters(),
spokeKubeClient.Discovery(),
spokeClusterInformerFactory.Cluster().V1alpha1().ClusterClaims(),
spokeKubeInformerFactory.Core().V1().Nodes(),
o.MaxCustomClusterClaims,
o.ClusterHealthCheckPeriod,
controllerContext.EventRecorder,
)
spokeClusterClient, err := clusterv1client.NewForConfig(spokeClientConfig)
if err != nil {
return err
}
spokeClusterInformerFactory := clusterv1informers.NewSharedInformerFactory(spokeClusterClient, 10*time.Minute)
var managedClusterClaimController factory.Controller
if features.DefaultSpokeRegistrationMutableFeatureGate.Enabled(ocmfeature.ClusterClaim) {
// create managedClusterClaimController to sync cluster claims
managedClusterClaimController = managedcluster.NewManagedClusterClaimController(
o.AgentOptions.SpokeClusterName,
o.MaxCustomClusterClaims,
hubClusterClient,
hubClusterInformerFactory.Cluster().V1().ManagedClusters(),
spokeClusterInformerFactory.Cluster().V1alpha1().ClusterClaims(),
controllerContext.EventRecorder,
)
}
var addOnLeaseController factory.Controller
var addOnRegistrationController factory.Controller
@@ -390,16 +377,14 @@ func (o *SpokeAgentOptions) RunSpokeAgent(ctx context.Context, controllerContext
go hubClusterInformerFactory.Start(ctx.Done())
go spokeKubeInformerFactory.Start(ctx.Done())
go namespacedManagementKubeInformerFactory.Start(ctx.Done())
go spokeClusterInformerFactory.Start(ctx.Done())
go addOnInformerFactory.Start(ctx.Done())
if features.DefaultSpokeRegistrationMutableFeatureGate.Enabled(ocmfeature.ClusterClaim) {
go spokeClusterInformerFactory.Start(ctx.Done())
}
go clientCertForHubController.Run(ctx, 1)
go managedClusterJoiningController.Run(ctx, 1)
go managedClusterLeaseController.Run(ctx, 1)
go managedClusterHealthCheckController.Run(ctx, 1)
if features.DefaultSpokeRegistrationMutableFeatureGate.Enabled(ocmfeature.ClusterClaim) {
go managedClusterClaimController.Run(ctx, 1)
}
if features.DefaultSpokeRegistrationMutableFeatureGate.Enabled(ocmfeature.AddonManagement) {
go addOnLeaseController.Run(ctx, 1)
go addOnRegistrationController.Run(ctx, 1)
@@ -474,7 +459,7 @@ func (o *SpokeAgentOptions) Complete(coreV1Client corev1client.CoreV1Interface,
}
// dump data in hub kubeconfig secret into file system if it exists
err = managedcluster.DumpSecret(coreV1Client, o.ComponentNamespace, o.HubKubeconfigSecret,
err = registration.DumpSecret(coreV1Client, o.ComponentNamespace, o.HubKubeconfigSecret,
o.HubKubeconfigDir, ctx, recorder)
if err != nil {
return err
@@ -527,7 +512,7 @@ func (o *SpokeAgentOptions) hasValidHubClientConfig() (bool, error) {
}
// check if the tls certificate is issued for the current cluster/agent
clusterName, agentName, err := managedcluster.GetClusterAgentNamesFromCertificate(certData)
clusterName, agentName, err := registration.GetClusterAgentNamesFromCertificate(certData)
if err != nil {
return false, nil
}
@@ -559,7 +544,7 @@ func (o *SpokeAgentOptions) getOrGenerateClusterAgentNames() (string, string) {
certPath := path.Join(o.HubKubeconfigDir, clientcert.TLSCertFile)
certData, certErr := os.ReadFile(path.Clean(certPath))
if certErr == nil {
clusterNameInCert, agentNameInCert, _ = managedcluster.GetClusterAgentNamesFromCertificate(certData)
clusterNameInCert, agentNameInCert, _ = registration.GetClusterAgentNamesFromCertificate(certData)
}
clusterName := o.AgentOptions.SpokeClusterName