mirror of
https://github.com/open-cluster-management-io/ocm.git
synced 2026-02-14 10:00:11 +00:00
🌱 Add contextual logging for work agent (#1242)
* Add contextual logging for work agent Signed-off-by: Jian Qiu <jqiu@redhat.com> * Resolve comments Signed-off-by: Jian Qiu <jqiu@redhat.com> --------- Signed-off-by: Jian Qiu <jqiu@redhat.com>
This commit is contained in:
@@ -8,7 +8,6 @@ import (
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/openshift/library-go/pkg/operator/events/eventstesting"
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/equality"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
@@ -342,7 +341,7 @@ func TestDeleteAppliedResourcess(t *testing.T) {
|
||||
for _, c := range cases {
|
||||
t.Run(c.name, func(t *testing.T) {
|
||||
fakeDynamicClient := fakedynamic.NewSimpleDynamicClient(scheme, c.existingResources...)
|
||||
actual, err := DeleteAppliedResources(context.TODO(), c.resourcesToRemove, "testing", fakeDynamicClient, eventstesting.NewTestingEventRecorder(t), c.owner)
|
||||
actual, err := DeleteAppliedResources(context.TODO(), c.resourcesToRemove, "testing", fakeDynamicClient, c.owner)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected err: %v", err)
|
||||
}
|
||||
|
||||
@@ -11,7 +11,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/openshift/library-go/pkg/controller/factory"
|
||||
"github.com/openshift/library-go/pkg/operator/events"
|
||||
"github.com/openshift/library-go/pkg/operator/resource/resourcehelper"
|
||||
"github.com/openshift/library-go/pkg/operator/resource/resourcemerge"
|
||||
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
|
||||
@@ -169,11 +168,12 @@ func DeleteAppliedResources(
|
||||
resources []workapiv1.AppliedManifestResourceMeta,
|
||||
reason string,
|
||||
dynamicClient dynamic.Interface,
|
||||
recorder events.Recorder,
|
||||
owner metav1.OwnerReference) ([]workapiv1.AppliedManifestResourceMeta, []error) {
|
||||
var resourcesPendingFinalization []workapiv1.AppliedManifestResourceMeta
|
||||
var errs []error
|
||||
|
||||
logger := klog.FromContext(ctx)
|
||||
|
||||
// set owner to be removed
|
||||
ownerCopy := owner.DeepCopy()
|
||||
ownerCopy.UID = types.UID(fmt.Sprintf("%s-", owner.UID))
|
||||
@@ -190,7 +190,8 @@ func DeleteAppliedResources(
|
||||
Namespace(resource.Namespace).
|
||||
Get(ctx, resource.Name, metav1.GetOptions{})
|
||||
if errors.IsNotFound(err) {
|
||||
klog.Infof("Resource %v with key %s/%s is removed Successfully", gvr, resource.Namespace, resource.Name)
|
||||
logger.Info("Resource is removed successfully",
|
||||
"gvr", gvr.String(), "resourceNamespace", resource.Namespace, "resourceName", resource.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -256,7 +257,8 @@ func DeleteAppliedResources(
|
||||
}
|
||||
|
||||
resourcesPendingFinalization = append(resourcesPendingFinalization, resource)
|
||||
recorder.Eventf("ResourceDeleted", "Deleted resource %v with key %s/%s because %s.", gvr, resource.Namespace, resource.Name, reason)
|
||||
logger.Info("Deleted resource",
|
||||
"gvr", gvr.String(), "resourceNamespace", resource.Namespace, "resourceName", resource.Name, "reason", reason)
|
||||
}
|
||||
|
||||
return resourcesPendingFinalization, errs
|
||||
@@ -409,6 +411,8 @@ func FindManifestCondition(resourceMeta workapiv1.ManifestResourceMeta, manifest
|
||||
|
||||
func ApplyOwnerReferences(ctx context.Context, dynamicClient dynamic.Interface, gvr schema.GroupVersionResource,
|
||||
existing runtime.Object, requiredOwner metav1.OwnerReference) error {
|
||||
logger := klog.FromContext(ctx)
|
||||
|
||||
accessor, err := meta.Accessor(existing)
|
||||
if err != nil {
|
||||
return fmt.Errorf("type %t cannot be accessed: %v", existing, err)
|
||||
@@ -432,7 +436,8 @@ func ApplyOwnerReferences(ctx context.Context, dynamicClient dynamic.Interface,
|
||||
return err
|
||||
}
|
||||
|
||||
klog.V(2).Infof("Patching resource %v %s/%s with patch %s", gvr, accessor.GetNamespace(), accessor.GetName(), string(patchData))
|
||||
logger.V(2).Info("Patching resource",
|
||||
"gvr", gvr.String(), "resourceNamespace", accessor.GetNamespace(), "resourceName", accessor.GetName(), "patch", string(patchData))
|
||||
_, err = dynamicClient.Resource(gvr).Namespace(accessor.GetNamespace()).Patch(ctx, accessor.GetName(), types.MergePatchType, patchData, metav1.PatchOptions{})
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ package apply
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/openshift/library-go/pkg/operator/events"
|
||||
"github.com/openshift/library-go/pkg/operator/resource/resourcemerge"
|
||||
@@ -12,6 +11,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
workapiv1 "open-cluster-management.io/api/work/v1"
|
||||
|
||||
@@ -31,8 +31,9 @@ func (c *CreateOnlyApply) Apply(ctx context.Context,
|
||||
required *unstructured.Unstructured,
|
||||
owner metav1.OwnerReference,
|
||||
_ *workapiv1.ManifestConfigOption,
|
||||
recorder events.Recorder) (runtime.Object, error) {
|
||||
_ events.Recorder) (runtime.Object, error) {
|
||||
|
||||
logger := klog.FromContext(ctx)
|
||||
obj, err := c.client.
|
||||
Resource(gvr).
|
||||
Namespace(required.GetNamespace()).
|
||||
@@ -42,8 +43,8 @@ func (c *CreateOnlyApply) Apply(ctx context.Context,
|
||||
obj, err = c.client.Resource(gvr).Namespace(required.GetNamespace()).Create(
|
||||
ctx, resourcemerge.WithCleanLabelsAndAnnotations(required).(*unstructured.Unstructured), metav1.CreateOptions{})
|
||||
if err != nil {
|
||||
recorder.Eventf(fmt.Sprintf(
|
||||
"%s Created", required.GetKind()), "Created %s/%s because it was missing", required.GetNamespace(), required.GetName())
|
||||
logger.Info("Resource created because of missing",
|
||||
"gvr", gvr.String(), "resourceNamespace", required.GetNamespace(), "resourceName", required.GetName())
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -2,13 +2,13 @@ package apply
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
"github.com/openshift/library-go/pkg/operator/events"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
workapiv1 "open-cluster-management.io/api/work/v1"
|
||||
)
|
||||
@@ -20,13 +20,13 @@ func NewReadOnlyApply() *ReadOnlyApply {
|
||||
}
|
||||
|
||||
func (c *ReadOnlyApply) Apply(ctx context.Context,
|
||||
_ schema.GroupVersionResource,
|
||||
gvr schema.GroupVersionResource,
|
||||
required *unstructured.Unstructured,
|
||||
_ metav1.OwnerReference,
|
||||
_ *workapiv1.ManifestConfigOption,
|
||||
recorder events.Recorder) (runtime.Object, error) {
|
||||
|
||||
recorder.Eventf(fmt.Sprintf(
|
||||
"%s noop", required.GetKind()), "Noop for %s/%s because its read-only", required.GetNamespace(), required.GetName())
|
||||
_ events.Recorder) (runtime.Object, error) {
|
||||
logger := klog.FromContext(ctx)
|
||||
logger.Info("Noop because its read-only",
|
||||
"gvr", gvr.String(), "resourceNamespace", required.GetNamespace(), "resourceName", required.GetName())
|
||||
return required, nil
|
||||
}
|
||||
|
||||
@@ -14,7 +14,6 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/jsonpath"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
@@ -45,7 +44,7 @@ func (c *ServerSideApply) Apply(
|
||||
requiredOriginal *unstructured.Unstructured,
|
||||
owner metav1.OwnerReference,
|
||||
applyOption *workapiv1.ManifestConfigOption,
|
||||
recorder events.Recorder) (runtime.Object, error) {
|
||||
_ events.Recorder) (runtime.Object, error) {
|
||||
logger := klog.FromContext(ctx)
|
||||
// Currently, if the required object has zero creationTime in metadata, it will cause
|
||||
// kube-apiserver to increment generation even if nothing else changes. more details see:
|
||||
@@ -114,10 +113,9 @@ func (c *ServerSideApply) Apply(
|
||||
Resource(gvr).
|
||||
Namespace(required.GetNamespace()).
|
||||
Apply(ctx, required.GetName(), required, metav1.ApplyOptions{FieldManager: fieldManager, Force: force})
|
||||
resourceKey, _ := cache.MetaNamespaceKeyFunc(required)
|
||||
recorder.Eventf(fmt.Sprintf(
|
||||
"Server Side Applied %s %s", required.GetKind(), resourceKey),
|
||||
"Patched with field manager %s, err %v", fieldManager, err)
|
||||
logger.Info("Server side applied",
|
||||
"gvr", gvr.String(), "resourceNamespace", required.GetNamespace(),
|
||||
"resourceName", required.GetName(), "fieldManager", fieldManager)
|
||||
|
||||
if errors.IsConflict(err) {
|
||||
return obj, &ServerSideApplyConflictError{ssaErr: err}
|
||||
|
||||
@@ -2,7 +2,6 @@ package apply
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
@@ -18,6 +17,7 @@ import (
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/klog/v2"
|
||||
"k8s.io/utils/pointer"
|
||||
|
||||
workapiv1 "open-cluster-management.io/api/work/v1"
|
||||
@@ -48,7 +48,6 @@ func (c *UpdateApply) Apply(
|
||||
owner metav1.OwnerReference,
|
||||
_ *workapiv1.ManifestConfigOption,
|
||||
recorder events.Recorder) (runtime.Object, error) {
|
||||
|
||||
clientHolder := resourceapply.NewClientHolder().
|
||||
WithAPIExtensionsClient(c.apiExtensionClient).
|
||||
WithKubernetes(c.kubeclient).
|
||||
@@ -84,6 +83,7 @@ func (c *UpdateApply) applyUnstructured(
|
||||
gvr schema.GroupVersionResource,
|
||||
recorder events.Recorder,
|
||||
cache resourceapply.ResourceCache) (*unstructured.Unstructured, bool, error) {
|
||||
logger := klog.FromContext(ctx)
|
||||
existing, err := c.dynamicClient.
|
||||
Resource(gvr).
|
||||
Namespace(required.GetNamespace()).
|
||||
@@ -91,8 +91,8 @@ func (c *UpdateApply) applyUnstructured(
|
||||
if apierrors.IsNotFound(err) {
|
||||
actual, err := c.dynamicClient.Resource(gvr).Namespace(required.GetNamespace()).Create(
|
||||
ctx, resourcemerge.WithCleanLabelsAndAnnotations(required).(*unstructured.Unstructured), metav1.CreateOptions{})
|
||||
recorder.Eventf(fmt.Sprintf(
|
||||
"%s Created", required.GetKind()), "Created %s/%s because it was missing", required.GetNamespace(), required.GetName())
|
||||
logger.Info("Created resource because it was missing",
|
||||
"gvr", gvr.String(), "resourceNamespace", required.GetNamespace(), "resourceName", required.GetName())
|
||||
cache.UpdateCachedResourceMetadata(required, actual)
|
||||
return actual, true, err
|
||||
}
|
||||
@@ -130,8 +130,8 @@ func (c *UpdateApply) applyUnstructured(
|
||||
required.SetResourceVersion(existing.GetResourceVersion())
|
||||
actual, err := c.dynamicClient.Resource(gvr).Namespace(required.GetNamespace()).Update(
|
||||
ctx, required, metav1.UpdateOptions{})
|
||||
recorder.Eventf(fmt.Sprintf(
|
||||
"%s Updated", required.GetKind()), "Updated %s/%s", required.GetNamespace(), required.GetName())
|
||||
logger.Info("Updated resource",
|
||||
"gvr", gvr.String(), "resourceNamespace", required.GetNamespace(), "resourceName", required.GetName())
|
||||
cache.UpdateCachedResourceMetadata(required, actual)
|
||||
return actual, true, err
|
||||
}
|
||||
|
||||
@@ -134,7 +134,7 @@ func (v *SarValidator) CheckSubjectAccessReviews(ctx context.Context, sa *workap
|
||||
// CheckEscalation checks whether the sa is escalated to operate the gvr(RBAC) resources.
|
||||
func (v *SarValidator) CheckEscalation(ctx context.Context, sa *workapiv1.ManifestWorkSubjectServiceAccount,
|
||||
gvr schema.GroupVersionResource, namespace, name string, obj *unstructured.Unstructured) error {
|
||||
|
||||
logger := klog.FromContext(ctx)
|
||||
if gvr.Group != "rbac.authorization.k8s.io" {
|
||||
return nil
|
||||
}
|
||||
@@ -152,8 +152,8 @@ func (v *SarValidator) CheckEscalation(ctx context.Context, sa *workapiv1.Manife
|
||||
DryRun: []string{"All"},
|
||||
})
|
||||
if apierrors.IsForbidden(err) {
|
||||
klog.Infof("not allowed to apply the resource %s %s, %s %s, error: %s",
|
||||
gvr.Group, gvr.Resource, namespace, name, err.Error())
|
||||
logger.Info("not allowed to apply the resource",
|
||||
"gvr", gvr.String(), "resourceNamespace", namespace, "resourceName", name, "error", err)
|
||||
return &NotAllowedError{
|
||||
Err: fmt.Errorf("not allowed to apply the resource %s %s, %s %s, error: permission escalation",
|
||||
gvr.Group, gvr.Resource, namespace, name),
|
||||
|
||||
4
pkg/work/spoke/auth/cache/auth.go
vendored
4
pkg/work/spoke/auth/cache/auth.go
vendored
@@ -101,6 +101,7 @@ func (v *sarCacheValidator) Start(ctx context.Context) {
|
||||
func (v *sarCacheValidator) Validate(ctx context.Context, executor *workapiv1.ManifestWorkExecutor,
|
||||
gvr schema.GroupVersionResource, namespace, name string,
|
||||
ownedByTheWork bool, obj *unstructured.Unstructured) error {
|
||||
logger := klog.FromContext(ctx)
|
||||
if executor == nil {
|
||||
return nil
|
||||
}
|
||||
@@ -128,7 +129,8 @@ func (v *sarCacheValidator) Validate(ctx context.Context, executor *workapiv1.Ma
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
klog.V(4).Infof("Get auth from cache executor %s, dimension: %+v allow: %v", executorKey, dimension, *allowed)
|
||||
logger.V(4).Info("Get auth from cache executor",
|
||||
"executorKey", executorKey, "dimension", dimension, "allowed", *allowed)
|
||||
if !*allowed {
|
||||
return &basic.NotAllowedError{
|
||||
Err: fmt.Errorf("not allowed to apply the resource %s %s, %s %s",
|
||||
|
||||
@@ -17,6 +17,8 @@ import (
|
||||
"open-cluster-management.io/ocm/pkg/common/queue"
|
||||
)
|
||||
|
||||
const manifestWorkAddFinalizerController = "ManifestWorkAddFinalizerController"
|
||||
|
||||
// AddFinalizerController is to add the cluster.open-cluster-management.io/manifest-work-cleanup finalizer to manifestworks.
|
||||
type AddFinalizerController struct {
|
||||
patcher patcher.Patcher[*workapiv1.ManifestWork, workapiv1.ManifestWorkSpec, workapiv1.ManifestWorkStatus]
|
||||
@@ -40,12 +42,14 @@ func NewAddFinalizerController(
|
||||
|
||||
return factory.New().
|
||||
WithInformersQueueKeysFunc(queue.QueueKeyByMetaName, manifestWorkInformer.Informer()).
|
||||
WithSync(controller.sync).ToController("ManifestWorkAddFinalizerController", recorder)
|
||||
WithSync(controller.sync).ToController(manifestWorkAddFinalizerController, recorder)
|
||||
}
|
||||
|
||||
func (m *AddFinalizerController) sync(ctx context.Context, controllerContext factory.SyncContext) error {
|
||||
manifestWorkName := controllerContext.QueueKey()
|
||||
klog.V(5).Infof("Reconciling ManifestWork %q", manifestWorkName)
|
||||
logger := klog.FromContext(ctx).WithName(manifestWorkAddFinalizerController).
|
||||
WithValues("manifestWorkName", manifestWorkName)
|
||||
logger.V(5).Info("Reconciling ManifestWork")
|
||||
|
||||
manifestWork, err := m.manifestWorkLister.Get(manifestWorkName)
|
||||
if errors.IsNotFound(err) {
|
||||
|
||||
@@ -24,6 +24,8 @@ import (
|
||||
"open-cluster-management.io/ocm/pkg/work/helper"
|
||||
)
|
||||
|
||||
const appliedManifestWorkFinalizer = "AppliedManifestWorkFinalizer"
|
||||
|
||||
// AppliedManifestWorkFinalizeController handles cleanup of appliedmanifestwork resources before deletion is allowed.
|
||||
// It should handle all appliedmanifestworks belonging to this agent identified by the agentID.
|
||||
type AppliedManifestWorkFinalizeController struct {
|
||||
@@ -54,12 +56,15 @@ func NewAppliedManifestWorkFinalizeController(
|
||||
return factory.New().
|
||||
WithFilteredEventsInformersQueueKeysFunc(queue.QueueKeyByMetaName,
|
||||
helper.AppliedManifestworkAgentIDFilter(agentID), appliedManifestWorkInformer.Informer()).
|
||||
WithSync(controller.sync).ToController("AppliedManifestWorkFinalizer", recorder)
|
||||
WithSync(controller.sync).ToController(appliedManifestWorkFinalizer, recorder)
|
||||
}
|
||||
|
||||
func (m *AppliedManifestWorkFinalizeController) sync(ctx context.Context, controllerContext factory.SyncContext) error {
|
||||
appliedManifestWorkName := controllerContext.QueueKey()
|
||||
klog.V(5).Infof("Reconciling AppliedManifestWork %q", appliedManifestWorkName)
|
||||
logger := klog.FromContext(ctx).WithName(appliedManifestWorkFinalizer).
|
||||
WithValues("appliedManifestWorkName", appliedManifestWorkName)
|
||||
logger.V(5).Info("Reconciling AppliedManifestWork")
|
||||
ctx = klog.NewContext(ctx, logger)
|
||||
|
||||
appliedManifestWork, err := m.appliedManifestWorkLister.Get(appliedManifestWorkName)
|
||||
if errors.IsNotFound(err) {
|
||||
@@ -77,6 +82,7 @@ func (m *AppliedManifestWorkFinalizeController) sync(ctx context.Context, contro
|
||||
// before removing finalizer from appliedmanifestwork
|
||||
func (m *AppliedManifestWorkFinalizeController) syncAppliedManifestWork(ctx context.Context,
|
||||
controllerContext factory.SyncContext, originalManifestWork *workapiv1.AppliedManifestWork) error {
|
||||
logger := klog.FromContext(ctx)
|
||||
appliedManifestWork := originalManifestWork.DeepCopy()
|
||||
|
||||
// no work to do until we're deleted
|
||||
@@ -96,7 +102,7 @@ func (m *AppliedManifestWorkFinalizeController) syncAppliedManifestWork(ctx cont
|
||||
// scoped resource correctly.
|
||||
reason := fmt.Sprintf("manifestwork %s is terminating", appliedManifestWork.Spec.ManifestWorkName)
|
||||
resourcesPendingFinalization, errs := helper.DeleteAppliedResources(
|
||||
ctx, appliedManifestWork.Status.AppliedResources, reason, m.spokeDynamicClient, controllerContext.Recorder(), *owner)
|
||||
ctx, appliedManifestWork.Status.AppliedResources, reason, m.spokeDynamicClient, *owner)
|
||||
appliedManifestWork.Status.AppliedResources = resourcesPendingFinalization
|
||||
updatedAppliedManifestWork, err := m.patcher.PatchStatus(ctx, appliedManifestWork, appliedManifestWork.Status, originalManifestWork.Status)
|
||||
if err != nil {
|
||||
@@ -111,7 +117,7 @@ func (m *AppliedManifestWorkFinalizeController) syncAppliedManifestWork(ctx cont
|
||||
|
||||
// requeue the work until all applied resources are deleted and finalized if the appliedmanifestwork itself is not updated
|
||||
if len(resourcesPendingFinalization) != 0 {
|
||||
klog.V(4).Infof("%d resources pending deletions in %s", len(resourcesPendingFinalization), appliedManifestWork.Name)
|
||||
logger.V(4).Info("resources pending deletions", "numOfResources", len(resourcesPendingFinalization))
|
||||
controllerContext.Queue().AddAfter(appliedManifestWork.Name, m.rateLimiter.When(appliedManifestWork.Name))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -23,6 +23,8 @@ import (
|
||||
"open-cluster-management.io/ocm/pkg/work/helper"
|
||||
)
|
||||
|
||||
const manifestWorkFinalizer = "ManifestWorkFinalizer"
|
||||
|
||||
// ManifestWorkFinalizeController handles cleanup of manifestwork resources before deletion is allowed.
|
||||
type ManifestWorkFinalizeController struct {
|
||||
patcher patcher.Patcher[*workapiv1.ManifestWork, workapiv1.ManifestWorkSpec, workapiv1.ManifestWorkStatus]
|
||||
@@ -60,13 +62,18 @@ func NewManifestWorkFinalizeController(
|
||||
helper.AppliedManifestworkQueueKeyFunc(hubHash),
|
||||
helper.AppliedManifestworkHubHashFilter(hubHash),
|
||||
appliedManifestWorkInformer.Informer()).
|
||||
WithSync(controller.sync).ToController("ManifestWorkFinalizer", recorder)
|
||||
WithSync(controller.sync).ToController(manifestWorkFinalizer, recorder)
|
||||
}
|
||||
|
||||
func (m *ManifestWorkFinalizeController) sync(ctx context.Context, controllerContext factory.SyncContext) error {
|
||||
manifestWorkName := controllerContext.QueueKey()
|
||||
appliedManifestWorkName := fmt.Sprintf("%s-%s", m.hubHash, manifestWorkName)
|
||||
klog.V(5).Infof("Reconciling ManifestWork %q", manifestWorkName)
|
||||
|
||||
logger := klog.FromContext(ctx).WithName(appliedManifestWorkFinalizer).
|
||||
WithValues("appliedManifestWorkName", appliedManifestWorkName, "manifestWorkName", manifestWorkName)
|
||||
ctx = klog.NewContext(ctx, logger)
|
||||
|
||||
logger.V(5).Info("Reconciling ManifestWork")
|
||||
|
||||
manifestWork, err := m.manifestWorkLister.Get(manifestWorkName)
|
||||
|
||||
|
||||
@@ -30,10 +30,11 @@ const (
|
||||
// If the AppliedManifestWork eviction grace period is set with a value that is larger than or equal to
|
||||
// the bound, the eviction feature will be disabled.
|
||||
EvictionGracePeriodBound = 100 * 365 * 24 * time.Hour
|
||||
|
||||
unManagedAppliedManifestWork = "UnManagedAppliedManifestWork"
|
||||
)
|
||||
|
||||
type unmanagedAppliedWorkController struct {
|
||||
recorder events.Recorder
|
||||
manifestWorkLister worklister.ManifestWorkNamespaceLister
|
||||
appliedManifestWorkClient workv1client.AppliedManifestWorkInterface
|
||||
patcher patcher.Patcher[*workapiv1.AppliedManifestWork, workapiv1.AppliedManifestWorkSpec, workapiv1.AppliedManifestWorkStatus]
|
||||
@@ -63,7 +64,6 @@ func NewUnManagedAppliedWorkController(
|
||||
hubHash, agentID string,
|
||||
) factory.Controller {
|
||||
controller := &unmanagedAppliedWorkController{
|
||||
recorder: recorder,
|
||||
manifestWorkLister: manifestWorkLister,
|
||||
appliedManifestWorkClient: appliedManifestWorkClient,
|
||||
patcher: patcher.NewPatcher[
|
||||
@@ -84,12 +84,17 @@ func NewUnManagedAppliedWorkController(
|
||||
WithFilteredEventsInformersQueueKeysFunc(
|
||||
queue.QueueKeyByMetaName,
|
||||
helper.AppliedManifestworkAgentIDFilter(agentID), appliedManifestWorkInformer.Informer()).
|
||||
WithSync(controller.sync).ToController("UnManagedAppliedManifestWork", recorder)
|
||||
WithSync(controller.sync).ToController(unManagedAppliedManifestWork, recorder)
|
||||
}
|
||||
|
||||
func (m *unmanagedAppliedWorkController) sync(ctx context.Context, controllerContext factory.SyncContext) error {
|
||||
appliedManifestWorkName := controllerContext.QueueKey()
|
||||
klog.V(5).Infof("Reconciling AppliedManifestWork %q", appliedManifestWorkName)
|
||||
|
||||
logger := klog.FromContext(ctx).WithName(appliedManifestWorkFinalizer).
|
||||
WithValues("appliedManifestWorkName", appliedManifestWorkName)
|
||||
ctx = klog.NewContext(ctx, logger)
|
||||
|
||||
logger.V(5).Info("Reconciling AppliedManifestWork")
|
||||
|
||||
appliedManifestWork, err := m.appliedManifestWorkLister.Get(appliedManifestWorkName)
|
||||
if errors.IsNotFound(err) {
|
||||
@@ -133,6 +138,8 @@ func (m *unmanagedAppliedWorkController) evictAppliedManifestWork(ctx context.Co
|
||||
controllerContext factory.SyncContext, appliedManifestWork *workapiv1.AppliedManifestWork) error {
|
||||
now := time.Now()
|
||||
|
||||
logger := klog.FromContext(ctx)
|
||||
|
||||
evictionStartTime := appliedManifestWork.Status.EvictionStartTime
|
||||
if evictionStartTime == nil {
|
||||
return m.patchEvictionStartTime(ctx, appliedManifestWork, &metav1.Time{Time: now})
|
||||
@@ -147,8 +154,7 @@ func (m *unmanagedAppliedWorkController) evictAppliedManifestWork(ctx context.Co
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
m.recorder.Eventf("AppliedManifestWorkEvicted",
|
||||
"AppliedManifestWork %s evicted by agent %s after eviction grace period", appliedManifestWork.Name, m.agentID)
|
||||
logger.Info("AppliedManifestWork evicted after eviction grace period", "agentID", m.agentID)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/openshift/library-go/pkg/operator/events/eventstesting"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
clienttesting "k8s.io/client-go/testing"
|
||||
@@ -275,7 +274,6 @@ func TestSyncUnamanagedAppliedWork(t *testing.T) {
|
||||
}
|
||||
|
||||
controller := &unmanagedAppliedWorkController{
|
||||
recorder: eventstesting.NewTestingEventRecorder(t),
|
||||
manifestWorkLister: informerFactory.Work().V1().ManifestWorks().Lister().ManifestWorks("test"),
|
||||
appliedManifestWorkClient: fakeClient.WorkV1().AppliedManifestWorks(),
|
||||
patcher: patcher.NewPatcher[
|
||||
|
||||
@@ -58,7 +58,7 @@ func (m *appliedManifestWorkReconciler) reconcile(
|
||||
case errors.IsNotFound(err):
|
||||
logger.V(2).Info(
|
||||
"Resource with key does not exist",
|
||||
"namespace", result.resourceMeta.Namespace, "name", result.resourceMeta.Name)
|
||||
"resourceNamespace", result.resourceMeta.Namespace, "resourceName", result.resourceMeta.Name)
|
||||
continue
|
||||
case err != nil:
|
||||
errs = append(errs, fmt.Errorf(
|
||||
@@ -90,7 +90,7 @@ func (m *appliedManifestWorkReconciler) reconcile(
|
||||
reason := fmt.Sprintf("it is no longer maintained by manifestwork %s", manifestWork.Name)
|
||||
|
||||
resourcesPendingFinalization, errs := helper.DeleteAppliedResources(
|
||||
ctx, noLongerMaintainedResources, reason, m.spokeDynamicClient, controllerContext.Recorder(), *owner)
|
||||
ctx, noLongerMaintainedResources, reason, m.spokeDynamicClient, *owner)
|
||||
if len(errs) != 0 {
|
||||
return manifestWork, appliedManifestWork, results, utilerrors.NewAggregate(errs)
|
||||
}
|
||||
|
||||
@@ -39,6 +39,8 @@ var (
|
||||
ResyncInterval = 4 * time.Minute
|
||||
)
|
||||
|
||||
const controllerName = "ManifestWorkController"
|
||||
|
||||
type workReconcile interface {
|
||||
reconcile(ctx context.Context,
|
||||
controllerContext factory.SyncContext,
|
||||
@@ -115,17 +117,17 @@ func NewManifestWorkController(
|
||||
appliedManifestWorkInformer.Informer(),
|
||||
).
|
||||
WithSyncContext(syncCtx).
|
||||
WithSync(controller.sync).ToController("ManifestWorkAgent", recorder)
|
||||
WithSync(controller.sync).ToController(controllerName, recorder)
|
||||
}
|
||||
|
||||
// sync is the main reconcile loop for manifest work. It is triggered in two scenarios
|
||||
// 1. ManifestWork API changes
|
||||
// 2. Resources defined in manifest changed on spoke
|
||||
func (m *ManifestWorkController) sync(ctx context.Context, controllerContext factory.SyncContext) error {
|
||||
logger := klog.FromContext(ctx).WithName("ManifestWorkController")
|
||||
|
||||
manifestWorkName := controllerContext.QueueKey()
|
||||
logger.V(4).Info("Reconciling ManifestWork", "name", manifestWorkName)
|
||||
logger := klog.FromContext(ctx).WithName(controllerName).WithValues("manifestWorkName", manifestWorkName)
|
||||
logger.V(5).Info("Reconciling ManifestWork")
|
||||
ctx = klog.NewContext(ctx, logger)
|
||||
|
||||
oldManifestWork, err := m.manifestWorkLister.Get(manifestWorkName)
|
||||
if apierrors.IsNotFound(err) {
|
||||
@@ -192,7 +194,7 @@ func (m *ManifestWorkController) sync(ctx context.Context, controllerContext fac
|
||||
return utilerrors.NewAggregate(errs)
|
||||
}
|
||||
|
||||
logger.V(2).Info("Requeue manifestwork", "name", manifestWork.Name, "requeue time", requeueTime)
|
||||
logger.V(2).Info("Requeue manifestwork", "requeue time", requeueTime)
|
||||
controllerContext.Queue().AddAfter(manifestWorkName, requeueTime)
|
||||
|
||||
return nil
|
||||
|
||||
@@ -85,7 +85,7 @@ func (m *manifestworkReconciler) reconcile(
|
||||
// and requeue the item
|
||||
var authError *basic.NotAllowedError
|
||||
if errors.As(result.Error, &authError) {
|
||||
logger.V(2).Info("apply work failed", "name", manifestWork.Name, "error", result.Error)
|
||||
logger.V(2).Info("apply work failed", "error", result.Error)
|
||||
result.Error = nil
|
||||
|
||||
if authError.RequeueTime < requeueTime {
|
||||
|
||||
@@ -31,7 +31,11 @@ import (
|
||||
"open-cluster-management.io/ocm/pkg/work/spoke/statusfeedback"
|
||||
)
|
||||
|
||||
const statusFeedbackConditionType = "StatusFeedbackSynced"
|
||||
const (
|
||||
statusFeedbackConditionType = "StatusFeedbackSynced"
|
||||
|
||||
controllerName = "AvailableStatusController"
|
||||
)
|
||||
|
||||
// AvailableStatusController is to update the available status conditions of both manifests and manifestworks.
|
||||
// It is also used to get the status value based on status feedback configuration and get condition values
|
||||
@@ -74,11 +78,16 @@ func NewAvailableStatusController(
|
||||
|
||||
return factory.New().
|
||||
WithInformersQueueKeysFunc(queue.QueueKeyByMetaName, manifestWorkInformer.Informer()).
|
||||
WithSync(controller.sync).ToController("AvailableStatusController", recorder), nil
|
||||
WithSync(controller.sync).ToController(controllerName, recorder), nil
|
||||
}
|
||||
|
||||
func (c *AvailableStatusController) sync(ctx context.Context, controllerContext factory.SyncContext) error {
|
||||
manifestWorkName := controllerContext.QueueKey()
|
||||
|
||||
logger := klog.FromContext(ctx).WithName(controllerName).WithValues("manifestWorkName", manifestWorkName)
|
||||
logger.V(4).Info("Reconciling ManifestWork")
|
||||
ctx = klog.NewContext(ctx, logger)
|
||||
|
||||
// sync a particular manifestwork
|
||||
manifestWork, err := c.manifestWorkLister.Get(manifestWorkName)
|
||||
if errors.IsNotFound(err) {
|
||||
@@ -100,7 +109,6 @@ func (c *AvailableStatusController) sync(ctx context.Context, controllerContext
|
||||
}
|
||||
|
||||
func (c *AvailableStatusController) syncManifestWork(ctx context.Context, originalManifestWork *workapiv1.ManifestWork) error {
|
||||
klog.V(5).Infof("Reconciling ManifestWork %q", originalManifestWork.Name)
|
||||
manifestWork := originalManifestWork.DeepCopy()
|
||||
|
||||
// do nothing when finalizer is not added.
|
||||
|
||||
@@ -2,15 +2,16 @@ package spoke
|
||||
|
||||
import (
|
||||
"context"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"github.com/openshift/library-go/pkg/controller/controllercmd"
|
||||
apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"k8s.io/klog/v2"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
|
||||
|
||||
workclientset "open-cluster-management.io/api/client/work/clientset/versioned"
|
||||
@@ -61,6 +62,14 @@ func NewWorkAgentConfig(commonOpts *options.AgentOptions, opts *WorkloadAgentOpt
|
||||
|
||||
// RunWorkloadAgent starts the controllers on agent to process work from hub.
|
||||
func (o *WorkAgentConfig) RunWorkloadAgent(ctx context.Context, controllerContext *controllercmd.ControllerContext) error {
|
||||
// setting up contextual logger
|
||||
logger := klog.NewKlogr()
|
||||
podName := os.Getenv("POD_NAME")
|
||||
if podName != "" {
|
||||
logger = logger.WithValues("podName", podName)
|
||||
}
|
||||
ctx = klog.NewContext(ctx, logger)
|
||||
|
||||
// load spoke client config and create spoke clients,
|
||||
// the work agent may not running in the spoke/managed cluster.
|
||||
spokeRestConfig, err := o.agentOptions.SpokeKubeConfig(controllerContext.KubeConfig)
|
||||
@@ -103,7 +112,7 @@ func (o *WorkAgentConfig) RunWorkloadAgent(ctx context.Context, controllerContex
|
||||
return err
|
||||
}
|
||||
|
||||
hubHost, hubWorkClient, hubWorkInformer, err := o.newWorkClientAndInformer(ctx, restMapper)
|
||||
hubHost, hubWorkClient, hubWorkInformer, err := o.newWorkClientAndInformer(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -199,7 +208,6 @@ func (o *WorkAgentConfig) RunWorkloadAgent(ctx context.Context, controllerContex
|
||||
|
||||
func (o *WorkAgentConfig) newWorkClientAndInformer(
|
||||
ctx context.Context,
|
||||
restMapper meta.RESTMapper,
|
||||
) (string, workv1client.ManifestWorkInterface, workv1informers.ManifestWorkInformer, error) {
|
||||
var workClient workclientset.Interface
|
||||
var watcherStore *store.AgentInformerWatcherStore
|
||||
|
||||
Reference in New Issue
Block a user