diff --git a/pkg/work/helper/helpers.go b/pkg/work/helper/helpers.go index 54cfd8b41..f1af3e763 100644 --- a/pkg/work/helper/helpers.go +++ b/pkg/work/helper/helpers.go @@ -400,6 +400,7 @@ func FindManifestConfiguration(resourceMeta workapiv1.ManifestResourceMeta, if rstOption.UpdateStrategy == nil && option.UpdateStrategy != nil { rstOption.UpdateStrategy = option.UpdateStrategy } + rstOption.FeedbackScrapeType = option.FeedbackScrapeType } if len(rstOption.FeedbackRules) == 0 && len(rstOption.ConditionRules) == 0 && rstOption.UpdateStrategy == nil { diff --git a/pkg/work/spoke/controllers/finalizercontroller/appliedmanifestwork_finalize_controller.go b/pkg/work/spoke/controllers/finalizercontroller/appliedmanifestwork_finalize_controller.go index 02f49fc33..34220f3f0 100644 --- a/pkg/work/spoke/controllers/finalizercontroller/appliedmanifestwork_finalize_controller.go +++ b/pkg/work/spoke/controllers/finalizercontroller/appliedmanifestwork_finalize_controller.go @@ -21,6 +21,7 @@ import ( commonhelper "open-cluster-management.io/ocm/pkg/common/helpers" "open-cluster-management.io/ocm/pkg/common/queue" "open-cluster-management.io/ocm/pkg/work/helper" + "open-cluster-management.io/ocm/pkg/work/spoke/objectreader" ) const appliedManifestWorkFinalizer = "AppliedManifestWorkFinalizer" @@ -31,6 +32,7 @@ type AppliedManifestWorkFinalizeController struct { patcher patcher.Patcher[*workapiv1.AppliedManifestWork, workapiv1.AppliedManifestWorkSpec, workapiv1.AppliedManifestWorkStatus] appliedManifestWorkLister worklister.AppliedManifestWorkLister spokeDynamicClient dynamic.Interface + objectReader objectreader.ObjectReader rateLimiter workqueue.TypedRateLimiter[string] } @@ -38,6 +40,7 @@ func NewAppliedManifestWorkFinalizeController( spokeDynamicClient dynamic.Interface, appliedManifestWorkClient workv1client.AppliedManifestWorkInterface, appliedManifestWorkInformer workinformer.AppliedManifestWorkInformer, + objectReader objectreader.ObjectReader, agentID string, ) factory.Controller { @@ -47,6 +50,7 @@ func NewAppliedManifestWorkFinalizeController( appliedManifestWorkClient), appliedManifestWorkLister: appliedManifestWorkInformer.Lister(), spokeDynamicClient: spokeDynamicClient, + objectReader: objectReader, // After 11 retries (approximately 1 mins), the delay reaches the maximum of 60 seconds. rateLimiter: workqueue.NewTypedItemExponentialFailureRateLimiter[string](50*time.Millisecond, 60*time.Second), } @@ -93,6 +97,10 @@ func (m *AppliedManifestWorkFinalizeController) syncAppliedManifestWork(ctx cont owner := helper.NewAppliedManifestWorkOwner(appliedManifestWork) + // unregister from objectReader + objectreader.UnRegisterInformerFromAppliedManifestWork( + ctx, m.objectReader, appliedManifestWork.Spec.ManifestWorkName, appliedManifestWork.Status.AppliedResources) + // Work is deleting, we remove its related resources on spoke cluster // We still need to run delete for every resource even with ownerref on it, since ownerref does not handle cluster // scoped resource correctly. diff --git a/pkg/work/spoke/controllers/finalizercontroller/appliedmanifestwork_finalize_controller_test.go b/pkg/work/spoke/controllers/finalizercontroller/appliedmanifestwork_finalize_controller_test.go index f1e2471ca..4fa0a108a 100644 --- a/pkg/work/spoke/controllers/finalizercontroller/appliedmanifestwork_finalize_controller_test.go +++ b/pkg/work/spoke/controllers/finalizercontroller/appliedmanifestwork_finalize_controller_test.go @@ -17,11 +17,13 @@ import ( "k8s.io/client-go/util/workqueue" fakeworkclient "open-cluster-management.io/api/client/work/clientset/versioned/fake" + workinformers "open-cluster-management.io/api/client/work/informers/externalversions" workapiv1 "open-cluster-management.io/api/work/v1" "open-cluster-management.io/sdk-go/pkg/patcher" testingcommon "open-cluster-management.io/ocm/pkg/common/testing" "open-cluster-management.io/ocm/pkg/work/helper" + "open-cluster-management.io/ocm/pkg/work/spoke/objectreader" "open-cluster-management.io/ocm/pkg/work/spoke/spoketesting" ) @@ -186,16 +188,24 @@ func TestFinalize(t *testing.T) { fakeDynamicClient := fakedynamic.NewSimpleDynamicClient(runtime.NewScheme(), c.existingResources...) fakeClient := fakeworkclient.NewSimpleClientset(testingWork) + informerFactory := workinformers.NewSharedInformerFactory(fakeClient, 0) + + r, err := objectreader.NewOptions().NewObjectReader(fakeDynamicClient, informerFactory.Work().V1().ManifestWorks()) + if err != nil { + t.Fatal(err) + } + controller := AppliedManifestWorkFinalizeController{ patcher: patcher.NewPatcher[ *workapiv1.AppliedManifestWork, workapiv1.AppliedManifestWorkSpec, workapiv1.AppliedManifestWorkStatus]( fakeClient.WorkV1().AppliedManifestWorks()), spokeDynamicClient: fakeDynamicClient, + objectReader: r, rateLimiter: workqueue.NewTypedItemExponentialFailureRateLimiter[string](0, 1*time.Second), } controllerContext := testingcommon.NewFakeSyncContext(t, testingWork.Name) - err := controller.syncAppliedManifestWork(context.TODO(), controllerContext, testingWork) + err = controller.syncAppliedManifestWork(context.TODO(), controllerContext, testingWork) if err != nil { t.Fatal(err) } diff --git a/pkg/work/spoke/controllers/manifestcontroller/appliedmanifestwork_reconciler.go b/pkg/work/spoke/controllers/manifestcontroller/appliedmanifestwork_reconciler.go index d51a036f6..bf3e90d27 100644 --- a/pkg/work/spoke/controllers/manifestcontroller/appliedmanifestwork_reconciler.go +++ b/pkg/work/spoke/controllers/manifestcontroller/appliedmanifestwork_reconciler.go @@ -20,10 +20,12 @@ import ( commonhelper "open-cluster-management.io/ocm/pkg/common/helpers" "open-cluster-management.io/ocm/pkg/work/helper" + "open-cluster-management.io/ocm/pkg/work/spoke/objectreader" ) type appliedManifestWorkReconciler struct { spokeDynamicClient dynamic.Interface + objectReader objectreader.ObjectReader rateLimiter workqueue.TypedRateLimiter[string] } @@ -89,6 +91,10 @@ func (m *appliedManifestWorkReconciler) reconcile( reason := fmt.Sprintf("it is no longer maintained by manifestwork %s", manifestWork.Name) + // unregister from objectReader + objectreader.UnRegisterInformerFromAppliedManifestWork( + ctx, m.objectReader, appliedManifestWork.Spec.ManifestWorkName, noLongerMaintainedResources) + resourcesPendingFinalization, errs := helper.DeleteAppliedResources( ctx, noLongerMaintainedResources, reason, m.spokeDynamicClient, *owner) if len(errs) != 0 { diff --git a/pkg/work/spoke/controllers/manifestcontroller/appliedmanifestwork_reconciler_test.go b/pkg/work/spoke/controllers/manifestcontroller/appliedmanifestwork_reconciler_test.go index a990e7cf2..c498542c7 100644 --- a/pkg/work/spoke/controllers/manifestcontroller/appliedmanifestwork_reconciler_test.go +++ b/pkg/work/spoke/controllers/manifestcontroller/appliedmanifestwork_reconciler_test.go @@ -24,6 +24,7 @@ import ( testingcommon "open-cluster-management.io/ocm/pkg/common/testing" "open-cluster-management.io/ocm/pkg/work/helper" + "open-cluster-management.io/ocm/pkg/work/spoke/objectreader" "open-cluster-management.io/ocm/pkg/work/spoke/spoketesting" ) @@ -276,6 +277,11 @@ func TestSyncManifestWork(t *testing.T) { t.Fatal(err) } + r, err := objectreader.NewOptions().NewObjectReader(fakeDynamicClient, informerFactory.Work().V1().ManifestWorks()) + if err != nil { + t.Fatal(err) + } + controller := ManifestWorkController{ manifestWorkLister: informerFactory.Work().V1().ManifestWorks().Lister().ManifestWorks("cluster1"), manifestWorkPatcher: patcher.NewPatcher[ @@ -291,6 +297,7 @@ func TestSyncManifestWork(t *testing.T) { }, &appliedManifestWorkReconciler{ spokeDynamicClient: fakeDynamicClient, + objectReader: r, rateLimiter: workqueue.NewTypedItemExponentialFailureRateLimiter[string](0, 1*time.Second), }, }, @@ -298,7 +305,7 @@ func TestSyncManifestWork(t *testing.T) { } controllerContext := testingcommon.NewFakeSyncContext(t, testingWork.Name) - err := controller.sync(context.TODO(), controllerContext, testingWork.Name) + err = controller.sync(context.TODO(), controllerContext, testingWork.Name) if err != nil { t.Fatal(err) } diff --git a/pkg/work/spoke/controllers/manifestcontroller/manifestwork_controller.go b/pkg/work/spoke/controllers/manifestcontroller/manifestwork_controller.go index d99b8cb6a..9e08248d1 100644 --- a/pkg/work/spoke/controllers/manifestcontroller/manifestwork_controller.go +++ b/pkg/work/spoke/controllers/manifestcontroller/manifestwork_controller.go @@ -31,6 +31,7 @@ import ( commonhelper "open-cluster-management.io/ocm/pkg/common/helpers" "open-cluster-management.io/ocm/pkg/work/spoke/apply" "open-cluster-management.io/ocm/pkg/work/spoke/auth" + "open-cluster-management.io/ocm/pkg/work/spoke/objectreader" ) var ( @@ -72,6 +73,7 @@ func NewManifestWorkController( manifestWorkLister worklister.ManifestWorkNamespaceLister, appliedManifestWorkClient workv1client.AppliedManifestWorkInterface, appliedManifestWorkInformer workinformer.AppliedManifestWorkInformer, + objectReader objectreader.ObjectReader, hubHash, agentID string, restMapper meta.RESTMapper, validator auth.ExecutorValidator) factory.Controller { @@ -98,6 +100,7 @@ func NewManifestWorkController( }, &appliedManifestWorkReconciler{ spokeDynamicClient: spokeDynamicClient, + objectReader: objectReader, rateLimiter: workqueue.NewTypedItemExponentialFailureRateLimiter[string](5*time.Millisecond, 1000*time.Second), }, }, diff --git a/pkg/work/spoke/controllers/statuscontroller/availablestatus_controller.go b/pkg/work/spoke/controllers/statuscontroller/availablestatus_controller.go index edeeafd65..812e43a54 100644 --- a/pkg/work/spoke/controllers/statuscontroller/availablestatus_controller.go +++ b/pkg/work/spoke/controllers/statuscontroller/availablestatus_controller.go @@ -11,8 +11,8 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" utilerrors "k8s.io/apimachinery/pkg/util/errors" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/dynamic" "k8s.io/klog/v2" workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1" @@ -44,7 +44,7 @@ const ( type AvailableStatusController struct { patcher patcher.Patcher[*workapiv1.ManifestWork, workapiv1.ManifestWorkSpec, workapiv1.ManifestWorkStatus] manifestWorkLister worklister.ManifestWorkNamespaceLister - objectReader *objectreader.ObjectReader + objectReader objectreader.ObjectReader statusReader *statusfeedback.StatusReader conditionReader *conditions.ConditionReader syncInterval time.Duration @@ -52,23 +52,14 @@ type AvailableStatusController struct { // NewAvailableStatusController returns a AvailableStatusController func NewAvailableStatusController( - spokeDynamicClient dynamic.Interface, manifestWorkClient workv1client.ManifestWorkInterface, manifestWorkInformer workinformer.ManifestWorkInformer, manifestWorkLister worklister.ManifestWorkNamespaceLister, + conditionReader *conditions.ConditionReader, + objectReader objectreader.ObjectReader, maxJSONRawLength int32, syncInterval time.Duration, -) (factory.Controller, error) { - conditionReader, err := conditions.NewConditionReader() - if err != nil { - return nil, err - } - - objectReader, err := objectreader.NewObjectReader(spokeDynamicClient, manifestWorkInformer) - if err != nil { - return nil, err - } - +) factory.Controller { controller := &AvailableStatusController{ patcher: patcher.NewPatcher[ *workapiv1.ManifestWork, workapiv1.ManifestWorkSpec, workapiv1.ManifestWorkStatus]( @@ -82,7 +73,7 @@ func NewAvailableStatusController( return factory.New(). WithInformersQueueKeysFunc(queue.QueueKeyByMetaName, manifestWorkInformer.Informer()). - WithSync(controller.sync).ToController(controllerName), nil + WithSync(controller.sync).ToController(controllerName) } func (c *AvailableStatusController) sync(ctx context.Context, controllerContext factory.SyncContext, manifestWorkName string) error { @@ -103,7 +94,7 @@ func (c *AvailableStatusController) sync(ctx context.Context, controllerContext logger = logging.SetLogTracingByObject(logger, manifestWork) ctx = klog.NewContext(ctx, logger) - err = c.syncManifestWork(ctx, manifestWork) + err = c.syncManifestWork(ctx, controllerContext, manifestWork) if err != nil { return fmt.Errorf("unable to sync manifestwork %q: %w", manifestWork.Name, err) } @@ -113,7 +104,7 @@ func (c *AvailableStatusController) sync(ctx context.Context, controllerContext return nil } -func (c *AvailableStatusController) syncManifestWork(ctx context.Context, originalManifestWork *workapiv1.ManifestWork) error { +func (c *AvailableStatusController) syncManifestWork(ctx context.Context, controllerContext factory.SyncContext, originalManifestWork *workapiv1.ManifestWork) error { manifestWork := originalManifestWork.DeepCopy() // do nothing when finalizer is not added. @@ -138,6 +129,15 @@ func (c *AvailableStatusController) syncManifestWork(ctx context.Context, origin } option := helper.FindManifestConfiguration(manifest.ResourceMeta, manifestWork.Spec.ManifestConfigs) + if option != nil && option.FeedbackScrapeType == workapiv1.FeedbackWatchType { + if err := c.objectReader.RegisterInformer(ctx, manifestWork.Name, manifest.ResourceMeta, controllerContext.Queue()); err != nil { + utilruntime.HandleErrorWithContext(ctx, err, "failed to register informer") + } + } else { + if err := c.objectReader.UnRegisterInformer(manifestWork.Name, manifest.ResourceMeta); err != nil { + utilruntime.HandleErrorWithContext(ctx, err, "failed to unregister informer") + } + } // Read status of the resource according to feedback rules. values, statusFeedbackCondition := c.getFeedbackValues(obj, option) diff --git a/pkg/work/spoke/controllers/statuscontroller/availablestatus_controller_test.go b/pkg/work/spoke/controllers/statuscontroller/availablestatus_controller_test.go index 408d61408..a707b90ae 100644 --- a/pkg/work/spoke/controllers/statuscontroller/availablestatus_controller_test.go +++ b/pkg/work/spoke/controllers/statuscontroller/availablestatus_controller_test.go @@ -210,11 +210,12 @@ func TestSyncManifestWork(t *testing.T) { fakeClient := fakeworkclient.NewSimpleClientset(testingWork) fakeDynamicClient := fakedynamic.NewSimpleDynamicClient(runtime.NewScheme(), c.existingResources...) informerFactory := workinformers.NewSharedInformerFactory(fakeClient, 0) - r, err := objectreader.NewObjectReader(fakeDynamicClient, informerFactory.Work().V1().ManifestWorks()) + r, err := objectreader.NewOptions().NewObjectReader(fakeDynamicClient, informerFactory.Work().V1().ManifestWorks()) if err != nil { t.Fatal(err) } + syncCtx := testingcommon.NewFakeSyncContext(t, testingWork.Namespace) controller := AvailableStatusController{ patcher: patcher.NewPatcher[ *workapiv1.ManifestWork, workapiv1.ManifestWorkSpec, workapiv1.ManifestWorkStatus]( @@ -222,7 +223,7 @@ func TestSyncManifestWork(t *testing.T) { objectReader: r, } - err = controller.syncManifestWork(context.TODO(), testingWork) + err = controller.syncManifestWork(context.TODO(), syncCtx, testingWork) if err != nil { t.Fatal(err) } @@ -554,11 +555,12 @@ func TestStatusFeedback(t *testing.T) { fakeClient := fakeworkclient.NewSimpleClientset(testingWork) fakeDynamicClient := fakedynamic.NewSimpleDynamicClient(runtime.NewScheme(), c.existingResources...) informerFactory := workinformers.NewSharedInformerFactory(fakeClient, 0) - r, err := objectreader.NewObjectReader(fakeDynamicClient, informerFactory.Work().V1().ManifestWorks()) + r, err := objectreader.NewOptions().NewObjectReader(fakeDynamicClient, informerFactory.Work().V1().ManifestWorks()) if err != nil { t.Fatal(err) } + syncCtx := testingcommon.NewFakeSyncContext(t, testingWork.Namespace) controller := AvailableStatusController{ statusReader: statusfeedback.NewStatusReader(), patcher: patcher.NewPatcher[ @@ -567,7 +569,7 @@ func TestStatusFeedback(t *testing.T) { objectReader: r, } - err = controller.syncManifestWork(context.TODO(), testingWork) + err = controller.syncManifestWork(context.TODO(), syncCtx, testingWork) if err != nil { t.Fatal(err) } @@ -882,7 +884,7 @@ func TestConditionRules(t *testing.T) { t.Fatal(err) } - r, err := objectreader.NewObjectReader(fakeDynamicClient, informerFactory.Work().V1().ManifestWorks()) + r, err := objectreader.NewOptions().NewObjectReader(fakeDynamicClient, informerFactory.Work().V1().ManifestWorks()) if err != nil { t.Fatal(err) } @@ -896,7 +898,8 @@ func TestConditionRules(t *testing.T) { objectReader: r, } - err = controller.syncManifestWork(context.TODO(), testingWork) + syncCtx := testingcommon.NewFakeSyncContext(t, testingWork.Namespace) + err = controller.syncManifestWork(context.TODO(), syncCtx, testingWork) if err != nil { t.Fatal(err) } @@ -983,3 +986,142 @@ func hasStatusCondition(conditions []metav1.Condition, conditionType string, sta return false } + +func TestRegisterUnregisterInformersBasedOnFeedbackType(t *testing.T) { + deployment := testingcommon.NewUnstructuredWithContent("apps/v1", "Deployment", "default", "test-deployment", + map[string]interface{}{ + "spec": map[string]interface{}{ + "replicas": int64(3), + }, + "status": map[string]interface{}{ + "availableReplicas": int64(3), + }, + }) + + cases := []struct { + name string + manifestConfigs []workapiv1.ManifestConfigOption + manifests []workapiv1.Manifest + }{ + { + name: "Register informer for FeedbackWatchType", + manifestConfigs: []workapiv1.ManifestConfigOption{ + { + ResourceIdentifier: workapiv1.ResourceIdentifier{ + Group: "apps", + Resource: "deployments", + Name: "test-deployment", + Namespace: "default", + }, + FeedbackRules: []workapiv1.FeedbackRule{ + {Type: workapiv1.JSONPathsType}, + }, + FeedbackScrapeType: workapiv1.FeedbackWatchType, + }, + }, + manifests: []workapiv1.Manifest{ + util.ToManifest(deployment), + }, + }, + { + name: "Do not register informer for FeedbackPollType", + manifestConfigs: []workapiv1.ManifestConfigOption{ + { + ResourceIdentifier: workapiv1.ResourceIdentifier{ + Group: "apps", + Resource: "deployments", + Name: "test-deployment", + Namespace: "default", + }, + FeedbackRules: []workapiv1.FeedbackRule{ + {Type: workapiv1.JSONPathsType}, + }, + FeedbackScrapeType: workapiv1.FeedbackPollType, + }, + }, + manifests: []workapiv1.Manifest{ + util.ToManifest(deployment), + }, + }, + { + name: "Do not register informer when FeedbackScrapeType is empty", + manifestConfigs: []workapiv1.ManifestConfigOption{ + { + ResourceIdentifier: workapiv1.ResourceIdentifier{ + Group: "apps", + Resource: "deployments", + Name: "test-deployment", + Namespace: "default", + }, + FeedbackRules: []workapiv1.FeedbackRule{ + {Type: workapiv1.JSONPathsType}, + }, + }, + }, + manifests: []workapiv1.Manifest{ + util.ToManifest(deployment), + }, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + scheme := runtime.NewScheme() + utilruntime.Must(workapiv1.Install(scheme)) + fakeDynamicClient := fakedynamic.NewSimpleDynamicClient(scheme, deployment) + fakeClient := fakeworkclient.NewSimpleClientset() + informerFactory := workinformers.NewSharedInformerFactory(fakeClient, 10*time.Minute) + + r, err := objectreader.NewOptions().NewObjectReader(fakeDynamicClient, informerFactory.Work().V1().ManifestWorks()) + if err != nil { + t.Fatal(err) + } + + conditionReader, err := conditions.NewConditionReader() + if err != nil { + t.Fatal(err) + } + + testingWork, _ := spoketesting.NewManifestWork(0) + testingWork.Finalizers = []string{workapiv1.ManifestWorkFinalizer} + testingWork.Spec.Workload.Manifests = c.manifests + testingWork.Spec.ManifestConfigs = c.manifestConfigs + testingWork.Status.ResourceStatus.Manifests = []workapiv1.ManifestCondition{ + { + ResourceMeta: workapiv1.ManifestResourceMeta{ + Group: "apps", + Version: "v1", + Resource: "deployments", + Namespace: "default", + Name: "test-deployment", + }, + Conditions: []metav1.Condition{ + { + Type: workapiv1.ManifestApplied, + Status: metav1.ConditionTrue, + }, + }, + }, + } + + syncCtx := testingcommon.NewFakeSyncContext(t, testingWork.Namespace) + controller := AvailableStatusController{ + patcher: patcher.NewPatcher[ + *workapiv1.ManifestWork, workapiv1.ManifestWorkSpec, workapiv1.ManifestWorkStatus]( + fakeClient.WorkV1().ManifestWorks(testingWork.Namespace)), + objectReader: r, + conditionReader: conditionReader, + statusReader: statusfeedback.NewStatusReader(), + } + + err = controller.syncManifestWork(context.TODO(), syncCtx, testingWork) + if err != nil { + t.Fatal(err) + } + + // Note: We cannot directly verify internal informer state from tests + // since objectReader is now private. The test verifies that the sync + // completes without error, which indirectly validates the registration logic. + }) + } +} diff --git a/pkg/work/spoke/objectreader/options.go b/pkg/work/spoke/objectreader/options.go new file mode 100644 index 000000000..a553c8fed --- /dev/null +++ b/pkg/work/spoke/objectreader/options.go @@ -0,0 +1,39 @@ +package objectreader + +import ( + "github.com/spf13/pflag" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/tools/cache" + + workinformers "open-cluster-management.io/api/client/work/informers/externalversions/work/v1" +) + +type Options struct { + MaxFeedbackWatch int32 +} + +func NewOptions() *Options { + return &Options{ + MaxFeedbackWatch: 50, + } +} + +func (o *Options) AddFlags(fs *pflag.FlagSet) { + fs.Int32Var(&o.MaxFeedbackWatch, "max-feedback-watch", + o.MaxFeedbackWatch, "The maximum number of watch for feedback results") +} + +func (o *Options) NewObjectReader(dynamicClient dynamic.Interface, workInformer workinformers.ManifestWorkInformer) (ObjectReader, error) { + if err := workInformer.Informer().AddIndexers(map[string]cache.IndexFunc{ + byWorkIndex: indexWorkByResource, + }); err != nil { + return nil, err + } + + return &objectReader{ + dynamicClient: dynamicClient, + informers: map[informerKey]*informerWithCancel{}, + indexer: workInformer.Informer().GetIndexer(), + maxWatch: o.MaxFeedbackWatch, + }, nil +} diff --git a/pkg/work/spoke/objectreader/reader.go b/pkg/work/spoke/objectreader/reader.go index 5c97fe272..a188c4e3b 100644 --- a/pkg/work/spoke/objectreader/reader.go +++ b/pkg/work/spoke/objectreader/reader.go @@ -17,15 +17,29 @@ import ( "k8s.io/client-go/dynamic/dynamicinformer" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" - workinformers "open-cluster-management.io/api/client/work/informers/externalversions/work/v1" workapiv1 "open-cluster-management.io/api/work/v1" ) const byWorkIndex = "byWorkIndex" +type ObjectReader interface { + // Get returns an object based on resourceMeta + Get(ctx context.Context, resourceMeta workapiv1.ManifestResourceMeta) (*unstructured.Unstructured, metav1.Condition, error) + + // RegisterInformer registers an informer to the ObjectReader + RegisterInformer( + ctx context.Context, workName string, + resourceMeta workapiv1.ManifestResourceMeta, + queue workqueue.TypedRateLimitingInterface[string]) error + + // UnRegisterInformer unregister the informer from the ObjectReader + UnRegisterInformer(workName string, resourceMeta workapiv1.ManifestResourceMeta) error +} + // ObjectReader reads spoke resources using informer-based caching or direct dynamic client calls. -type ObjectReader struct { +type objectReader struct { sync.RWMutex dynamicClient dynamic.Interface @@ -33,6 +47,8 @@ type ObjectReader struct { informers map[informerKey]*informerWithCancel indexer cache.Indexer + + maxWatch int32 } type informerWithCancel struct { @@ -57,21 +73,7 @@ type registrationKey struct { workName string } -func NewObjectReader(dynamicClient dynamic.Interface, workInformer workinformers.ManifestWorkInformer) (*ObjectReader, error) { - if err := workInformer.Informer().AddIndexers(map[string]cache.IndexFunc{ - byWorkIndex: indexWorkByResource, - }); err != nil { - return nil, err - } - - return &ObjectReader{ - dynamicClient: dynamicClient, - informers: map[informerKey]*informerWithCancel{}, - indexer: workInformer.Informer().GetIndexer(), - }, nil -} - -func (o *ObjectReader) Get(ctx context.Context, resourceMeta workapiv1.ManifestResourceMeta) (*unstructured.Unstructured, metav1.Condition, error) { +func (o *objectReader) Get(ctx context.Context, resourceMeta workapiv1.ManifestResourceMeta) (*unstructured.Unstructured, metav1.Condition, error) { if len(resourceMeta.Resource) == 0 || len(resourceMeta.Version) == 0 || len(resourceMeta.Name) == 0 { return nil, metav1.Condition{ Type: workapiv1.ManifestAvailable, @@ -107,7 +109,7 @@ func (o *ObjectReader) Get(ctx context.Context, resourceMeta workapiv1.ManifestR }, nil } -func (o *ObjectReader) getObject(ctx context.Context, resourceMeta workapiv1.ManifestResourceMeta) (*unstructured.Unstructured, error) { +func (o *objectReader) getObject(ctx context.Context, resourceMeta workapiv1.ManifestResourceMeta) (*unstructured.Unstructured, error) { gvr := schema.GroupVersionResource{ Group: resourceMeta.Group, Version: resourceMeta.Version, @@ -118,28 +120,40 @@ func (o *ObjectReader) getObject(ctx context.Context, resourceMeta workapiv1.Man o.RLock() i, found := o.informers[key] o.RUnlock() - if !found { - return o.dynamicClient.Resource(gvr).Namespace(resourceMeta.Namespace).Get(ctx, resourceMeta.Name, metav1.GetOptions{}) + + // Use informer cache only if it exists and has synced. + // If informer is not synced (e.g., watch permission denied, initial sync in progress), + // fallback to direct client.Get() which only requires GET permission. + if found && i.informer.HasSynced() { + var runObj runtime.Object + var err error + // For cluster-scoped resources (empty namespace), use Get() directly + // ByNamespace("") doesn't work for cluster-scoped resources + if resourceMeta.Namespace == "" { + runObj, err = i.lister.Get(resourceMeta.Name) + } else { + runObj, err = i.lister.ByNamespace(resourceMeta.Namespace).Get(resourceMeta.Name) + } + if err != nil { + return nil, err + } + obj, ok := runObj.(*unstructured.Unstructured) + if !ok { + return nil, fmt.Errorf("unexpected type from lister: %T", runObj) + } + return obj, nil } - var runObj runtime.Object - runObj, err := i.lister.ByNamespace(resourceMeta.Namespace).Get(resourceMeta.Name) - if err != nil { - return nil, err - } - obj, ok := runObj.(*unstructured.Unstructured) - if !ok { - return nil, fmt.Errorf("unexpected type from lister: %T", runObj) - } - return obj, nil + return o.dynamicClient.Resource(gvr).Namespace(resourceMeta.Namespace).Get(ctx, resourceMeta.Name, metav1.GetOptions{}) } // RegisterInformer checks if there is an informer and if the event handler has been registered to the informer. // this is called each time a resource needs to be watched. It is idempotent. -func (o *ObjectReader) RegisterInformer( +func (o *objectReader) RegisterInformer( ctx context.Context, workName string, resourceMeta workapiv1.ManifestResourceMeta, queue workqueue.TypedRateLimitingInterface[string]) error { + logger := klog.FromContext(ctx) o.Lock() defer o.Unlock() @@ -158,6 +172,11 @@ func (o *ObjectReader) RegisterInformer( } informer, found := o.informers[key] if !found { + if len(o.informers) >= int(o.maxWatch) { + logger.Info("The number of registered informers has reached the maximum limit, fallback to feedback with poll") + return nil + } + resourceInformer := dynamicinformer.NewFilteredDynamicInformer( o.dynamicClient, gvr, resourceMeta.Namespace, 24*time.Hour, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, nil) @@ -169,6 +188,7 @@ func (o *ObjectReader) RegisterInformer( registrations: map[registrationKey]cache.ResourceEventHandlerRegistration{}, } o.informers[key] = informer + logger.V(4).Info("Registered informer for objecr reader", "informerKey", key) go resourceInformer.Informer().Run(informerCtx.Done()) } @@ -176,6 +196,8 @@ func (o *ObjectReader) RegisterInformer( if _, registrationFound := informer.registrations[regKey]; registrationFound { return nil } + + logger.V(4).Info("Add event handler of informer for objecr reader", "informerKey", key, "resourceKey", regKey) // Add event handler into the informer so it can trigger work reconcile registration, err := informer.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: o.queueWorkByResourceFunc(ctx, gvr, queue), @@ -188,11 +210,12 @@ func (o *ObjectReader) RegisterInformer( } // record the event handler registration informer.registrations[regKey] = registration + return nil } // UnRegisterInformer is called each time a resource is not watched. -func (o *ObjectReader) UnRegisterInformer(workName string, resourceMeta workapiv1.ManifestResourceMeta) error { +func (o *objectReader) UnRegisterInformer(workName string, resourceMeta workapiv1.ManifestResourceMeta) error { o.Lock() defer o.Unlock() @@ -233,8 +256,9 @@ func (o *ObjectReader) UnRegisterInformer(workName string, resourceMeta workapiv return nil } -func (o *ObjectReader) queueWorkByResourceFunc(ctx context.Context, gvr schema.GroupVersionResource, queue workqueue.TypedRateLimitingInterface[string]) func(object interface{}) { +func (o *objectReader) queueWorkByResourceFunc(ctx context.Context, gvr schema.GroupVersionResource, queue workqueue.TypedRateLimitingInterface[string]) func(object interface{}) { return func(object interface{}) { + logger := klog.FromContext(ctx) accessor, err := meta.Accessor(object) if err != nil { utilruntime.HandleErrorWithContext(ctx, err, "failed to access object") @@ -250,6 +274,7 @@ func (o *ObjectReader) queueWorkByResourceFunc(ctx context.Context, gvr schema.G for _, obj := range objects { work := obj.(*workapiv1.ManifestWork) + logger.V(4).Info("enqueue work by resource", "resourceKey", key) queue.Add(work.Name) } } @@ -268,3 +293,21 @@ func indexWorkByResource(obj interface{}) ([]string, error) { } return keys, nil } + +func UnRegisterInformerFromAppliedManifestWork(ctx context.Context, o ObjectReader, workName string, appliedResources []workapiv1.AppliedManifestResourceMeta) { + if o == nil { + return + } + for _, r := range appliedResources { + resourceMeta := workapiv1.ManifestResourceMeta{ + Group: r.Group, + Version: r.Version, + Resource: r.Resource, + Name: r.Name, + Namespace: r.Namespace, + } + if err := o.UnRegisterInformer(workName, resourceMeta); err != nil { + utilruntime.HandleErrorWithContext(ctx, err, "failed to unregister informer") + } + } +} diff --git a/pkg/work/spoke/objectreader/reader_test.go b/pkg/work/spoke/objectreader/reader_test.go index 8c7686a8b..6c287f48c 100644 --- a/pkg/work/spoke/objectreader/reader_test.go +++ b/pkg/work/spoke/objectreader/reader_test.go @@ -5,6 +5,8 @@ import ( "testing" "time" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" @@ -20,7 +22,7 @@ import ( func TestNewObjectReader(t *testing.T) { scheme := runtime.NewScheme() - _ = workapiv1.Install(scheme) + _ = corev1.AddToScheme(scheme) fakeWorkClient := fakeworkclient.NewSimpleClientset() workInformerFactory := workinformers.NewSharedInformerFactory(fakeWorkClient, 10*time.Minute) @@ -28,7 +30,7 @@ func TestNewObjectReader(t *testing.T) { fakeDynamicClient := fakedynamic.NewSimpleDynamicClient(scheme) - reader, err := NewObjectReader(fakeDynamicClient, workInformer) + reader, err := NewOptions().NewObjectReader(fakeDynamicClient, workInformer) if err != nil { t.Fatal(err) } @@ -37,16 +39,16 @@ func TestNewObjectReader(t *testing.T) { t.Fatal("Expected ObjectReader to be created, but got nil") } - if reader.dynamicClient != fakeDynamicClient { - t.Error("Expected dynamicClient to be set correctly") - } - - if reader.informers == nil { - t.Error("Expected informers map to be initialized") - } - - if reader.indexer == nil { - t.Error("Expected indexer to be set correctly") + // Verify ObjectReader is functional by calling Get + _, _, err = reader.Get(context.TODO(), workapiv1.ManifestResourceMeta{ + Group: "", + Version: "v1", + Resource: "secrets", + Namespace: "default", + Name: "test", + }) + if err == nil { + t.Error("Expected error for non-existent resource") } } @@ -83,13 +85,13 @@ func TestGet_IncompleteResourceMeta(t *testing.T) { } scheme := runtime.NewScheme() - _ = workapiv1.Install(scheme) + _ = corev1.AddToScheme(scheme) fakeDynamicClient := fakedynamic.NewSimpleDynamicClient(scheme) fakeWorkClient := fakeworkclient.NewSimpleClientset() workInformerFactory := workinformers.NewSharedInformerFactory(fakeWorkClient, 10*time.Minute) workInformer := workInformerFactory.Work().V1().ManifestWorks() - reader, err := NewObjectReader(fakeDynamicClient, workInformer) + reader, err := NewOptions().NewObjectReader(fakeDynamicClient, workInformer) if err != nil { t.Fatal(err) } @@ -127,13 +129,13 @@ func TestGet_IncompleteResourceMeta(t *testing.T) { func TestGet_ResourceNotFound(t *testing.T) { scheme := runtime.NewScheme() - _ = workapiv1.Install(scheme) + _ = corev1.AddToScheme(scheme) fakeDynamicClient := fakedynamic.NewSimpleDynamicClient(scheme) fakeWorkClient := fakeworkclient.NewSimpleClientset() workInformerFactory := workinformers.NewSharedInformerFactory(fakeWorkClient, 10*time.Minute) workInformer := workInformerFactory.Work().V1().ManifestWorks() - reader, err := NewObjectReader(fakeDynamicClient, workInformer) + reader, err := NewOptions().NewObjectReader(fakeDynamicClient, workInformer) if err != nil { t.Fatal(err) } @@ -175,7 +177,7 @@ func TestGet_ResourceNotFound(t *testing.T) { func TestGet_ResourceFound(t *testing.T) { scheme := runtime.NewScheme() - _ = workapiv1.Install(scheme) + _ = corev1.AddToScheme(scheme) secret := &unstructured.Unstructured{ Object: map[string]any{ @@ -196,7 +198,7 @@ func TestGet_ResourceFound(t *testing.T) { workInformerFactory := workinformers.NewSharedInformerFactory(fakeWorkClient, 10*time.Minute) workInformer := workInformerFactory.Work().V1().ManifestWorks() - reader, err := NewObjectReader(fakeDynamicClient, workInformer) + reader, err := NewOptions().NewObjectReader(fakeDynamicClient, workInformer) if err != nil { t.Fatal(err) } @@ -246,7 +248,7 @@ func TestGet_ResourceFound(t *testing.T) { func TestRegisterInformer(t *testing.T) { scheme := runtime.NewScheme() - _ = workapiv1.Install(scheme) + _ = corev1.AddToScheme(scheme) fakeDynamicClient := fakedynamic.NewSimpleDynamicClient(scheme) fakeWorkClient := fakeworkclient.NewSimpleClientset() @@ -254,7 +256,7 @@ func TestRegisterInformer(t *testing.T) { workInformer := workInformerFactory.Work().V1().ManifestWorks() queue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()) - reader, err := NewObjectReader(fakeDynamicClient, workInformer) + reader, err := NewOptions().NewObjectReader(fakeDynamicClient, workInformer) if err != nil { t.Fatal(err) } @@ -283,9 +285,11 @@ func TestRegisterInformer(t *testing.T) { } key := informerKey{GroupVersionResource: gvr, namespace: resourceMeta.Namespace} - reader.RLock() - informer, found := reader.informers[key] - reader.RUnlock() + // Cast to concrete type to access internal fields in tests + concreteReader := reader.(*objectReader) + concreteReader.RLock() + informer, found := concreteReader.informers[key] + concreteReader.RUnlock() if !found { t.Fatal("Expected informer to be registered") @@ -313,9 +317,9 @@ func TestRegisterInformer(t *testing.T) { t.Fatalf("Expected no error on second registration, got %v", err) } - reader.RLock() - informer, found = reader.informers[key] - reader.RUnlock() + concreteReader.RLock() + informer, found = concreteReader.informers[key] + concreteReader.RUnlock() if !found { t.Fatal("Expected informer to still be registered") @@ -328,7 +332,7 @@ func TestRegisterInformer(t *testing.T) { func TestRegisterInformer_MultipleResources(t *testing.T) { scheme := runtime.NewScheme() - _ = workapiv1.Install(scheme) + _ = corev1.AddToScheme(scheme) fakeDynamicClient := fakedynamic.NewSimpleDynamicClient(scheme) fakeWorkClient := fakeworkclient.NewSimpleClientset() @@ -336,7 +340,7 @@ func TestRegisterInformer_MultipleResources(t *testing.T) { workInformer := workInformerFactory.Work().V1().ManifestWorks() queue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()) - reader, err := NewObjectReader(fakeDynamicClient, workInformer) + reader, err := NewOptions().NewObjectReader(fakeDynamicClient, workInformer) if err != nil { t.Fatal(err) } @@ -378,9 +382,11 @@ func TestRegisterInformer_MultipleResources(t *testing.T) { } key := informerKey{GroupVersionResource: gvr, namespace: "default"} - reader.RLock() - informer, found := reader.informers[key] - reader.RUnlock() + // Cast to concrete type to access internal fields in tests + concreteReader := reader.(*objectReader) + concreteReader.RLock() + informer, found := concreteReader.informers[key] + concreteReader.RUnlock() if !found { t.Fatal("Expected informer to be registered") @@ -394,7 +400,7 @@ func TestRegisterInformer_MultipleResources(t *testing.T) { func TestUnRegisterInformer(t *testing.T) { scheme := runtime.NewScheme() - _ = workapiv1.Install(scheme) + _ = corev1.AddToScheme(scheme) fakeDynamicClient := fakedynamic.NewSimpleDynamicClient(scheme) fakeWorkClient := fakeworkclient.NewSimpleClientset() @@ -402,7 +408,7 @@ func TestUnRegisterInformer(t *testing.T) { workInformer := workInformerFactory.Work().V1().ManifestWorks() queue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()) - reader, err := NewObjectReader(fakeDynamicClient, workInformer) + reader, err := NewOptions().NewObjectReader(fakeDynamicClient, workInformer) if err != nil { t.Fatal(err) } @@ -431,9 +437,10 @@ func TestUnRegisterInformer(t *testing.T) { key := informerKey{GroupVersionResource: gvr, namespace: resourceMeta.Namespace} // Verify informer exists - reader.RLock() - _, found := reader.informers[key] - reader.RUnlock() + concreteReader := reader.(*objectReader) + concreteReader.RLock() + _, found := concreteReader.informers[key] + concreteReader.RUnlock() if !found { t.Fatal("Expected informer to be registered") } @@ -445,9 +452,9 @@ func TestUnRegisterInformer(t *testing.T) { } // Verify informer was removed (since it was the last registration) - reader.RLock() - _, found = reader.informers[key] - reader.RUnlock() + concreteReader.RLock() + _, found = concreteReader.informers[key] + concreteReader.RUnlock() if found { t.Error("Expected informer to be removed") } @@ -455,7 +462,7 @@ func TestUnRegisterInformer(t *testing.T) { func TestUnRegisterInformer_MultipleRegistrations(t *testing.T) { scheme := runtime.NewScheme() - _ = workapiv1.Install(scheme) + _ = corev1.AddToScheme(scheme) fakeDynamicClient := fakedynamic.NewSimpleDynamicClient(scheme) fakeWorkClient := fakeworkclient.NewSimpleClientset() @@ -463,7 +470,7 @@ func TestUnRegisterInformer_MultipleRegistrations(t *testing.T) { workInformer := workInformerFactory.Work().V1().ManifestWorks() queue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()) - reader, err := NewObjectReader(fakeDynamicClient, workInformer) + reader, err := NewOptions().NewObjectReader(fakeDynamicClient, workInformer) if err != nil { t.Fatal(err) } @@ -511,9 +518,10 @@ func TestUnRegisterInformer_MultipleRegistrations(t *testing.T) { } // Verify informer still exists (since second resource is still registered) - reader.RLock() - informer, found := reader.informers[key] - reader.RUnlock() + concreteReader := reader.(*objectReader) + concreteReader.RLock() + informer, found := concreteReader.informers[key] + concreteReader.RUnlock() if !found { t.Fatal("Expected informer to still be registered") } @@ -529,9 +537,9 @@ func TestUnRegisterInformer_MultipleRegistrations(t *testing.T) { } // Verify informer was removed (since it was the last registration) - reader.RLock() - _, found = reader.informers[key] - reader.RUnlock() + concreteReader.RLock() + _, found = concreteReader.informers[key] + concreteReader.RUnlock() if found { t.Error("Expected informer to be removed after unregistering all resources") } @@ -539,14 +547,14 @@ func TestUnRegisterInformer_MultipleRegistrations(t *testing.T) { func TestUnRegisterInformer_NotRegistered(t *testing.T) { scheme := runtime.NewScheme() - _ = workapiv1.Install(scheme) + _ = corev1.AddToScheme(scheme) fakeDynamicClient := fakedynamic.NewSimpleDynamicClient(scheme) fakeWorkClient := fakeworkclient.NewSimpleClientset() workInformerFactory := workinformers.NewSharedInformerFactory(fakeWorkClient, 10*time.Minute) workInformer := workInformerFactory.Work().V1().ManifestWorks() - reader, err := NewObjectReader(fakeDynamicClient, workInformer) + reader, err := NewOptions().NewObjectReader(fakeDynamicClient, workInformer) if err != nil { t.Fatal(err) } @@ -566,6 +574,204 @@ func TestUnRegisterInformer_NotRegistered(t *testing.T) { } } +func TestUnRegisterInformerFromAppliedManifestWork(t *testing.T) { + scheme := runtime.NewScheme() + _ = corev1.AddToScheme(scheme) + _ = appsv1.AddToScheme(scheme) + + fakeDynamicClient := fakedynamic.NewSimpleDynamicClient(scheme) + fakeWorkClient := fakeworkclient.NewSimpleClientset() + workInformerFactory := workinformers.NewSharedInformerFactory(fakeWorkClient, 10*time.Minute) + workInformer := workInformerFactory.Work().V1().ManifestWorks() + queue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()) + + reader, err := NewOptions().NewObjectReader(fakeDynamicClient, workInformer) + if err != nil { + t.Fatal(err) + } + + ctx := t.Context() + + // Register informers for multiple resources + resources := []workapiv1.ManifestResourceMeta{ + { + Group: "", + Version: "v1", + Resource: "secrets", + Namespace: "default", + Name: "secret1", + }, + { + Group: "apps", + Version: "v1", + Resource: "deployments", + Namespace: "kube-system", + Name: "deployment1", + }, + { + Group: "", + Version: "v1", + Resource: "configmaps", + Namespace: "default", + Name: "cm1", + }, + } + + for _, resource := range resources { + err = reader.RegisterInformer(ctx, "test-work", resource, queue) + if err != nil { + t.Fatalf("Expected no error registering informer, got %v", err) + } + } + + // Verify all informers are registered + concreteReader := reader.(*objectReader) + concreteReader.RLock() + initialCount := len(concreteReader.informers) + concreteReader.RUnlock() + + if initialCount != 3 { + t.Errorf("Expected 3 informers to be registered, got %d", initialCount) + } + + // Create applied resources from the manifest resources + appliedResources := []workapiv1.AppliedManifestResourceMeta{ + { + ResourceIdentifier: workapiv1.ResourceIdentifier{ + Group: "", + Resource: "secrets", + Namespace: "default", + Name: "secret1", + }, + Version: "v1", + }, + { + ResourceIdentifier: workapiv1.ResourceIdentifier{ + Group: "apps", + Resource: "deployments", + Namespace: "kube-system", + Name: "deployment1", + }, + Version: "v1", + }, + } + + // Unregister using the helper function + UnRegisterInformerFromAppliedManifestWork(context.TODO(), reader, "test-work", appliedResources) + + // Verify that 2 informers were removed + concreteReader.RLock() + finalCount := len(concreteReader.informers) + concreteReader.RUnlock() + + if finalCount != 1 { + t.Errorf("Expected 1 informer to remain after unregistering 2 resources, got %d", finalCount) + } +} + +func TestUnRegisterInformerFromAppliedManifestWork_NilObjectReader(t *testing.T) { + // This test ensures the function handles nil objectReader gracefully + appliedResources := []workapiv1.AppliedManifestResourceMeta{ + { + ResourceIdentifier: workapiv1.ResourceIdentifier{ + Group: "", + Resource: "secrets", + Namespace: "default", + Name: "secret1", + }, + Version: "v1", + }, + } + + // Should not panic when objectReader is nil + UnRegisterInformerFromAppliedManifestWork(context.TODO(), nil, "test-work", appliedResources) +} + +func TestGet_InformerFallbackToClient(t *testing.T) { + scheme := runtime.NewScheme() + _ = corev1.AddToScheme(scheme) + + secret := &unstructured.Unstructured{ + Object: map[string]any{ + "apiVersion": "v1", + "kind": "Secret", + "metadata": map[string]any{ + "name": "test-secret", + "namespace": "default", + }, + "data": map[string]any{ + "key": "dmFsdWU=", + }, + }, + } + + // Create dynamic client with the secret - object exists in client + fakeDynamicClient := fakedynamic.NewSimpleDynamicClient(scheme, secret) + fakeWorkClient := fakeworkclient.NewSimpleClientset() + workInformerFactory := workinformers.NewSharedInformerFactory(fakeWorkClient, 10*time.Minute) + workInformer := workInformerFactory.Work().V1().ManifestWorks() + queue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]()) + + reader, err := NewOptions().NewObjectReader(fakeDynamicClient, workInformer) + if err != nil { + t.Fatal(err) + } + + resourceMeta := workapiv1.ManifestResourceMeta{ + Group: "", + Version: "v1", + Resource: "secrets", + Namespace: "default", + Name: "test-secret", + } + + ctx := t.Context() + + // Register informer - this creates an informer but it is not synced yet. + // This simulates scenarios where: + // 1. Watch permission is denied - informer starts but can't sync + // 2. Informer is still performing initial cache sync + // In both cases, HasSynced() returns false and Get() should fallback to client.Get() + // which only requires GET permission (not WATCH permission). + err = reader.RegisterInformer(ctx, "test-work", resourceMeta, queue) + if err != nil { + t.Fatalf("Expected no error registering informer, got %v", err) + } + + // Get should fallback to dynamic client when informer is not synced. + // This ensures that even without WATCH permission, resources can still be retrieved + // using GET permission via the dynamic client. + obj, condition, err := reader.Get(ctx, resourceMeta) + + if err != nil { + t.Errorf("Expected no error, got %v", err) + } + + if obj == nil { + t.Fatal("Expected object to be returned from fallback to client, got nil") + } + + if obj.GetName() != "test-secret" { + t.Errorf("Expected object name test-secret, got %s", obj.GetName()) + } + + if obj.GetNamespace() != "default" { + t.Errorf("Expected object namespace default, got %s", obj.GetNamespace()) + } + + if condition.Type != workapiv1.ManifestAvailable { + t.Errorf("Expected condition type %s, got %s", workapiv1.ManifestAvailable, condition.Type) + } + + if condition.Status != metav1.ConditionTrue { + t.Errorf("Expected condition status %s, got %s", metav1.ConditionTrue, condition.Status) + } + + if condition.Reason != "ResourceAvailable" { + t.Errorf("Expected reason ResourceAvailable, got %s", condition.Reason) + } +} + func TestIndexWorkByResource(t *testing.T) { cases := []struct { name string diff --git a/pkg/work/spoke/options.go b/pkg/work/spoke/options.go index 64d3d4afa..e2e7c7a77 100644 --- a/pkg/work/spoke/options.go +++ b/pkg/work/spoke/options.go @@ -4,6 +4,8 @@ import ( "time" "github.com/spf13/pflag" + + "open-cluster-management.io/ocm/pkg/work/spoke/objectreader" ) const ( @@ -20,6 +22,8 @@ type WorkloadAgentOptions struct { CloudEventsClientID string CloudEventsClientCodecs []string DefaultUserAgent string + + ObjectReaderOption *objectreader.Options } // NewWorkloadAgentOptions returns the flags with default value set @@ -31,6 +35,7 @@ func NewWorkloadAgentOptions() *WorkloadAgentOptions { WorkloadSourceDriver: "kube", WorkloadSourceConfig: "/spoke/hub-kubeconfig/kubeconfig", DefaultUserAgent: defaultUserAgent, + ObjectReaderOption: objectreader.NewOptions(), } } @@ -50,4 +55,6 @@ func (o *WorkloadAgentOptions) AddFlags(fs *pflag.FlagSet) { o.CloudEventsClientID, "The ID of the cloudevents client when workload source source is based on cloudevents") fs.StringSliceVar(&o.CloudEventsClientCodecs, "cloudevents-client-codecs", o.CloudEventsClientCodecs, "The codecs for cloudevents client when workload source source is based on cloudevents, the valid codecs: manifest or manifestbundle") + + o.ObjectReaderOption.AddFlags(fs) } diff --git a/pkg/work/spoke/spokeagent.go b/pkg/work/spoke/spokeagent.go index 83d3014a6..ae0ca0da7 100644 --- a/pkg/work/spoke/spokeagent.go +++ b/pkg/work/spoke/spokeagent.go @@ -29,6 +29,7 @@ import ( "open-cluster-management.io/ocm/pkg/features" "open-cluster-management.io/ocm/pkg/work/helper" "open-cluster-management.io/ocm/pkg/work/spoke/auth" + "open-cluster-management.io/ocm/pkg/work/spoke/conditions" "open-cluster-management.io/ocm/pkg/work/spoke/controllers/finalizercontroller" "open-cluster-management.io/ocm/pkg/work/spoke/controllers/manifestcontroller" "open-cluster-management.io/ocm/pkg/work/spoke/controllers/statuscontroller" @@ -132,6 +133,16 @@ func (o *WorkAgentConfig) RunWorkloadAgent(ctx context.Context, controllerContex restMapper, ).NewExecutorValidator(ctx, features.SpokeMutableFeatureGate.Enabled(ocmfeature.ExecutorValidatingCaches)) + conditionReader, err := conditions.NewConditionReader() + if err != nil { + return err + } + + objectReader, err := o.workOptions.ObjectReaderOption.NewObjectReader(spokeDynamicClient, hubWorkInformer) + if err != nil { + return err + } + manifestWorkController := manifestcontroller.NewManifestWorkController( spokeDynamicClient, spokeKubeClient, @@ -141,6 +152,7 @@ func (o *WorkAgentConfig) RunWorkloadAgent(ctx context.Context, controllerContex hubWorkInformer.Lister().ManifestWorks(o.agentOptions.SpokeClusterName), spokeWorkClient.WorkV1().AppliedManifestWorks(), spokeWorkInformerFactory.Work().V1().AppliedManifestWorks(), + objectReader, hubHash, agentID, restMapper, validator, @@ -154,6 +166,7 @@ func (o *WorkAgentConfig) RunWorkloadAgent(ctx context.Context, controllerContex spokeDynamicClient, spokeWorkClient.WorkV1().AppliedManifestWorks(), spokeWorkInformerFactory.Work().V1().AppliedManifestWorks(), + objectReader, agentID, ) manifestWorkFinalizeController := finalizercontroller.NewManifestWorkFinalizeController( @@ -172,17 +185,15 @@ func (o *WorkAgentConfig) RunWorkloadAgent(ctx context.Context, controllerContex o.workOptions.AppliedManifestWorkEvictionGracePeriod, hubHash, agentID, ) - availableStatusController, err := statuscontroller.NewAvailableStatusController( - spokeDynamicClient, + availableStatusController := statuscontroller.NewAvailableStatusController( hubWorkClient, hubWorkInformer, hubWorkInformer.Lister().ManifestWorks(o.agentOptions.SpokeClusterName), + conditionReader, + objectReader, o.workOptions.MaxJSONRawLength, o.workOptions.StatusSyncInterval, ) - if err != nil { - return err - } go spokeWorkInformerFactory.Start(ctx.Done()) go hubWorkInformer.Informer().Run(ctx.Done()) diff --git a/test/integration/work/statusfeedback_test.go b/test/integration/work/statusfeedback_test.go index 6b725c475..a3570ddf7 100644 --- a/test/integration/work/statusfeedback_test.go +++ b/test/integration/work/statusfeedback_test.go @@ -3,6 +3,7 @@ package work import ( "context" "fmt" + "time" "github.com/onsi/ginkgo/v2" "github.com/onsi/gomega" @@ -16,7 +17,9 @@ import ( ocmfeature "open-cluster-management.io/api/feature" workapiv1 "open-cluster-management.io/api/work/v1" + commonoptions "open-cluster-management.io/ocm/pkg/common/options" "open-cluster-management.io/ocm/pkg/features" + "open-cluster-management.io/ocm/pkg/work/spoke" "open-cluster-management.io/ocm/test/integration/util" ) @@ -1017,4 +1020,236 @@ var _ = ginkgo.Describe("ManifestWork Status Feedback", func() { }, eventuallyTimeout, eventuallyInterval).ShouldNot(gomega.HaveOccurred()) }) }) + + ginkgo.Context("Watch-based Status Feedback", func() { + ginkgo.BeforeEach(func() { + u, _, err := util.NewDeployment(clusterName, "deploy-watch", "sa") + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + manifests = append(manifests, util.ToManifest(u)) + + var ctx context.Context + ctx, cancel = context.WithCancel(context.Background()) + // increase normal sync interval so we can validate watch based status feedback + syncIntervalOptionDecorator := func( + opt *spoke.WorkloadAgentOptions, + commonOpt *commonoptions.AgentOptions) (*spoke.WorkloadAgentOptions, *commonoptions.AgentOptions) { + opt.StatusSyncInterval = 30 * time.Second + return opt, commonOpt + } + go startWorkAgent(ctx, clusterName, syncIntervalOptionDecorator) + }) + + ginkgo.AfterEach(func() { + if cancel != nil { + cancel() + } + }) + + ginkgo.It("should register informer and watch resource status changes", func() { + // Create ManifestWork with watch-based feedback + work.Spec.ManifestConfigs = []workapiv1.ManifestConfigOption{ + { + ResourceIdentifier: workapiv1.ResourceIdentifier{ + Group: "apps", + Resource: "deployments", + Namespace: clusterName, + Name: "deploy-watch", + }, + FeedbackRules: []workapiv1.FeedbackRule{ + { + Type: workapiv1.JSONPathsType, + JsonPaths: []workapiv1.JsonPath{ + { + Name: "replicas", + Path: ".spec.replicas", + }, + { + Name: "availableReplicas", + Path: ".status.availableReplicas", + }, + }, + }, + }, + FeedbackScrapeType: workapiv1.FeedbackWatchType, + }, + } + + work, err = hubWorkClient.WorkV1().ManifestWorks(clusterName).Create(context.Background(), work, metav1.CreateOptions{}) + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + + // Wait for work to be applied + util.AssertWorkCondition(work.Namespace, work.Name, hubWorkClient, + workapiv1.WorkApplied, metav1.ConditionTrue, []metav1.ConditionStatus{metav1.ConditionTrue}, + eventuallyTimeout, eventuallyInterval) + util.AssertWorkCondition(work.Namespace, work.Name, hubWorkClient, + workapiv1.WorkAvailable, metav1.ConditionTrue, []metav1.ConditionStatus{metav1.ConditionTrue}, + eventuallyTimeout, eventuallyInterval) + + // Update Deployment status on spoke + gomega.Eventually(func() error { + deploy, err := spokeKubeClient.AppsV1().Deployments(clusterName). + Get(context.Background(), "deploy-watch", metav1.GetOptions{}) + if err != nil { + return err + } + + deploy.Status.Replicas = 3 + deploy.Status.ReadyReplicas = 3 + deploy.Status.AvailableReplicas = 3 + _, err = spokeKubeClient.AppsV1().Deployments(clusterName). + UpdateStatus(context.Background(), deploy, metav1.UpdateOptions{}) + return err + }, eventuallyTimeout, eventuallyInterval).ShouldNot(gomega.HaveOccurred()) + + // Verify feedback values are updated via watch + gomega.Eventually(func() error { + work, err = hubWorkClient.WorkV1().ManifestWorks(clusterName). + Get(context.Background(), work.Name, metav1.GetOptions{}) + if err != nil { + return err + } + + if len(work.Status.ResourceStatus.Manifests) != 1 { + return fmt.Errorf("expected 1 manifest status, got %d", + len(work.Status.ResourceStatus.Manifests)) + } + + values := work.Status.ResourceStatus.Manifests[0].StatusFeedbacks.Values + if len(values) != 2 { + return fmt.Errorf("expected 2 feedback values, got %d", len(values)) + } + + for _, v := range values { + if v.Name == "availableReplicas" { + if v.Value.Integer == nil || *v.Value.Integer != 3 { + return fmt.Errorf("expected availableReplicas to be 3, got %v", v.Value.Integer) + } + } + } + + return nil + }, eventuallyTimeout, eventuallyInterval).ShouldNot(gomega.HaveOccurred()) + }) + + ginkgo.It("should watch cluster-scope resources like namespaces", func() { + // Create a namespace manifest + testNsName := fmt.Sprintf("test-ns-%s", rand.String(5)) + namespaceManifest := util.ToManifest(&corev1.Namespace{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "Namespace", + }, + ObjectMeta: metav1.ObjectMeta{ + Name: testNsName, + Labels: map[string]string{ + "test": "watch-feedback", + }, + }, + }) + + // Create ManifestWork with watch-based feedback for cluster-scope resource + work.Spec.Workload.Manifests = []workapiv1.Manifest{namespaceManifest} + work.Spec.ManifestConfigs = []workapiv1.ManifestConfigOption{ + { + ResourceIdentifier: workapiv1.ResourceIdentifier{ + Group: "", + Resource: "namespaces", + Name: testNsName, + // Note: No Namespace field for cluster-scoped resources + }, + FeedbackRules: []workapiv1.FeedbackRule{ + { + Type: workapiv1.JSONPathsType, + JsonPaths: []workapiv1.JsonPath{ + { + Name: "phase", + Path: ".status.phase", + }, + { + Name: "name", + Path: ".metadata.name", + }, + }, + }, + }, + FeedbackScrapeType: workapiv1.FeedbackWatchType, + }, + } + + work, err = hubWorkClient.WorkV1().ManifestWorks(clusterName).Create(context.Background(), work, metav1.CreateOptions{}) + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + + // Wait for work to be applied + util.AssertWorkCondition(work.Namespace, work.Name, hubWorkClient, + workapiv1.WorkApplied, metav1.ConditionTrue, []metav1.ConditionStatus{metav1.ConditionTrue}, + eventuallyTimeout, eventuallyInterval) + util.AssertWorkCondition(work.Namespace, work.Name, hubWorkClient, + workapiv1.WorkAvailable, metav1.ConditionTrue, []metav1.ConditionStatus{metav1.ConditionTrue}, + eventuallyTimeout, eventuallyInterval) + + // Update namespace status - Kubernetes sets phase to Active automatically, + // but we'll verify it's being watched + gomega.Eventually(func() error { + ns, err := spokeKubeClient.CoreV1().Namespaces().Get(context.Background(), testNsName, metav1.GetOptions{}) + if err != nil { + return err + } + + // Kubernetes automatically sets the phase to Active for new namespaces + // We just need to ensure it has a phase set + if ns.Status.Phase == "" { + ns.Status.Phase = corev1.NamespaceActive + _, err = spokeKubeClient.CoreV1().Namespaces().UpdateStatus(context.Background(), ns, metav1.UpdateOptions{}) + return err + } + return nil + }, eventuallyTimeout, eventuallyInterval).ShouldNot(gomega.HaveOccurred()) + + // Verify feedback values are updated via watch + gomega.Eventually(func() error { + work, err = hubWorkClient.WorkV1().ManifestWorks(clusterName). + Get(context.Background(), work.Name, metav1.GetOptions{}) + if err != nil { + return err + } + + if len(work.Status.ResourceStatus.Manifests) != 1 { + return fmt.Errorf("expected 1 manifest status, got %d", + len(work.Status.ResourceStatus.Manifests)) + } + + values := work.Status.ResourceStatus.Manifests[0].StatusFeedbacks.Values + if len(values) != 2 { + return fmt.Errorf("expected 2 feedback values, got %d", len(values)) + } + + var foundPhase, foundName bool + for _, v := range values { + if v.Name == "phase" { + if v.Value.String == nil || *v.Value.String != string(corev1.NamespaceActive) { + return fmt.Errorf("expected phase to be Active, got %v", v.Value.String) + } + foundPhase = true + } + if v.Name == "name" { + if v.Value.String == nil || *v.Value.String != testNsName { + return fmt.Errorf("expected name to be %s, got %v", testNsName, v.Value.String) + } + foundName = true + } + } + + if !foundPhase || !foundName { + return fmt.Errorf("missing expected feedback values: phase=%v, name=%v", foundPhase, foundName) + } + + if !util.HaveManifestCondition(work.Status.ResourceStatus.Manifests, + "StatusFeedbackSynced", []metav1.ConditionStatus{metav1.ConditionTrue}) { + return fmt.Errorf("status sync condition should be True") + } + + return nil + }, eventuallyTimeout, eventuallyInterval).ShouldNot(gomega.HaveOccurred()) + }) + }) })