diff --git a/pkg/operators/clustermanager/controllers/clustermanagercontroller/clustermanager_crd_reconcile.go b/pkg/operators/clustermanager/controllers/clustermanagercontroller/clustermanager_crd_reconcile.go index 5510c741a..77118bb87 100644 --- a/pkg/operators/clustermanager/controllers/clustermanagercontroller/clustermanager_crd_reconcile.go +++ b/pkg/operators/clustermanager/controllers/clustermanagercontroller/clustermanager_crd_reconcile.go @@ -7,21 +7,17 @@ package clustermanagercontroller import ( "context" "fmt" - "reflect" "github.com/openshift/library-go/pkg/assets" "github.com/openshift/library-go/pkg/operator/events" "github.com/openshift/library-go/pkg/operator/resource/resourceapply" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" - "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/klog/v2" operatorapiv1 "open-cluster-management.io/api/operator/v1" "open-cluster-management.io/registration-operator/manifests" "open-cluster-management.io/registration-operator/pkg/helpers" - "open-cluster-management.io/registration-operator/pkg/operators/clustermanager/controllers/migrationcontroller" "open-cluster-management.io/registration-operator/pkg/operators/crdmanager" migrationclient "sigs.k8s.io/kube-storage-version-migrator/pkg/clients/clientset/typed/migration/v1alpha1" ) @@ -49,9 +45,6 @@ var ( "cluster-manager/hub/0000_03_clusters.open-cluster-management.io_placementdecisions.crd.yaml", "cluster-manager/hub/0000_05_clusters.open-cluster-management.io_addonplacementscores.crd.yaml", } - - // removed CRD StoredVersions - removedCRDStoredVersions = map[string]string{} ) type crdReconcile struct { @@ -64,17 +57,6 @@ type crdReconcile struct { } func (c *crdReconcile) reconcile(ctx context.Context, cm *operatorapiv1.ClusterManager, config manifests.HubConfig) (*operatorapiv1.ClusterManager, reconcileState, error) { - // update CRD StoredVersion - if err := c.updateStoredVersion(ctx); err != nil { - meta.SetStatusCondition(&cm.Status.Conditions, metav1.Condition{ - Type: clusterManagerApplied, - Status: metav1.ConditionFalse, - Reason: "CRDStoredVersionUpdateFailed", - Message: fmt.Sprintf("Failed to update crd stored version: %v", err), - }) - return cm, reconcileStop, err - } - crdManager := crdmanager.NewManager[*apiextensionsv1.CustomResourceDefinition]( c.hubAPIExtensionClient.ApiextensionsV1().CustomResourceDefinitions(), crdmanager.EqualV1, @@ -136,48 +118,3 @@ func (c *crdReconcile) clean(ctx context.Context, cm *operatorapiv1.ClusterManag return cm, reconcileContinue, nil } - -// updateStoredVersion update(remove) deleted api version from CRD status.StoredVersions -func (c *crdReconcile) updateStoredVersion(ctx context.Context) error { - for name, version := range removedCRDStoredVersions { - // Check migration status before update CRD stored version - // If CRD's StorageVersionMigration is not found, it means that the previous or the current release CRD doesn't need migration, and can contiue to update the CRD's stored version. - // If CRD's StorageVersionMigration is found and the status is success, it means that the current CRs were migrated successfully, and can contiue to update the CRD's stored version. - // Other cases, for example, the migration failed, we should not contiue to update the stored version, that will caused the stored old version CRs inconsistent with latest CRD. - svmStatus, err := migrationcontroller.IsStorageVersionMigrationSucceeded(c.hubMigrationClient, name) - if svmStatus == false && !errors.IsNotFound(err) { - return fmt.Errorf("failed to updateStoredVersion as StorageVersionMigrations %v: %v", name, err) - } - - // retrieve CRD - crd, err := c.hubAPIExtensionClient.ApiextensionsV1().CustomResourceDefinitions().Get(ctx, name, metav1.GetOptions{}) - if errors.IsNotFound(err) { - continue - } - if err != nil { - klog.Warningf("faield to get CRD %v: %v", crd.Name, err) - continue - } - - // remove old versions from its status - oldStoredVersions := crd.Status.StoredVersions - newStoredVersions := make([]string, 0, len(oldStoredVersions)) - for _, stored := range oldStoredVersions { - if stored != version { - newStoredVersions = append(newStoredVersions, stored) - } - } - - if !reflect.DeepEqual(oldStoredVersions, newStoredVersions) { - crd.Status.StoredVersions = newStoredVersions - // update the status sub-resource - crd, err = c.hubAPIExtensionClient.ApiextensionsV1().CustomResourceDefinitions().UpdateStatus(ctx, crd, metav1.UpdateOptions{}) - if err != nil { - return err - } - klog.V(4).Infof("updated CRD %v status storedVersions: %v", crd.Name, crd.Status.StoredVersions) - } - } - - return nil -} diff --git a/pkg/operators/clustermanager/controllers/crdstatuccontroller/crd_status_controller.go b/pkg/operators/clustermanager/controllers/crdstatuccontroller/crd_status_controller.go new file mode 100644 index 000000000..34efa26c7 --- /dev/null +++ b/pkg/operators/clustermanager/controllers/crdstatuccontroller/crd_status_controller.go @@ -0,0 +1,139 @@ +/* + * Copyright 2022 Contributors to the Open Cluster Management project + */ + +package crdstatuccontroller + +import ( + "context" + "reflect" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "github.com/openshift/library-go/pkg/controller/factory" + "github.com/openshift/library-go/pkg/operator/events" + apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/runtime" + operatorinformer "open-cluster-management.io/api/client/operator/informers/externalversions/operator/v1" + operatorlister "open-cluster-management.io/api/client/operator/listers/operator/v1" + + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/klog/v2" + "open-cluster-management.io/registration-operator/pkg/helpers" + "open-cluster-management.io/registration-operator/pkg/operators/clustermanager/controllers/migrationcontroller" +) + +var ( + + // CRD StoredVersions + desiredCRDStoredVersions = map[string][]string{ + "managedclustersets.cluster.open-cluster-management.io": {"v1beta2"}, + "managedclustersetbindings.cluster.open-cluster-management.io": {"v1beta2"}, + } +) + +type crdStatusController struct { + kubeconfig *rest.Config + kubeClient kubernetes.Interface + clusterManagerLister operatorlister.ClusterManagerLister + generateHubClusterClients func(hubConfig *rest.Config) (apiextensionsclient.Interface, error) +} + +// NewClusterManagerController construct cluster manager hub controller +func NewCRDStatusController( + kubeconfig *rest.Config, + kubeClient kubernetes.Interface, + clusterManagerInformer operatorinformer.ClusterManagerInformer, + recorder events.Recorder) factory.Controller { + controller := &crdStatusController{ + kubeconfig: kubeconfig, + kubeClient: kubeClient, + clusterManagerLister: clusterManagerInformer.Lister(), + generateHubClusterClients: generateHubClients, + } + + return factory.New().WithSync(controller.sync). + WithInformersQueueKeyFunc(func(obj runtime.Object) string { + accessor, _ := meta.Accessor(obj) + return accessor.GetName() + }, clusterManagerInformer.Informer()). + ToController("CRDStatusController", recorder) +} + +func (c *crdStatusController) sync(ctx context.Context, controllerContext factory.SyncContext) error { + clusterManagerName := controllerContext.QueueKey() + klog.V(4).Infof("Reconciling ClusterManager %q", clusterManagerName) + + clusterManager, err := c.clusterManagerLister.Get(clusterManagerName) + if errors.IsNotFound(err) { + return nil + } + if err != nil { + return err + } + + // ClusterManager is deleting + if !clusterManager.DeletionTimestamp.IsZero() { + return nil + } + + // need to wait storage version migrations succeed. + if succeeded := meta.IsStatusConditionTrue(clusterManager.Status.Conditions, migrationcontroller.MigrationSucceeded); !succeeded { + controllerContext.Queue().AddRateLimited(clusterManagerName) + klog.V(4).Info("Wait storage version migration succeed.") + return nil + } + + // If mode is default, then config is management kubeconfig, else it would use management kubeconfig to find the hub + hubKubeconfig, err := helpers.GetHubKubeconfig(ctx, c.kubeconfig, c.kubeClient, clusterManager.Name, clusterManager.Spec.DeployOption.Mode) + if err != nil { + return err + } + + apiExtensionClient, err := c.generateHubClusterClients(hubKubeconfig) + if err != nil { + return err + } + + err = updateStoredVersion(ctx, apiExtensionClient) + if err != nil { + return err + } + return nil +} + +// updateStoredVersion update(remove) deleted api version from CRD status.StoredVersions +func updateStoredVersion(ctx context.Context, hubAPIExtensionClient apiextensionsclient.Interface) error { + for name, desiredVersions := range desiredCRDStoredVersions { + // retrieve CRD + crd, err := hubAPIExtensionClient.ApiextensionsV1().CustomResourceDefinitions().Get(ctx, name, metav1.GetOptions{}) + if err != nil { + return err + } + if !reflect.DeepEqual(crd.Status.StoredVersions, desiredVersions) { + crd.Status.StoredVersions = desiredVersions + // update the status sub-resource + klog.V(4).Infof("Need update stored versions. oldStoredVersions: %v, newStoredVersions: %v", crd.Status.StoredVersions, desiredVersions) + crd, err = hubAPIExtensionClient.ApiextensionsV1().CustomResourceDefinitions().UpdateStatus(ctx, crd, metav1.UpdateOptions{}) + if err != nil { + klog.Errorf("Failed to update storedversion:%v", err) + return err + } + klog.V(4).Infof("updated CRD %v status storedVersions: %v", crd.Name, crd.Status.StoredVersions) + } + } + + return nil +} + +func generateHubClients(hubKubeConfig *rest.Config) (apiextensionsclient.Interface, error) { + hubApiExtensionClient, err := apiextensionsclient.NewForConfig(hubKubeConfig) + if err != nil { + return nil, err + } + + return hubApiExtensionClient, nil +} diff --git a/pkg/operators/clustermanager/controllers/crdstatuccontroller/crd_status_controller_test.go b/pkg/operators/clustermanager/controllers/crdstatuccontroller/crd_status_controller_test.go new file mode 100644 index 000000000..2ae04cc3f --- /dev/null +++ b/pkg/operators/clustermanager/controllers/crdstatuccontroller/crd_status_controller_test.go @@ -0,0 +1,112 @@ +/* + * Copyright 2022 Contributors to the Open Cluster Management project + */ + +package crdstatuccontroller + +import ( + "context" + "testing" + "time" + + apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + fakeapiextensions "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/rest" + fakeoperatorlient "open-cluster-management.io/api/client/operator/clientset/versioned/fake" + operatorinformers "open-cluster-management.io/api/client/operator/informers/externalversions" + operatorapiv1 "open-cluster-management.io/api/operator/v1" + testinghelper "open-cluster-management.io/registration-operator/pkg/helpers/testing" + "open-cluster-management.io/registration-operator/pkg/operators/clustermanager/controllers/migrationcontroller" +) + +func TestSync(t *testing.T) { + clusterManager := newClusterManager("testhub") + tc := newTestController(t, clusterManager) + + syncContext := testinghelper.NewFakeSyncContext(t, "testhub") + //Do not support migration + err := tc.sync(context.Background(), syncContext) + if err != nil { + t.Fatalf("Expected no error when sync, %v", err) + } + + // migration succeed + clusterManager.Status.Conditions = []metav1.Condition{ + { + Type: migrationcontroller.MigrationSucceeded, + Status: metav1.ConditionTrue, + }, + } + crds := newCrds() + tc = newTestController(t, clusterManager, crds...) + err = tc.sync(context.Background(), syncContext) + if err != nil { + t.Fatalf("Expected no error when sync, %v", err) + } + +} + +func newTestController(t *testing.T, clustermanager *operatorapiv1.ClusterManager, crds ...runtime.Object) *crdStatusController { + fakeOperatorClient := fakeoperatorlient.NewSimpleClientset(clustermanager) + operatorInformers := operatorinformers.NewSharedInformerFactory(fakeOperatorClient, 5*time.Minute) + fakeAPIExtensionClient := fakeapiextensions.NewSimpleClientset(crds...) + + crdStatusController := &crdStatusController{ + clusterManagerLister: operatorInformers.Operator().V1().ClusterManagers().Lister(), + } + crdStatusController.generateHubClusterClients = func(hubKubeConfig *rest.Config) (apiextensionsclient.Interface, error) { + return fakeAPIExtensionClient, nil + } + store := operatorInformers.Operator().V1().ClusterManagers().Informer().GetStore() + if err := store.Add(clustermanager); err != nil { + t.Fatal(err) + } + + return crdStatusController +} + +func newClusterManager(name string) *operatorapiv1.ClusterManager { + return &operatorapiv1.ClusterManager{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: operatorapiv1.ClusterManagerSpec{ + RegistrationImagePullSpec: "testregistration", + DeployOption: operatorapiv1.ClusterManagerDeployOption{ + Mode: operatorapiv1.InstallModeDefault, + }, + AddOnManagerConfiguration: &operatorapiv1.AddOnManagerConfiguration{ + Mode: operatorapiv1.ComponentModeTypeEnable, + }, + }, + } +} + +func newCrds() []runtime.Object { + return []runtime.Object{ + &apiextensionsv1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{ + Name: "managedclustersets.cluster.open-cluster-management.io", + }, + Status: apiextensionsv1.CustomResourceDefinitionStatus{ + StoredVersions: []string{ + "v1beta1", + "v1beta2", + }, + }, + }, + &apiextensionsv1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{ + Name: "managedclustersetbindings.cluster.open-cluster-management.io", + }, + Status: apiextensionsv1.CustomResourceDefinitionStatus{ + StoredVersions: []string{ + "v1beta1", + }, + }, + }, + } +} diff --git a/pkg/operators/clustermanager/controllers/migrationcontroller/migration_controller.go b/pkg/operators/clustermanager/controllers/migrationcontroller/migration_controller.go index f25f36ab9..a657074f0 100644 --- a/pkg/operators/clustermanager/controllers/migrationcontroller/migration_controller.go +++ b/pkg/operators/clustermanager/controllers/migrationcontroller/migration_controller.go @@ -3,7 +3,6 @@ package migrationcontroller import ( "context" "fmt" - "time" apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" "k8s.io/apimachinery/pkg/api/equality" @@ -24,6 +23,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + operatorv1client "open-cluster-management.io/api/client/operator/clientset/versioned/typed/operator/v1" operatorinformer "open-cluster-management.io/api/client/operator/informers/externalversions/operator/v1" operatorlister "open-cluster-management.io/api/client/operator/listers/operator/v1" "open-cluster-management.io/registration-operator/manifests" @@ -51,26 +51,35 @@ var ( ) const ( - clusterManagerApplied = "Applied" + clusterManagerApplied = "Applied" + MigrationSucceeded = "MigrationSucceeded" + migrationRequestCRDName = "storageversionmigrations.migration.k8s.io" ) type crdMigrationController struct { - kubeconfig *rest.Config - kubeClient kubernetes.Interface - clusterManagerLister operatorlister.ClusterManagerLister + kubeconfig *rest.Config + kubeClient kubernetes.Interface + clusterManagerClient operatorv1client.ClusterManagerInterface + clusterManagerLister operatorlister.ClusterManagerLister + recorder events.Recorder + generateHubClusterClients func(hubConfig *rest.Config) (apiextensionsclient.Interface, migrationv1alpha1client.StorageVersionMigrationsGetter, error) } // NewClusterManagerController construct cluster manager hub controller func NewCRDMigrationController( kubeconfig *rest.Config, kubeClient kubernetes.Interface, + clusterManagerClient operatorv1client.ClusterManagerInterface, clusterManagerInformer operatorinformer.ClusterManagerInformer, recorder events.Recorder) factory.Controller { controller := &crdMigrationController{ - kubeconfig: kubeconfig, - kubeClient: kubeClient, - clusterManagerLister: clusterManagerInformer.Lister(), + kubeconfig: kubeconfig, + kubeClient: kubeClient, + clusterManagerClient: clusterManagerClient, + clusterManagerLister: clusterManagerInformer.Lister(), + recorder: recorder, + generateHubClusterClients: generateHubClients, } return factory.New().WithSync(controller.sync). @@ -103,36 +112,67 @@ func (c *crdMigrationController) sync(ctx context.Context, controllerContext fac if err != nil { return err } - apiExtensionClient, err := apiextensionsclient.NewForConfig(hubKubeconfig) + apiExtensionClient, migrationClient, err := c.generateHubClusterClients(hubKubeconfig) if err != nil { return err } - migrationClient, err := migrationv1alpha1client.NewForConfig(hubKubeconfig) - if err != nil { - return err - } - - // apply storage version migrations if it is supported - supported, err := supportStorageVersionMigration(ctx, apiExtensionClient) - if err != nil { - return err - } - if !supported { - return nil - } // ClusterManager is deleting, we remove its related resources on hub if !clusterManager.DeletionTimestamp.IsZero() { return removeStorageVersionMigrations(ctx, migrationClient) } + // apply storage version migrations if it is supported + supported, err := supportStorageVersionMigration(ctx, apiExtensionClient) + if err != nil { + return err + } + + if !supported { + migrationCond := metav1.Condition{ + Type: MigrationSucceeded, + Status: metav1.ConditionFalse, + Reason: "StorageVersionMigrationFailed", + Message: fmt.Sprintf("Do not support StorageVersionMigration"), + } + _, _, err = helpers.UpdateClusterManagerStatus(ctx, c.clusterManagerClient, clusterManagerName, + helpers.UpdateClusterManagerConditionFn(migrationCond), + ) + return err + } + // do not apply storage version migrations until other resources are applied if applied := meta.IsStatusConditionTrue(clusterManager.Status.Conditions, clusterManagerApplied); !applied { - controllerContext.Queue().AddAfter(clusterManagerName, 5*time.Second) + controllerContext.Queue().AddRateLimited(clusterManagerName) return nil } - return applyStorageVersionMigrations(ctx, migrationClient, controllerContext.Recorder()) + err = applyStorageVersionMigrations(ctx, migrationClient, c.recorder) + if err != nil { + klog.Errorf("Failed to apply StorageVersionMigrations. %v", err) + return err + } + + migrationCond, err := syncStorageVersionMigrationsCondition(ctx, migrationClient) + if err != nil { + klog.Errorf("Failed to sync StorageVersionMigrations condition. %v", err) + return err + } + + _, _, err = helpers.UpdateClusterManagerStatus(ctx, c.clusterManagerClient, clusterManagerName, + helpers.UpdateClusterManagerConditionFn(migrationCond), + ) + if err != nil { + return err + } + + //If migration not succeed, wait for all StorageVersionMigrations succeed. + if migrationCond.Status != metav1.ConditionTrue { + klog.V(4).Infof("Wait all StorageVersionMigrations succeed. migrationCond: %v. error: %v", migrationCond, err) + controllerContext.Queue().AddRateLimited(clusterManagerName) + } + + return nil } // supportStorageVersionMigration returns ture if StorageVersionMigration CRD exists; otherwise returns false. @@ -174,8 +214,7 @@ func removeStorageVersionMigrations( func applyStorageVersionMigrations(ctx context.Context, migrationClient migrationv1alpha1client.StorageVersionMigrationsGetter, recorder events.Recorder) error { errs := []error{} for _, file := range migrationRequestFiles { - _, _, err := applyStorageVersionMigration( - migrationClient, + required, err := parseStorageVersionMigrationFile( func(name string) ([]byte, error) { template, err := manifests.ClusterManagerManifestFiles.ReadFile(name) if err != nil { @@ -183,35 +222,88 @@ func applyStorageVersionMigrations(ctx context.Context, migrationClient migratio } return assets.MustCreateAssetFromTemplate(name, template, struct{}{}).Data, nil }, - recorder, file) if err != nil { errs = append(errs, err) + continue + } + + _, _, err = applyStorageVersionMigration(migrationClient, required, recorder) + if err != nil { + errs = append(errs, err) + continue } } return operatorhelpers.NewMultiLineAggregate(errs) } +// syncStorageVersionMigrationsCondition sync the migration condition based on all the StorageVersionMigrations status +// 1. migrationSucceeded is true only when all the StorageVersionMigrations resources succeed. +// 2. migrationSucceeded is false when any of the StorageVersionMigrations resources failed or running +func syncStorageVersionMigrationsCondition(ctx context.Context, migrationClient migrationv1alpha1client.StorageVersionMigrationsGetter) (metav1.Condition, error) { + for _, file := range migrationRequestFiles { + required, err := parseStorageVersionMigrationFile( + func(name string) ([]byte, error) { + template, err := manifests.ClusterManagerManifestFiles.ReadFile(name) + if err != nil { + return nil, err + } + return assets.MustCreateAssetFromTemplate(name, template, struct{}{}).Data, nil + }, + file) + if err != nil { + return metav1.Condition{}, err + } + existing, err := migrationClient.StorageVersionMigrations().Get(ctx, required.Name, metav1.GetOptions{}) + if err != nil { + return metav1.Condition{}, err + } + migrationStatusCondition := getStorageVersionMigrationStatusCondition(existing) + if migrationStatusCondition == nil { + return metav1.Condition{ + Type: MigrationSucceeded, + Status: metav1.ConditionFalse, + Reason: "StorageVersionMigrationProcessing", + Message: fmt.Sprintf("Wait StorageVersionMigration %v succeed.", existing.Name), + }, nil + } + switch migrationStatusCondition.Type { + case migrationv1alpha1.MigrationSucceeded: + continue + case migrationv1alpha1.MigrationFailed: + return metav1.Condition{ + Type: MigrationSucceeded, + Status: metav1.ConditionFalse, + Reason: fmt.Sprintf("StorageVersionMigration Failed. %v", migrationStatusCondition.Reason), + Message: fmt.Sprintf("Failed to wait StorageVersionMigration %v succeed. %v", existing.Name, migrationStatusCondition.Message), + }, nil + case migrationv1alpha1.MigrationRunning: + return metav1.Condition{ + Type: MigrationSucceeded, + Status: metav1.ConditionFalse, + Reason: fmt.Sprintf("StorageVersionMigration Running. %v", migrationStatusCondition.Reason), + Message: fmt.Sprintf("Wait StorageVersionMigration %v succeed. %v", existing.Name, migrationStatusCondition.Message), + }, nil + } + } + return metav1.Condition{ + Type: MigrationSucceeded, + Status: metav1.ConditionTrue, + Reason: "StorageVersionMigrationSucceed", + Message: fmt.Sprintf("All StorageVersionMigrations Succeed"), + }, nil +} + func removeStorageVersionMigration( ctx context.Context, migrationClient migrationv1alpha1client.StorageVersionMigrationsGetter, manifests resourceapply.AssetFunc, file string) error { - objectRaw, err := manifests(file) + required, err := parseStorageVersionMigrationFile(manifests, file) if err != nil { return err } - object, _, err := genericCodec.Decode(objectRaw, nil, nil) - if err != nil { - return err - } - - required, ok := object.(*migrationv1alpha1.StorageVersionMigration) - if !ok { - return fmt.Errorf("invalid StorageVersionMigration in file %q: %v", file, object) - } - err = migrationClient.StorageVersionMigrations().Delete(ctx, required.Name, metav1.DeleteOptions{}) if errors.IsNotFound(err) { return nil @@ -219,25 +311,35 @@ func removeStorageVersionMigration( return err } -func applyStorageVersionMigration( - client migrationv1alpha1client.StorageVersionMigrationsGetter, +func parseStorageVersionMigrationFile( manifests resourceapply.AssetFunc, - recorder events.Recorder, - file string) (*migrationv1alpha1.StorageVersionMigration, bool, error) { + file string, +) (*migrationv1alpha1.StorageVersionMigration, error) { objBytes, err := manifests(file) if err != nil { - return nil, false, fmt.Errorf("missing %q: %v", file, err) + return nil, fmt.Errorf("missing %q: %v", file, err) } - requiredObj, _, err := genericCodec.Decode(objBytes, nil, nil) + svmObj, _, err := genericCodec.Decode(objBytes, nil, nil) if err != nil { - return nil, false, fmt.Errorf("cannot decode %q: %v", file, err) + return nil, fmt.Errorf("cannot decode %q: %v", file, err) } - required, ok := requiredObj.(*migrationv1alpha1.StorageVersionMigration) + svm, ok := svmObj.(*migrationv1alpha1.StorageVersionMigration) if !ok { - return nil, false, fmt.Errorf("invalid StorageVersionMigration in file %q: %v", file, requiredObj) + return nil, fmt.Errorf("invalid StorageVersionMigration in file %q: %v", file, svmObj) } + return svm, nil +} + +func applyStorageVersionMigration( + client migrationv1alpha1client.StorageVersionMigrationsGetter, + required *migrationv1alpha1.StorageVersionMigration, + recorder events.Recorder, +) (*migrationv1alpha1.StorageVersionMigration, bool, error) { + if required == nil { + return nil, false, fmt.Errorf("required StorageVersionMigration is nil") + } existing, err := client.StorageVersionMigrations().Get(context.TODO(), required.Name, metav1.GetOptions{}) if errors.IsNotFound(err) { actual, err := client.StorageVersionMigrations().Create(context.TODO(), required, metav1.CreateOptions{}) @@ -273,24 +375,39 @@ func applyStorageVersionMigration( return actual, true, nil } -func IsStorageVersionMigrationSucceeded(client migrationv1alpha1client.StorageVersionMigrationsGetter, name string) (bool, error) { - svmcr, err := client.StorageVersionMigrations().Get(context.TODO(), name, metav1.GetOptions{}) - if err != nil { - return false, err - } - +func getStorageVersionMigrationStatusCondition(svmcr *migrationv1alpha1.StorageVersionMigration) *migrationv1alpha1.MigrationCondition { + var runningCon *migrationv1alpha1.MigrationCondition for _, c := range svmcr.Status.Conditions { switch c.Type { case migrationv1alpha1.MigrationSucceeded: if c.Status == corev1.ConditionTrue { - return true, nil + return &c } + continue case migrationv1alpha1.MigrationFailed: if c.Status == corev1.ConditionTrue { - return false, nil + return &c } + continue + case migrationv1alpha1.MigrationRunning: + if c.Status == corev1.ConditionTrue { + runningCon = &c + } + continue } } - - return false, nil + return runningCon +} + +func generateHubClients(hubKubeConfig *rest.Config) (apiextensionsclient.Interface, migrationv1alpha1client.StorageVersionMigrationsGetter, error) { + hubApiExtensionClient, err := apiextensionsclient.NewForConfig(hubKubeConfig) + if err != nil { + return nil, nil, err + } + + hubMigrationClient, err := migrationv1alpha1client.NewForConfig(hubKubeConfig) + if err != nil { + return nil, nil, err + } + return hubApiExtensionClient, hubMigrationClient, nil } diff --git a/pkg/operators/clustermanager/controllers/migrationcontroller/migration_controller_test.go b/pkg/operators/clustermanager/controllers/migrationcontroller/migration_controller_test.go index 26a63a9bf..0d6ef73ab 100644 --- a/pkg/operators/clustermanager/controllers/migrationcontroller/migration_controller_test.go +++ b/pkg/operators/clustermanager/controllers/migrationcontroller/migration_controller_test.go @@ -2,17 +2,30 @@ package migrationcontroller import ( "context" + "reflect" "testing" + "time" + + testinghelper "open-cluster-management.io/registration-operator/pkg/helpers/testing" + + fakeoperatorlient "open-cluster-management.io/api/client/operator/clientset/versioned/fake" + operatorinformers "open-cluster-management.io/api/client/operator/informers/externalversions" + operatorapiv1 "open-cluster-management.io/api/operator/v1" "github.com/openshift/library-go/pkg/operator/events/eventstesting" + v1 "k8s.io/api/core/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" + apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" fakeapiextensions "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake" "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/rest" clienttesting "k8s.io/client-go/testing" migrationv1alpha1 "sigs.k8s.io/kube-storage-version-migrator/pkg/apis/migration/v1alpha1" fakemigrationclient "sigs.k8s.io/kube-storage-version-migrator/pkg/clients/clientset/fake" + migrationv1alpha1client "sigs.k8s.io/kube-storage-version-migrator/pkg/clients/clientset/typed/migration/v1alpha1" ) func TestSupportStorageVersionMigration(t *testing.T) { @@ -28,11 +41,7 @@ func TestSupportStorageVersionMigration(t *testing.T) { { name: "support", existingObjects: []runtime.Object{ - &apiextensionsv1.CustomResourceDefinition{ - ObjectMeta: metav1.ObjectMeta{ - Name: migrationRequestCRDName, - }, - }, + newCrd(migrationRequestCRDName), }, supported: true, }, @@ -52,6 +61,14 @@ func TestSupportStorageVersionMigration(t *testing.T) { } } +func newCrd(name string) runtime.Object { + return &apiextensionsv1.CustomResourceDefinition{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + } +} + func TestApplyStorageVersionMigrations(t *testing.T) { cases := []struct { name string @@ -145,8 +162,7 @@ func TestRemoveStorageVersionMigrations(t *testing.T) { for _, c := range cases { t.Run(c.name, func(t *testing.T) { fakeMigrationClient := fakemigrationclient.NewSimpleClientset(c.existingObjects...) - - err := applyStorageVersionMigrations(context.TODO(), fakeMigrationClient.MigrationV1alpha1(), eventstesting.NewTestingEventRecorder(t)) + err := removeStorageVersionMigrations(context.TODO(), fakeMigrationClient.MigrationV1alpha1()) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -185,3 +201,272 @@ func assertStorageVersionMigration(t *testing.T, name string, object runtime.Obj t.Errorf("expected migration name %q but got %q", name, migration.Name) } } + +func Test_syncStorageVersionMigrationsCondition(t *testing.T) { + + tests := []struct { + name string + existingObjects []runtime.Object + want metav1.Condition + wantErr bool + }{ + { + name: "empty condition", + existingObjects: []runtime.Object{ + &migrationv1alpha1.StorageVersionMigration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "managedclustersetbindings.cluster.open-cluster-management.io", + }, + }, + &migrationv1alpha1.StorageVersionMigration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "managedclustersets.cluster.open-cluster-management.io", + }, + }, + }, + wantErr: false, + want: metav1.Condition{ + Type: MigrationSucceeded, + Status: metav1.ConditionFalse, + }, + }, + { + name: "all migration running condition", + existingObjects: []runtime.Object{ + &migrationv1alpha1.StorageVersionMigration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "managedclustersetbindings.cluster.open-cluster-management.io", + }, + Status: migrationv1alpha1.StorageVersionMigrationStatus{ + Conditions: []migrationv1alpha1.MigrationCondition{ + { + Type: migrationv1alpha1.MigrationRunning, + Status: v1.ConditionTrue, + }, + }, + }, + }, + &migrationv1alpha1.StorageVersionMigration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "managedclustersets.cluster.open-cluster-management.io", + }, + Status: migrationv1alpha1.StorageVersionMigrationStatus{ + Conditions: []migrationv1alpha1.MigrationCondition{ + { + Type: migrationv1alpha1.MigrationRunning, + Status: v1.ConditionTrue, + }, + }, + }, + }, + }, + wantErr: false, + want: metav1.Condition{ + Type: MigrationSucceeded, + Status: metav1.ConditionFalse, + }, + }, + { + name: "one migration running, one succeed", + existingObjects: []runtime.Object{ + &migrationv1alpha1.StorageVersionMigration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "managedclustersetbindings.cluster.open-cluster-management.io", + }, + Status: migrationv1alpha1.StorageVersionMigrationStatus{ + Conditions: []migrationv1alpha1.MigrationCondition{ + { + Type: migrationv1alpha1.MigrationSucceeded, + Status: v1.ConditionTrue, + }, + }, + }, + }, + &migrationv1alpha1.StorageVersionMigration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "managedclustersets.cluster.open-cluster-management.io", + }, + Status: migrationv1alpha1.StorageVersionMigrationStatus{ + Conditions: []migrationv1alpha1.MigrationCondition{ + { + Type: migrationv1alpha1.MigrationRunning, + Status: v1.ConditionTrue, + }, + }, + }, + }, + }, + wantErr: false, + want: metav1.Condition{ + Type: MigrationSucceeded, + Status: metav1.ConditionFalse, + }, + }, + { + name: "one migration failed, one succeed", + existingObjects: []runtime.Object{ + &migrationv1alpha1.StorageVersionMigration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "managedclustersetbindings.cluster.open-cluster-management.io", + }, + Status: migrationv1alpha1.StorageVersionMigrationStatus{ + Conditions: []migrationv1alpha1.MigrationCondition{ + { + Type: migrationv1alpha1.MigrationFailed, + Status: v1.ConditionTrue, + }, + }, + }, + }, + &migrationv1alpha1.StorageVersionMigration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "managedclustersets.cluster.open-cluster-management.io", + }, + Status: migrationv1alpha1.StorageVersionMigrationStatus{ + Conditions: []migrationv1alpha1.MigrationCondition{ + { + Type: migrationv1alpha1.MigrationSucceeded, + Status: v1.ConditionTrue, + }, + }, + }, + }, + }, + wantErr: false, + want: metav1.Condition{ + Type: MigrationSucceeded, + Status: metav1.ConditionFalse, + }, + }, + { + name: "all migration succeed", + existingObjects: []runtime.Object{ + &migrationv1alpha1.StorageVersionMigration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "managedclustersetbindings.cluster.open-cluster-management.io", + }, + Status: migrationv1alpha1.StorageVersionMigrationStatus{ + Conditions: []migrationv1alpha1.MigrationCondition{ + { + Type: migrationv1alpha1.MigrationSucceeded, + Status: v1.ConditionTrue, + }, + }, + }, + }, + &migrationv1alpha1.StorageVersionMigration{ + ObjectMeta: metav1.ObjectMeta{ + Name: "managedclustersets.cluster.open-cluster-management.io", + }, + Status: migrationv1alpha1.StorageVersionMigrationStatus{ + Conditions: []migrationv1alpha1.MigrationCondition{ + { + Type: migrationv1alpha1.MigrationSucceeded, + Status: v1.ConditionTrue, + }, + }, + }, + }, + }, + wantErr: false, + want: metav1.Condition{ + Type: MigrationSucceeded, + Status: metav1.ConditionTrue, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fakeMigrationClient := fakemigrationclient.NewSimpleClientset(tt.existingObjects...) + + got, err := syncStorageVersionMigrationsCondition(context.Background(), fakeMigrationClient.MigrationV1alpha1()) + if (err != nil) != tt.wantErr { + t.Errorf("syncStorageVersionMigrationsCondition() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got.Type, tt.want.Type) || !reflect.DeepEqual(got.Status, tt.want.Status) { + t.Errorf("syncStorageVersionMigrationsCondition() = %v, want %v", got, tt.want) + } + }) + } +} + +func TestSync(t *testing.T) { + clusterManager := newClusterManager("testhub") + tc := newTestController(t, clusterManager) + + syncContext := testinghelper.NewFakeSyncContext(t, "testhub") + //Do not support migration + err := tc.sync(context.Background(), syncContext) + if err != nil { + t.Fatalf("Expected no error when sync, %v", err) + } + + clusterManager, err = tc.clusterManagerClient.Get(context.Background(), "testhub", metav1.GetOptions{}) + if err != nil { + t.Fatalf("Expected no error when sync, %v", err) + } + + if notsucceeded := meta.IsStatusConditionFalse(clusterManager.Status.Conditions, MigrationSucceeded); !notsucceeded { + t.Errorf("Error to sync clusterManager.Status.Conditions %v", clusterManager.Status.Conditions) + } + // all resources applied + clusterManager.Status.Conditions = []metav1.Condition{ + { + Type: clusterManagerApplied, + Status: metav1.ConditionTrue, + }, + } + migrateCrd := newCrd(migrationRequestCRDName) + tc = newTestController(t, clusterManager, migrateCrd) + err = tc.sync(context.Background(), syncContext) + if err != nil { + t.Fatalf("Expected no error when sync, %v", err) + } + clusterManager, err = tc.clusterManagerClient.Get(context.Background(), "testhub", metav1.GetOptions{}) + if err != nil { + t.Fatalf("Expected no error when sync, %v", err) + } + if notsucceeded := meta.IsStatusConditionFalse(clusterManager.Status.Conditions, MigrationSucceeded); !notsucceeded { + t.Errorf("Error to sync clusterManager.Status.Conditions %v", clusterManager.Status.Conditions) + } +} + +func newTestController(t *testing.T, clustermanager *operatorapiv1.ClusterManager, crds ...runtime.Object) *crdMigrationController { + fakeOperatorClient := fakeoperatorlient.NewSimpleClientset(clustermanager) + operatorInformers := operatorinformers.NewSharedInformerFactory(fakeOperatorClient, 5*time.Minute) + fakeAPIExtensionClient := fakeapiextensions.NewSimpleClientset(crds...) + fakeMigrationClient := fakemigrationclient.NewSimpleClientset() + + crdMigrationController := &crdMigrationController{ + clusterManagerClient: fakeOperatorClient.OperatorV1().ClusterManagers(), + clusterManagerLister: operatorInformers.Operator().V1().ClusterManagers().Lister(), + recorder: eventstesting.NewTestingEventRecorder(t), + } + crdMigrationController.generateHubClusterClients = func(hubKubeConfig *rest.Config) (apiextensionsclient.Interface, migrationv1alpha1client.StorageVersionMigrationsGetter, error) { + return fakeAPIExtensionClient, fakeMigrationClient.MigrationV1alpha1(), nil + } + store := operatorInformers.Operator().V1().ClusterManagers().Informer().GetStore() + if err := store.Add(clustermanager); err != nil { + t.Fatal(err) + } + + return crdMigrationController +} + +func newClusterManager(name string) *operatorapiv1.ClusterManager { + return &operatorapiv1.ClusterManager{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: operatorapiv1.ClusterManagerSpec{ + RegistrationImagePullSpec: "testregistration", + DeployOption: operatorapiv1.ClusterManagerDeployOption{ + Mode: operatorapiv1.InstallModeDefault, + }, + AddOnManagerConfiguration: &operatorapiv1.AddOnManagerConfiguration{ + Mode: operatorapiv1.ComponentModeTypeEnable, + }, + }, + } +} diff --git a/pkg/operators/clustermanager/options.go b/pkg/operators/clustermanager/options.go index a656048c0..76a077ba6 100644 --- a/pkg/operators/clustermanager/options.go +++ b/pkg/operators/clustermanager/options.go @@ -11,6 +11,7 @@ import ( operatorinformer "open-cluster-management.io/api/client/operator/informers/externalversions" "open-cluster-management.io/registration-operator/pkg/operators/clustermanager/controllers/certrotationcontroller" "open-cluster-management.io/registration-operator/pkg/operators/clustermanager/controllers/clustermanagercontroller" + "open-cluster-management.io/registration-operator/pkg/operators/clustermanager/controllers/crdstatuccontroller" "open-cluster-management.io/registration-operator/pkg/operators/clustermanager/controllers/migrationcontroller" clustermanagerstatuscontroller "open-cluster-management.io/registration-operator/pkg/operators/clustermanager/controllers/statuscontroller" ) @@ -64,6 +65,13 @@ func (o *Options) RunClusterManagerOperator(ctx context.Context, controllerConte controllerContext.EventRecorder) crdMigrationController := migrationcontroller.NewCRDMigrationController( + controllerContext.KubeConfig, + kubeClient, + operatorClient.OperatorV1().ClusterManagers(), + operatorInformer.Operator().V1().ClusterManagers(), + controllerContext.EventRecorder) + + crdStatusController := crdstatuccontroller.NewCRDStatusController( controllerContext.KubeConfig, kubeClient, operatorInformer.Operator().V1().ClusterManagers(), @@ -75,7 +83,7 @@ func (o *Options) RunClusterManagerOperator(ctx context.Context, controllerConte go statusController.Run(ctx, 1) go certRotationController.Run(ctx, 1) go crdMigrationController.Run(ctx, 1) - + go crdStatusController.Run(ctx, 1) <-ctx.Done() return nil } diff --git a/test/e2e/clusterset_test.go b/test/e2e/clusterset_test.go index 56d294625..8f44d5097 100644 --- a/test/e2e/clusterset_test.go +++ b/test/e2e/clusterset_test.go @@ -12,6 +12,11 @@ import ( clusterv1beta2 "open-cluster-management.io/api/cluster/v1beta2" ) +const ( + clustersetCrdName = "managedclustersets.cluster.open-cluster-management.io" + clustersetBindingCrdName = "managedclustersetbindings.cluster.open-cluster-management.io" +) + var _ = ginkgo.Describe("Create v1beta2 managedclusterset", func() { ginkgo.It("Create a v1beta2 labelselector based ManagedClusterSet and get/update/delete with v1beta2 client", func() { ginkgo.By("Create a v1beta2 ManagedClusterSet") @@ -113,4 +118,35 @@ var _ = ginkgo.Describe("Create v1beta2 managedclusterset", func() { err := t.ClusterClient.ClusterV1beta1().ManagedClusterSets().Delete(context.Background(), managedClusterSetName, metav1.DeleteOptions{}) gomega.Expect(err).ToNot(gomega.HaveOccurred()) }) + ginkgo.It("Check if the v1beta1 storageversion is removed from clusterset crd", func() { + gomega.Eventually(func() error { + clustersetCrd, err := t.HubAPIExtensionClient.ApiextensionsV1().CustomResourceDefinitions().Get(context.Background(), clustersetCrdName, metav1.GetOptions{}) + if err != nil { + return err + } + if len(clustersetCrd.Status.StoredVersions) != 1 { + return fmt.Errorf("clustersetCrd.Status.StoredVersions should be v1beta2, but got:%v", clustersetCrd.Status.StoredVersions) + } + if clustersetCrd.Status.StoredVersions[0] != "v1beta2" { + return fmt.Errorf("clustersetCrd.Status.StoredVersions should be v1beta2, but got:%v", clustersetCrd.Status.StoredVersions) + } + return nil + }, t.EventuallyTimeout*5, t.EventuallyInterval*5).Should(gomega.Succeed()) + }) + ginkgo.It("Check if the v1beta1 storageversion is removed from clustersetbinding crd", func() { + gomega.Eventually(func() error { + clustersetBindingCrd, err := t.HubAPIExtensionClient.ApiextensionsV1().CustomResourceDefinitions().Get(context.Background(), clustersetBindingCrdName, metav1.GetOptions{}) + if err != nil { + return err + } + if len(clustersetBindingCrd.Status.StoredVersions) != 1 { + return fmt.Errorf("clustersetBindingCrd.Status.StoredVersions should be v1beta2, but got:%v", clustersetBindingCrd.Status.StoredVersions) + } + if clustersetBindingCrd.Status.StoredVersions[0] != "v1beta2" { + return fmt.Errorf("clustersetBindingCrd.Status.StoredVersions should be v1beta2, but got:%v", clustersetBindingCrd.Status.StoredVersions) + } + return nil + }, t.EventuallyTimeout*5, t.EventuallyInterval*5).Should(gomega.Succeed()) + + }) })