Add watch-based feedback with dynamic informer lifecycle management (#1350)

* Add watch-based feedback with dynamic informer lifecycle management

Implements dynamic informer registration and cleanup for resources
configured with watch-based status feedback (FeedbackScrapeType=Watch).
This enables real-time status updates for watched resources while
efficiently managing resource lifecycle.

Features:
- Automatically register informers for resources with FeedbackWatchType
- Skip informer registration for FeedbackPollType or when not configured
- Clean up informers when resources are removed from manifestwork
- Clean up informers during applied manifestwork finalization
- Clean up informers when feedback type changes from watch to poll

Implementation:
- Refactored ObjectReader to interface for better modularity
- Added UnRegisterInformerFromAppliedManifestWork helper for bulk cleanup
- Enhanced AvailableStatusController to conditionally register informers
- Updated finalization controllers to unregister informers on cleanup
- Added nil safety checks to prevent panics during cleanup

Testing:
- Unit tests for informer registration based on feedback type
- Unit tests for bulk unregistration and nil safety
- Integration test for end-to-end watch-based feedback workflow
- Integration test for informer cleanup on manifestwork deletion
- All existing tests updated and passing

This feature improves performance by using watch-based updates for
real-time status feedback while maintaining efficient resource cleanup.

Signed-off-by: Jian Qiu <jqiu@redhat.com>

* Fallback to get from client when informer is not synced

Signed-off-by: Jian Qiu <jqiu@redhat.com>

---------

Signed-off-by: Jian Qiu <jqiu@redhat.com>
This commit is contained in:
Jian Qiu
2026-01-29 14:46:21 +08:00
committed by GitHub
parent f2aa5d4d6a
commit 63d9574ca2
14 changed files with 830 additions and 112 deletions

View File

@@ -400,6 +400,7 @@ func FindManifestConfiguration(resourceMeta workapiv1.ManifestResourceMeta,
if rstOption.UpdateStrategy == nil && option.UpdateStrategy != nil {
rstOption.UpdateStrategy = option.UpdateStrategy
}
rstOption.FeedbackScrapeType = option.FeedbackScrapeType
}
if len(rstOption.FeedbackRules) == 0 && len(rstOption.ConditionRules) == 0 && rstOption.UpdateStrategy == nil {

View File

@@ -21,6 +21,7 @@ import (
commonhelper "open-cluster-management.io/ocm/pkg/common/helpers"
"open-cluster-management.io/ocm/pkg/common/queue"
"open-cluster-management.io/ocm/pkg/work/helper"
"open-cluster-management.io/ocm/pkg/work/spoke/objectreader"
)
const appliedManifestWorkFinalizer = "AppliedManifestWorkFinalizer"
@@ -31,6 +32,7 @@ type AppliedManifestWorkFinalizeController struct {
patcher patcher.Patcher[*workapiv1.AppliedManifestWork, workapiv1.AppliedManifestWorkSpec, workapiv1.AppliedManifestWorkStatus]
appliedManifestWorkLister worklister.AppliedManifestWorkLister
spokeDynamicClient dynamic.Interface
objectReader objectreader.ObjectReader
rateLimiter workqueue.TypedRateLimiter[string]
}
@@ -38,6 +40,7 @@ func NewAppliedManifestWorkFinalizeController(
spokeDynamicClient dynamic.Interface,
appliedManifestWorkClient workv1client.AppliedManifestWorkInterface,
appliedManifestWorkInformer workinformer.AppliedManifestWorkInformer,
objectReader objectreader.ObjectReader,
agentID string,
) factory.Controller {
@@ -47,6 +50,7 @@ func NewAppliedManifestWorkFinalizeController(
appliedManifestWorkClient),
appliedManifestWorkLister: appliedManifestWorkInformer.Lister(),
spokeDynamicClient: spokeDynamicClient,
objectReader: objectReader,
// After 11 retries (approximately 1 mins), the delay reaches the maximum of 60 seconds.
rateLimiter: workqueue.NewTypedItemExponentialFailureRateLimiter[string](50*time.Millisecond, 60*time.Second),
}
@@ -93,6 +97,10 @@ func (m *AppliedManifestWorkFinalizeController) syncAppliedManifestWork(ctx cont
owner := helper.NewAppliedManifestWorkOwner(appliedManifestWork)
// unregister from objectReader
objectreader.UnRegisterInformerFromAppliedManifestWork(
ctx, m.objectReader, appliedManifestWork.Spec.ManifestWorkName, appliedManifestWork.Status.AppliedResources)
// Work is deleting, we remove its related resources on spoke cluster
// We still need to run delete for every resource even with ownerref on it, since ownerref does not handle cluster
// scoped resource correctly.

View File

@@ -17,11 +17,13 @@ import (
"k8s.io/client-go/util/workqueue"
fakeworkclient "open-cluster-management.io/api/client/work/clientset/versioned/fake"
workinformers "open-cluster-management.io/api/client/work/informers/externalversions"
workapiv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/sdk-go/pkg/patcher"
testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
"open-cluster-management.io/ocm/pkg/work/helper"
"open-cluster-management.io/ocm/pkg/work/spoke/objectreader"
"open-cluster-management.io/ocm/pkg/work/spoke/spoketesting"
)
@@ -186,16 +188,24 @@ func TestFinalize(t *testing.T) {
fakeDynamicClient := fakedynamic.NewSimpleDynamicClient(runtime.NewScheme(), c.existingResources...)
fakeClient := fakeworkclient.NewSimpleClientset(testingWork)
informerFactory := workinformers.NewSharedInformerFactory(fakeClient, 0)
r, err := objectreader.NewOptions().NewObjectReader(fakeDynamicClient, informerFactory.Work().V1().ManifestWorks())
if err != nil {
t.Fatal(err)
}
controller := AppliedManifestWorkFinalizeController{
patcher: patcher.NewPatcher[
*workapiv1.AppliedManifestWork, workapiv1.AppliedManifestWorkSpec, workapiv1.AppliedManifestWorkStatus](
fakeClient.WorkV1().AppliedManifestWorks()),
spokeDynamicClient: fakeDynamicClient,
objectReader: r,
rateLimiter: workqueue.NewTypedItemExponentialFailureRateLimiter[string](0, 1*time.Second),
}
controllerContext := testingcommon.NewFakeSyncContext(t, testingWork.Name)
err := controller.syncAppliedManifestWork(context.TODO(), controllerContext, testingWork)
err = controller.syncAppliedManifestWork(context.TODO(), controllerContext, testingWork)
if err != nil {
t.Fatal(err)
}

View File

@@ -20,10 +20,12 @@ import (
commonhelper "open-cluster-management.io/ocm/pkg/common/helpers"
"open-cluster-management.io/ocm/pkg/work/helper"
"open-cluster-management.io/ocm/pkg/work/spoke/objectreader"
)
type appliedManifestWorkReconciler struct {
spokeDynamicClient dynamic.Interface
objectReader objectreader.ObjectReader
rateLimiter workqueue.TypedRateLimiter[string]
}
@@ -89,6 +91,10 @@ func (m *appliedManifestWorkReconciler) reconcile(
reason := fmt.Sprintf("it is no longer maintained by manifestwork %s", manifestWork.Name)
// unregister from objectReader
objectreader.UnRegisterInformerFromAppliedManifestWork(
ctx, m.objectReader, appliedManifestWork.Spec.ManifestWorkName, noLongerMaintainedResources)
resourcesPendingFinalization, errs := helper.DeleteAppliedResources(
ctx, noLongerMaintainedResources, reason, m.spokeDynamicClient, *owner)
if len(errs) != 0 {

View File

@@ -24,6 +24,7 @@ import (
testingcommon "open-cluster-management.io/ocm/pkg/common/testing"
"open-cluster-management.io/ocm/pkg/work/helper"
"open-cluster-management.io/ocm/pkg/work/spoke/objectreader"
"open-cluster-management.io/ocm/pkg/work/spoke/spoketesting"
)
@@ -276,6 +277,11 @@ func TestSyncManifestWork(t *testing.T) {
t.Fatal(err)
}
r, err := objectreader.NewOptions().NewObjectReader(fakeDynamicClient, informerFactory.Work().V1().ManifestWorks())
if err != nil {
t.Fatal(err)
}
controller := ManifestWorkController{
manifestWorkLister: informerFactory.Work().V1().ManifestWorks().Lister().ManifestWorks("cluster1"),
manifestWorkPatcher: patcher.NewPatcher[
@@ -291,6 +297,7 @@ func TestSyncManifestWork(t *testing.T) {
},
&appliedManifestWorkReconciler{
spokeDynamicClient: fakeDynamicClient,
objectReader: r,
rateLimiter: workqueue.NewTypedItemExponentialFailureRateLimiter[string](0, 1*time.Second),
},
},
@@ -298,7 +305,7 @@ func TestSyncManifestWork(t *testing.T) {
}
controllerContext := testingcommon.NewFakeSyncContext(t, testingWork.Name)
err := controller.sync(context.TODO(), controllerContext, testingWork.Name)
err = controller.sync(context.TODO(), controllerContext, testingWork.Name)
if err != nil {
t.Fatal(err)
}

View File

@@ -31,6 +31,7 @@ import (
commonhelper "open-cluster-management.io/ocm/pkg/common/helpers"
"open-cluster-management.io/ocm/pkg/work/spoke/apply"
"open-cluster-management.io/ocm/pkg/work/spoke/auth"
"open-cluster-management.io/ocm/pkg/work/spoke/objectreader"
)
var (
@@ -72,6 +73,7 @@ func NewManifestWorkController(
manifestWorkLister worklister.ManifestWorkNamespaceLister,
appliedManifestWorkClient workv1client.AppliedManifestWorkInterface,
appliedManifestWorkInformer workinformer.AppliedManifestWorkInformer,
objectReader objectreader.ObjectReader,
hubHash, agentID string,
restMapper meta.RESTMapper,
validator auth.ExecutorValidator) factory.Controller {
@@ -98,6 +100,7 @@ func NewManifestWorkController(
},
&appliedManifestWorkReconciler{
spokeDynamicClient: spokeDynamicClient,
objectReader: objectReader,
rateLimiter: workqueue.NewTypedItemExponentialFailureRateLimiter[string](5*time.Millisecond, 1000*time.Second),
},
},

View File

@@ -11,8 +11,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/klog/v2"
workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1"
@@ -44,7 +44,7 @@ const (
type AvailableStatusController struct {
patcher patcher.Patcher[*workapiv1.ManifestWork, workapiv1.ManifestWorkSpec, workapiv1.ManifestWorkStatus]
manifestWorkLister worklister.ManifestWorkNamespaceLister
objectReader *objectreader.ObjectReader
objectReader objectreader.ObjectReader
statusReader *statusfeedback.StatusReader
conditionReader *conditions.ConditionReader
syncInterval time.Duration
@@ -52,23 +52,14 @@ type AvailableStatusController struct {
// NewAvailableStatusController returns a AvailableStatusController
func NewAvailableStatusController(
spokeDynamicClient dynamic.Interface,
manifestWorkClient workv1client.ManifestWorkInterface,
manifestWorkInformer workinformer.ManifestWorkInformer,
manifestWorkLister worklister.ManifestWorkNamespaceLister,
conditionReader *conditions.ConditionReader,
objectReader objectreader.ObjectReader,
maxJSONRawLength int32,
syncInterval time.Duration,
) (factory.Controller, error) {
conditionReader, err := conditions.NewConditionReader()
if err != nil {
return nil, err
}
objectReader, err := objectreader.NewObjectReader(spokeDynamicClient, manifestWorkInformer)
if err != nil {
return nil, err
}
) factory.Controller {
controller := &AvailableStatusController{
patcher: patcher.NewPatcher[
*workapiv1.ManifestWork, workapiv1.ManifestWorkSpec, workapiv1.ManifestWorkStatus](
@@ -82,7 +73,7 @@ func NewAvailableStatusController(
return factory.New().
WithInformersQueueKeysFunc(queue.QueueKeyByMetaName, manifestWorkInformer.Informer()).
WithSync(controller.sync).ToController(controllerName), nil
WithSync(controller.sync).ToController(controllerName)
}
func (c *AvailableStatusController) sync(ctx context.Context, controllerContext factory.SyncContext, manifestWorkName string) error {
@@ -103,7 +94,7 @@ func (c *AvailableStatusController) sync(ctx context.Context, controllerContext
logger = logging.SetLogTracingByObject(logger, manifestWork)
ctx = klog.NewContext(ctx, logger)
err = c.syncManifestWork(ctx, manifestWork)
err = c.syncManifestWork(ctx, controllerContext, manifestWork)
if err != nil {
return fmt.Errorf("unable to sync manifestwork %q: %w", manifestWork.Name, err)
}
@@ -113,7 +104,7 @@ func (c *AvailableStatusController) sync(ctx context.Context, controllerContext
return nil
}
func (c *AvailableStatusController) syncManifestWork(ctx context.Context, originalManifestWork *workapiv1.ManifestWork) error {
func (c *AvailableStatusController) syncManifestWork(ctx context.Context, controllerContext factory.SyncContext, originalManifestWork *workapiv1.ManifestWork) error {
manifestWork := originalManifestWork.DeepCopy()
// do nothing when finalizer is not added.
@@ -138,6 +129,15 @@ func (c *AvailableStatusController) syncManifestWork(ctx context.Context, origin
}
option := helper.FindManifestConfiguration(manifest.ResourceMeta, manifestWork.Spec.ManifestConfigs)
if option != nil && option.FeedbackScrapeType == workapiv1.FeedbackWatchType {
if err := c.objectReader.RegisterInformer(ctx, manifestWork.Name, manifest.ResourceMeta, controllerContext.Queue()); err != nil {
utilruntime.HandleErrorWithContext(ctx, err, "failed to register informer")
}
} else {
if err := c.objectReader.UnRegisterInformer(manifestWork.Name, manifest.ResourceMeta); err != nil {
utilruntime.HandleErrorWithContext(ctx, err, "failed to unregister informer")
}
}
// Read status of the resource according to feedback rules.
values, statusFeedbackCondition := c.getFeedbackValues(obj, option)

View File

@@ -210,11 +210,12 @@ func TestSyncManifestWork(t *testing.T) {
fakeClient := fakeworkclient.NewSimpleClientset(testingWork)
fakeDynamicClient := fakedynamic.NewSimpleDynamicClient(runtime.NewScheme(), c.existingResources...)
informerFactory := workinformers.NewSharedInformerFactory(fakeClient, 0)
r, err := objectreader.NewObjectReader(fakeDynamicClient, informerFactory.Work().V1().ManifestWorks())
r, err := objectreader.NewOptions().NewObjectReader(fakeDynamicClient, informerFactory.Work().V1().ManifestWorks())
if err != nil {
t.Fatal(err)
}
syncCtx := testingcommon.NewFakeSyncContext(t, testingWork.Namespace)
controller := AvailableStatusController{
patcher: patcher.NewPatcher[
*workapiv1.ManifestWork, workapiv1.ManifestWorkSpec, workapiv1.ManifestWorkStatus](
@@ -222,7 +223,7 @@ func TestSyncManifestWork(t *testing.T) {
objectReader: r,
}
err = controller.syncManifestWork(context.TODO(), testingWork)
err = controller.syncManifestWork(context.TODO(), syncCtx, testingWork)
if err != nil {
t.Fatal(err)
}
@@ -554,11 +555,12 @@ func TestStatusFeedback(t *testing.T) {
fakeClient := fakeworkclient.NewSimpleClientset(testingWork)
fakeDynamicClient := fakedynamic.NewSimpleDynamicClient(runtime.NewScheme(), c.existingResources...)
informerFactory := workinformers.NewSharedInformerFactory(fakeClient, 0)
r, err := objectreader.NewObjectReader(fakeDynamicClient, informerFactory.Work().V1().ManifestWorks())
r, err := objectreader.NewOptions().NewObjectReader(fakeDynamicClient, informerFactory.Work().V1().ManifestWorks())
if err != nil {
t.Fatal(err)
}
syncCtx := testingcommon.NewFakeSyncContext(t, testingWork.Namespace)
controller := AvailableStatusController{
statusReader: statusfeedback.NewStatusReader(),
patcher: patcher.NewPatcher[
@@ -567,7 +569,7 @@ func TestStatusFeedback(t *testing.T) {
objectReader: r,
}
err = controller.syncManifestWork(context.TODO(), testingWork)
err = controller.syncManifestWork(context.TODO(), syncCtx, testingWork)
if err != nil {
t.Fatal(err)
}
@@ -882,7 +884,7 @@ func TestConditionRules(t *testing.T) {
t.Fatal(err)
}
r, err := objectreader.NewObjectReader(fakeDynamicClient, informerFactory.Work().V1().ManifestWorks())
r, err := objectreader.NewOptions().NewObjectReader(fakeDynamicClient, informerFactory.Work().V1().ManifestWorks())
if err != nil {
t.Fatal(err)
}
@@ -896,7 +898,8 @@ func TestConditionRules(t *testing.T) {
objectReader: r,
}
err = controller.syncManifestWork(context.TODO(), testingWork)
syncCtx := testingcommon.NewFakeSyncContext(t, testingWork.Namespace)
err = controller.syncManifestWork(context.TODO(), syncCtx, testingWork)
if err != nil {
t.Fatal(err)
}
@@ -983,3 +986,142 @@ func hasStatusCondition(conditions []metav1.Condition, conditionType string, sta
return false
}
func TestRegisterUnregisterInformersBasedOnFeedbackType(t *testing.T) {
deployment := testingcommon.NewUnstructuredWithContent("apps/v1", "Deployment", "default", "test-deployment",
map[string]interface{}{
"spec": map[string]interface{}{
"replicas": int64(3),
},
"status": map[string]interface{}{
"availableReplicas": int64(3),
},
})
cases := []struct {
name string
manifestConfigs []workapiv1.ManifestConfigOption
manifests []workapiv1.Manifest
}{
{
name: "Register informer for FeedbackWatchType",
manifestConfigs: []workapiv1.ManifestConfigOption{
{
ResourceIdentifier: workapiv1.ResourceIdentifier{
Group: "apps",
Resource: "deployments",
Name: "test-deployment",
Namespace: "default",
},
FeedbackRules: []workapiv1.FeedbackRule{
{Type: workapiv1.JSONPathsType},
},
FeedbackScrapeType: workapiv1.FeedbackWatchType,
},
},
manifests: []workapiv1.Manifest{
util.ToManifest(deployment),
},
},
{
name: "Do not register informer for FeedbackPollType",
manifestConfigs: []workapiv1.ManifestConfigOption{
{
ResourceIdentifier: workapiv1.ResourceIdentifier{
Group: "apps",
Resource: "deployments",
Name: "test-deployment",
Namespace: "default",
},
FeedbackRules: []workapiv1.FeedbackRule{
{Type: workapiv1.JSONPathsType},
},
FeedbackScrapeType: workapiv1.FeedbackPollType,
},
},
manifests: []workapiv1.Manifest{
util.ToManifest(deployment),
},
},
{
name: "Do not register informer when FeedbackScrapeType is empty",
manifestConfigs: []workapiv1.ManifestConfigOption{
{
ResourceIdentifier: workapiv1.ResourceIdentifier{
Group: "apps",
Resource: "deployments",
Name: "test-deployment",
Namespace: "default",
},
FeedbackRules: []workapiv1.FeedbackRule{
{Type: workapiv1.JSONPathsType},
},
},
},
manifests: []workapiv1.Manifest{
util.ToManifest(deployment),
},
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
scheme := runtime.NewScheme()
utilruntime.Must(workapiv1.Install(scheme))
fakeDynamicClient := fakedynamic.NewSimpleDynamicClient(scheme, deployment)
fakeClient := fakeworkclient.NewSimpleClientset()
informerFactory := workinformers.NewSharedInformerFactory(fakeClient, 10*time.Minute)
r, err := objectreader.NewOptions().NewObjectReader(fakeDynamicClient, informerFactory.Work().V1().ManifestWorks())
if err != nil {
t.Fatal(err)
}
conditionReader, err := conditions.NewConditionReader()
if err != nil {
t.Fatal(err)
}
testingWork, _ := spoketesting.NewManifestWork(0)
testingWork.Finalizers = []string{workapiv1.ManifestWorkFinalizer}
testingWork.Spec.Workload.Manifests = c.manifests
testingWork.Spec.ManifestConfigs = c.manifestConfigs
testingWork.Status.ResourceStatus.Manifests = []workapiv1.ManifestCondition{
{
ResourceMeta: workapiv1.ManifestResourceMeta{
Group: "apps",
Version: "v1",
Resource: "deployments",
Namespace: "default",
Name: "test-deployment",
},
Conditions: []metav1.Condition{
{
Type: workapiv1.ManifestApplied,
Status: metav1.ConditionTrue,
},
},
},
}
syncCtx := testingcommon.NewFakeSyncContext(t, testingWork.Namespace)
controller := AvailableStatusController{
patcher: patcher.NewPatcher[
*workapiv1.ManifestWork, workapiv1.ManifestWorkSpec, workapiv1.ManifestWorkStatus](
fakeClient.WorkV1().ManifestWorks(testingWork.Namespace)),
objectReader: r,
conditionReader: conditionReader,
statusReader: statusfeedback.NewStatusReader(),
}
err = controller.syncManifestWork(context.TODO(), syncCtx, testingWork)
if err != nil {
t.Fatal(err)
}
// Note: We cannot directly verify internal informer state from tests
// since objectReader is now private. The test verifies that the sync
// completes without error, which indirectly validates the registration logic.
})
}
}

View File

@@ -0,0 +1,39 @@
package objectreader
import (
"github.com/spf13/pflag"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
workinformers "open-cluster-management.io/api/client/work/informers/externalversions/work/v1"
)
type Options struct {
MaxFeedbackWatch int32
}
func NewOptions() *Options {
return &Options{
MaxFeedbackWatch: 50,
}
}
func (o *Options) AddFlags(fs *pflag.FlagSet) {
fs.Int32Var(&o.MaxFeedbackWatch, "max-feedback-watch",
o.MaxFeedbackWatch, "The maximum number of watch for feedback results")
}
func (o *Options) NewObjectReader(dynamicClient dynamic.Interface, workInformer workinformers.ManifestWorkInformer) (ObjectReader, error) {
if err := workInformer.Informer().AddIndexers(map[string]cache.IndexFunc{
byWorkIndex: indexWorkByResource,
}); err != nil {
return nil, err
}
return &objectReader{
dynamicClient: dynamicClient,
informers: map[informerKey]*informerWithCancel{},
indexer: workInformer.Informer().GetIndexer(),
maxWatch: o.MaxFeedbackWatch,
}, nil
}

View File

@@ -17,15 +17,29 @@ import (
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
workinformers "open-cluster-management.io/api/client/work/informers/externalversions/work/v1"
workapiv1 "open-cluster-management.io/api/work/v1"
)
const byWorkIndex = "byWorkIndex"
type ObjectReader interface {
// Get returns an object based on resourceMeta
Get(ctx context.Context, resourceMeta workapiv1.ManifestResourceMeta) (*unstructured.Unstructured, metav1.Condition, error)
// RegisterInformer registers an informer to the ObjectReader
RegisterInformer(
ctx context.Context, workName string,
resourceMeta workapiv1.ManifestResourceMeta,
queue workqueue.TypedRateLimitingInterface[string]) error
// UnRegisterInformer unregister the informer from the ObjectReader
UnRegisterInformer(workName string, resourceMeta workapiv1.ManifestResourceMeta) error
}
// ObjectReader reads spoke resources using informer-based caching or direct dynamic client calls.
type ObjectReader struct {
type objectReader struct {
sync.RWMutex
dynamicClient dynamic.Interface
@@ -33,6 +47,8 @@ type ObjectReader struct {
informers map[informerKey]*informerWithCancel
indexer cache.Indexer
maxWatch int32
}
type informerWithCancel struct {
@@ -57,21 +73,7 @@ type registrationKey struct {
workName string
}
func NewObjectReader(dynamicClient dynamic.Interface, workInformer workinformers.ManifestWorkInformer) (*ObjectReader, error) {
if err := workInformer.Informer().AddIndexers(map[string]cache.IndexFunc{
byWorkIndex: indexWorkByResource,
}); err != nil {
return nil, err
}
return &ObjectReader{
dynamicClient: dynamicClient,
informers: map[informerKey]*informerWithCancel{},
indexer: workInformer.Informer().GetIndexer(),
}, nil
}
func (o *ObjectReader) Get(ctx context.Context, resourceMeta workapiv1.ManifestResourceMeta) (*unstructured.Unstructured, metav1.Condition, error) {
func (o *objectReader) Get(ctx context.Context, resourceMeta workapiv1.ManifestResourceMeta) (*unstructured.Unstructured, metav1.Condition, error) {
if len(resourceMeta.Resource) == 0 || len(resourceMeta.Version) == 0 || len(resourceMeta.Name) == 0 {
return nil, metav1.Condition{
Type: workapiv1.ManifestAvailable,
@@ -107,7 +109,7 @@ func (o *ObjectReader) Get(ctx context.Context, resourceMeta workapiv1.ManifestR
}, nil
}
func (o *ObjectReader) getObject(ctx context.Context, resourceMeta workapiv1.ManifestResourceMeta) (*unstructured.Unstructured, error) {
func (o *objectReader) getObject(ctx context.Context, resourceMeta workapiv1.ManifestResourceMeta) (*unstructured.Unstructured, error) {
gvr := schema.GroupVersionResource{
Group: resourceMeta.Group,
Version: resourceMeta.Version,
@@ -118,28 +120,40 @@ func (o *ObjectReader) getObject(ctx context.Context, resourceMeta workapiv1.Man
o.RLock()
i, found := o.informers[key]
o.RUnlock()
if !found {
return o.dynamicClient.Resource(gvr).Namespace(resourceMeta.Namespace).Get(ctx, resourceMeta.Name, metav1.GetOptions{})
// Use informer cache only if it exists and has synced.
// If informer is not synced (e.g., watch permission denied, initial sync in progress),
// fallback to direct client.Get() which only requires GET permission.
if found && i.informer.HasSynced() {
var runObj runtime.Object
var err error
// For cluster-scoped resources (empty namespace), use Get() directly
// ByNamespace("") doesn't work for cluster-scoped resources
if resourceMeta.Namespace == "" {
runObj, err = i.lister.Get(resourceMeta.Name)
} else {
runObj, err = i.lister.ByNamespace(resourceMeta.Namespace).Get(resourceMeta.Name)
}
if err != nil {
return nil, err
}
obj, ok := runObj.(*unstructured.Unstructured)
if !ok {
return nil, fmt.Errorf("unexpected type from lister: %T", runObj)
}
return obj, nil
}
var runObj runtime.Object
runObj, err := i.lister.ByNamespace(resourceMeta.Namespace).Get(resourceMeta.Name)
if err != nil {
return nil, err
}
obj, ok := runObj.(*unstructured.Unstructured)
if !ok {
return nil, fmt.Errorf("unexpected type from lister: %T", runObj)
}
return obj, nil
return o.dynamicClient.Resource(gvr).Namespace(resourceMeta.Namespace).Get(ctx, resourceMeta.Name, metav1.GetOptions{})
}
// RegisterInformer checks if there is an informer and if the event handler has been registered to the informer.
// this is called each time a resource needs to be watched. It is idempotent.
func (o *ObjectReader) RegisterInformer(
func (o *objectReader) RegisterInformer(
ctx context.Context, workName string,
resourceMeta workapiv1.ManifestResourceMeta,
queue workqueue.TypedRateLimitingInterface[string]) error {
logger := klog.FromContext(ctx)
o.Lock()
defer o.Unlock()
@@ -158,6 +172,11 @@ func (o *ObjectReader) RegisterInformer(
}
informer, found := o.informers[key]
if !found {
if len(o.informers) >= int(o.maxWatch) {
logger.Info("The number of registered informers has reached the maximum limit, fallback to feedback with poll")
return nil
}
resourceInformer := dynamicinformer.NewFilteredDynamicInformer(
o.dynamicClient, gvr, resourceMeta.Namespace, 24*time.Hour,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, nil)
@@ -169,6 +188,7 @@ func (o *ObjectReader) RegisterInformer(
registrations: map[registrationKey]cache.ResourceEventHandlerRegistration{},
}
o.informers[key] = informer
logger.V(4).Info("Registered informer for objecr reader", "informerKey", key)
go resourceInformer.Informer().Run(informerCtx.Done())
}
@@ -176,6 +196,8 @@ func (o *ObjectReader) RegisterInformer(
if _, registrationFound := informer.registrations[regKey]; registrationFound {
return nil
}
logger.V(4).Info("Add event handler of informer for objecr reader", "informerKey", key, "resourceKey", regKey)
// Add event handler into the informer so it can trigger work reconcile
registration, err := informer.informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: o.queueWorkByResourceFunc(ctx, gvr, queue),
@@ -188,11 +210,12 @@ func (o *ObjectReader) RegisterInformer(
}
// record the event handler registration
informer.registrations[regKey] = registration
return nil
}
// UnRegisterInformer is called each time a resource is not watched.
func (o *ObjectReader) UnRegisterInformer(workName string, resourceMeta workapiv1.ManifestResourceMeta) error {
func (o *objectReader) UnRegisterInformer(workName string, resourceMeta workapiv1.ManifestResourceMeta) error {
o.Lock()
defer o.Unlock()
@@ -233,8 +256,9 @@ func (o *ObjectReader) UnRegisterInformer(workName string, resourceMeta workapiv
return nil
}
func (o *ObjectReader) queueWorkByResourceFunc(ctx context.Context, gvr schema.GroupVersionResource, queue workqueue.TypedRateLimitingInterface[string]) func(object interface{}) {
func (o *objectReader) queueWorkByResourceFunc(ctx context.Context, gvr schema.GroupVersionResource, queue workqueue.TypedRateLimitingInterface[string]) func(object interface{}) {
return func(object interface{}) {
logger := klog.FromContext(ctx)
accessor, err := meta.Accessor(object)
if err != nil {
utilruntime.HandleErrorWithContext(ctx, err, "failed to access object")
@@ -250,6 +274,7 @@ func (o *ObjectReader) queueWorkByResourceFunc(ctx context.Context, gvr schema.G
for _, obj := range objects {
work := obj.(*workapiv1.ManifestWork)
logger.V(4).Info("enqueue work by resource", "resourceKey", key)
queue.Add(work.Name)
}
}
@@ -268,3 +293,21 @@ func indexWorkByResource(obj interface{}) ([]string, error) {
}
return keys, nil
}
func UnRegisterInformerFromAppliedManifestWork(ctx context.Context, o ObjectReader, workName string, appliedResources []workapiv1.AppliedManifestResourceMeta) {
if o == nil {
return
}
for _, r := range appliedResources {
resourceMeta := workapiv1.ManifestResourceMeta{
Group: r.Group,
Version: r.Version,
Resource: r.Resource,
Name: r.Name,
Namespace: r.Namespace,
}
if err := o.UnRegisterInformer(workName, resourceMeta); err != nil {
utilruntime.HandleErrorWithContext(ctx, err, "failed to unregister informer")
}
}
}

View File

@@ -5,6 +5,8 @@ import (
"testing"
"time"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@@ -20,7 +22,7 @@ import (
func TestNewObjectReader(t *testing.T) {
scheme := runtime.NewScheme()
_ = workapiv1.Install(scheme)
_ = corev1.AddToScheme(scheme)
fakeWorkClient := fakeworkclient.NewSimpleClientset()
workInformerFactory := workinformers.NewSharedInformerFactory(fakeWorkClient, 10*time.Minute)
@@ -28,7 +30,7 @@ func TestNewObjectReader(t *testing.T) {
fakeDynamicClient := fakedynamic.NewSimpleDynamicClient(scheme)
reader, err := NewObjectReader(fakeDynamicClient, workInformer)
reader, err := NewOptions().NewObjectReader(fakeDynamicClient, workInformer)
if err != nil {
t.Fatal(err)
}
@@ -37,16 +39,16 @@ func TestNewObjectReader(t *testing.T) {
t.Fatal("Expected ObjectReader to be created, but got nil")
}
if reader.dynamicClient != fakeDynamicClient {
t.Error("Expected dynamicClient to be set correctly")
}
if reader.informers == nil {
t.Error("Expected informers map to be initialized")
}
if reader.indexer == nil {
t.Error("Expected indexer to be set correctly")
// Verify ObjectReader is functional by calling Get
_, _, err = reader.Get(context.TODO(), workapiv1.ManifestResourceMeta{
Group: "",
Version: "v1",
Resource: "secrets",
Namespace: "default",
Name: "test",
})
if err == nil {
t.Error("Expected error for non-existent resource")
}
}
@@ -83,13 +85,13 @@ func TestGet_IncompleteResourceMeta(t *testing.T) {
}
scheme := runtime.NewScheme()
_ = workapiv1.Install(scheme)
_ = corev1.AddToScheme(scheme)
fakeDynamicClient := fakedynamic.NewSimpleDynamicClient(scheme)
fakeWorkClient := fakeworkclient.NewSimpleClientset()
workInformerFactory := workinformers.NewSharedInformerFactory(fakeWorkClient, 10*time.Minute)
workInformer := workInformerFactory.Work().V1().ManifestWorks()
reader, err := NewObjectReader(fakeDynamicClient, workInformer)
reader, err := NewOptions().NewObjectReader(fakeDynamicClient, workInformer)
if err != nil {
t.Fatal(err)
}
@@ -127,13 +129,13 @@ func TestGet_IncompleteResourceMeta(t *testing.T) {
func TestGet_ResourceNotFound(t *testing.T) {
scheme := runtime.NewScheme()
_ = workapiv1.Install(scheme)
_ = corev1.AddToScheme(scheme)
fakeDynamicClient := fakedynamic.NewSimpleDynamicClient(scheme)
fakeWorkClient := fakeworkclient.NewSimpleClientset()
workInformerFactory := workinformers.NewSharedInformerFactory(fakeWorkClient, 10*time.Minute)
workInformer := workInformerFactory.Work().V1().ManifestWorks()
reader, err := NewObjectReader(fakeDynamicClient, workInformer)
reader, err := NewOptions().NewObjectReader(fakeDynamicClient, workInformer)
if err != nil {
t.Fatal(err)
}
@@ -175,7 +177,7 @@ func TestGet_ResourceNotFound(t *testing.T) {
func TestGet_ResourceFound(t *testing.T) {
scheme := runtime.NewScheme()
_ = workapiv1.Install(scheme)
_ = corev1.AddToScheme(scheme)
secret := &unstructured.Unstructured{
Object: map[string]any{
@@ -196,7 +198,7 @@ func TestGet_ResourceFound(t *testing.T) {
workInformerFactory := workinformers.NewSharedInformerFactory(fakeWorkClient, 10*time.Minute)
workInformer := workInformerFactory.Work().V1().ManifestWorks()
reader, err := NewObjectReader(fakeDynamicClient, workInformer)
reader, err := NewOptions().NewObjectReader(fakeDynamicClient, workInformer)
if err != nil {
t.Fatal(err)
}
@@ -246,7 +248,7 @@ func TestGet_ResourceFound(t *testing.T) {
func TestRegisterInformer(t *testing.T) {
scheme := runtime.NewScheme()
_ = workapiv1.Install(scheme)
_ = corev1.AddToScheme(scheme)
fakeDynamicClient := fakedynamic.NewSimpleDynamicClient(scheme)
fakeWorkClient := fakeworkclient.NewSimpleClientset()
@@ -254,7 +256,7 @@ func TestRegisterInformer(t *testing.T) {
workInformer := workInformerFactory.Work().V1().ManifestWorks()
queue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]())
reader, err := NewObjectReader(fakeDynamicClient, workInformer)
reader, err := NewOptions().NewObjectReader(fakeDynamicClient, workInformer)
if err != nil {
t.Fatal(err)
}
@@ -283,9 +285,11 @@ func TestRegisterInformer(t *testing.T) {
}
key := informerKey{GroupVersionResource: gvr, namespace: resourceMeta.Namespace}
reader.RLock()
informer, found := reader.informers[key]
reader.RUnlock()
// Cast to concrete type to access internal fields in tests
concreteReader := reader.(*objectReader)
concreteReader.RLock()
informer, found := concreteReader.informers[key]
concreteReader.RUnlock()
if !found {
t.Fatal("Expected informer to be registered")
@@ -313,9 +317,9 @@ func TestRegisterInformer(t *testing.T) {
t.Fatalf("Expected no error on second registration, got %v", err)
}
reader.RLock()
informer, found = reader.informers[key]
reader.RUnlock()
concreteReader.RLock()
informer, found = concreteReader.informers[key]
concreteReader.RUnlock()
if !found {
t.Fatal("Expected informer to still be registered")
@@ -328,7 +332,7 @@ func TestRegisterInformer(t *testing.T) {
func TestRegisterInformer_MultipleResources(t *testing.T) {
scheme := runtime.NewScheme()
_ = workapiv1.Install(scheme)
_ = corev1.AddToScheme(scheme)
fakeDynamicClient := fakedynamic.NewSimpleDynamicClient(scheme)
fakeWorkClient := fakeworkclient.NewSimpleClientset()
@@ -336,7 +340,7 @@ func TestRegisterInformer_MultipleResources(t *testing.T) {
workInformer := workInformerFactory.Work().V1().ManifestWorks()
queue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]())
reader, err := NewObjectReader(fakeDynamicClient, workInformer)
reader, err := NewOptions().NewObjectReader(fakeDynamicClient, workInformer)
if err != nil {
t.Fatal(err)
}
@@ -378,9 +382,11 @@ func TestRegisterInformer_MultipleResources(t *testing.T) {
}
key := informerKey{GroupVersionResource: gvr, namespace: "default"}
reader.RLock()
informer, found := reader.informers[key]
reader.RUnlock()
// Cast to concrete type to access internal fields in tests
concreteReader := reader.(*objectReader)
concreteReader.RLock()
informer, found := concreteReader.informers[key]
concreteReader.RUnlock()
if !found {
t.Fatal("Expected informer to be registered")
@@ -394,7 +400,7 @@ func TestRegisterInformer_MultipleResources(t *testing.T) {
func TestUnRegisterInformer(t *testing.T) {
scheme := runtime.NewScheme()
_ = workapiv1.Install(scheme)
_ = corev1.AddToScheme(scheme)
fakeDynamicClient := fakedynamic.NewSimpleDynamicClient(scheme)
fakeWorkClient := fakeworkclient.NewSimpleClientset()
@@ -402,7 +408,7 @@ func TestUnRegisterInformer(t *testing.T) {
workInformer := workInformerFactory.Work().V1().ManifestWorks()
queue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]())
reader, err := NewObjectReader(fakeDynamicClient, workInformer)
reader, err := NewOptions().NewObjectReader(fakeDynamicClient, workInformer)
if err != nil {
t.Fatal(err)
}
@@ -431,9 +437,10 @@ func TestUnRegisterInformer(t *testing.T) {
key := informerKey{GroupVersionResource: gvr, namespace: resourceMeta.Namespace}
// Verify informer exists
reader.RLock()
_, found := reader.informers[key]
reader.RUnlock()
concreteReader := reader.(*objectReader)
concreteReader.RLock()
_, found := concreteReader.informers[key]
concreteReader.RUnlock()
if !found {
t.Fatal("Expected informer to be registered")
}
@@ -445,9 +452,9 @@ func TestUnRegisterInformer(t *testing.T) {
}
// Verify informer was removed (since it was the last registration)
reader.RLock()
_, found = reader.informers[key]
reader.RUnlock()
concreteReader.RLock()
_, found = concreteReader.informers[key]
concreteReader.RUnlock()
if found {
t.Error("Expected informer to be removed")
}
@@ -455,7 +462,7 @@ func TestUnRegisterInformer(t *testing.T) {
func TestUnRegisterInformer_MultipleRegistrations(t *testing.T) {
scheme := runtime.NewScheme()
_ = workapiv1.Install(scheme)
_ = corev1.AddToScheme(scheme)
fakeDynamicClient := fakedynamic.NewSimpleDynamicClient(scheme)
fakeWorkClient := fakeworkclient.NewSimpleClientset()
@@ -463,7 +470,7 @@ func TestUnRegisterInformer_MultipleRegistrations(t *testing.T) {
workInformer := workInformerFactory.Work().V1().ManifestWorks()
queue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]())
reader, err := NewObjectReader(fakeDynamicClient, workInformer)
reader, err := NewOptions().NewObjectReader(fakeDynamicClient, workInformer)
if err != nil {
t.Fatal(err)
}
@@ -511,9 +518,10 @@ func TestUnRegisterInformer_MultipleRegistrations(t *testing.T) {
}
// Verify informer still exists (since second resource is still registered)
reader.RLock()
informer, found := reader.informers[key]
reader.RUnlock()
concreteReader := reader.(*objectReader)
concreteReader.RLock()
informer, found := concreteReader.informers[key]
concreteReader.RUnlock()
if !found {
t.Fatal("Expected informer to still be registered")
}
@@ -529,9 +537,9 @@ func TestUnRegisterInformer_MultipleRegistrations(t *testing.T) {
}
// Verify informer was removed (since it was the last registration)
reader.RLock()
_, found = reader.informers[key]
reader.RUnlock()
concreteReader.RLock()
_, found = concreteReader.informers[key]
concreteReader.RUnlock()
if found {
t.Error("Expected informer to be removed after unregistering all resources")
}
@@ -539,14 +547,14 @@ func TestUnRegisterInformer_MultipleRegistrations(t *testing.T) {
func TestUnRegisterInformer_NotRegistered(t *testing.T) {
scheme := runtime.NewScheme()
_ = workapiv1.Install(scheme)
_ = corev1.AddToScheme(scheme)
fakeDynamicClient := fakedynamic.NewSimpleDynamicClient(scheme)
fakeWorkClient := fakeworkclient.NewSimpleClientset()
workInformerFactory := workinformers.NewSharedInformerFactory(fakeWorkClient, 10*time.Minute)
workInformer := workInformerFactory.Work().V1().ManifestWorks()
reader, err := NewObjectReader(fakeDynamicClient, workInformer)
reader, err := NewOptions().NewObjectReader(fakeDynamicClient, workInformer)
if err != nil {
t.Fatal(err)
}
@@ -566,6 +574,204 @@ func TestUnRegisterInformer_NotRegistered(t *testing.T) {
}
}
func TestUnRegisterInformerFromAppliedManifestWork(t *testing.T) {
scheme := runtime.NewScheme()
_ = corev1.AddToScheme(scheme)
_ = appsv1.AddToScheme(scheme)
fakeDynamicClient := fakedynamic.NewSimpleDynamicClient(scheme)
fakeWorkClient := fakeworkclient.NewSimpleClientset()
workInformerFactory := workinformers.NewSharedInformerFactory(fakeWorkClient, 10*time.Minute)
workInformer := workInformerFactory.Work().V1().ManifestWorks()
queue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]())
reader, err := NewOptions().NewObjectReader(fakeDynamicClient, workInformer)
if err != nil {
t.Fatal(err)
}
ctx := t.Context()
// Register informers for multiple resources
resources := []workapiv1.ManifestResourceMeta{
{
Group: "",
Version: "v1",
Resource: "secrets",
Namespace: "default",
Name: "secret1",
},
{
Group: "apps",
Version: "v1",
Resource: "deployments",
Namespace: "kube-system",
Name: "deployment1",
},
{
Group: "",
Version: "v1",
Resource: "configmaps",
Namespace: "default",
Name: "cm1",
},
}
for _, resource := range resources {
err = reader.RegisterInformer(ctx, "test-work", resource, queue)
if err != nil {
t.Fatalf("Expected no error registering informer, got %v", err)
}
}
// Verify all informers are registered
concreteReader := reader.(*objectReader)
concreteReader.RLock()
initialCount := len(concreteReader.informers)
concreteReader.RUnlock()
if initialCount != 3 {
t.Errorf("Expected 3 informers to be registered, got %d", initialCount)
}
// Create applied resources from the manifest resources
appliedResources := []workapiv1.AppliedManifestResourceMeta{
{
ResourceIdentifier: workapiv1.ResourceIdentifier{
Group: "",
Resource: "secrets",
Namespace: "default",
Name: "secret1",
},
Version: "v1",
},
{
ResourceIdentifier: workapiv1.ResourceIdentifier{
Group: "apps",
Resource: "deployments",
Namespace: "kube-system",
Name: "deployment1",
},
Version: "v1",
},
}
// Unregister using the helper function
UnRegisterInformerFromAppliedManifestWork(context.TODO(), reader, "test-work", appliedResources)
// Verify that 2 informers were removed
concreteReader.RLock()
finalCount := len(concreteReader.informers)
concreteReader.RUnlock()
if finalCount != 1 {
t.Errorf("Expected 1 informer to remain after unregistering 2 resources, got %d", finalCount)
}
}
func TestUnRegisterInformerFromAppliedManifestWork_NilObjectReader(t *testing.T) {
// This test ensures the function handles nil objectReader gracefully
appliedResources := []workapiv1.AppliedManifestResourceMeta{
{
ResourceIdentifier: workapiv1.ResourceIdentifier{
Group: "",
Resource: "secrets",
Namespace: "default",
Name: "secret1",
},
Version: "v1",
},
}
// Should not panic when objectReader is nil
UnRegisterInformerFromAppliedManifestWork(context.TODO(), nil, "test-work", appliedResources)
}
func TestGet_InformerFallbackToClient(t *testing.T) {
scheme := runtime.NewScheme()
_ = corev1.AddToScheme(scheme)
secret := &unstructured.Unstructured{
Object: map[string]any{
"apiVersion": "v1",
"kind": "Secret",
"metadata": map[string]any{
"name": "test-secret",
"namespace": "default",
},
"data": map[string]any{
"key": "dmFsdWU=",
},
},
}
// Create dynamic client with the secret - object exists in client
fakeDynamicClient := fakedynamic.NewSimpleDynamicClient(scheme, secret)
fakeWorkClient := fakeworkclient.NewSimpleClientset()
workInformerFactory := workinformers.NewSharedInformerFactory(fakeWorkClient, 10*time.Minute)
workInformer := workInformerFactory.Work().V1().ManifestWorks()
queue := workqueue.NewTypedRateLimitingQueue(workqueue.DefaultTypedControllerRateLimiter[string]())
reader, err := NewOptions().NewObjectReader(fakeDynamicClient, workInformer)
if err != nil {
t.Fatal(err)
}
resourceMeta := workapiv1.ManifestResourceMeta{
Group: "",
Version: "v1",
Resource: "secrets",
Namespace: "default",
Name: "test-secret",
}
ctx := t.Context()
// Register informer - this creates an informer but it is not synced yet.
// This simulates scenarios where:
// 1. Watch permission is denied - informer starts but can't sync
// 2. Informer is still performing initial cache sync
// In both cases, HasSynced() returns false and Get() should fallback to client.Get()
// which only requires GET permission (not WATCH permission).
err = reader.RegisterInformer(ctx, "test-work", resourceMeta, queue)
if err != nil {
t.Fatalf("Expected no error registering informer, got %v", err)
}
// Get should fallback to dynamic client when informer is not synced.
// This ensures that even without WATCH permission, resources can still be retrieved
// using GET permission via the dynamic client.
obj, condition, err := reader.Get(ctx, resourceMeta)
if err != nil {
t.Errorf("Expected no error, got %v", err)
}
if obj == nil {
t.Fatal("Expected object to be returned from fallback to client, got nil")
}
if obj.GetName() != "test-secret" {
t.Errorf("Expected object name test-secret, got %s", obj.GetName())
}
if obj.GetNamespace() != "default" {
t.Errorf("Expected object namespace default, got %s", obj.GetNamespace())
}
if condition.Type != workapiv1.ManifestAvailable {
t.Errorf("Expected condition type %s, got %s", workapiv1.ManifestAvailable, condition.Type)
}
if condition.Status != metav1.ConditionTrue {
t.Errorf("Expected condition status %s, got %s", metav1.ConditionTrue, condition.Status)
}
if condition.Reason != "ResourceAvailable" {
t.Errorf("Expected reason ResourceAvailable, got %s", condition.Reason)
}
}
func TestIndexWorkByResource(t *testing.T) {
cases := []struct {
name string

View File

@@ -4,6 +4,8 @@ import (
"time"
"github.com/spf13/pflag"
"open-cluster-management.io/ocm/pkg/work/spoke/objectreader"
)
const (
@@ -20,6 +22,8 @@ type WorkloadAgentOptions struct {
CloudEventsClientID string
CloudEventsClientCodecs []string
DefaultUserAgent string
ObjectReaderOption *objectreader.Options
}
// NewWorkloadAgentOptions returns the flags with default value set
@@ -31,6 +35,7 @@ func NewWorkloadAgentOptions() *WorkloadAgentOptions {
WorkloadSourceDriver: "kube",
WorkloadSourceConfig: "/spoke/hub-kubeconfig/kubeconfig",
DefaultUserAgent: defaultUserAgent,
ObjectReaderOption: objectreader.NewOptions(),
}
}
@@ -50,4 +55,6 @@ func (o *WorkloadAgentOptions) AddFlags(fs *pflag.FlagSet) {
o.CloudEventsClientID, "The ID of the cloudevents client when workload source source is based on cloudevents")
fs.StringSliceVar(&o.CloudEventsClientCodecs, "cloudevents-client-codecs", o.CloudEventsClientCodecs,
"The codecs for cloudevents client when workload source source is based on cloudevents, the valid codecs: manifest or manifestbundle")
o.ObjectReaderOption.AddFlags(fs)
}

View File

@@ -29,6 +29,7 @@ import (
"open-cluster-management.io/ocm/pkg/features"
"open-cluster-management.io/ocm/pkg/work/helper"
"open-cluster-management.io/ocm/pkg/work/spoke/auth"
"open-cluster-management.io/ocm/pkg/work/spoke/conditions"
"open-cluster-management.io/ocm/pkg/work/spoke/controllers/finalizercontroller"
"open-cluster-management.io/ocm/pkg/work/spoke/controllers/manifestcontroller"
"open-cluster-management.io/ocm/pkg/work/spoke/controllers/statuscontroller"
@@ -132,6 +133,16 @@ func (o *WorkAgentConfig) RunWorkloadAgent(ctx context.Context, controllerContex
restMapper,
).NewExecutorValidator(ctx, features.SpokeMutableFeatureGate.Enabled(ocmfeature.ExecutorValidatingCaches))
conditionReader, err := conditions.NewConditionReader()
if err != nil {
return err
}
objectReader, err := o.workOptions.ObjectReaderOption.NewObjectReader(spokeDynamicClient, hubWorkInformer)
if err != nil {
return err
}
manifestWorkController := manifestcontroller.NewManifestWorkController(
spokeDynamicClient,
spokeKubeClient,
@@ -141,6 +152,7 @@ func (o *WorkAgentConfig) RunWorkloadAgent(ctx context.Context, controllerContex
hubWorkInformer.Lister().ManifestWorks(o.agentOptions.SpokeClusterName),
spokeWorkClient.WorkV1().AppliedManifestWorks(),
spokeWorkInformerFactory.Work().V1().AppliedManifestWorks(),
objectReader,
hubHash, agentID,
restMapper,
validator,
@@ -154,6 +166,7 @@ func (o *WorkAgentConfig) RunWorkloadAgent(ctx context.Context, controllerContex
spokeDynamicClient,
spokeWorkClient.WorkV1().AppliedManifestWorks(),
spokeWorkInformerFactory.Work().V1().AppliedManifestWorks(),
objectReader,
agentID,
)
manifestWorkFinalizeController := finalizercontroller.NewManifestWorkFinalizeController(
@@ -172,17 +185,15 @@ func (o *WorkAgentConfig) RunWorkloadAgent(ctx context.Context, controllerContex
o.workOptions.AppliedManifestWorkEvictionGracePeriod,
hubHash, agentID,
)
availableStatusController, err := statuscontroller.NewAvailableStatusController(
spokeDynamicClient,
availableStatusController := statuscontroller.NewAvailableStatusController(
hubWorkClient,
hubWorkInformer,
hubWorkInformer.Lister().ManifestWorks(o.agentOptions.SpokeClusterName),
conditionReader,
objectReader,
o.workOptions.MaxJSONRawLength,
o.workOptions.StatusSyncInterval,
)
if err != nil {
return err
}
go spokeWorkInformerFactory.Start(ctx.Done())
go hubWorkInformer.Informer().Run(ctx.Done())