mirror of
https://github.com/open-cluster-management-io/ocm.git
synced 2026-02-14 18:09:57 +00:00
✨ Added support for about-api for cluster properties (#1006)
Some checks failed
Post / coverage (push) Failing after 33m23s
Post / images (amd64) (push) Failing after 8m28s
Post / images (arm64) (push) Failing after 7m59s
Post / image manifest (push) Has been skipped
Post / trigger clusteradm e2e (push) Has been skipped
Scorecard supply-chain security / Scorecard analysis (push) Failing after 1m20s
Close stale issues and PRs / stale (push) Successful in 57s
Some checks failed
Post / coverage (push) Failing after 33m23s
Post / images (amd64) (push) Failing after 8m28s
Post / images (arm64) (push) Failing after 7m59s
Post / image manifest (push) Has been skipped
Post / trigger clusteradm e2e (push) Has been skipped
Scorecard supply-chain security / Scorecard analysis (push) Failing after 1m20s
Close stale issues and PRs / stale (push) Successful in 57s
* Added support for about-api for cluster properties Signed-off-by: gnana997 <gnana097@gmail.com> * refactored failing registration test cases Signed-off-by: gnana997 <gnana097@gmail.com> * Added new fake classes and test cases Signed-off-by: gnana997 <gnana097@gmail.com> * Refactored test cases and vendors Signed-off-by: gnana997 <gnana097@gmail.com> * updated the open-cluster api package and updated cluster property Signed-off-by: gnana997 <gnana097@gmail.com> * Refactored the pr with just registration details and crds Signed-off-by: gnana997 <gnana097@gmail.com> * Fix fake client Signed-off-by: Jian Qiu <jqiu@redhat.com> * Add integration test for clusterproperty Signed-off-by: Jian Qiu <jqiu@redhat.com> --------- Signed-off-by: gnana997 <gnana097@gmail.com> Signed-off-by: Jian Qiu <jqiu@redhat.com> Co-authored-by: gnana997 <gnana097@gmail.com>
This commit is contained in:
@@ -11,6 +11,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/selection"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
aboutv1alpha1listers "sigs.k8s.io/about-api/pkg/generated/listers/apis/v1alpha1"
|
||||
|
||||
clusterv1alpha1listers "open-cluster-management.io/api/client/cluster/listers/cluster/v1alpha1"
|
||||
clusterv1 "open-cluster-management.io/api/cluster/v1"
|
||||
@@ -25,14 +26,12 @@ const labelCustomizedOnly = "open-cluster-management.io/spoke-only"
|
||||
type claimReconcile struct {
|
||||
recorder events.Recorder
|
||||
claimLister clusterv1alpha1listers.ClusterClaimLister
|
||||
aboutLister aboutv1alpha1listers.ClusterPropertyLister
|
||||
maxCustomClusterClaims int
|
||||
reservedClusterClaimSuffixes []string
|
||||
}
|
||||
|
||||
func (r *claimReconcile) reconcile(ctx context.Context, cluster *clusterv1.ManagedCluster) (*clusterv1.ManagedCluster, reconcileState, error) {
|
||||
if !features.SpokeMutableFeatureGate.Enabled(ocmfeature.ClusterClaim) {
|
||||
return cluster, reconcileContinue, nil
|
||||
}
|
||||
// current managed cluster has not joined the hub yet, do nothing.
|
||||
if !meta.IsStatusConditionTrue(cluster.Status.Conditions, clusterv1.ManagedClusterConditionJoined) {
|
||||
r.recorder.Eventf("ManagedClusterIsNotAccepted", "Managed cluster %q does not join the hub yet", cluster.Name)
|
||||
@@ -46,31 +45,61 @@ func (r *claimReconcile) reconcile(ctx context.Context, cluster *clusterv1.Manag
|
||||
// exposeClaims saves cluster claims fetched on managed cluster into status of the
|
||||
// managed cluster on hub. Some of the customized claims might not be exposed once
|
||||
// the total number of the claims exceeds the value of `cluster-claims-max`.
|
||||
func (r *claimReconcile) exposeClaims(ctx context.Context, cluster *clusterv1.ManagedCluster) error {
|
||||
func (r *claimReconcile) exposeClaims(_ context.Context, cluster *clusterv1.ManagedCluster) error {
|
||||
var reservedClaims, customClaims []clusterv1.ManagedClusterClaim
|
||||
var clusterClaims []*clusterv1alpha1.ClusterClaim
|
||||
claimsMap := map[string]clusterv1.ManagedClusterClaim{}
|
||||
|
||||
// clusterClaim with label `open-cluster-management.io/spoke-only` will not be synced to managedCluster.Status at hub.
|
||||
requirement, _ := labels.NewRequirement(labelCustomizedOnly, selection.DoesNotExist, []string{})
|
||||
selector := labels.NewSelector().Add(*requirement)
|
||||
clusterClaims, err := r.claimLister.List(selector)
|
||||
requirement, err := labels.NewRequirement(labelCustomizedOnly, selection.DoesNotExist, []string{})
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to list cluster claims: %w", err)
|
||||
return err
|
||||
}
|
||||
selector := labels.NewSelector().Add(*requirement)
|
||||
|
||||
if features.SpokeMutableFeatureGate.Enabled(ocmfeature.ClusterProperty) {
|
||||
clusterProperties, err := r.aboutLister.List(selector)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to list cluster properties: %w", err)
|
||||
}
|
||||
|
||||
for _, property := range clusterProperties {
|
||||
claimsMap[property.Name] = clusterv1.ManagedClusterClaim{
|
||||
Name: property.Name,
|
||||
Value: property.Spec.Value,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if features.SpokeMutableFeatureGate.Enabled(ocmfeature.ClusterClaim) {
|
||||
clusterClaims, err = r.claimLister.List(selector)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to list cluster claims: %w", err)
|
||||
}
|
||||
|
||||
for _, claim := range clusterClaims {
|
||||
// if the claim has the same name with the property, ignore it.
|
||||
if _, ok := claimsMap[claim.Name]; !ok {
|
||||
claimsMap[claim.Name] = clusterv1.ManagedClusterClaim{
|
||||
Name: claim.Name,
|
||||
Value: claim.Spec.Value,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// check if the cluster claim is one of the reserved claims or has a reserved suffix.
|
||||
// if so, it will be treated as a reserved claim and will always be exposed.
|
||||
reservedClaimNames := sets.New(clusterv1alpha1.ReservedClusterClaimNames[:]...)
|
||||
reservedClaimSuffixes := sets.New(r.reservedClusterClaimSuffixes...)
|
||||
for _, clusterClaim := range clusterClaims {
|
||||
managedClusterClaim := clusterv1.ManagedClusterClaim{
|
||||
Name: clusterClaim.Name,
|
||||
Value: clusterClaim.Spec.Value,
|
||||
}
|
||||
|
||||
for _, managedClusterClaim := range claimsMap {
|
||||
if matchReservedClaims(reservedClaimNames, reservedClaimSuffixes, managedClusterClaim) {
|
||||
reservedClaims = append(reservedClaims, managedClusterClaim)
|
||||
// reservedClaimNames.Insert(managedClusterClaim.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
customClaims = append(customClaims, managedClusterClaim)
|
||||
}
|
||||
|
||||
|
||||
@@ -12,9 +12,11 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
kubeinformers "k8s.io/client-go/informers"
|
||||
fakekube "k8s.io/client-go/kubernetes/fake"
|
||||
kubefake "k8s.io/client-go/kubernetes/fake"
|
||||
clienttesting "k8s.io/client-go/testing"
|
||||
aboutv1alpha1 "sigs.k8s.io/about-api/pkg/apis/v1alpha1"
|
||||
aboutclusterfake "sigs.k8s.io/about-api/pkg/generated/clientset/versioned/fake"
|
||||
aboutinformers "sigs.k8s.io/about-api/pkg/generated/informers/externalversions"
|
||||
|
||||
clusterfake "open-cluster-management.io/api/client/cluster/clientset/versioned/fake"
|
||||
clusterscheme "open-cluster-management.io/api/client/cluster/clientset/versioned/scheme"
|
||||
@@ -37,6 +39,7 @@ func TestSync(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
cluster runtime.Object
|
||||
properties []runtime.Object
|
||||
claims []runtime.Object
|
||||
validateActions func(t *testing.T, actions []clienttesting.Action)
|
||||
expectedErr string
|
||||
@@ -55,6 +58,16 @@ func TestSync(t *testing.T) {
|
||||
{
|
||||
name: "sync a joined managed cluster",
|
||||
cluster: testinghelpers.NewJoinedManagedCluster(),
|
||||
properties: []runtime.Object{
|
||||
&aboutv1alpha1.ClusterProperty{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "name",
|
||||
},
|
||||
Spec: aboutv1alpha1.ClusterPropertySpec{
|
||||
Value: "test",
|
||||
},
|
||||
},
|
||||
},
|
||||
claims: []runtime.Object{
|
||||
&clusterv1alpha1.ClusterClaim{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
@@ -78,6 +91,65 @@ func TestSync(t *testing.T) {
|
||||
Name: "a",
|
||||
Value: "b",
|
||||
},
|
||||
{
|
||||
Name: "name",
|
||||
Value: "test",
|
||||
},
|
||||
}
|
||||
actual := cluster.Status.ClusterClaims
|
||||
if !reflect.DeepEqual(actual, expected) {
|
||||
t.Errorf("expected cluster claim %v but got: %v", expected, actual)
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "sync a joined managed cluster with same property and claims",
|
||||
cluster: testinghelpers.NewJoinedManagedCluster(),
|
||||
properties: []runtime.Object{
|
||||
&aboutv1alpha1.ClusterProperty{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "key1",
|
||||
},
|
||||
Spec: aboutv1alpha1.ClusterPropertySpec{
|
||||
Value: "value1",
|
||||
},
|
||||
},
|
||||
&aboutv1alpha1.ClusterProperty{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "key2",
|
||||
},
|
||||
Spec: aboutv1alpha1.ClusterPropertySpec{
|
||||
Value: "value2",
|
||||
},
|
||||
},
|
||||
},
|
||||
claims: []runtime.Object{
|
||||
&clusterv1alpha1.ClusterClaim{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "key1",
|
||||
},
|
||||
Spec: clusterv1alpha1.ClusterClaimSpec{
|
||||
Value: "value3",
|
||||
},
|
||||
},
|
||||
},
|
||||
validateActions: func(t *testing.T, actions []clienttesting.Action) {
|
||||
testingcommon.AssertActions(t, actions, "patch")
|
||||
patch := actions[0].(clienttesting.PatchAction).GetPatch()
|
||||
cluster := &clusterv1.ManagedCluster{}
|
||||
err := json.Unmarshal(patch, cluster)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
expected := []clusterv1.ManagedClusterClaim{
|
||||
{
|
||||
Name: "key1",
|
||||
Value: "value1",
|
||||
},
|
||||
{
|
||||
Name: "key2",
|
||||
Value: "value2",
|
||||
},
|
||||
}
|
||||
actual := cluster.Status.ClusterClaims
|
||||
if !reflect.DeepEqual(actual, expected) {
|
||||
@@ -89,8 +161,12 @@ func TestSync(t *testing.T) {
|
||||
|
||||
apiServer, discoveryClient := newDiscoveryServer(t, nil)
|
||||
defer apiServer.Close()
|
||||
kubeClient := kubefake.NewSimpleClientset()
|
||||
kubeClient := kubefake.NewClientset()
|
||||
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Minute*10)
|
||||
err := features.SpokeMutableFeatureGate.Set("ClusterProperty=true")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
@@ -100,6 +176,8 @@ func TestSync(t *testing.T) {
|
||||
}
|
||||
|
||||
clusterClient := clusterfake.NewSimpleClientset(objects...)
|
||||
aboutClusterClient := aboutclusterfake.NewSimpleClientset()
|
||||
clusterPropertyInformerFactory := aboutinformers.NewSharedInformerFactory(aboutClusterClient, time.Minute*10)
|
||||
clusterInformerFactory := clusterinformers.NewSharedInformerFactory(clusterClient, time.Minute*10)
|
||||
if c.cluster != nil {
|
||||
if err := clusterInformerFactory.Cluster().V1().ManagedClusters().Informer().GetStore().Add(c.cluster); err != nil {
|
||||
@@ -113,7 +191,13 @@ func TestSync(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
fakeHubClient := fakekube.NewSimpleClientset()
|
||||
for _, property := range c.properties {
|
||||
if err := clusterPropertyInformerFactory.About().V1alpha1().ClusterProperties().Informer().GetStore().Add(property); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
fakeHubClient := kubefake.NewClientset()
|
||||
ctx := context.TODO()
|
||||
hubEventRecorder, err := helpers.NewEventRecorder(ctx,
|
||||
clusterscheme.Scheme, fakeHubClient.EventsV1(), "test")
|
||||
@@ -126,6 +210,7 @@ func TestSync(t *testing.T) {
|
||||
clusterInformerFactory.Cluster().V1().ManagedClusters(),
|
||||
discoveryClient,
|
||||
clusterInformerFactory.Cluster().V1alpha1().ClusterClaims(),
|
||||
clusterPropertyInformerFactory.About().V1alpha1().ClusterProperties(),
|
||||
kubeInformerFactory.Core().V1().Nodes(),
|
||||
20,
|
||||
[]string{},
|
||||
@@ -146,20 +231,21 @@ func TestExposeClaims(t *testing.T) {
|
||||
name string
|
||||
cluster *clusterv1.ManagedCluster
|
||||
claims []*clusterv1alpha1.ClusterClaim
|
||||
properties []*aboutv1alpha1.ClusterProperty
|
||||
maxCustomClusterClaims int
|
||||
reservedClusterClaimSuffixes []string
|
||||
validateActions func(t *testing.T, actions []clienttesting.Action)
|
||||
expectedErr string
|
||||
}{
|
||||
{
|
||||
name: "sync claims into status of the managed cluster",
|
||||
name: "sync properties into status of the managed cluster",
|
||||
cluster: testinghelpers.NewJoinedManagedCluster(),
|
||||
claims: []*clusterv1alpha1.ClusterClaim{
|
||||
properties: []*aboutv1alpha1.ClusterProperty{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "a",
|
||||
},
|
||||
Spec: clusterv1alpha1.ClusterClaimSpec{
|
||||
Spec: aboutv1alpha1.ClusterPropertySpec{
|
||||
Value: "b",
|
||||
},
|
||||
},
|
||||
@@ -359,6 +445,48 @@ func TestExposeClaims(t *testing.T) {
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "sync non-customized-only properties into status of the managed cluster",
|
||||
cluster: testinghelpers.NewJoinedManagedCluster(),
|
||||
properties: []*aboutv1alpha1.ClusterProperty{
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "a",
|
||||
Labels: map[string]string{labelCustomizedOnly: ""},
|
||||
},
|
||||
Spec: aboutv1alpha1.ClusterPropertySpec{
|
||||
Value: "b",
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: "c",
|
||||
},
|
||||
Spec: aboutv1alpha1.ClusterPropertySpec{
|
||||
Value: "d",
|
||||
},
|
||||
},
|
||||
},
|
||||
validateActions: func(t *testing.T, actions []clienttesting.Action) {
|
||||
testingcommon.AssertActions(t, actions, "patch")
|
||||
patch := actions[0].(clienttesting.PatchAction).GetPatch()
|
||||
cluster := &clusterv1.ManagedCluster{}
|
||||
err := json.Unmarshal(patch, cluster)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
expected := []clusterv1.ManagedClusterClaim{
|
||||
{
|
||||
Name: "c",
|
||||
Value: "d",
|
||||
},
|
||||
}
|
||||
actual := cluster.Status.ClusterClaims
|
||||
if !reflect.DeepEqual(actual, expected) {
|
||||
t.Errorf("expected cluster claim %v but got: %v", expected, actual)
|
||||
}
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "sync non-customized-only claims into status of the managed cluster",
|
||||
cluster: testinghelpers.NewJoinedManagedCluster(),
|
||||
@@ -405,8 +533,12 @@ func TestExposeClaims(t *testing.T) {
|
||||
|
||||
apiServer, discoveryClient := newDiscoveryServer(t, nil)
|
||||
defer apiServer.Close()
|
||||
kubeClient := kubefake.NewSimpleClientset()
|
||||
kubeClient := kubefake.NewClientset()
|
||||
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Minute*10)
|
||||
err := features.SpokeMutableFeatureGate.Set("ClusterProperty=true")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
@@ -416,6 +548,8 @@ func TestExposeClaims(t *testing.T) {
|
||||
}
|
||||
|
||||
clusterClient := clusterfake.NewSimpleClientset(objects...)
|
||||
aboutClusterClient := aboutclusterfake.NewSimpleClientset()
|
||||
clusterPropertyInformerFactory := aboutinformers.NewSharedInformerFactory(aboutClusterClient, time.Minute*10)
|
||||
clusterInformerFactory := clusterinformers.NewSharedInformerFactory(clusterClient, time.Minute*10)
|
||||
if c.cluster != nil {
|
||||
if err := clusterInformerFactory.Cluster().V1().ManagedClusters().Informer().GetStore().Add(c.cluster); err != nil {
|
||||
@@ -429,11 +563,17 @@ func TestExposeClaims(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
for _, property := range c.properties {
|
||||
if err := clusterPropertyInformerFactory.About().V1alpha1().ClusterProperties().Informer().GetStore().Add(property); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
if c.maxCustomClusterClaims == 0 {
|
||||
c.maxCustomClusterClaims = 20
|
||||
}
|
||||
|
||||
fakeHubClient := fakekube.NewSimpleClientset()
|
||||
fakeHubClient := kubefake.NewClientset()
|
||||
ctx := context.TODO()
|
||||
hubEventRecorder, err := helpers.NewEventRecorder(ctx,
|
||||
clusterscheme.Scheme, fakeHubClient.EventsV1(), "test")
|
||||
@@ -446,6 +586,7 @@ func TestExposeClaims(t *testing.T) {
|
||||
clusterInformerFactory.Cluster().V1().ManagedClusters(),
|
||||
discoveryClient,
|
||||
clusterInformerFactory.Cluster().V1alpha1().ClusterClaims(),
|
||||
clusterPropertyInformerFactory.About().V1alpha1().ClusterProperties(),
|
||||
kubeInformerFactory.Core().V1().Nodes(),
|
||||
c.maxCustomClusterClaims,
|
||||
c.reservedClusterClaimSuffixes,
|
||||
|
||||
@@ -13,6 +13,8 @@ import (
|
||||
fakekube "k8s.io/client-go/kubernetes/fake"
|
||||
kubefake "k8s.io/client-go/kubernetes/fake"
|
||||
clienttesting "k8s.io/client-go/testing"
|
||||
aboutclusterfake "sigs.k8s.io/about-api/pkg/generated/clientset/versioned/fake"
|
||||
aboutinformers "sigs.k8s.io/about-api/pkg/generated/informers/externalversions"
|
||||
|
||||
clusterfake "open-cluster-management.io/api/client/cluster/clientset/versioned/fake"
|
||||
clusterscheme "open-cluster-management.io/api/client/cluster/clientset/versioned/scheme"
|
||||
@@ -73,6 +75,8 @@ func TestSyncManagedCluster(t *testing.T) {
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
clusterClient := clusterfake.NewSimpleClientset(c.startingObjects...)
|
||||
aboutClusterClient := aboutclusterfake.NewSimpleClientset()
|
||||
clusterPropertyInformerFactory := aboutinformers.NewSharedInformerFactory(aboutClusterClient, time.Minute*10)
|
||||
clusterInformerFactory := clusterinformers.NewSharedInformerFactory(clusterClient, time.Minute*10)
|
||||
clusterStore := clusterInformerFactory.Cluster().V1().ManagedClusters().Informer().GetStore()
|
||||
for _, cluster := range c.startingObjects {
|
||||
@@ -94,6 +98,7 @@ func TestSyncManagedCluster(t *testing.T) {
|
||||
clusterInformerFactory.Cluster().V1().ManagedClusters(),
|
||||
discoveryClient,
|
||||
clusterInformerFactory.Cluster().V1alpha1().ClusterClaims(),
|
||||
clusterPropertyInformerFactory.About().V1alpha1().ClusterProperties(),
|
||||
kubeInformerFactory.Core().V1().Nodes(),
|
||||
20,
|
||||
[]string{},
|
||||
|
||||
@@ -19,6 +19,8 @@ import (
|
||||
kubefake "k8s.io/client-go/kubernetes/fake"
|
||||
"k8s.io/client-go/rest"
|
||||
clienttesting "k8s.io/client-go/testing"
|
||||
aboutclusterfake "sigs.k8s.io/about-api/pkg/generated/clientset/versioned/fake"
|
||||
aboutinformers "sigs.k8s.io/about-api/pkg/generated/informers/externalversions"
|
||||
|
||||
clusterfake "open-cluster-management.io/api/client/cluster/clientset/versioned/fake"
|
||||
clusterscheme "open-cluster-management.io/api/client/cluster/clientset/versioned/scheme"
|
||||
@@ -294,6 +296,8 @@ func TestHealthCheck(t *testing.T) {
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
clusterClient := clusterfake.NewSimpleClientset(c.clusters...)
|
||||
aboutClusterClient := aboutclusterfake.NewSimpleClientset()
|
||||
clusterPropertyInformerFactory := aboutinformers.NewSharedInformerFactory(aboutClusterClient, time.Minute*10)
|
||||
clusterInformerFactory := clusterinformers.NewSharedInformerFactory(clusterClient, time.Minute*10)
|
||||
clusterStore := clusterInformerFactory.Cluster().V1().ManagedClusters().Informer().GetStore()
|
||||
for _, cluster := range c.clusters {
|
||||
@@ -328,6 +332,7 @@ func TestHealthCheck(t *testing.T) {
|
||||
clusterInformerFactory.Cluster().V1().ManagedClusters(),
|
||||
discoveryClient,
|
||||
clusterInformerFactory.Cluster().V1alpha1().ClusterClaims(),
|
||||
clusterPropertyInformerFactory.About().V1alpha1().ClusterProperties(),
|
||||
kubeInformerFactory.Core().V1().Nodes(),
|
||||
20,
|
||||
[]string{},
|
||||
|
||||
@@ -14,13 +14,17 @@ import (
|
||||
"k8s.io/client-go/discovery"
|
||||
corev1informers "k8s.io/client-go/informers/core/v1"
|
||||
kevents "k8s.io/client-go/tools/events"
|
||||
aboutv1alpha1informer "sigs.k8s.io/about-api/pkg/generated/informers/externalversions/apis/v1alpha1"
|
||||
|
||||
clientset "open-cluster-management.io/api/client/cluster/clientset/versioned"
|
||||
clusterv1informer "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1"
|
||||
clusterv1alpha1informer "open-cluster-management.io/api/client/cluster/informers/externalversions/cluster/v1alpha1"
|
||||
clusterv1listers "open-cluster-management.io/api/client/cluster/listers/cluster/v1"
|
||||
clusterv1 "open-cluster-management.io/api/cluster/v1"
|
||||
ocmfeature "open-cluster-management.io/api/feature"
|
||||
"open-cluster-management.io/sdk-go/pkg/patcher"
|
||||
|
||||
"open-cluster-management.io/ocm/pkg/features"
|
||||
)
|
||||
|
||||
// managedClusterStatusController checks the kube-apiserver health on managed cluster to determine it whether is available
|
||||
@@ -52,6 +56,7 @@ func NewManagedClusterStatusController(
|
||||
hubClusterInformer clusterv1informer.ManagedClusterInformer,
|
||||
managedClusterDiscoveryClient discovery.DiscoveryInterface,
|
||||
claimInformer clusterv1alpha1informer.ClusterClaimInformer,
|
||||
propertyInformer aboutv1alpha1informer.ClusterPropertyInformer,
|
||||
nodeInformer corev1informers.NodeInformer,
|
||||
maxCustomClusterClaims int,
|
||||
reservedClusterClaimSuffixes []string,
|
||||
@@ -64,6 +69,7 @@ func NewManagedClusterStatusController(
|
||||
hubClusterInformer,
|
||||
managedClusterDiscoveryClient,
|
||||
claimInformer,
|
||||
propertyInformer,
|
||||
nodeInformer,
|
||||
maxCustomClusterClaims,
|
||||
reservedClusterClaimSuffixes,
|
||||
@@ -71,11 +77,18 @@ func NewManagedClusterStatusController(
|
||||
hubEventRecorder,
|
||||
)
|
||||
|
||||
return factory.New().
|
||||
WithInformers(hubClusterInformer.Informer(), nodeInformer.Informer(), claimInformer.Informer()).
|
||||
WithSync(c.sync).
|
||||
ResyncEvery(resyncInterval).
|
||||
ToController("ManagedClusterStatusController", recorder)
|
||||
controllerFactory := factory.New().
|
||||
WithInformers(hubClusterInformer.Informer(), nodeInformer.Informer()).
|
||||
WithSync(c.sync).ResyncEvery(resyncInterval)
|
||||
|
||||
if features.SpokeMutableFeatureGate.Enabled(ocmfeature.ClusterClaim) {
|
||||
controllerFactory = controllerFactory.WithInformers(claimInformer.Informer())
|
||||
}
|
||||
if features.SpokeMutableFeatureGate.Enabled(ocmfeature.ClusterProperty) {
|
||||
controllerFactory = controllerFactory.WithInformers(propertyInformer.Informer())
|
||||
}
|
||||
|
||||
return controllerFactory.ToController("ManagedClusterStatusController", recorder)
|
||||
}
|
||||
|
||||
func newManagedClusterStatusController(
|
||||
@@ -84,6 +97,7 @@ func newManagedClusterStatusController(
|
||||
hubClusterInformer clusterv1informer.ManagedClusterInformer,
|
||||
managedClusterDiscoveryClient discovery.DiscoveryInterface,
|
||||
claimInformer clusterv1alpha1informer.ClusterClaimInformer,
|
||||
propertyInformer aboutv1alpha1informer.ClusterPropertyInformer,
|
||||
nodeInformer corev1informers.NodeInformer,
|
||||
maxCustomClusterClaims int,
|
||||
reservedClusterClaimSuffixes []string,
|
||||
@@ -98,7 +112,10 @@ func newManagedClusterStatusController(
|
||||
&joiningReconcile{recorder: recorder},
|
||||
&resoureReconcile{managedClusterDiscoveryClient: managedClusterDiscoveryClient, nodeLister: nodeInformer.Lister()},
|
||||
&claimReconcile{claimLister: claimInformer.Lister(), recorder: recorder,
|
||||
maxCustomClusterClaims: maxCustomClusterClaims, reservedClusterClaimSuffixes: reservedClusterClaimSuffixes},
|
||||
maxCustomClusterClaims: maxCustomClusterClaims,
|
||||
reservedClusterClaimSuffixes: reservedClusterClaimSuffixes,
|
||||
aboutLister: propertyInformer.Lister(),
|
||||
},
|
||||
},
|
||||
hubClusterLister: hubClusterInformer.Lister(),
|
||||
hubEventRecorder: hubEventRecorder,
|
||||
@@ -131,7 +148,7 @@ func (c *managedClusterStatusController) sync(ctx context.Context, syncCtx facto
|
||||
outOfSynced := meta.IsStatusConditionFalse(newCluster.Status.Conditions, clusterv1.ManagedClusterConditionClockSynced)
|
||||
if outOfSynced {
|
||||
c.recorder.Eventf("ClockOutOfSync", "The managed cluster's clock is out of sync, the agent will not be able to update the status of managed cluster.")
|
||||
return fmt.Errorf("the managed cluster's clock is out of sync, the agent will not be able to update the status of managed cluster.")
|
||||
return fmt.Errorf("the managed cluster's clock is out of sync, the agent will not be able to update the status of managed cluster")
|
||||
}
|
||||
|
||||
changed, err := c.patcher.PatchStatus(ctx, newCluster, newCluster.Status, cluster.Status)
|
||||
@@ -168,5 +185,4 @@ func (c *managedClusterStatusController) sendAvailableConditionEvent(
|
||||
c.hubEventRecorder.Eventf(newCluster, nil, corev1.EventTypeWarning, "Unavailable", "Unavailable",
|
||||
"The %s is successfully imported. However, its Kube API server is unavailable", cluster.Name)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -15,6 +15,8 @@ import (
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/klog/v2"
|
||||
aboutclient "sigs.k8s.io/about-api/pkg/generated/clientset/versioned"
|
||||
aboutinformers "sigs.k8s.io/about-api/pkg/generated/informers/externalversions"
|
||||
|
||||
clusterv1client "open-cluster-management.io/api/client/cluster/clientset/versioned"
|
||||
clusterscheme "open-cluster-management.io/api/client/cluster/clientset/versioned/scheme"
|
||||
@@ -129,6 +131,11 @@ func (o *SpokeAgentConfig) RunSpokeAgent(ctx context.Context, controllerContext
|
||||
return err
|
||||
}
|
||||
|
||||
aboutClusterClient, err := aboutclient.NewForConfig(spokeClientConfig)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return o.RunSpokeAgentWithSpokeInformers(
|
||||
ctx,
|
||||
kubeConfig,
|
||||
@@ -136,6 +143,7 @@ func (o *SpokeAgentConfig) RunSpokeAgent(ctx context.Context, controllerContext
|
||||
spokeKubeClient,
|
||||
informers.NewSharedInformerFactory(spokeKubeClient, 10*time.Minute),
|
||||
clusterv1informers.NewSharedInformerFactory(spokeClusterClient, 10*time.Minute),
|
||||
aboutinformers.NewSharedInformerFactory(aboutClusterClient, 10*time.Minute),
|
||||
controllerContext.EventRecorder,
|
||||
)
|
||||
}
|
||||
@@ -145,6 +153,7 @@ func (o *SpokeAgentConfig) RunSpokeAgentWithSpokeInformers(ctx context.Context,
|
||||
spokeKubeClient kubernetes.Interface,
|
||||
spokeKubeInformerFactory informers.SharedInformerFactory,
|
||||
spokeClusterInformerFactory clusterv1informers.SharedInformerFactory,
|
||||
aboutInformers aboutinformers.SharedInformerFactory,
|
||||
recorder events.Recorder) error {
|
||||
logger := klog.FromContext(ctx)
|
||||
logger.Info("Cluster name and agent ID", "clusterName", o.agentOptions.SpokeClusterName, "agentID", o.agentOptions.AgentID)
|
||||
@@ -356,6 +365,7 @@ func (o *SpokeAgentConfig) RunSpokeAgentWithSpokeInformers(ctx context.Context,
|
||||
hubClient.ClusterInformer,
|
||||
spokeKubeClient.Discovery(),
|
||||
spokeClusterInformerFactory.Cluster().V1alpha1().ClusterClaims(),
|
||||
aboutInformers.About().V1alpha1().ClusterProperties(),
|
||||
spokeKubeInformerFactory.Core().V1().Nodes(),
|
||||
o.registrationOption.MaxCustomClusterClaims,
|
||||
o.registrationOption.ReservedClusterClaimSuffixes,
|
||||
@@ -431,6 +441,10 @@ func (o *SpokeAgentConfig) RunSpokeAgentWithSpokeInformers(ctx context.Context,
|
||||
go spokeClusterInformerFactory.Start(ctx.Done())
|
||||
}
|
||||
|
||||
if features.SpokeMutableFeatureGate.Enabled(ocmfeature.ClusterProperty) {
|
||||
go aboutInformers.Start(ctx.Done())
|
||||
}
|
||||
|
||||
go secretController.Run(ctx, 1)
|
||||
go managedClusterLeaseController.Run(ctx, 1)
|
||||
go managedClusterHealthCheckController.Run(ctx, 1)
|
||||
|
||||
Reference in New Issue
Block a user