use crd manager to update and clean crds (#297)

* use crd manager to update clean crds

Signed-off-by: Jian Qiu <jqiu@redhat.com>

* Update tests

Signed-off-by: Jian Qiu <jqiu@redhat.com>

* Rebase crd manager

Signed-off-by: Jian Qiu <jqiu@redhat.com>

* build client builder to make code clearer

Signed-off-by: Jian Qiu <jqiu@redhat.com>

* Add ut for crdmanager

Signed-off-by: Jian Qiu <jqiu@redhat.com>

* Compare crd spec when update

Signed-off-by: Jian Qiu <jqiu@redhat.com>

Signed-off-by: Jian Qiu <jqiu@redhat.com>
This commit is contained in:
Jian Qiu
2022-12-16 09:40:46 +08:00
committed by GitHub
parent 4264c7d22c
commit c3baabeec2
25 changed files with 1289 additions and 1160 deletions

View File

@@ -21,15 +21,12 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/version"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
admissionclient "k8s.io/client-go/kubernetes/typed/admissionregistration/v1"
coreclientv1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/rest"
@@ -426,48 +423,14 @@ func ApplyDirectly(
client kubernetes.Interface,
apiExtensionClient apiextensionsclient.Interface,
apiRegistrationClient apiregistrationclient.APIServicesGetter,
dynamicClient dynamic.Interface,
recorder events.Recorder,
cache resourceapply.ResourceCache,
manifests resourceapply.AssetFunc,
files ...string) []resourceapply.ApplyResult {
ret := []resourceapply.ApplyResult{}
noCRDV1beta1Files := []string{}
for _, file := range files {
result := resourceapply.ApplyResult{File: file}
objBytes, err := manifests(file)
if err != nil {
result.Error = fmt.Errorf("missing %q: %v", file, err)
ret = append(ret, result)
continue
}
// apply v1beta1 crd if the object is crd v1beta1, we need to do this by using dynamic client
// since the v1beta1 schema has been removed in kube 1.23.
// TODO remove this logic after we do not support spoke with version lowner than 1.16
requiredObj, _, err := scheme.Codecs.UniversalDecoder().Decode(objBytes, nil, &unstructured.Unstructured{})
if err != nil {
result.Error = fmt.Errorf("cannot unmarshal %q: %v", file, err)
ret = append(ret, result)
continue
}
unstructuredObj := requiredObj.(*unstructured.Unstructured)
if unstructuredObj.GetKind() != "CustomResourceDefinition" {
noCRDV1beta1Files = append(noCRDV1beta1Files, file)
continue
}
if unstructuredObj.GetAPIVersion() != "apiextensions.k8s.io/v1beta1" {
noCRDV1beta1Files = append(noCRDV1beta1Files, file)
continue
}
result.Result, result.Changed, result.Error = applyCRDv1beta1(ctx, dynamicClient, recorder, unstructuredObj)
ret = append(ret, result)
}
genericApplyFiles := []string{}
for _, file := range noCRDV1beta1Files {
for _, file := range files {
result := resourceapply.ApplyResult{File: file}
objBytes, err := manifests(file)
if err != nil {
@@ -523,36 +486,6 @@ func ApplyDirectly(
return ret
}
func applyCRDv1beta1(ctx context.Context, client dynamic.Interface, recorder events.Recorder, required *unstructured.Unstructured) (runtime.Object, bool, error) {
gvr := schema.GroupVersionResource{
Group: "apiextensions.k8s.io",
Version: "v1beta1",
Resource: "customresourcedefinitions",
}
existing, err := client.Resource(gvr).Get(ctx, required.GetName(), metav1.GetOptions{})
if errors.IsNotFound(err) {
actual, createdErr := client.Resource(gvr).Create(ctx, required, metav1.CreateOptions{})
return actual, true, createdErr
}
if err != nil {
return nil, false, err
}
requiredSpec, _, _ := unstructured.NestedMap(required.UnstructuredContent(), "spec")
existingSpec, _, _ := unstructured.NestedMap(existing.UnstructuredContent(), "spec")
if equality.Semantic.DeepEqual(requiredSpec, existingSpec) {
return existing, false, nil
}
required.SetResourceVersion(existing.GetResourceVersion())
actual, err := client.Resource(gvr).Update(ctx, required, metav1.UpdateOptions{})
return actual, true, err
}
// NumOfUnavailablePod is to check if a deployment is in degraded state.
func NumOfUnavailablePod(deployment *appsv1.Deployment) int32 {
desiredReplicas := int32(1)

View File

@@ -25,7 +25,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/diff"
"k8s.io/apimachinery/pkg/util/version"
fakedynamic "k8s.io/client-go/dynamic/fake"
fakekube "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
clienttesting "k8s.io/client-go/testing"
@@ -454,14 +453,6 @@ func TestApplyDirectly(t *testing.T) {
applyFileNames: []string{"crd"},
expectErr: false,
},
{
name: "Apply v1beta1 CRD",
applyFiles: map[string]runtime.Object{
"crd": newUnstructured("apiextensions.k8s.io/v1beta1", "CustomResourceDefinition", "", "", map[string]interface{}{}),
},
applyFileNames: []string{"crd"},
expectErr: false,
},
{
name: "Apply CRD with nil apiExtensionClient",
applyFiles: map[string]runtime.Object{
@@ -494,9 +485,6 @@ func TestApplyDirectly(t *testing.T) {
return json.Marshal(c.applyFiles[name])
}
scheme := runtime.NewScheme()
dynamicClient := fakedynamic.NewSimpleDynamicClient(scheme)
cache := resourceapply.NewResourceCache()
var results []resourceapply.ApplyResult
switch {
@@ -504,7 +492,6 @@ func TestApplyDirectly(t *testing.T) {
results = ApplyDirectly(
context.TODO(),
fakeKubeClient, nil, nil,
dynamicClient,
eventstesting.NewTestingEventRecorder(t),
cache,
fakeApplyFunc,
@@ -514,7 +501,6 @@ func TestApplyDirectly(t *testing.T) {
results = ApplyDirectly(
context.TODO(),
fakeKubeClient, nil, fakeResgistrationClient,
dynamicClient,
eventstesting.NewTestingEventRecorder(t),
cache,
fakeApplyFunc,
@@ -524,7 +510,6 @@ func TestApplyDirectly(t *testing.T) {
results = ApplyDirectly(
context.TODO(),
fakeKubeClient, fakeExtensionClient, nil,
dynamicClient,
eventstesting.NewTestingEventRecorder(t),
cache,
fakeApplyFunc,
@@ -534,7 +519,6 @@ func TestApplyDirectly(t *testing.T) {
results = ApplyDirectly(
context.TODO(),
fakeKubeClient, fakeExtensionClient, fakeResgistrationClient,
dynamicClient,
eventstesting.NewTestingEventRecorder(t),
cache,
fakeApplyFunc,

View File

@@ -244,6 +244,11 @@ func (n *clusterManagerController) sync(ctx context.Context, controllerContext f
// Update status
var conds []metav1.Condition = []metav1.Condition{featureGateCondition}
if cond := meta.FindStatusCondition(clusterManager.Status.Conditions, clusterManagerProgressing); cond != nil {
conds = append(conds, *cond)
}
if len(errs) == 0 {
conds = append(conds, metav1.Condition{
Type: clusterManagerApplied,
@@ -251,12 +256,26 @@ func (n *clusterManagerController) sync(ctx context.Context, controllerContext f
Reason: "ClusterManagerApplied",
Message: "Components of cluster manager are applied",
})
} else if cond := meta.FindStatusCondition(clusterManager.Status.Conditions, clusterManagerApplied); cond != nil {
conds = append(conds, *cond)
}
} else {
if cond := meta.FindStatusCondition(clusterManager.Status.Conditions, clusterManagerApplied); cond != nil {
conds = append(conds, *cond)
}
if cond := meta.FindStatusCondition(clusterManager.Status.Conditions, clusterManagerProgressing); cond != nil {
conds = append(conds, *cond)
// When appliedCondition is false, we should not update related resources and resource generations
_, updated, updatedErr := helpers.UpdateClusterManagerStatus(
ctx, n.clusterManagerClient, clusterManager.Name,
helpers.UpdateClusterManagerConditionFn(conds...),
func(oldStatus *operatorapiv1.ClusterManagerStatus) error {
oldStatus.ObservedGeneration = clusterManager.Generation
return nil
},
)
if updated {
return updatedErr
}
return utilerrors.NewAggregate(errs)
}
_, _, updatedErr := helpers.UpdateClusterManagerStatus(

View File

@@ -376,7 +376,7 @@ func TestDeleteCRD(t *testing.T) {
clusterManager.ObjectMeta.SetDeletionTimestamp(&now)
crd := &apiextensionsv1.CustomResourceDefinition{
ObjectMeta: metav1.ObjectMeta{
Name: crdNames[0],
Name: "clustermanagementaddons.addon.open-cluster-management.io",
},
}
@@ -391,7 +391,7 @@ func TestDeleteCRD(t *testing.T) {
return true, crd, nil
}
return true, &apiextensionsv1.CustomResourceDefinition{}, errors.NewNotFound(
apiextensionsv1.Resource("customresourcedefinitions"), crdNames[0])
apiextensionsv1.Resource("customresourcedefinitions"), "clustermanagementaddons.addon.open-cluster-management.io")
})
syncContext := testinghelper.NewFakeSyncContext(t, "testhub")

View File

@@ -10,19 +10,19 @@ import (
"github.com/openshift/library-go/pkg/assets"
"github.com/openshift/library-go/pkg/operator/events"
"github.com/openshift/library-go/pkg/operator/resource/resourceapply"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/klog/v2"
operatorapiv1 "open-cluster-management.io/api/operator/v1"
"open-cluster-management.io/registration-operator/manifests"
"open-cluster-management.io/registration-operator/pkg/helpers"
"open-cluster-management.io/registration-operator/pkg/operators/clustermanager/controllers/migrationcontroller"
"open-cluster-management.io/registration-operator/pkg/operators/crdmanager"
"reflect"
migrationclient "sigs.k8s.io/kube-storage-version-migrator/pkg/clients/clientset/typed/migration/v1alpha1"
"time"
)
var (
@@ -79,15 +79,12 @@ func (c *crdReconcile) reconcile(ctx context.Context, cm *operatorapiv1.ClusterM
return cm, reconcileStop, err
}
var appliedErrs []error
resourceResults := helpers.ApplyDirectly(
ctx,
nil,
c.hubAPIExtensionClient,
nil,
nil,
c.recorder,
c.cache,
crdManager := crdmanager.NewManager[*apiextensionsv1.CustomResourceDefinition](
c.hubAPIExtensionClient.ApiextensionsV1().CustomResourceDefinitions(),
crdmanager.EqualV1,
)
if err := crdManager.Apply(ctx,
func(name string) ([]byte, error) {
template, err := manifests.ClusterManagerManifestFiles.ReadFile(name)
if err != nil {
@@ -97,60 +94,48 @@ func (c *crdReconcile) reconcile(ctx context.Context, cm *operatorapiv1.ClusterM
helpers.SetRelatedResourcesStatusesWithObj(&cm.Status.RelatedResources, objData)
return objData, nil
},
hubCRDResourceFiles...,
)
for _, result := range resourceResults {
if result.Error != nil {
appliedErrs = append(appliedErrs, fmt.Errorf("%q (%T): %v", result.File, result.Type, result.Error))
}
}
if len(appliedErrs) > 0 {
hubCRDResourceFiles...); err != nil {
meta.SetStatusCondition(&cm.Status.Conditions, metav1.Condition{
Type: clusterManagerApplied,
Status: metav1.ConditionFalse,
Reason: "CRDApplyFaild",
Message: fmt.Sprintf("Failed to apply crd: %v", utilerrors.NewAggregate(appliedErrs)),
Message: fmt.Sprintf("Failed to apply crd: %v", err),
})
return cm, reconcileStop, utilerrors.NewAggregate(appliedErrs)
return cm, reconcileStop, err
}
return cm, reconcileContinue, nil
}
func (c *crdReconcile) clean(ctx context.Context, cm *operatorapiv1.ClusterManager, config manifests.HubConfig) (*operatorapiv1.ClusterManager, reconcileState, error) {
if c.skipRemoveCRDs {
return cm, reconcileContinue, nil
}
// Remove crd
crdManager := crdmanager.NewManager[*apiextensionsv1.CustomResourceDefinition](
c.hubAPIExtensionClient.ApiextensionsV1().CustomResourceDefinitions(),
crdmanager.EqualV1,
)
// Remove crds in order at first
for _, name := range crdNames {
err := c.removeCRD(ctx, name)
if err != nil {
// TODO add condition
if err := crdManager.CleanOne(ctx, name, c.skipRemoveCRDs); err != nil {
return cm, reconcileStop, err
}
c.recorder.Eventf("CRDDeleted", "crd %s is deleted", name)
}
if c.skipRemoveCRDs {
return cm, reconcileContinue, nil
}
for _, file := range hubCRDResourceFiles {
err := helpers.CleanUpStaticObject(
ctx,
nil,
c.hubAPIExtensionClient,
nil,
func(name string) ([]byte, error) {
template, err := manifests.ClusterManagerManifestFiles.ReadFile(name)
if err != nil {
return nil, err
}
return assets.MustCreateAssetFromTemplate(name, template, config).Data, nil
},
file,
)
if err != nil {
// TODO add condition
return cm, reconcileContinue, err
}
if err := crdManager.Clean(ctx, c.skipRemoveCRDs,
func(name string) ([]byte, error) {
template, err := manifests.ClusterManagerManifestFiles.ReadFile(name)
if err != nil {
return nil, err
}
objData := assets.MustCreateAssetFromTemplate(name, template, config).Data
helpers.SetRelatedResourcesStatusesWithObj(&cm.Status.RelatedResources, objData)
return objData, nil
},
hubCRDResourceFiles...); err != nil {
return cm, reconcileStop, err
}
return cm, reconcileContinue, nil
@@ -200,26 +185,3 @@ func (c *crdReconcile) updateStoredVersion(ctx context.Context) error {
return nil
}
// removeCRD removes crd, and check if crd resource is removed. Since the related cr is still being deleted,
// it will check the crd existence after deletion, and only return nil when crd is not found.
func (c *crdReconcile) removeCRD(ctx context.Context, name string) error {
err := c.hubAPIExtensionClient.ApiextensionsV1().CustomResourceDefinitions().Delete(
ctx, name, metav1.DeleteOptions{})
switch {
case errors.IsNotFound(err):
return nil
case err != nil:
return err
}
_, err = c.hubAPIExtensionClient.ApiextensionsV1().CustomResourceDefinitions().Get(ctx, name, metav1.GetOptions{})
switch {
case errors.IsNotFound(err):
return nil
case err != nil:
return err
}
return helpers.NewRequeueError(fmt.Sprintf("crd %s is still deleting", name), 3*time.Second)
}

View File

@@ -81,7 +81,6 @@ func (c *hubReoncile) reconcile(ctx context.Context, cm *operatorapiv1.ClusterMa
c.hubKubeClient,
nil,
nil,
nil,
c.recorder,
c.cache,
func(name string) ([]byte, error) {

View File

@@ -68,7 +68,7 @@ func (c *runtimeReconcile) reconcile(ctx context.Context, cm *operatorapiv1.Clus
var appliedErrs []error
resourceResults := helpers.ApplyDirectly(
ctx,
c.kubeClient, nil, nil, nil,
c.kubeClient, nil, nil,
c.recorder,
c.cache,
func(name string) ([]byte, error) {

View File

@@ -56,7 +56,6 @@ func (c *webhookReconcile) reconcile(ctx context.Context, cm *operatorapiv1.Clus
c.hubKubeClient,
nil,
nil,
nil,
c.recorder,
c.cache,
func(name string) ([]byte, error) {

View File

@@ -0,0 +1,292 @@
package crdmanager
import (
"context"
"errors"
"fmt"
"github.com/openshift/library-go/pkg/operator/resource/resourcemerge"
"k8s.io/utils/pointer"
"strings"
"github.com/openshift/library-go/pkg/operator/resource/resourceapply"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
versionutil "k8s.io/apimachinery/pkg/util/version"
"k8s.io/klog/v2"
"open-cluster-management.io/registration-operator/pkg/version"
)
// versionAnnotationKey is an annotation key on crd resources to mark the ocm version of the crds.
const (
versionAnnotationKey = "operator.open-cluster-management.io/version"
// defaultVersion is set if gitVersion cannot be obtained. It is the lownest version so crd is updated as long
// as it has a higher version. It also ensures the crd spec is still compared
// for update when version is not obtained.
defaultVersion = "0.0.0"
)
var (
genericScheme = runtime.NewScheme()
genericCodecs = serializer.NewCodecFactory(genericScheme)
genericCodec = genericCodecs.UniversalDeserializer()
)
func init() {
utilruntime.Must(apiextensionsv1.AddToScheme(genericScheme))
utilruntime.Must(apiextensionsv1beta1.AddToScheme(genericScheme))
}
type CRD interface {
*apiextensionsv1.CustomResourceDefinition | *apiextensionsv1beta1.CustomResourceDefinition
}
type Manager[T CRD] struct {
client crdClient[T]
equal func(old, new T) bool
version *versionutil.Version
}
type crdClient[T CRD] interface {
Get(ctx context.Context, name string, opt metav1.GetOptions) (T, error)
Create(ctx context.Context, obj T, opt metav1.CreateOptions) (T, error)
Update(ctx context.Context, obj T, opt metav1.UpdateOptions) (T, error)
Delete(ctx context.Context, name string, opt metav1.DeleteOptions) error
}
type RemainingCRDError struct {
RemainingCRDs []string
}
func (r *RemainingCRDError) Error() string {
return fmt.Sprintf("Thera are still reaming CRDs: %s", strings.Join(r.RemainingCRDs, ","))
}
func NewManager[T CRD](client crdClient[T], equalFunc func(old, new T) bool) *Manager[T] {
gitVersion := version.Get().GitVersion
if len(gitVersion) == 0 {
gitVersion = defaultVersion
}
v, err := versionutil.ParseGeneric(gitVersion)
if err != nil {
utilruntime.HandleError(err)
}
manager := &Manager[T]{
client: client,
equal: equalFunc,
version: v,
}
return manager
}
func (m *Manager[T]) CleanOne(ctx context.Context, name string, skip bool) error {
// remove version annotation if skip clean
if skip {
existing, err := m.client.Get(ctx, name, metav1.GetOptions{})
switch {
case apierrors.IsNotFound(err):
return nil
case err != nil:
return err
}
accessor, err := meta.Accessor(existing)
if err != nil {
return err
}
annotations := accessor.GetAnnotations()
if annotations == nil {
return nil
}
v, ok := annotations[versionAnnotationKey]
if !ok {
return nil
}
cnt, err := m.version.Compare(v)
if err != nil {
return err
}
if cnt != 0 {
return nil
}
delete(annotations, versionAnnotationKey)
accessor.SetAnnotations(annotations)
_, err = m.client.Update(ctx, existing, metav1.UpdateOptions{})
return err
}
err := m.client.Delete(ctx, name, metav1.DeleteOptions{})
switch {
case apierrors.IsNotFound(err):
return nil
case err == nil:
return &RemainingCRDError{RemainingCRDs: []string{name}}
}
return err
}
func (m *Manager[T]) Clean(ctx context.Context, skip bool, manifests resourceapply.AssetFunc, files ...string) error {
var errs []error
var remainingCRDs []string
for _, file := range files {
objBytes, err := manifests(file)
if err != nil {
errs = append(errs, err)
continue
}
requiredObj, _, err := genericCodec.Decode(objBytes, nil, nil)
if err != nil {
errs = append(errs, err)
continue
}
accessor, err := meta.Accessor(requiredObj)
if err != nil {
return err
}
err = m.CleanOne(ctx, accessor.GetName(), skip)
var remainingErr *RemainingCRDError
switch {
case errors.As(err, &remainingErr):
remainingCRDs = append(remainingCRDs, accessor.GetName())
case err != nil:
errs = append(errs, err)
}
}
if len(errs) > 0 {
return utilerrors.NewAggregate(errs)
}
if len(remainingCRDs) > 0 {
return &RemainingCRDError{RemainingCRDs: remainingCRDs}
}
return nil
}
func (m *Manager[T]) Apply(ctx context.Context, manifests resourceapply.AssetFunc, files ...string) error {
var errs []error
for _, file := range files {
objBytes, err := manifests(file)
if err != nil {
errs = append(errs, err)
continue
}
requiredObj, _, err := genericCodec.Decode(objBytes, nil, nil)
if err != nil {
errs = append(errs, err)
continue
}
err = m.applyOne(ctx, requiredObj.(T))
if err != nil {
errs = append(errs, err)
}
}
return utilerrors.NewAggregate(errs)
}
func (m *Manager[T]) applyOne(ctx context.Context, required T) error {
accessor, err := meta.Accessor(required)
if err != nil {
return err
}
existing, err := m.client.Get(ctx, accessor.GetName(), metav1.GetOptions{})
if apierrors.IsNotFound(err) {
_, err := m.client.Create(ctx, required, metav1.CreateOptions{})
klog.Infof("crd %s is created", accessor.GetName())
return err
}
if err != nil {
return err
}
ok, err := m.shouldUpdate(existing, required)
if err != nil {
return err
}
if !ok {
return nil
}
existingAccessor, err := meta.Accessor(existing)
if err != nil {
return err
}
annotations := accessor.GetAnnotations()
if annotations == nil {
annotations = map[string]string{}
}
annotations[versionAnnotationKey] = m.version.String()
accessor.SetAnnotations(annotations)
accessor.SetResourceVersion(existingAccessor.GetResourceVersion())
_, err = m.client.Update(ctx, required, metav1.UpdateOptions{})
if err != nil {
return err
}
klog.Infof("crd %s is updated to version %s", accessor.GetName(), m.version.String())
return nil
}
func (m *Manager[T]) shouldUpdate(old, new T) (bool, error) {
// if existingVersion is higher than the required version, do not update crd.
accessor, err := meta.Accessor(old)
if err != nil {
return false, err
}
var existingVersion string
if accessor.GetAnnotations() != nil {
existingVersion = accessor.GetAnnotations()[versionAnnotationKey]
}
// alwasy update if existing doest not have version annotation
if len(existingVersion) == 0 {
return true, nil
}
cnt, err := m.version.Compare(existingVersion)
if err != nil {
return false, err
}
// if the version are the same, compare the spec
if cnt == 0 {
return !m.equal(old, new), nil
}
// do not update when version is higher
return cnt > 0, nil
}
func EqualV1(old, new *apiextensionsv1.CustomResourceDefinition) bool {
modified := pointer.Bool(false)
resourcemerge.EnsureCustomResourceDefinitionV1(modified, old, *new)
return !*modified
}
func EqualV1Beta1(old, new *apiextensionsv1beta1.CustomResourceDefinition) bool {
modified := pointer.Bool(false)
resourcemerge.EnsureCustomResourceDefinitionV1Beta1(modified, old, *new)
return !*modified
}

View File

@@ -0,0 +1,351 @@
/*
* Copyright 2022 Contributors to the Open Cluster Management project
*/
package crdmanager
import (
"context"
"encoding/json"
"fmt"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
fakeapiextensions "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
versionutil "k8s.io/apimachinery/pkg/util/version"
clienttesting "k8s.io/client-go/testing"
testinghelpers "open-cluster-management.io/registration-operator/pkg/helpers/testing"
"strconv"
"testing"
)
func TestApplyV1CRD(t *testing.T) {
cases := []struct {
name string
desiredVersion string
requiredCRDs []runtime.Object
existingCRDs []runtime.Object
verify func(t *testing.T, actions []clienttesting.Action)
}{
{
name: "create crd",
desiredVersion: "v0.9.0",
requiredCRDs: []runtime.Object{newV1CRD("foo", "")},
existingCRDs: []runtime.Object{},
verify: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 2 {
t.Fatalf("actions are not expected: %v", actions)
}
testinghelpers.AssertAction(t, actions[1], "create")
},
},
{
name: "update crd",
desiredVersion: "v0.9.0-16-g889bd8b",
requiredCRDs: []runtime.Object{newV1CRD("foo", "")},
existingCRDs: []runtime.Object{newV1CRD("foo", "v0.8.0")},
verify: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 2 {
t.Fatalf("actions are not expected: %v", actions)
}
testinghelpers.AssertAction(t, actions[1], "update")
obj := actions[1].(clienttesting.UpdateActionImpl).Object
assertCRDVersion(t, obj, "0.9.0-16-g889bd8b")
},
},
{
name: "update crd from none",
desiredVersion: "v0.9.0-16-g889bd8b",
requiredCRDs: []runtime.Object{newV1CRD("foo", "")},
existingCRDs: []runtime.Object{newV1CRD("foo", "")},
verify: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 2 {
t.Fatalf("actions are not expected: %v", actions)
}
testinghelpers.AssertAction(t, actions[1], "update")
obj := actions[1].(clienttesting.UpdateActionImpl).Object
assertCRDVersion(t, obj, "0.9.0-16-g889bd8b")
},
},
{
name: "noop crd",
desiredVersion: "v0.8.0-16-g889bd8b",
requiredCRDs: []runtime.Object{newV1CRD("foo", "")},
existingCRDs: []runtime.Object{newV1CRD("foo", "v0.9.0")},
verify: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 1 {
t.Fatalf("actions are not expected: %v", actions)
}
testinghelpers.AssertAction(t, actions[0], "get")
},
},
{
name: "crd version equals",
desiredVersion: "0.0.0",
requiredCRDs: []runtime.Object{newV1CRD("foo", "")},
existingCRDs: []runtime.Object{newV1CRD("foo", "0.0.0")},
verify: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 1 {
t.Fatalf("actions are not expected: %v", actions)
}
testinghelpers.AssertAction(t, actions[0], "get")
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
client := fakeapiextensions.NewSimpleClientset(c.existingCRDs...)
manager := NewManager[*apiextensionsv1.CustomResourceDefinition](client.ApiextensionsV1().CustomResourceDefinitions(), EqualV1)
v, _ := versionutil.ParseSemantic(c.desiredVersion)
manager.version = v
var indices []string
for i := range c.requiredCRDs {
indices = append(indices, fmt.Sprintf("%d", i))
}
err := manager.Apply(context.TODO(), func(index string) ([]byte, error) {
i, _ := strconv.Atoi(index)
return json.Marshal(c.requiredCRDs[i])
}, indices...)
if err != nil {
t.Errorf("apply error: %v", err)
}
c.verify(t, client.Actions())
})
}
}
func TestApplyV1Beta1CRD(t *testing.T) {
cases := []struct {
name string
desiredVersion string
requiredCRDs []runtime.Object
existingCRDs []runtime.Object
verify func(t *testing.T, actions []clienttesting.Action)
}{
{
name: "create crd",
desiredVersion: "v0.9.0",
requiredCRDs: []runtime.Object{newV1Beta1CRD("foo", "")},
existingCRDs: []runtime.Object{},
verify: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 2 {
t.Fatalf("actions are not expected: %v", actions)
}
testinghelpers.AssertAction(t, actions[1], "create")
},
},
{
name: "update crd",
desiredVersion: "v0.9.0-16-g889bd8b",
requiredCRDs: []runtime.Object{newV1Beta1CRD("foo", "")},
existingCRDs: []runtime.Object{newV1Beta1CRD("foo", "v0.8.0")},
verify: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 2 {
t.Fatalf("actions are not expected: %v", actions)
}
testinghelpers.AssertAction(t, actions[1], "update")
obj := actions[1].(clienttesting.UpdateActionImpl).Object
assertCRDVersion(t, obj, "0.9.0-16-g889bd8b")
},
},
{
name: "update crd from none",
desiredVersion: "v0.9.0-16-g889bd8b",
requiredCRDs: []runtime.Object{newV1Beta1CRD("foo", "")},
existingCRDs: []runtime.Object{newV1Beta1CRD("foo", "")},
verify: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 2 {
t.Fatalf("actions are not expected: %v", actions)
}
testinghelpers.AssertAction(t, actions[1], "update")
obj := actions[1].(clienttesting.UpdateActionImpl).Object
assertCRDVersion(t, obj, "0.9.0-16-g889bd8b")
},
},
{
name: "noop crd",
desiredVersion: "v0.8.0-16-g889bd8b",
requiredCRDs: []runtime.Object{newV1Beta1CRD("foo", "")},
existingCRDs: []runtime.Object{newV1Beta1CRD("foo", "v0.9.0")},
verify: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 1 {
t.Fatalf("actions are not expected: %v", actions)
}
testinghelpers.AssertAction(t, actions[0], "get")
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
client := fakeapiextensions.NewSimpleClientset(c.existingCRDs...)
manager := NewManager[*apiextensionsv1beta1.CustomResourceDefinition](client.ApiextensionsV1beta1().CustomResourceDefinitions(), EqualV1Beta1)
v, _ := versionutil.ParseSemantic(c.desiredVersion)
manager.version = v
var indices []string
for i := range c.requiredCRDs {
indices = append(indices, fmt.Sprintf("%d", i))
}
err := manager.Apply(context.TODO(), func(index string) ([]byte, error) {
i, _ := strconv.Atoi(index)
return json.Marshal(c.requiredCRDs[i])
}, indices...)
if err != nil {
t.Errorf("apply error: %v", err)
}
c.verify(t, client.Actions())
})
}
}
func TestClean(t *testing.T) {
cases := []struct {
name string
desiredVersion string
skip bool
expectErr bool
requiredCRDs []runtime.Object
existingCRDs []runtime.Object
verify func(t *testing.T, actions []clienttesting.Action)
}{
{
name: "delete crd",
desiredVersion: "v0.9.0",
requiredCRDs: []runtime.Object{newV1CRD("foo", "")},
existingCRDs: []runtime.Object{},
verify: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 1 {
t.Fatalf("actions are not expected: %v", actions)
}
testinghelpers.AssertAction(t, actions[0], "delete")
},
},
{
name: "delete existing crd",
desiredVersion: "v0.9.0",
expectErr: true,
requiredCRDs: []runtime.Object{newV1CRD("foo", "")},
existingCRDs: []runtime.Object{newV1CRD("foo", "")},
verify: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 1 {
t.Fatalf("actions are not expected: %v", actions)
}
testinghelpers.AssertAction(t, actions[0], "delete")
},
},
{
name: "skip delete existing crd",
desiredVersion: "v0.9.0",
skip: true,
requiredCRDs: []runtime.Object{newV1CRD("foo", "")},
existingCRDs: []runtime.Object{newV1CRD("foo", "0.9.0")},
verify: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 2 {
t.Fatalf("actions are not expected: %v", actions)
}
testinghelpers.AssertAction(t, actions[1], "update")
obj := actions[1].(clienttesting.UpdateActionImpl).Object
accessor, _ := meta.Accessor(obj)
if len(accessor.GetAnnotations()) != 0 {
t.Errorf("annotation should be cleaned")
}
},
},
{
name: "skip delete existing crd not owned",
desiredVersion: "v0.9.0",
skip: true,
requiredCRDs: []runtime.Object{newV1CRD("foo", "")},
existingCRDs: []runtime.Object{newV1CRD("foo", "0.10.0")},
verify: func(t *testing.T, actions []clienttesting.Action) {
if len(actions) != 1 {
t.Fatalf("actions are not expected: %v", actions)
}
testinghelpers.AssertAction(t, actions[0], "get")
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
client := fakeapiextensions.NewSimpleClientset(c.existingCRDs...)
manager := NewManager[*apiextensionsv1.CustomResourceDefinition](client.ApiextensionsV1().CustomResourceDefinitions(), EqualV1)
v, _ := versionutil.ParseSemantic(c.desiredVersion)
manager.version = v
var indices []string
for i := range c.requiredCRDs {
indices = append(indices, fmt.Sprintf("%d", i))
}
err := manager.Clean(context.TODO(), c.skip, func(index string) ([]byte, error) {
i, _ := strconv.Atoi(index)
return json.Marshal(c.requiredCRDs[i])
}, indices...)
if c.expectErr && err == nil {
t.Errorf("should have err")
}
if !c.expectErr && err != nil {
t.Errorf("apply error: %v", err)
}
c.verify(t, client.Actions())
})
}
}
func newV1Beta1CRD(name, version string) *apiextensionsv1beta1.CustomResourceDefinition {
crd := &apiextensionsv1beta1.CustomResourceDefinition{
TypeMeta: metav1.TypeMeta{
APIVersion: "apiextensions.k8s.io/v1beta1",
Kind: "CustomResourceDefinition",
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
}
if len(version) > 0 {
crd.Annotations = map[string]string{versionAnnotationKey: version}
}
return crd
}
func newV1CRD(name, version string) *apiextensionsv1.CustomResourceDefinition {
crd := &apiextensionsv1.CustomResourceDefinition{
TypeMeta: metav1.TypeMeta{
APIVersion: "apiextensions.k8s.io/v1",
Kind: "CustomResourceDefinition",
},
ObjectMeta: metav1.ObjectMeta{
Name: name,
},
Spec: apiextensionsv1.CustomResourceDefinitionSpec{
Conversion: &apiextensionsv1.CustomResourceConversion{
Strategy: apiextensionsv1.NoneConverter,
},
},
}
if len(version) > 0 {
crd.Annotations = map[string]string{versionAnnotationKey: version}
}
return crd
}
func assertCRDVersion(t *testing.T, obj interface{}, version string) {
accessor, _ := meta.Accessor(obj)
annotation := accessor.GetAnnotations()
if len(annotation) == 0 {
t.Fatalf("Expect a version annotation but got none")
}
if annotation[versionAnnotationKey] != version {
t.Errorf("Expect version %s, but got %s", version, annotation[versionAnnotationKey])
}
}

View File

@@ -0,0 +1,103 @@
/*
* Copyright 2022 Contributors to the Open Cluster Management project
*/
package klusterletcontroller
import (
"context"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
workclientset "open-cluster-management.io/api/client/work/clientset/versioned"
workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1"
operatorapiv1 "open-cluster-management.io/api/operator/v1"
)
type managedClusterClientsBuilderInterface interface {
withMode(mode operatorapiv1.InstallMode) managedClusterClientsBuilderInterface
withKubeConfigSecret(namespace, name string) managedClusterClientsBuilderInterface
build(ctx context.Context) (*managedClusterClients, error)
}
// managedClusterClients holds variety of kube client for managed cluster
type managedClusterClients struct {
kubeClient kubernetes.Interface
apiExtensionClient apiextensionsclient.Interface
appliedManifestWorkClient workv1client.AppliedManifestWorkInterface
// Only used for Hosted mode to generate managed cluster kubeconfig
// with minimum permission for registration and work.
kubeconfig *rest.Config
}
type managedClusterClientsBuilder struct {
kubeClient kubernetes.Interface
apiExtensionClient apiextensionsclient.Interface
appliedManifestWorkClient workv1client.AppliedManifestWorkInterface
mode operatorapiv1.InstallMode
secretNamespace string
secretName string
}
func newManagedClusterClientsBuilder(
kubeClient kubernetes.Interface,
apiExtensionClient apiextensionsclient.Interface,
appliedManifestWorkClient workv1client.AppliedManifestWorkInterface,
) *managedClusterClientsBuilder {
return &managedClusterClientsBuilder{
kubeClient: kubeClient,
apiExtensionClient: apiExtensionClient,
appliedManifestWorkClient: appliedManifestWorkClient,
}
}
func (m *managedClusterClientsBuilder) withMode(mode operatorapiv1.InstallMode) managedClusterClientsBuilderInterface {
m.mode = mode
return m
}
func (m *managedClusterClientsBuilder) withKubeConfigSecret(namespace, name string) managedClusterClientsBuilderInterface {
m.secretNamespace = namespace
m.secretName = name
return m
}
func (m *managedClusterClientsBuilder) build(ctx context.Context) (*managedClusterClients, error) {
if m.mode != operatorapiv1.InstallModeHosted {
return &managedClusterClients{
kubeClient: m.kubeClient,
apiExtensionClient: m.apiExtensionClient,
appliedManifestWorkClient: m.appliedManifestWorkClient,
}, nil
}
// Ensure the agent namespace for users to create the external-managed-kubeconfig secret in this
// namespace, so that in the next reconcile loop the controller can get the secret successfully after
// the secret was created.
if err := ensureAgentNamespace(ctx, m.kubeClient, m.secretNamespace); err != nil {
return nil, err
}
managedKubeConfig, err := getManagedKubeConfig(ctx, m.kubeClient, m.secretNamespace, m.secretName)
if err != nil {
return nil, err
}
clients := &managedClusterClients{
kubeconfig: managedKubeConfig,
}
if clients.kubeClient, err = kubernetes.NewForConfig(managedKubeConfig); err != nil {
return nil, err
}
if clients.apiExtensionClient, err = apiextensionsclient.NewForConfig(managedKubeConfig); err != nil {
return nil, err
}
workClient, err := workclientset.NewForConfig(managedKubeConfig)
if err != nil {
return nil, err
}
clients.appliedManifestWorkClient = workClient.WorkV1().AppliedManifestWorks()
return clients, nil
}

View File

@@ -2,29 +2,23 @@ package klusterletcontroller
import (
"context"
"crypto/sha256"
"fmt"
"github.com/openshift/library-go/pkg/assets"
"open-cluster-management.io/registration-operator/manifests"
"reflect"
"strings"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/version"
"k8s.io/client-go/dynamic"
appsinformer "k8s.io/client-go/informers/apps/v1"
coreinformer "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/klog/v2"
"open-cluster-management.io/registration-operator/manifests"
"reflect"
"github.com/openshift/library-go/pkg/controller/factory"
"github.com/openshift/library-go/pkg/operator/events"
operatorhelpers "github.com/openshift/library-go/pkg/operator/v1helpers"
operatorv1client "open-cluster-management.io/api/client/operator/clientset/versioned/typed/operator/v1"
operatorinformer "open-cluster-management.io/api/client/operator/informers/externalversions/operator/v1"
operatorlister "open-cluster-management.io/api/client/operator/listers/operator/v1"
@@ -34,28 +28,18 @@ import (
)
type klusterletCleanupController struct {
klusterletClient operatorv1client.KlusterletInterface
klusterletLister operatorlister.KlusterletLister
kubeClient kubernetes.Interface
apiExtensionClient apiextensionsclient.Interface
dynamicClient dynamic.Interface
appliedManifestWorkClient workv1client.AppliedManifestWorkInterface
kubeVersion *version.Version
operatorNamespace string
// buildManagedClusterClientsHostedMode build clients for the managed cluster in hosted mode,
// this can be overridden for testing
buildManagedClusterClientsHostedMode func(
ctx context.Context,
kubeClient kubernetes.Interface,
namespace, secret string) (*managedClusterClients, error)
klusterletClient operatorv1client.KlusterletInterface
klusterletLister operatorlister.KlusterletLister
kubeClient kubernetes.Interface
kubeVersion *version.Version
operatorNamespace string
managedClusterClientsBuilder managedClusterClientsBuilderInterface
}
// NewKlusterletCleanupController construct klusterlet cleanup controller
func NewKlusterletCleanupController(
kubeClient kubernetes.Interface,
apiExtensionClient apiextensionsclient.Interface,
dynamicClient dynamic.Interface,
klusterletClient operatorv1client.KlusterletInterface,
klusterletInformer operatorinformer.KlusterletInformer,
secretInformer coreinformer.SecretInformer,
@@ -65,15 +49,12 @@ func NewKlusterletCleanupController(
operatorNamespace string,
recorder events.Recorder) factory.Controller {
controller := &klusterletCleanupController{
kubeClient: kubeClient,
apiExtensionClient: apiExtensionClient,
dynamicClient: dynamicClient,
klusterletClient: klusterletClient,
klusterletLister: klusterletInformer.Lister(),
appliedManifestWorkClient: appliedManifestWorkClient,
kubeVersion: kubeVersion,
operatorNamespace: operatorNamespace,
buildManagedClusterClientsHostedMode: buildManagedClusterClientsFromSecret,
kubeClient: kubeClient,
klusterletClient: klusterletClient,
klusterletLister: klusterletInformer.Lister(),
kubeVersion: kubeVersion,
operatorNamespace: operatorNamespace,
managedClusterClientsBuilder: newManagedClusterClientsBuilder(kubeClient, apiExtensionClient, appliedManifestWorkClient),
}
return factory.New().WithSync(controller.sync).
@@ -98,14 +79,13 @@ func (n *klusterletCleanupController) sync(ctx context.Context, controllerContex
return err
}
klusterlet = klusterlet.DeepCopy()
installMode := klusterlet.Spec.DeployOption.Mode
if klusterlet.DeletionTimestamp.IsZero() {
if !hasFinalizer(klusterlet, klusterletFinalizer) {
return n.addFinalizer(ctx, klusterlet, klusterletFinalizer)
}
if !hasFinalizer(klusterlet, klusterletHostedFinalizer) && readyToAddHostedFinalizer(klusterlet, installMode) {
if !hasFinalizer(klusterlet, klusterletHostedFinalizer) && readyToAddHostedFinalizer(klusterlet, klusterlet.Spec.DeployOption.Mode) {
// the external managed kubeconfig secret is ready, there will be some resources applied on the managed
// cluster, add hosted finalizer here to indicate these resources should be cleaned up when deleting the
// klusterlet
@@ -116,13 +96,6 @@ func (n *klusterletCleanupController) sync(ctx context.Context, controllerContex
}
// Klusterlet is deleting, we remove its related resources on managed and management cluster
skip := skipCleanupManagedClusterResources(klusterlet, installMode)
if !skip && !readyToOperateManagedClusterResources(klusterlet, installMode) {
// wait for the external managed kubeconfig to exist to clean up resources on the managed cluster
return nil
}
config := klusterletConfig{
KlusterletName: klusterlet.Name,
KlusterletNamespace: helpers.KlusterletNamespace(klusterlet),
@@ -139,151 +112,77 @@ func (n *klusterletCleanupController) sync(ctx context.Context, controllerContex
ExternalManagedKubeConfigSecret: helpers.ExternalManagedKubeConfig,
ExternalManagedKubeConfigRegistrationSecret: helpers.ExternalManagedKubeConfigRegistration,
ExternalManagedKubeConfigWorkSecret: helpers.ExternalManagedKubeConfigWork,
InstallMode: installMode,
InstallMode: klusterlet.Spec.DeployOption.Mode,
HubApiServerHostAlias: klusterlet.Spec.HubApiServerHostAlias,
}
managedClusterClients := &managedClusterClients{
kubeClient: n.kubeClient,
apiExtensionClient: n.apiExtensionClient,
dynamicClient: n.dynamicClient,
appliedManifestWorkClient: n.appliedManifestWorkClient,
reconcilers := []klusterletReconcile{
&runtimeReconcile{
kubeClient: n.kubeClient,
recorder: controllerContext.Recorder(),
},
}
if installMode == operatorapiv1.InstallModeHosted && !skip {
managedClusterClients, err = n.buildManagedClusterClientsHostedMode(ctx,
n.kubeClient, config.AgentNamespace, config.ExternalManagedKubeConfigSecret)
// Add other reconcilers only when managed cluster is ready to manage.
// we should clean managedcluster resource when
// 1. install mode is not hosted
// 2. install mode is hosted and some resources has been applied on managed cluster (if hosted finalizer exists)
if config.InstallMode != operatorapiv1.InstallModeHosted || hasFinalizer(klusterlet, klusterletHostedFinalizer) {
managedClusterClients, err := n.managedClusterClientsBuilder.
withMode(config.InstallMode).
withKubeConfigSecret(config.AgentNamespace, config.ExternalManagedKubeConfigSecret).
build(ctx)
// stop when hosted kubeconfig is not found. the klustelet controller will monitor the secret and retrigger
// reconcilation of cleanup controller when secret is created again.
if errors.IsNotFound(err) {
return nil
}
if err != nil {
return err
}
reconcilers = append(reconcilers,
&crdReconcile{
managedClusterClients: managedClusterClients,
kubeVersion: n.kubeVersion,
recorder: controllerContext.Recorder(),
},
&managedReconcile{
managedClusterClients: managedClusterClients,
kubeClient: n.kubeClient,
kubeVersion: n.kubeVersion,
opratorNamespace: n.operatorNamespace,
recorder: controllerContext.Recorder(),
},
)
}
// managementReconcile should be added as the last one, since we finally need to remove agent namespace.
reconcilers = append(reconcilers, &managementReconcile{
kubeClient: n.kubeClient,
operatorNamespace: n.operatorNamespace,
recorder: controllerContext.Recorder(),
})
var errs []error
for _, reconciler := range reconcilers {
var state reconcileState
klusterlet, state, err = reconciler.clean(ctx, klusterlet, config)
if err != nil {
errs = append(errs, err)
}
if state == reconcileStop {
break
}
}
if err := n.cleanUp(ctx, controllerContext, managedClusterClients, config, skip); err != nil {
return err
if len(errs) > 0 {
return utilerrors.NewAggregate(errs)
}
return n.removeKlusterletFinalizers(ctx, klusterlet)
}
func (n *klusterletCleanupController) cleanUp(
ctx context.Context,
controllerContext factory.SyncContext,
managedClients *managedClusterClients,
config klusterletConfig,
skipCleanupResourcesOnManagedCluster bool) error {
// Remove deployment
deployments := []string{
fmt.Sprintf("%s-registration-agent", config.KlusterletName),
fmt.Sprintf("%s-work-agent", config.KlusterletName),
}
for _, deployment := range deployments {
err := n.kubeClient.AppsV1().Deployments(config.AgentNamespace).Delete(ctx, deployment, metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
return err
}
controllerContext.Recorder().Eventf("DeploymentDeleted", "deployment %s is deleted", deployment)
}
if !skipCleanupResourcesOnManagedCluster {
// get hub host from bootstrap kubeconfig
var hubHost string
bootstrapKubeConfigSecret, err := n.kubeClient.CoreV1().Secrets(config.AgentNamespace).Get(ctx, config.BootStrapKubeConfigSecret, metav1.GetOptions{})
switch {
case err == nil:
restConfig, err := helpers.LoadClientConfigFromSecret(bootstrapKubeConfigSecret)
if err != nil {
return fmt.Errorf("unable to load kubeconfig from secret %q %q: %w", config.AgentNamespace, config.BootStrapKubeConfigSecret, err)
}
hubHost = restConfig.Host
case !errors.IsNotFound(err):
return err
}
err = n.cleanUpManagedClusterResources(ctx, managedClients, config, hubHost)
if err != nil {
return err
}
}
// Remove secrets
secrets := []string{config.HubKubeConfigSecret}
if config.InstallMode == operatorapiv1.InstallModeHosted {
// In Hosted mod, also need to remove the external-managed-kubeconfig-registration and external-managed-kubeconfig-work
secrets = append(secrets, []string{config.ExternalManagedKubeConfigRegistrationSecret, config.ExternalManagedKubeConfigWorkSecret}...)
}
for _, secret := range secrets {
err := n.kubeClient.CoreV1().Secrets(config.AgentNamespace).Delete(ctx, secret, metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
return err
}
controllerContext.Recorder().Eventf("SecretDeleted", "secret %s is deleted", secret)
}
// remove static file on the management cluster
err := removeStaticResources(ctx, n.kubeClient, n.apiExtensionClient, managementStaticResourceFiles, config)
if err != nil {
return err
}
// The agent namespace on the management cluster should be removed **at the end**. Otherwise if any failure occurred,
// the managed-external-kubeconfig secret would be removed and the next reconcile will fail due to can not build the
// managed cluster clients.
if config.InstallMode == operatorapiv1.InstallModeHosted {
// remove the agent namespace on the management cluster
err = n.kubeClient.CoreV1().Namespaces().Delete(ctx, config.AgentNamespace, metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
return err
}
}
return nil
}
func (n *klusterletCleanupController) cleanUpManagedClusterResources(
ctx context.Context,
managedClients *managedClusterClients,
config klusterletConfig,
hubHost string) error {
// remove finalizer from AppliedManifestWorks, should be executed **before** "remove hub kubeconfig secret".
if len(hubHost) > 0 {
if err := n.cleanUpAppliedManifestWorks(ctx, managedClients.appliedManifestWorkClient, hubHost); err != nil {
return err
}
}
// remove static file on the managed cluster
err := removeStaticResources(ctx, managedClients.kubeClient, managedClients.apiExtensionClient,
managedStaticResourceFiles, config)
if err != nil {
return err
}
// TODO remove this when we do not support kube 1.11 any longer
cnt, err := n.kubeVersion.Compare("v1.12.0")
klog.Infof("comapare version %d, %v", cnt, err)
if cnt, err := n.kubeVersion.Compare("v1.12.0"); err == nil && cnt < 0 {
err = removeStaticResources(ctx, managedClients.kubeClient, managedClients.apiExtensionClient,
kube111StaticResourceFiles, config)
if err != nil {
return err
}
}
// remove the klusterlet namespace and klusterlet addon namespace on the managed cluster
// For now, whether in Default or Hosted mode, the addons could be deployed on the managed cluster.
namespaces := []string{config.KlusterletNamespace, fmt.Sprintf("%s-addon", config.KlusterletNamespace)}
for _, namespace := range namespaces {
err = managedClients.kubeClient.CoreV1().Namespaces().Delete(ctx, namespace, metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
return err
}
}
// no longer remove the CRDs (AppliedManifestWork & ClusterClaim), because they might be shared
// by multiple klusterlets. Consequently, the CRs of those CRDs will not be deleted as well when deleting a klusterlet.
return nil
}
func (n *klusterletCleanupController) removeKlusterletFinalizers(ctx context.Context, deploy *operatorapiv1.Klusterlet) error {
// reload klusterlet
deploy, err := n.klusterletClient.Get(ctx, deploy.Name, metav1.GetOptions{})
@@ -293,7 +192,7 @@ func (n *klusterletCleanupController) removeKlusterletFinalizers(ctx context.Con
if err != nil {
return err
}
copiedFinalizers := []string{}
var copiedFinalizers []string
for i := range deploy.Finalizers {
if deploy.Finalizers[i] == klusterletFinalizer || deploy.Finalizers[i] == klusterletHostedFinalizer {
continue
@@ -309,34 +208,8 @@ func (n *klusterletCleanupController) removeKlusterletFinalizers(ctx context.Con
return nil
}
// cleanUpAppliedManifestWorks removes finalizer from the AppliedManifestWorks whose name starts with
// the hash of the given hub host.
func (n *klusterletCleanupController) cleanUpAppliedManifestWorks(ctx context.Context, appliedManifestWorkClient workv1client.AppliedManifestWorkInterface, hubHost string) error {
appliedManifestWorks, err := appliedManifestWorkClient.List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("unable to list AppliedManifestWorks: %w", err)
}
errs := []error{}
prefix := fmt.Sprintf("%s-", fmt.Sprintf("%x", sha256.Sum256([]byte(hubHost))))
for _, appliedManifestWork := range appliedManifestWorks.Items {
// ignore AppliedManifestWork for other klusterlet
if !strings.HasPrefix(appliedManifestWork.Name, prefix) {
continue
}
// remove finalizer if exists
if mutated := removeFinalizer(&appliedManifestWork, appliedManifestWorkFinalizer); !mutated {
continue
}
_, err := appliedManifestWorkClient.Update(ctx, &appliedManifestWork, metav1.UpdateOptions{})
if err != nil && !errors.IsNotFound(err) {
errs = append(errs, fmt.Errorf("unable to remove finalizer from AppliedManifestWork %q: %w", appliedManifestWork.Name, err))
}
}
return operatorhelpers.NewMultiLineAggregate(errs)
}
// readyToAddHostedFinalizer checkes whether the hosted finalizer should be added.
// It is only added when mode is hosted, and some resources have been applied to the managed cluster.
func readyToAddHostedFinalizer(klusterlet *operatorapiv1.Klusterlet, mode operatorapiv1.InstallMode) bool {
if mode != operatorapiv1.InstallModeHosted {
return false
@@ -345,14 +218,6 @@ func readyToAddHostedFinalizer(klusterlet *operatorapiv1.Klusterlet, mode operat
return meta.IsStatusConditionTrue(klusterlet.Status.Conditions, klusterletReadyToApply)
}
func skipCleanupManagedClusterResources(klusterlet *operatorapiv1.Klusterlet, mode operatorapiv1.InstallMode) bool {
if mode != operatorapiv1.InstallModeHosted {
return false
}
return !hasFinalizer(klusterlet, klusterletHostedFinalizer)
}
func (n *klusterletCleanupController) addFinalizer(ctx context.Context, k *operatorapiv1.Klusterlet, finalizer string) error {
k.Finalizers = append(k.Finalizers, finalizer)
_, err := n.klusterletClient.Update(ctx, k, metav1.UpdateOptions{})
@@ -374,7 +239,7 @@ func removeFinalizer(obj runtime.Object, finalizerName string) bool {
return false
}
newFinalizers := []string{}
var newFinalizers []string
accessor, _ := meta.Accessor(obj)
found := false
for _, finalizer := range accessor.GetFinalizers() {

View File

@@ -35,7 +35,7 @@ func TestSyncDelete(t *testing.T) {
t.Errorf("Expected non error when sync, %v", err)
}
deleteActions := []clienttesting.DeleteActionImpl{}
var deleteActions []clienttesting.DeleteActionImpl
kubeActions := controller.kubeClient.Actions()
for _, action := range kubeActions {
if action.GetVerb() == "delete" {
@@ -50,7 +50,7 @@ func TestSyncDelete(t *testing.T) {
t.Errorf("Expected 27 delete actions, but got %d", len(deleteActions))
}
updateWorkActions := []clienttesting.UpdateActionImpl{}
var updateWorkActions []clienttesting.UpdateActionImpl
workActions := controller.workClient.Actions()
for _, action := range workActions {
if action.GetVerb() == "update" {
@@ -93,7 +93,7 @@ func TestSyncDeleteHosted(t *testing.T) {
t.Errorf("Expected non error when sync, %v", err)
}
deleteActionsManagement := []clienttesting.DeleteActionImpl{}
var deleteActionsManagement []clienttesting.DeleteActionImpl
kubeActions := controller.kubeClient.Actions()
for _, action := range kubeActions {
if action.GetVerb() == "delete" {
@@ -109,7 +109,7 @@ func TestSyncDeleteHosted(t *testing.T) {
t.Errorf("Expected 17 delete actions, but got %d", len(deleteActionsManagement))
}
deleteActionsManaged := []clienttesting.DeleteActionImpl{}
var deleteActionsManaged []clienttesting.DeleteActionImpl
for _, action := range controller.managedKubeClient.Actions() {
if action.GetVerb() == "delete" {
deleteAction := action.(clienttesting.DeleteActionImpl)
@@ -123,7 +123,7 @@ func TestSyncDeleteHosted(t *testing.T) {
t.Errorf("Expected 13 delete actions, but got %d", len(deleteActionsManaged))
}
updateWorkActions := []clienttesting.UpdateActionImpl{}
var updateWorkActions []clienttesting.UpdateActionImpl
workActions := controller.managedWorkClient.Actions()
for _, action := range workActions {
if action.GetVerb() == "update" {
@@ -149,8 +149,7 @@ func TestSyncDeleteHostedDeleteAgentNamespace(t *testing.T) {
})
now := metav1.Now()
klusterlet.ObjectMeta.SetDeletionTimestamp(&now)
controller := newTestControllerHosted(t, klusterlet, nil).
setBuildManagedClusterClientsHostedModeFunc(buildManagedClusterClientsFromSecret)
controller := newTestControllerHosted(t, klusterlet, nil).setDefaultManagedClusterClientsBuilder()
syncContext := testinghelper.NewFakeSyncContext(t, "klusterlet")
err := controller.cleanupController.sync(context.TODO(), syncContext)
@@ -167,8 +166,7 @@ func TestSyncDeleteHostedDeleteWaitKubeconfig(t *testing.T) {
klusterlet := newKlusterletHosted("klusterlet", "testns", "cluster1")
now := metav1.Now()
klusterlet.ObjectMeta.SetDeletionTimestamp(&now)
controller := newTestControllerHosted(t, klusterlet, nil).
setBuildManagedClusterClientsHostedModeFunc(buildManagedClusterClientsFromSecret)
controller := newTestControllerHosted(t, klusterlet, nil).setDefaultManagedClusterClientsBuilder()
syncContext := testinghelper.NewFakeSyncContext(t, "klusterlet")
err := controller.cleanupController.sync(context.TODO(), syncContext)

View File

@@ -3,6 +3,7 @@ package klusterletcontroller
import (
"context"
"fmt"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"strings"
corev1 "k8s.io/api/core/v1"
@@ -12,7 +13,6 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/version"
"k8s.io/client-go/dynamic"
appsinformer "k8s.io/client-go/informers/apps/v1"
coreinformer "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
@@ -25,7 +25,6 @@ import (
operatorv1client "open-cluster-management.io/api/client/operator/clientset/versioned/typed/operator/v1"
operatorinformer "open-cluster-management.io/api/client/operator/informers/externalversions/operator/v1"
operatorlister "open-cluster-management.io/api/client/operator/listers/operator/v1"
workclientset "open-cluster-management.io/api/client/work/clientset/versioned"
workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1"
operatorapiv1 "open-cluster-management.io/api/operator/v1"
"open-cluster-management.io/registration-operator/pkg/helpers"
@@ -45,27 +44,19 @@ const (
)
type klusterletController struct {
klusterletClient operatorv1client.KlusterletInterface
klusterletLister operatorlister.KlusterletLister
kubeClient kubernetes.Interface
apiExtensionClient apiextensionsclient.Interface
dynamicClient dynamic.Interface
appliedManifestWorkClient workv1client.AppliedManifestWorkInterface
kubeVersion *version.Version
operatorNamespace string
skipHubSecretPlaceholder bool
cache resourceapply.ResourceCache
// buildManagedClusterClientsHostedMode build clients for the managed cluster in hosted mode,
// this can be overridden for testing
buildManagedClusterClientsHostedMode func(
ctx context.Context,
kubeClient kubernetes.Interface,
namespace, secret string) (*managedClusterClients, error)
klusterletClient operatorv1client.KlusterletInterface
klusterletLister operatorlister.KlusterletLister
kubeClient kubernetes.Interface
kubeVersion *version.Version
operatorNamespace string
skipHubSecretPlaceholder bool
cache resourceapply.ResourceCache
managedClusterClientsBuilder managedClusterClientsBuilderInterface
}
type klusterletReconcile interface {
reconcile(ctx context.Context, cm *operatorapiv1.Klusterlet, config klusterletConfig) (*operatorapiv1.Klusterlet, reconcileState, error)
clean(ctx context.Context, cm *operatorapiv1.Klusterlet, config klusterletConfig) (*operatorapiv1.Klusterlet, reconcileState, error)
}
type reconcileState int64
@@ -79,7 +70,6 @@ const (
func NewKlusterletController(
kubeClient kubernetes.Interface,
apiExtensionClient apiextensionsclient.Interface,
dynamicClient dynamic.Interface,
klusterletClient operatorv1client.KlusterletInterface,
klusterletInformer operatorinformer.KlusterletInformer,
secretInformer coreinformer.SecretInformer,
@@ -90,17 +80,14 @@ func NewKlusterletController(
recorder events.Recorder,
skipHubSecretPlaceholder bool) factory.Controller {
controller := &klusterletController{
kubeClient: kubeClient,
apiExtensionClient: apiExtensionClient,
dynamicClient: dynamicClient,
klusterletClient: klusterletClient,
klusterletLister: klusterletInformer.Lister(),
appliedManifestWorkClient: appliedManifestWorkClient,
kubeVersion: kubeVersion,
operatorNamespace: operatorNamespace,
buildManagedClusterClientsHostedMode: buildManagedClusterClientsFromSecret,
skipHubSecretPlaceholder: skipHubSecretPlaceholder,
cache: resourceapply.NewResourceCache(),
kubeClient: kubeClient,
klusterletClient: klusterletClient,
klusterletLister: klusterletInformer.Lister(),
kubeVersion: kubeVersion,
operatorNamespace: operatorNamespace,
skipHubSecretPlaceholder: skipHubSecretPlaceholder,
cache: resourceapply.NewResourceCache(),
managedClusterClientsBuilder: newManagedClusterClientsBuilder(kubeClient, apiExtensionClient, appliedManifestWorkClient),
}
return factory.New().WithSync(controller.sync).
@@ -149,17 +136,6 @@ type klusterletConfig struct {
HubApiServerHostAlias *operatorapiv1.HubApiServerHostAlias
}
// managedClusterClients holds variety of kube client for managed cluster
type managedClusterClients struct {
kubeClient kubernetes.Interface
apiExtensionClient apiextensionsclient.Interface
appliedManifestWorkClient workv1client.AppliedManifestWorkInterface
dynamicClient dynamic.Interface
// Only used for Hosted mode to generate managed cluster kubeconfig
// with minimum permission for registration and work.
kubeconfig *rest.Config
}
func (n *klusterletController) sync(ctx context.Context, controllerContext factory.SyncContext) error {
klusterletName := controllerContext.QueueKey()
klog.V(4).Infof("Reconciling Klusterlet %q", klusterletName)
@@ -194,35 +170,36 @@ func (n *klusterletController) sync(ctx context.Context, controllerContext facto
HubApiServerHostAlias: klusterlet.Spec.HubApiServerHostAlias,
}
managedClusterClients := &managedClusterClients{
kubeClient: n.kubeClient,
apiExtensionClient: n.apiExtensionClient,
dynamicClient: n.dynamicClient,
appliedManifestWorkClient: n.appliedManifestWorkClient,
}
managedClusterClients, err := n.managedClusterClientsBuilder.
withMode(config.InstallMode).
withKubeConfigSecret(config.AgentNamespace, config.ExternalManagedKubeConfigSecret).
build(ctx)
// update klusterletReadyToApply condition at first in hosted mode
// this conditions should be updated even when klusterlet is in deleteing state.
if config.InstallMode == operatorapiv1.InstallModeHosted {
managedClusterClients, err = n.buildManagedClusterClientsHostedMode(ctx,
n.kubeClient, config.AgentNamespace, config.ExternalManagedKubeConfigSecret)
cond := metav1.Condition{
Type: klusterletReadyToApply, Status: metav1.ConditionTrue, Reason: "KlusterletPrepared",
Message: "Klusterlet is ready to apply",
}
if err != nil {
_, _, _ = helpers.UpdateKlusterletStatus(ctx, n.klusterletClient, klusterletName,
helpers.UpdateKlusterletConditionFn(metav1.Condition{
Type: klusterletReadyToApply, Status: metav1.ConditionFalse, Reason: "KlusterletPrepareFailed",
Message: fmt.Sprintf("Failed to build managed cluster clients: %v", err),
}))
return err
cond = metav1.Condition{
Type: klusterletReadyToApply, Status: metav1.ConditionFalse, Reason: "KlusterletPrepareFailed",
Message: fmt.Sprintf("Failed to build managed cluster clients: %v", err),
}
}
_, updated, updateErr := helpers.UpdateKlusterletStatus(ctx, n.klusterletClient, klusterletName,
helpers.UpdateKlusterletConditionFn(metav1.Condition{
Type: klusterletReadyToApply, Status: metav1.ConditionTrue, Reason: "KlusterletPrepared",
Message: "Klusterlet is ready to apply",
}))
helpers.UpdateKlusterletConditionFn(cond))
if updated {
return updateErr
}
}
if err != nil {
return err
}
if !klusterlet.DeletionTimestamp.IsZero() {
// The work of klusterlet cleanup will be handled by klusterlet cleanup controller
return nil
@@ -233,11 +210,6 @@ func (n *klusterletController) sync(ctx context.Context, controllerContext facto
return nil
}
if !readyToOperateManagedClusterResources(klusterlet, config.InstallMode) {
// wait for the external managed kubeconfig to exist to apply resources on the manged cluster
return nil
}
var featureGateCondition metav1.Condition
// If there are some invalid feature gates of registration or work, will output condition `ValidFeatureGates`
// False in Klusterlet.
@@ -291,14 +263,31 @@ func (n *klusterletController) sync(ctx context.Context, controllerContext facto
appliedCondition = &metav1.Condition{
Type: klusterletApplied, Status: metav1.ConditionTrue, Reason: "KlusterletApplied",
Message: "Klusterlet Component Applied"}
} else if appliedCondition == nil {
appliedCondition = &metav1.Condition{
Type: klusterletApplied, Status: metav1.ConditionFalse, Reason: "KlusterletApplyFailed",
Message: "Klusterlet Component Apply failed"}
} else {
if appliedCondition == nil {
appliedCondition = &metav1.Condition{
Type: klusterletApplied, Status: metav1.ConditionFalse, Reason: "KlusterletApplyFailed",
Message: "Klusterlet Component Apply failed"}
}
// When appliedCondition is false, we should not update related resources and resource generations
_, updated, err := helpers.UpdateKlusterletStatus(ctx, n.klusterletClient, klusterletName,
helpers.UpdateKlusterletConditionFn(featureGateCondition, *appliedCondition),
func(oldStatus *operatorapiv1.KlusterletStatus) error {
oldStatus.ObservedGeneration = klusterlet.Generation
return nil
},
)
if updated {
return err
}
return utilerrors.NewAggregate(errs)
}
// If we get here, we have successfully applied everything and should indicate that
_, _, _ = helpers.UpdateKlusterletStatus(ctx, n.klusterletClient, klusterletName,
// If we get here, we have successfully applied everything.
_, _, err = helpers.UpdateKlusterletStatus(ctx, n.klusterletClient, klusterletName,
helpers.UpdateKlusterletConditionFn(featureGateCondition, *appliedCondition),
helpers.UpdateKlusterletGenerationsFn(klusterlet.Status.Generations...),
helpers.UpdateKlusterletRelatedResourcesFn(klusterlet.Status.RelatedResources...),
@@ -307,15 +296,7 @@ func (n *klusterletController) sync(ctx context.Context, controllerContext facto
return nil
},
)
return nil
}
func readyToOperateManagedClusterResources(klusterlet *operatorapiv1.Klusterlet, mode operatorapiv1.InstallMode) bool {
if mode != operatorapiv1.InstallModeHosted {
return true
}
return meta.IsStatusConditionTrue(klusterlet.Status.Conditions, klusterletReadyToApply) && hasFinalizer(klusterlet, klusterletHostedFinalizer)
return err
}
// TODO also read CABundle from ExternalServerURLs and set into registration deployment
@@ -341,48 +322,6 @@ func getManagedKubeConfig(ctx context.Context, kubeClient kubernetes.Interface,
return helpers.LoadClientConfigFromSecret(managedKubeconfigSecret)
}
// buildManagedClusterClientsFromSecret builds variety of clients for managed cluster from managed cluster kubeconfig secret.
func buildManagedClusterClientsFromSecret(ctx context.Context, client kubernetes.Interface, agentNamespace, secretName string) (
*managedClusterClients, error) {
// Ensure the agent namespace for users to create the external-managed-kubeconfig secret in this
// namespace, so that in the next reconcile loop the controller can get the secret successfully after
// the secret was created.
err := ensureAgentNamespace(ctx, client, agentNamespace)
if err != nil {
return nil, err
}
managedKubeConfig, err := getManagedKubeConfig(ctx, client, agentNamespace, secretName)
if err != nil {
return nil, err
}
kubeClient, err := kubernetes.NewForConfig(managedKubeConfig)
if err != nil {
return nil, err
}
apiExtensionClient, err := apiextensionsclient.NewForConfig(managedKubeConfig)
if err != nil {
return nil, err
}
dynamicClient, err := dynamic.NewForConfig(managedKubeConfig)
if err != nil {
return nil, err
}
workClient, err := workclientset.NewForConfig(managedKubeConfig)
if err != nil {
return nil, err
}
return &managedClusterClients{
kubeClient: kubeClient,
apiExtensionClient: apiExtensionClient,
appliedManifestWorkClient: workClient.WorkV1().AppliedManifestWorks(),
dynamicClient: dynamicClient,
kubeconfig: managedKubeConfig}, nil
}
// ensureAgentNamespace create agent namespace if it is not exist
func ensureAgentNamespace(ctx context.Context, kubeClient kubernetes.Interface, namespace string) error {
_, err := kubeClient.CoreV1().Namespaces().Get(ctx, namespace, metav1.GetOptions{})
@@ -395,9 +334,7 @@ func ensureAgentNamespace(ctx context.Context, kubeClient kubernetes.Interface,
},
},
}, metav1.CreateOptions{})
if createErr != nil {
return createErr
}
return createErr
}
return err
}
@@ -437,12 +374,11 @@ func ensureNamespace(ctx context.Context, kubeClient kubernetes.Interface, klust
},
},
}, metav1.CreateOptions{})
if createErr != nil {
meta.SetStatusCondition(&klusterlet.Status.Conditions, metav1.Condition{
Type: klusterletApplied, Status: metav1.ConditionFalse, Reason: "KlusterletApplyFailed",
Message: fmt.Sprintf("Failed to create namespace %q: %v", namespace, createErr)})
return createErr
}
meta.SetStatusCondition(&klusterlet.Status.Conditions, metav1.Condition{
Type: klusterletApplied, Status: metav1.ConditionFalse, Reason: "KlusterletApplyFailed",
Message: fmt.Sprintf("Failed to create namespace %q: %v", namespace, createErr)})
return createErr
case err != nil:
meta.SetStatusCondition(&klusterlet.Status.Conditions, metav1.Condition{
Type: klusterletApplied, Status: metav1.ConditionFalse, Reason: "KlusterletApplyFailed",

View File

@@ -4,6 +4,7 @@ import (
"context"
"crypto/sha256"
"fmt"
"k8s.io/client-go/rest"
"strings"
"testing"
"time"
@@ -19,10 +20,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/rand"
"k8s.io/apimachinery/pkg/util/version"
fakedynamic "k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/kubernetes"
fakekube "k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/rest"
clienttesting "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
@@ -44,7 +42,6 @@ type testController struct {
kubeClient *fakekube.Clientset
apiExtensionClient *fakeapiextensions.Clientset
operatorClient *fakeoperatorclient.Clientset
dynamicClient *fakedynamic.FakeDynamicClient
workClient *fakeworkclient.Clientset
operatorStore cache.Store
@@ -159,30 +156,23 @@ func newTestController(t *testing.T, klusterlet *operatorapiv1.Klusterlet, appli
operatorInformers := operatorinformers.NewSharedInformerFactory(fakeOperatorClient, 5*time.Minute)
kubeVersion, _ := version.ParseGeneric("v1.18.0")
scheme := runtime.NewScheme()
dynamicClient := fakedynamic.NewSimpleDynamicClient(scheme)
hubController := &klusterletController{
klusterletClient: fakeOperatorClient.OperatorV1().Klusterlets(),
kubeClient: fakeKubeClient,
apiExtensionClient: fakeAPIExtensionClient,
dynamicClient: dynamicClient,
appliedManifestWorkClient: fakeWorkClient.WorkV1().AppliedManifestWorks(),
klusterletLister: operatorInformers.Operator().V1().Klusterlets().Lister(),
kubeVersion: kubeVersion,
operatorNamespace: "open-cluster-management",
cache: resourceapply.NewResourceCache(),
klusterletClient: fakeOperatorClient.OperatorV1().Klusterlets(),
kubeClient: fakeKubeClient,
klusterletLister: operatorInformers.Operator().V1().Klusterlets().Lister(),
kubeVersion: kubeVersion,
operatorNamespace: "open-cluster-management",
cache: resourceapply.NewResourceCache(),
managedClusterClientsBuilder: newManagedClusterClientsBuilder(fakeKubeClient, fakeAPIExtensionClient, fakeWorkClient.WorkV1().AppliedManifestWorks()),
}
cleanupController := &klusterletCleanupController{
klusterletClient: fakeOperatorClient.OperatorV1().Klusterlets(),
kubeClient: fakeKubeClient,
apiExtensionClient: fakeAPIExtensionClient,
dynamicClient: dynamicClient,
appliedManifestWorkClient: fakeWorkClient.WorkV1().AppliedManifestWorks(),
klusterletLister: operatorInformers.Operator().V1().Klusterlets().Lister(),
kubeVersion: kubeVersion,
operatorNamespace: "open-cluster-management",
klusterletClient: fakeOperatorClient.OperatorV1().Klusterlets(),
kubeClient: fakeKubeClient,
klusterletLister: operatorInformers.Operator().V1().Klusterlets().Lister(),
kubeVersion: kubeVersion,
operatorNamespace: "open-cluster-management",
managedClusterClientsBuilder: newManagedClusterClientsBuilder(fakeKubeClient, fakeAPIExtensionClient, fakeWorkClient.WorkV1().AppliedManifestWorks()),
}
store := operatorInformers.Operator().V1().Klusterlets().Informer().GetStore()
@@ -195,7 +185,6 @@ func newTestController(t *testing.T, klusterlet *operatorapiv1.Klusterlet, appli
cleanupController: cleanupController,
kubeClient: fakeKubeClient,
apiExtensionClient: fakeAPIExtensionClient,
dynamicClient: dynamicClient,
operatorClient: fakeOperatorClient,
workClient: fakeWorkClient,
operatorStore: store,
@@ -256,43 +245,30 @@ func newTestControllerHosted(t *testing.T, klusterlet *operatorapiv1.Klusterlet,
fakeManagedAPIExtensionClient := fakeapiextensions.NewSimpleClientset()
fakeManagedWorkClient := fakeworkclient.NewSimpleClientset(appliedManifestWorks...)
defaultBuildManagedClusterClientsHostedModeFn := func(
ctx context.Context,
kubeClient kubernetes.Interface,
namespace,
secret string) (*managedClusterClients, error) {
return &managedClusterClients{
kubeClient: fakeManagedKubeClient,
apiExtensionClient: fakeManagedAPIExtensionClient,
appliedManifestWorkClient: fakeManagedWorkClient.WorkV1().AppliedManifestWorks(),
kubeconfig: &rest.Config{
Host: "testhost",
TLSClientConfig: rest.TLSClientConfig{
CAData: []byte("test"),
},
},
}, nil
}
hubController := &klusterletController{
klusterletClient: fakeOperatorClient.OperatorV1().Klusterlets(),
kubeClient: fakeKubeClient,
apiExtensionClient: fakeAPIExtensionClient,
appliedManifestWorkClient: fakeWorkClient.WorkV1().AppliedManifestWorks(),
klusterletLister: operatorInformers.Operator().V1().Klusterlets().Lister(),
kubeVersion: kubeVersion,
operatorNamespace: "open-cluster-management",
cache: resourceapply.NewResourceCache(),
buildManagedClusterClientsHostedMode: defaultBuildManagedClusterClientsHostedModeFn,
klusterletClient: fakeOperatorClient.OperatorV1().Klusterlets(),
kubeClient: fakeKubeClient,
klusterletLister: operatorInformers.Operator().V1().Klusterlets().Lister(),
kubeVersion: kubeVersion,
operatorNamespace: "open-cluster-management",
cache: resourceapply.NewResourceCache(),
managedClusterClientsBuilder: &fakeManagedClusterBuilder{
fakeWorkClient: fakeManagedWorkClient,
fakeAPIExtensionClient: fakeManagedAPIExtensionClient,
fakeKubeClient: fakeManagedKubeClient,
},
}
cleanupController := &klusterletCleanupController{
klusterletClient: fakeOperatorClient.OperatorV1().Klusterlets(),
kubeClient: fakeKubeClient,
apiExtensionClient: fakeAPIExtensionClient,
appliedManifestWorkClient: fakeWorkClient.WorkV1().AppliedManifestWorks(),
klusterletLister: operatorInformers.Operator().V1().Klusterlets().Lister(),
kubeVersion: kubeVersion,
operatorNamespace: "open-cluster-management",
buildManagedClusterClientsHostedMode: defaultBuildManagedClusterClientsHostedModeFn,
klusterletClient: fakeOperatorClient.OperatorV1().Klusterlets(),
kubeClient: fakeKubeClient,
klusterletLister: operatorInformers.Operator().V1().Klusterlets().Lister(),
kubeVersion: kubeVersion,
operatorNamespace: "open-cluster-management",
managedClusterClientsBuilder: &fakeManagedClusterBuilder{
fakeWorkClient: fakeManagedWorkClient,
fakeAPIExtensionClient: fakeManagedAPIExtensionClient,
fakeKubeClient: fakeManagedKubeClient,
},
}
store := operatorInformers.Operator().V1().Klusterlets().Informer().GetStore()
@@ -315,10 +291,17 @@ func newTestControllerHosted(t *testing.T, klusterlet *operatorapiv1.Klusterlet,
}
}
func (c *testController) setBuildManagedClusterClientsHostedModeFunc(
f func(ctx context.Context, kubeClient kubernetes.Interface, namespace, secret string) (
*managedClusterClients, error)) *testController {
c.controller.buildManagedClusterClientsHostedMode = f
func (c *testController) setDefaultManagedClusterClientsBuilder() *testController {
c.controller.managedClusterClientsBuilder = newManagedClusterClientsBuilder(
c.kubeClient,
c.apiExtensionClient,
c.workClient.WorkV1().AppliedManifestWorks(),
)
c.cleanupController.managedClusterClientsBuilder = newManagedClusterClientsBuilder(
c.kubeClient,
c.apiExtensionClient,
c.workClient.WorkV1().AppliedManifestWorks(),
)
return c
}
@@ -626,8 +609,11 @@ func TestSyncDeployHosted(t *testing.T) {
func TestSyncDeployHostedCreateAgentNamespace(t *testing.T) {
klusterlet := newKlusterletHosted("klusterlet", "testns", "cluster1")
controller := newTestControllerHosted(t, klusterlet, nil).
setBuildManagedClusterClientsHostedModeFunc(buildManagedClusterClientsFromSecret)
meta.SetStatusCondition(&klusterlet.Status.Conditions, metav1.Condition{
Type: klusterletReadyToApply, Status: metav1.ConditionFalse, Reason: "KlusterletPrepareFailed",
Message: fmt.Sprintf("Failed to build managed cluster clients: secrets \"external-managed-kubeconfig\" not found"),
})
controller := newTestControllerHosted(t, klusterlet, nil).setDefaultManagedClusterClientsBuilder()
syncContext := testinghelper.NewFakeSyncContext(t, "klusterlet")
err := controller.controller.sync(context.TODO(), syncContext)
@@ -899,15 +885,6 @@ func TestDeployOnKube111(t *testing.T) {
}
}
dynamicAction := controller.dynamicClient.Actions()
createCRDObjects := []runtime.Object{}
for _, action := range dynamicAction {
if action.GetVerb() == "create" {
object := action.(clienttesting.CreateActionImpl).Object
createCRDObjects = append(createCRDObjects, object)
}
}
// Check if resources are created as expected
// 11 managed static manifests + 11 management static manifests - 2 duplicated service account manifests + 1 addon namespace + 2 deployments + 2 kube111 clusterrolebindings
if len(createObjects) != 25 {
@@ -916,9 +893,6 @@ func TestDeployOnKube111(t *testing.T) {
for _, object := range createObjects {
ensureObject(t, object, klusterlet)
}
if len(createCRDObjects) != 2 {
t.Errorf("Expect 2 v1beta1 crd objects created in the sync loop, actual %d", len(createCRDObjects))
}
operatorAction := controller.operatorClient.Actions()
if len(operatorAction) != 2 {
@@ -989,3 +963,31 @@ func newAppliedManifestWorks(host string, finalizers []string, terminated bool)
return w
}
type fakeManagedClusterBuilder struct {
fakeKubeClient *fakekube.Clientset
fakeAPIExtensionClient *fakeapiextensions.Clientset
fakeWorkClient *fakeworkclient.Clientset
}
func (f *fakeManagedClusterBuilder) withMode(mode operatorapiv1.InstallMode) managedClusterClientsBuilderInterface {
return f
}
func (f *fakeManagedClusterBuilder) withKubeConfigSecret(namespace, name string) managedClusterClientsBuilderInterface {
return f
}
func (m *fakeManagedClusterBuilder) build(ctx context.Context) (*managedClusterClients, error) {
return &managedClusterClients{
kubeClient: m.fakeKubeClient,
apiExtensionClient: m.fakeAPIExtensionClient,
appliedManifestWorkClient: m.fakeWorkClient.WorkV1().AppliedManifestWorks(),
kubeconfig: &rest.Config{
Host: "testhost",
TLSClientConfig: rest.TLSClientConfig{
CAData: []byte("test"),
},
},
}, nil
}

View File

@@ -6,17 +6,18 @@ package klusterletcontroller
import (
"context"
"fmt"
"github.com/openshift/library-go/pkg/assets"
"github.com/openshift/library-go/pkg/operator/events"
"github.com/openshift/library-go/pkg/operator/resource/resourceapply"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/version"
operatorapiv1 "open-cluster-management.io/api/operator/v1"
"open-cluster-management.io/registration-operator/manifests"
"open-cluster-management.io/registration-operator/pkg/helpers"
"open-cluster-management.io/registration-operator/pkg/operators/crdmanager"
)
var (
@@ -40,46 +41,98 @@ type crdReconcile struct {
}
func (r *crdReconcile) reconcile(ctx context.Context, klusterlet *operatorapiv1.Klusterlet, config klusterletConfig) (*operatorapiv1.Klusterlet, reconcileState, error) {
// CRD v1beta1 was deprecated from k8s 1.16.0 and will be removed in k8s 1.22
crdFiles := crdV1StaticFiles
var applyErr error
if cnt, err := r.kubeVersion.Compare("v1.16.0"); err == nil && cnt < 0 {
crdFiles = crdV1beta1StaticFiles
crdManager := crdmanager.NewManager[*apiextensionsv1beta1.CustomResourceDefinition](
r.managedClusterClients.apiExtensionClient.ApiextensionsV1beta1().CustomResourceDefinitions(),
crdmanager.EqualV1Beta1,
)
applyErr = crdManager.Apply(ctx,
func(name string) ([]byte, error) {
template, err := manifests.KlusterletManifestFiles.ReadFile(name)
if err != nil {
return nil, err
}
objData := assets.MustCreateAssetFromTemplate(name, template, config).Data
helpers.SetRelatedResourcesStatusesWithObj(&klusterlet.Status.RelatedResources, objData)
return objData, nil
},
crdV1beta1StaticFiles...,
)
} else {
crdManager := crdmanager.NewManager[*apiextensionsv1.CustomResourceDefinition](
r.managedClusterClients.apiExtensionClient.ApiextensionsV1().CustomResourceDefinitions(),
crdmanager.EqualV1,
)
applyErr = crdManager.Apply(ctx,
func(name string) ([]byte, error) {
template, err := manifests.KlusterletManifestFiles.ReadFile(name)
if err != nil {
return nil, err
}
objData := assets.MustCreateAssetFromTemplate(name, template, config).Data
helpers.SetRelatedResourcesStatusesWithObj(&klusterlet.Status.RelatedResources, objData)
return objData, nil
},
crdV1StaticFiles...,
)
}
resourceResults := helpers.ApplyDirectly(
ctx,
nil,
r.managedClusterClients.apiExtensionClient,
nil,
r.managedClusterClients.dynamicClient,
r.recorder,
r.cache,
func(name string) ([]byte, error) {
template, err := manifests.KlusterletManifestFiles.ReadFile(name)
if err != nil {
return nil, err
}
objData := assets.MustCreateAssetFromTemplate(name, template, config).Data
helpers.SetRelatedResourcesStatusesWithObj(&klusterlet.Status.RelatedResources, objData)
return objData, nil
},
crdFiles...,
)
var errs []error
for _, result := range resourceResults {
if result.Error != nil {
errs = append(errs, fmt.Errorf("%q (%T): %v", result.File, result.Type, result.Error))
}
}
if len(errs) > 0 {
applyErrors := utilerrors.NewAggregate(errs)
if applyErr != nil {
meta.SetStatusCondition(&klusterlet.Status.Conditions, metav1.Condition{
Type: klusterletApplied, Status: metav1.ConditionFalse, Reason: "CRDApplyFailed",
Message: applyErrors.Error(),
Message: applyErr.Error(),
})
return klusterlet, reconcileStop, applyErrors
return klusterlet, reconcileStop, applyErr
}
return klusterlet, reconcileContinue, nil
}
// no longer remove the CRDs (AppliedManifestWork & ClusterClaim), because they might be shared
// by multiple klusterlets. Consequently, the CRs of those CRDs will not be deleted as well when deleting a klusterlet.
// Only clean the version label on crds, so another klusterlet can update crds later.
func (r *crdReconcile) clean(ctx context.Context, klusterlet *operatorapiv1.Klusterlet, config klusterletConfig) (*operatorapiv1.Klusterlet, reconcileState, error) {
var deleteErr error
if cnt, err := r.kubeVersion.Compare("v1.16.0"); err == nil && cnt < 0 {
crdManager := crdmanager.NewManager[*apiextensionsv1beta1.CustomResourceDefinition](
r.managedClusterClients.apiExtensionClient.ApiextensionsV1beta1().CustomResourceDefinitions(),
crdmanager.EqualV1Beta1,
)
deleteErr = crdManager.Clean(ctx, true,
func(name string) ([]byte, error) {
template, err := manifests.KlusterletManifestFiles.ReadFile(name)
if err != nil {
return nil, err
}
objData := assets.MustCreateAssetFromTemplate(name, template, config).Data
helpers.SetRelatedResourcesStatusesWithObj(&klusterlet.Status.RelatedResources, objData)
return objData, nil
},
crdV1beta1StaticFiles...,
)
} else {
crdManager := crdmanager.NewManager[*apiextensionsv1.CustomResourceDefinition](
r.managedClusterClients.apiExtensionClient.ApiextensionsV1().CustomResourceDefinitions(),
crdmanager.EqualV1,
)
deleteErr = crdManager.Clean(ctx, true,
func(name string) ([]byte, error) {
template, err := manifests.KlusterletManifestFiles.ReadFile(name)
if err != nil {
return nil, err
}
objData := assets.MustCreateAssetFromTemplate(name, template, config).Data
helpers.SetRelatedResourcesStatusesWithObj(&klusterlet.Status.RelatedResources, objData)
return objData, nil
},
crdV1StaticFiles...,
)
}
if deleteErr != nil {
return klusterlet, reconcileStop, deleteErr
}
return klusterlet, reconcileContinue, nil

View File

@@ -6,10 +6,12 @@ package klusterletcontroller
import (
"context"
"crypto/sha256"
"fmt"
"github.com/openshift/library-go/pkg/assets"
"github.com/openshift/library-go/pkg/operator/events"
"github.com/openshift/library-go/pkg/operator/resource/resourceapply"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
@@ -18,6 +20,7 @@ import (
operatorapiv1 "open-cluster-management.io/api/operator/v1"
"open-cluster-management.io/registration-operator/manifests"
"open-cluster-management.io/registration-operator/pkg/helpers"
"strings"
)
var (
@@ -91,7 +94,6 @@ func (r *managedReconcile) reconcile(ctx context.Context, klusterlet *operatorap
r.managedClusterClients.kubeClient,
r.managedClusterClients.apiExtensionClient,
nil,
r.managedClusterClients.dynamicClient,
r.recorder,
r.cache,
func(name string) ([]byte, error) {
@@ -124,3 +126,82 @@ func (r *managedReconcile) reconcile(ctx context.Context, klusterlet *operatorap
return klusterlet, reconcileContinue, nil
}
func (r *managedReconcile) clean(ctx context.Context, klusterlet *operatorapiv1.Klusterlet, config klusterletConfig) (*operatorapiv1.Klusterlet, reconcileState, error) {
// nothing should be done when deploy mode is hosted and hosted finalizer is not added.
if klusterlet.Spec.DeployOption.Mode == operatorapiv1.InstallModeHosted && !hasFinalizer(klusterlet, klusterletHostedFinalizer) {
return klusterlet, reconcileContinue, nil
}
if err := r.cleanUpAppliedManifestWorks(ctx, klusterlet, config); err != nil {
return klusterlet, reconcileStop, err
}
if err := removeStaticResources(ctx, r.managedClusterClients.kubeClient, r.managedClusterClients.apiExtensionClient,
managedStaticResourceFiles, config); err != nil {
return klusterlet, reconcileStop, err
}
if cnt, err := r.kubeVersion.Compare("v1.12.0"); err == nil && cnt < 0 {
err = removeStaticResources(ctx, r.managedClusterClients.kubeClient, r.managedClusterClients.apiExtensionClient,
kube111StaticResourceFiles, config)
if err != nil {
return klusterlet, reconcileStop, err
}
}
// remove the klusterlet namespace and klusterlet addon namespace on the managed cluster
// For now, whether in Default or Hosted mode, the addons could be deployed on the managed cluster.
namespaces := []string{config.KlusterletNamespace, fmt.Sprintf("%s-addon", config.KlusterletNamespace)}
for _, namespace := range namespaces {
if err := r.managedClusterClients.kubeClient.CoreV1().Namespaces().Delete(
ctx, namespace, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) {
return klusterlet, reconcileStop, err
}
}
return klusterlet, reconcileContinue, nil
}
// cleanUpAppliedManifestWorks removes finalizer from the AppliedManifestWorks whose name starts with
// the hash of the given hub host.
func (r *managedReconcile) cleanUpAppliedManifestWorks(ctx context.Context, klusterlet *operatorapiv1.Klusterlet, config klusterletConfig) error {
appliedManifestWorks, err := r.managedClusterClients.appliedManifestWorkClient.List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("unable to list AppliedManifestWorks: %w", err)
}
if len(appliedManifestWorks.Items) == 0 {
return nil
}
bootstrapKubeConfigSecret, err := r.kubeClient.CoreV1().Secrets(config.AgentNamespace).Get(ctx, config.BootStrapKubeConfigSecret, metav1.GetOptions{})
if err != nil {
return err
}
restConfig, err := helpers.LoadClientConfigFromSecret(bootstrapKubeConfigSecret)
if err != nil {
return fmt.Errorf("unable to load kubeconfig from secret %q %q: %w", config.AgentNamespace, config.BootStrapKubeConfigSecret, err)
}
var errs []error
prefix := fmt.Sprintf("%s-", fmt.Sprintf("%x", sha256.Sum256([]byte(restConfig.Host))))
for _, appliedManifestWork := range appliedManifestWorks.Items {
// ignore AppliedManifestWork for other klusterlet
// TODO we should not need to filter AppliedManifestWork using hubhost in the next release.
if string(klusterlet.UID) != appliedManifestWork.Spec.AgentID || !strings.HasPrefix(appliedManifestWork.Name, prefix) {
continue
}
// remove finalizer if exists
if mutated := removeFinalizer(&appliedManifestWork, appliedManifestWorkFinalizer); !mutated {
continue
}
_, err := r.managedClusterClients.appliedManifestWorkClient.Update(ctx, &appliedManifestWork, metav1.UpdateOptions{})
if err != nil && !errors.IsNotFound(err) {
errs = append(errs, fmt.Errorf("unable to remove finalizer from AppliedManifestWork %q: %w", appliedManifestWork.Name, err))
}
}
return utilerrors.NewAggregate(errs)
}

View File

@@ -10,6 +10,7 @@ import (
"github.com/openshift/library-go/pkg/assets"
"github.com/openshift/library-go/pkg/operator/events"
"github.com/openshift/library-go/pkg/operator/resource/resourceapply"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
@@ -59,7 +60,6 @@ func (r *managementReconcile) reconcile(ctx context.Context, klusterlet *operato
r.kubeClient,
nil,
nil,
nil,
r.recorder,
r.cache,
func(name string) ([]byte, error) {
@@ -92,3 +92,38 @@ func (r *managementReconcile) reconcile(ctx context.Context, klusterlet *operato
return klusterlet, reconcileContinue, nil
}
func (r *managementReconcile) clean(ctx context.Context, klusterlet *operatorapiv1.Klusterlet, config klusterletConfig) (*operatorapiv1.Klusterlet, reconcileState, error) {
// Remove secrets
secrets := []string{config.HubKubeConfigSecret}
if config.InstallMode == operatorapiv1.InstallModeHosted {
// In Hosted mod, also need to remove the external-managed-kubeconfig-registration and external-managed-kubeconfig-work
secrets = append(secrets, []string{config.ExternalManagedKubeConfigRegistrationSecret, config.ExternalManagedKubeConfigWorkSecret}...)
}
for _, secret := range secrets {
err := r.kubeClient.CoreV1().Secrets(config.AgentNamespace).Delete(ctx, secret, metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
return klusterlet, reconcileStop, err
}
r.recorder.Eventf("SecretDeleted", "secret %s is deleted", secret)
}
// remove static file on the management cluster
err := removeStaticResources(ctx, r.kubeClient, nil, managementStaticResourceFiles, config)
if err != nil {
return klusterlet, reconcileStop, err
}
// The agent namespace on the management cluster should be removed **at the end**. Otherwise if any failure occurred,
// the managed-external-kubeconfig secret would be removed and the next reconcile will fail due to can not build the
// managed cluster clients.
if config.InstallMode == operatorapiv1.InstallModeHosted {
// remove the agent namespace on the management cluster
err = r.kubeClient.CoreV1().Namespaces().Delete(ctx, config.AgentNamespace, metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
return klusterlet, reconcileStop, err
}
}
return klusterlet, reconcileContinue, nil
}

View File

@@ -10,6 +10,7 @@ import (
"github.com/openshift/library-go/pkg/assets"
"github.com/openshift/library-go/pkg/operator/events"
"github.com/openshift/library-go/pkg/operator/resource/resourceapply"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
@@ -153,6 +154,22 @@ func (r *runtimeReconcile) getClusterNameFromHubKubeConfigSecret(ctx context.Con
return string(clusterName), nil
}
func (r *runtimeReconcile) clean(ctx context.Context, klusterlet *operatorapiv1.Klusterlet, config klusterletConfig) (*operatorapiv1.Klusterlet, reconcileState, error) {
deployments := []string{
fmt.Sprintf("%s-registration-agent", config.KlusterletName),
fmt.Sprintf("%s-work-agent", config.KlusterletName),
}
for _, deployment := range deployments {
err := r.kubeClient.AppsV1().Deployments(config.AgentNamespace).Delete(ctx, deployment, metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
return klusterlet, reconcileStop, err
}
r.recorder.Eventf("DeploymentDeleted", "deployment %s is deleted", deployment)
}
return klusterlet, reconcileContinue, nil
}
// registrationServiceAccountName splices the name of registration service account
func registrationServiceAccountName(klusterletName string) string {
return fmt.Sprintf("%s-registration-sa", klusterletName)

View File

@@ -8,7 +8,6 @@ import (
"github.com/openshift/library-go/pkg/controller/controllercmd"
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
versionutil "k8s.io/apimachinery/pkg/util/version"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
@@ -40,10 +39,6 @@ func (o *Options) RunKlusterletOperator(ctx context.Context, controllerContext *
if err != nil {
return err
}
dynamicClient, err := dynamic.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return err
}
version, err := kubeClient.ServerVersion()
if err != nil {
@@ -78,7 +73,6 @@ func (o *Options) RunKlusterletOperator(ctx context.Context, controllerContext *
klusterletController := klusterletcontroller.NewKlusterletController(
kubeClient,
apiExtensionClient,
dynamicClient,
operatorClient.OperatorV1().Klusterlets(),
operatorInformer.Operator().V1().Klusterlets(),
kubeInformer.Core().V1().Secrets(),
@@ -92,7 +86,6 @@ func (o *Options) RunKlusterletOperator(ctx context.Context, controllerContext *
klusterletCleanupController := klusterletcontroller.NewKlusterletCleanupController(
kubeClient,
apiExtensionClient,
dynamicClient,
operatorClient.OperatorV1().Klusterlets(),
operatorInformer.Operator().V1().Klusterlets(),
kubeInformer.Core().V1().Secrets(),

View File

@@ -32,34 +32,32 @@ var _ = ginkgo.Describe("Create v1beta2 managedclusterset", func() {
},
},
}
gomega.Eventually(func() bool {
gomega.Eventually(func() error {
_, err := t.ClusterClient.ClusterV1beta2().ManagedClusterSets().Create(context.Background(), managedClusterSet, metav1.CreateOptions{})
return err == nil
}, t.EventuallyTimeout*5, t.EventuallyInterval*5).Should(gomega.BeTrue())
return err
}, t.EventuallyTimeout*5, t.EventuallyInterval*5).Should(gomega.Succeed())
ginkgo.By("Get v1beta2 ManagedClusterSet using v1beta2 client")
gomega.Eventually(func() bool {
gomega.Eventually(func() error {
_, err := t.ClusterClient.ClusterV1beta2().ManagedClusterSets().Get(context.Background(), managedClusterSetName, metav1.GetOptions{})
return err == nil
}, t.EventuallyTimeout*5, t.EventuallyInterval*5).Should(gomega.BeTrue())
return err
}, t.EventuallyTimeout*5, t.EventuallyInterval*5).Should(gomega.Succeed())
ginkgo.By("Update v1beta2 ManagedClusterSet using v1beta2 client")
gomega.Eventually(func() bool {
gomega.Eventually(func() error {
managedClusterSet, err := t.ClusterClient.ClusterV1beta2().ManagedClusterSets().Get(context.Background(), managedClusterSetName, metav1.GetOptions{})
if err != nil {
return false
return err
}
updateManagedClusterSet := managedClusterSet.DeepCopy()
updateManagedClusterSet.Spec.ClusterSelector.LabelSelector.MatchLabels = nil
_, err = t.ClusterClient.ClusterV1beta2().ManagedClusterSets().Update(context.Background(), updateManagedClusterSet, metav1.UpdateOptions{})
return err == nil
}, t.EventuallyTimeout*5, t.EventuallyInterval*5).Should(gomega.BeTrue())
return err
}, t.EventuallyTimeout*5, t.EventuallyInterval*5).Should(gomega.Succeed())
ginkgo.By("Delete v1beta2 ManagedClusterSet using v1beta2 client")
gomega.Eventually(func() bool {
err := t.ClusterClient.ClusterV1beta2().ManagedClusterSets().Delete(context.Background(), managedClusterSetName, metav1.DeleteOptions{})
return err == nil
}, t.EventuallyTimeout*5, t.EventuallyInterval*5).Should(gomega.BeTrue())
err := t.ClusterClient.ClusterV1beta2().ManagedClusterSets().Delete(context.Background(), managedClusterSetName, metav1.DeleteOptions{})
gomega.Expect(err).ToNot(gomega.HaveOccurred())
})
ginkgo.It("Create a v1beta2 labelselector based ManagedClusterSet and get/update/delete with v1beta1 client", func() {
ginkgo.By("Create a v1beta2 ManagedClusterSet")
@@ -80,10 +78,10 @@ var _ = ginkgo.Describe("Create v1beta2 managedclusterset", func() {
},
},
}
gomega.Eventually(func() bool {
gomega.Eventually(func() error {
_, err := t.ClusterClient.ClusterV1beta2().ManagedClusterSets().Create(context.Background(), managedClusterSet, metav1.CreateOptions{})
return err == nil
}, t.EventuallyTimeout*5, t.EventuallyInterval*5).Should(gomega.BeTrue())
return err
}, t.EventuallyTimeout*5, t.EventuallyInterval*5).Should(gomega.Succeed())
ginkgo.By("Get v1beta2 ManagedClusterSet using v1beta1 client")
gomega.Eventually(func() bool {
@@ -101,20 +99,18 @@ var _ = ginkgo.Describe("Create v1beta2 managedclusterset", func() {
}, t.EventuallyTimeout*5, t.EventuallyInterval*5).Should(gomega.BeTrue())
ginkgo.By("Update v1beta2 ManagedClusterSet using v1beta1 client")
gomega.Eventually(func() bool {
gomega.Eventually(func() error {
updateManagedClusterSet, err := t.ClusterClient.ClusterV1beta1().ManagedClusterSets().Get(context.Background(), managedClusterSetName, metav1.GetOptions{})
if err != nil {
return false
return err
}
updateManagedClusterSet.Spec.ClusterSelector.LabelSelector.MatchLabels = nil
_, err = t.ClusterClient.ClusterV1beta1().ManagedClusterSets().Update(context.Background(), updateManagedClusterSet, metav1.UpdateOptions{})
return err == nil
}, t.EventuallyTimeout*5, t.EventuallyInterval*5).Should(gomega.BeTrue())
return err
}, t.EventuallyTimeout*5, t.EventuallyInterval*5).Should(gomega.Succeed())
ginkgo.By("Delete v1beta2 ManagedClusterSet using v1beta1 client")
gomega.Eventually(func() bool {
err := t.ClusterClient.ClusterV1beta1().ManagedClusterSets().Delete(context.Background(), managedClusterSetName, metav1.DeleteOptions{})
return err == nil
}, t.EventuallyTimeout*5, t.EventuallyInterval*5).Should(gomega.BeTrue())
err := t.ClusterClient.ClusterV1beta1().ManagedClusterSets().Delete(context.Background(), managedClusterSetName, metav1.DeleteOptions{})
gomega.Expect(err).ToNot(gomega.HaveOccurred())
})
})

View File

@@ -536,12 +536,15 @@ func (t *Tester) CheckClusterManagerStatus() error {
return fmt.Errorf("ClusterManager not found")
}
cm := cms.Items[0]
if meta.IsStatusConditionTrue(cm.Status.Conditions, "HubRegistrationDegraded") {
if !meta.IsStatusConditionFalse(cm.Status.Conditions, "HubRegistrationDegraded") {
return fmt.Errorf("HubRegistration is degraded")
}
if meta.IsStatusConditionTrue(cm.Status.Conditions, "HubPlacementDegraded") {
if !meta.IsStatusConditionFalse(cm.Status.Conditions, "HubPlacementDegraded") {
return fmt.Errorf("HubPlacement is degraded")
}
if !meta.IsStatusConditionFalse(cm.Status.Conditions, "Progressing") {
return fmt.Errorf("ClusterManager is still progressing")
}
return nil
}

View File

@@ -112,6 +112,8 @@ var _ = ginkgo.Describe("Klusterlet Hosted mode", func() {
return err
}
fmt.Printf("related resources are %v\n", actual.Status.RelatedResources)
// 11 managed static manifests + 11 management static manifests + 2CRDs + 2 deployments(2 duplicated CRDs, but status also recorded in the klusterlet's status)
if len(actual.Status.RelatedResources) != 26 {
return fmt.Errorf("should get 26 relatedResources, actual got %v", len(actual.Status.RelatedResources))

View File

@@ -1,493 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package fake
import (
"context"
"fmt"
"strings"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/testing"
)
func NewSimpleDynamicClient(scheme *runtime.Scheme, objects ...runtime.Object) *FakeDynamicClient {
unstructuredScheme := runtime.NewScheme()
for gvk := range scheme.AllKnownTypes() {
if unstructuredScheme.Recognizes(gvk) {
continue
}
if strings.HasSuffix(gvk.Kind, "List") {
unstructuredScheme.AddKnownTypeWithName(gvk, &unstructured.UnstructuredList{})
continue
}
unstructuredScheme.AddKnownTypeWithName(gvk, &unstructured.Unstructured{})
}
objects, err := convertObjectsToUnstructured(scheme, objects)
if err != nil {
panic(err)
}
for _, obj := range objects {
gvk := obj.GetObjectKind().GroupVersionKind()
if !unstructuredScheme.Recognizes(gvk) {
unstructuredScheme.AddKnownTypeWithName(gvk, &unstructured.Unstructured{})
}
gvk.Kind += "List"
if !unstructuredScheme.Recognizes(gvk) {
unstructuredScheme.AddKnownTypeWithName(gvk, &unstructured.UnstructuredList{})
}
}
return NewSimpleDynamicClientWithCustomListKinds(unstructuredScheme, nil, objects...)
}
// NewSimpleDynamicClientWithCustomListKinds try not to use this. In general you want to have the scheme have the List types registered
// and allow the default guessing for resources match. Sometimes that doesn't work, so you can specify a custom mapping here.
func NewSimpleDynamicClientWithCustomListKinds(scheme *runtime.Scheme, gvrToListKind map[schema.GroupVersionResource]string, objects ...runtime.Object) *FakeDynamicClient {
// In order to use List with this client, you have to have your lists registered so that the object tracker will find them
// in the scheme to support the t.scheme.New(listGVK) call when it's building the return value.
// Since the base fake client needs the listGVK passed through the action (in cases where there are no instances, it
// cannot look up the actual hits), we need to know a mapping of GVR to listGVK here. For GETs and other types of calls,
// there is no return value that contains a GVK, so it doesn't have to know the mapping in advance.
// first we attempt to invert known List types from the scheme to auto guess the resource with unsafe guesses
// this covers common usage of registering types in scheme and passing them
completeGVRToListKind := map[schema.GroupVersionResource]string{}
for listGVK := range scheme.AllKnownTypes() {
if !strings.HasSuffix(listGVK.Kind, "List") {
continue
}
nonListGVK := listGVK.GroupVersion().WithKind(listGVK.Kind[:len(listGVK.Kind)-4])
plural, _ := meta.UnsafeGuessKindToResource(nonListGVK)
completeGVRToListKind[plural] = listGVK.Kind
}
for gvr, listKind := range gvrToListKind {
if !strings.HasSuffix(listKind, "List") {
panic("coding error, listGVK must end in List or this fake client doesn't work right")
}
listGVK := gvr.GroupVersion().WithKind(listKind)
// if we already have this type registered, just skip it
if _, err := scheme.New(listGVK); err == nil {
completeGVRToListKind[gvr] = listKind
continue
}
scheme.AddKnownTypeWithName(listGVK, &unstructured.UnstructuredList{})
completeGVRToListKind[gvr] = listKind
}
codecs := serializer.NewCodecFactory(scheme)
o := testing.NewObjectTracker(scheme, codecs.UniversalDecoder())
for _, obj := range objects {
if err := o.Add(obj); err != nil {
panic(err)
}
}
cs := &FakeDynamicClient{scheme: scheme, gvrToListKind: completeGVRToListKind, tracker: o}
cs.AddReactor("*", "*", testing.ObjectReaction(o))
cs.AddWatchReactor("*", func(action testing.Action) (handled bool, ret watch.Interface, err error) {
gvr := action.GetResource()
ns := action.GetNamespace()
watch, err := o.Watch(gvr, ns)
if err != nil {
return false, nil, err
}
return true, watch, nil
})
return cs
}
// Clientset implements clientset.Interface. Meant to be embedded into a
// struct to get a default implementation. This makes faking out just the method
// you want to test easier.
type FakeDynamicClient struct {
testing.Fake
scheme *runtime.Scheme
gvrToListKind map[schema.GroupVersionResource]string
tracker testing.ObjectTracker
}
type dynamicResourceClient struct {
client *FakeDynamicClient
namespace string
resource schema.GroupVersionResource
listKind string
}
var (
_ dynamic.Interface = &FakeDynamicClient{}
_ testing.FakeClient = &FakeDynamicClient{}
)
func (c *FakeDynamicClient) Tracker() testing.ObjectTracker {
return c.tracker
}
func (c *FakeDynamicClient) Resource(resource schema.GroupVersionResource) dynamic.NamespaceableResourceInterface {
return &dynamicResourceClient{client: c, resource: resource, listKind: c.gvrToListKind[resource]}
}
func (c *dynamicResourceClient) Namespace(ns string) dynamic.ResourceInterface {
ret := *c
ret.namespace = ns
return &ret
}
func (c *dynamicResourceClient) Create(ctx context.Context, obj *unstructured.Unstructured, opts metav1.CreateOptions, subresources ...string) (*unstructured.Unstructured, error) {
var uncastRet runtime.Object
var err error
switch {
case len(c.namespace) == 0 && len(subresources) == 0:
uncastRet, err = c.client.Fake.
Invokes(testing.NewRootCreateAction(c.resource, obj), obj)
case len(c.namespace) == 0 && len(subresources) > 0:
var accessor metav1.Object // avoid shadowing err
accessor, err = meta.Accessor(obj)
if err != nil {
return nil, err
}
name := accessor.GetName()
uncastRet, err = c.client.Fake.
Invokes(testing.NewRootCreateSubresourceAction(c.resource, name, strings.Join(subresources, "/"), obj), obj)
case len(c.namespace) > 0 && len(subresources) == 0:
uncastRet, err = c.client.Fake.
Invokes(testing.NewCreateAction(c.resource, c.namespace, obj), obj)
case len(c.namespace) > 0 && len(subresources) > 0:
var accessor metav1.Object // avoid shadowing err
accessor, err = meta.Accessor(obj)
if err != nil {
return nil, err
}
name := accessor.GetName()
uncastRet, err = c.client.Fake.
Invokes(testing.NewCreateSubresourceAction(c.resource, name, strings.Join(subresources, "/"), c.namespace, obj), obj)
}
if err != nil {
return nil, err
}
if uncastRet == nil {
return nil, err
}
ret := &unstructured.Unstructured{}
if err := c.client.scheme.Convert(uncastRet, ret, nil); err != nil {
return nil, err
}
return ret, err
}
func (c *dynamicResourceClient) Update(ctx context.Context, obj *unstructured.Unstructured, opts metav1.UpdateOptions, subresources ...string) (*unstructured.Unstructured, error) {
var uncastRet runtime.Object
var err error
switch {
case len(c.namespace) == 0 && len(subresources) == 0:
uncastRet, err = c.client.Fake.
Invokes(testing.NewRootUpdateAction(c.resource, obj), obj)
case len(c.namespace) == 0 && len(subresources) > 0:
uncastRet, err = c.client.Fake.
Invokes(testing.NewRootUpdateSubresourceAction(c.resource, strings.Join(subresources, "/"), obj), obj)
case len(c.namespace) > 0 && len(subresources) == 0:
uncastRet, err = c.client.Fake.
Invokes(testing.NewUpdateAction(c.resource, c.namespace, obj), obj)
case len(c.namespace) > 0 && len(subresources) > 0:
uncastRet, err = c.client.Fake.
Invokes(testing.NewUpdateSubresourceAction(c.resource, strings.Join(subresources, "/"), c.namespace, obj), obj)
}
if err != nil {
return nil, err
}
if uncastRet == nil {
return nil, err
}
ret := &unstructured.Unstructured{}
if err := c.client.scheme.Convert(uncastRet, ret, nil); err != nil {
return nil, err
}
return ret, err
}
func (c *dynamicResourceClient) UpdateStatus(ctx context.Context, obj *unstructured.Unstructured, opts metav1.UpdateOptions) (*unstructured.Unstructured, error) {
var uncastRet runtime.Object
var err error
switch {
case len(c.namespace) == 0:
uncastRet, err = c.client.Fake.
Invokes(testing.NewRootUpdateSubresourceAction(c.resource, "status", obj), obj)
case len(c.namespace) > 0:
uncastRet, err = c.client.Fake.
Invokes(testing.NewUpdateSubresourceAction(c.resource, "status", c.namespace, obj), obj)
}
if err != nil {
return nil, err
}
if uncastRet == nil {
return nil, err
}
ret := &unstructured.Unstructured{}
if err := c.client.scheme.Convert(uncastRet, ret, nil); err != nil {
return nil, err
}
return ret, err
}
func (c *dynamicResourceClient) Delete(ctx context.Context, name string, opts metav1.DeleteOptions, subresources ...string) error {
var err error
switch {
case len(c.namespace) == 0 && len(subresources) == 0:
_, err = c.client.Fake.
Invokes(testing.NewRootDeleteAction(c.resource, name), &metav1.Status{Status: "dynamic delete fail"})
case len(c.namespace) == 0 && len(subresources) > 0:
_, err = c.client.Fake.
Invokes(testing.NewRootDeleteSubresourceAction(c.resource, strings.Join(subresources, "/"), name), &metav1.Status{Status: "dynamic delete fail"})
case len(c.namespace) > 0 && len(subresources) == 0:
_, err = c.client.Fake.
Invokes(testing.NewDeleteAction(c.resource, c.namespace, name), &metav1.Status{Status: "dynamic delete fail"})
case len(c.namespace) > 0 && len(subresources) > 0:
_, err = c.client.Fake.
Invokes(testing.NewDeleteSubresourceAction(c.resource, strings.Join(subresources, "/"), c.namespace, name), &metav1.Status{Status: "dynamic delete fail"})
}
return err
}
func (c *dynamicResourceClient) DeleteCollection(ctx context.Context, opts metav1.DeleteOptions, listOptions metav1.ListOptions) error {
var err error
switch {
case len(c.namespace) == 0:
action := testing.NewRootDeleteCollectionAction(c.resource, listOptions)
_, err = c.client.Fake.Invokes(action, &metav1.Status{Status: "dynamic deletecollection fail"})
case len(c.namespace) > 0:
action := testing.NewDeleteCollectionAction(c.resource, c.namespace, listOptions)
_, err = c.client.Fake.Invokes(action, &metav1.Status{Status: "dynamic deletecollection fail"})
}
return err
}
func (c *dynamicResourceClient) Get(ctx context.Context, name string, opts metav1.GetOptions, subresources ...string) (*unstructured.Unstructured, error) {
var uncastRet runtime.Object
var err error
switch {
case len(c.namespace) == 0 && len(subresources) == 0:
uncastRet, err = c.client.Fake.
Invokes(testing.NewRootGetAction(c.resource, name), &metav1.Status{Status: "dynamic get fail"})
case len(c.namespace) == 0 && len(subresources) > 0:
uncastRet, err = c.client.Fake.
Invokes(testing.NewRootGetSubresourceAction(c.resource, strings.Join(subresources, "/"), name), &metav1.Status{Status: "dynamic get fail"})
case len(c.namespace) > 0 && len(subresources) == 0:
uncastRet, err = c.client.Fake.
Invokes(testing.NewGetAction(c.resource, c.namespace, name), &metav1.Status{Status: "dynamic get fail"})
case len(c.namespace) > 0 && len(subresources) > 0:
uncastRet, err = c.client.Fake.
Invokes(testing.NewGetSubresourceAction(c.resource, c.namespace, strings.Join(subresources, "/"), name), &metav1.Status{Status: "dynamic get fail"})
}
if err != nil {
return nil, err
}
if uncastRet == nil {
return nil, err
}
ret := &unstructured.Unstructured{}
if err := c.client.scheme.Convert(uncastRet, ret, nil); err != nil {
return nil, err
}
return ret, err
}
func (c *dynamicResourceClient) List(ctx context.Context, opts metav1.ListOptions) (*unstructured.UnstructuredList, error) {
if len(c.listKind) == 0 {
panic(fmt.Sprintf("coding error: you must register resource to list kind for every resource you're going to LIST when creating the client. See NewSimpleDynamicClientWithCustomListKinds or register the list into the scheme: %v out of %v", c.resource, c.client.gvrToListKind))
}
listGVK := c.resource.GroupVersion().WithKind(c.listKind)
listForFakeClientGVK := c.resource.GroupVersion().WithKind(c.listKind[:len(c.listKind)-4]) /*base library appends List*/
var obj runtime.Object
var err error
switch {
case len(c.namespace) == 0:
obj, err = c.client.Fake.
Invokes(testing.NewRootListAction(c.resource, listForFakeClientGVK, opts), &metav1.Status{Status: "dynamic list fail"})
case len(c.namespace) > 0:
obj, err = c.client.Fake.
Invokes(testing.NewListAction(c.resource, listForFakeClientGVK, c.namespace, opts), &metav1.Status{Status: "dynamic list fail"})
}
if obj == nil {
return nil, err
}
label, _, _ := testing.ExtractFromListOptions(opts)
if label == nil {
label = labels.Everything()
}
retUnstructured := &unstructured.Unstructured{}
if err := c.client.scheme.Convert(obj, retUnstructured, nil); err != nil {
return nil, err
}
entireList, err := retUnstructured.ToList()
if err != nil {
return nil, err
}
list := &unstructured.UnstructuredList{}
list.SetResourceVersion(entireList.GetResourceVersion())
list.GetObjectKind().SetGroupVersionKind(listGVK)
for i := range entireList.Items {
item := &entireList.Items[i]
metadata, err := meta.Accessor(item)
if err != nil {
return nil, err
}
if label.Matches(labels.Set(metadata.GetLabels())) {
list.Items = append(list.Items, *item)
}
}
return list, nil
}
func (c *dynamicResourceClient) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
switch {
case len(c.namespace) == 0:
return c.client.Fake.
InvokesWatch(testing.NewRootWatchAction(c.resource, opts))
case len(c.namespace) > 0:
return c.client.Fake.
InvokesWatch(testing.NewWatchAction(c.resource, c.namespace, opts))
}
panic("math broke")
}
// TODO: opts are currently ignored.
func (c *dynamicResourceClient) Patch(ctx context.Context, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (*unstructured.Unstructured, error) {
var uncastRet runtime.Object
var err error
switch {
case len(c.namespace) == 0 && len(subresources) == 0:
uncastRet, err = c.client.Fake.
Invokes(testing.NewRootPatchAction(c.resource, name, pt, data), &metav1.Status{Status: "dynamic patch fail"})
case len(c.namespace) == 0 && len(subresources) > 0:
uncastRet, err = c.client.Fake.
Invokes(testing.NewRootPatchSubresourceAction(c.resource, name, pt, data, subresources...), &metav1.Status{Status: "dynamic patch fail"})
case len(c.namespace) > 0 && len(subresources) == 0:
uncastRet, err = c.client.Fake.
Invokes(testing.NewPatchAction(c.resource, c.namespace, name, pt, data), &metav1.Status{Status: "dynamic patch fail"})
case len(c.namespace) > 0 && len(subresources) > 0:
uncastRet, err = c.client.Fake.
Invokes(testing.NewPatchSubresourceAction(c.resource, c.namespace, name, pt, data, subresources...), &metav1.Status{Status: "dynamic patch fail"})
}
if err != nil {
return nil, err
}
if uncastRet == nil {
return nil, err
}
ret := &unstructured.Unstructured{}
if err := c.client.scheme.Convert(uncastRet, ret, nil); err != nil {
return nil, err
}
return ret, err
}
func convertObjectsToUnstructured(s *runtime.Scheme, objs []runtime.Object) ([]runtime.Object, error) {
ul := make([]runtime.Object, 0, len(objs))
for _, obj := range objs {
u, err := convertToUnstructured(s, obj)
if err != nil {
return nil, err
}
ul = append(ul, u)
}
return ul, nil
}
func convertToUnstructured(s *runtime.Scheme, obj runtime.Object) (runtime.Object, error) {
var (
err error
u unstructured.Unstructured
)
u.Object, err = runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
if err != nil {
return nil, fmt.Errorf("failed to convert to unstructured: %w", err)
}
gvk := u.GroupVersionKind()
if gvk.Group == "" || gvk.Kind == "" {
gvks, _, err := s.ObjectKinds(obj)
if err != nil {
return nil, fmt.Errorf("failed to convert to unstructured - unable to get GVK %w", err)
}
apiv, k := gvks[0].ToAPIVersionAndKind()
u.SetAPIVersion(apiv)
u.SetKind(k)
}
return &u, nil
}

1
vendor/modules.txt vendored
View File

@@ -873,7 +873,6 @@ k8s.io/client-go/applyconfigurations/storage/v1beta1
k8s.io/client-go/discovery
k8s.io/client-go/discovery/fake
k8s.io/client-go/dynamic
k8s.io/client-go/dynamic/fake
k8s.io/client-go/informers
k8s.io/client-go/informers/admissionregistration
k8s.io/client-go/informers/admissionregistration/v1