diff --git a/go.mod b/go.mod index 2eb44629b..da125b231 100644 --- a/go.mod +++ b/go.mod @@ -42,7 +42,7 @@ require ( k8s.io/utils v0.0.0-20251002143259-bc988d571ff4 open-cluster-management.io/addon-framework v1.2.1-0.20260306083712-b6f9759b7b6d open-cluster-management.io/api v1.2.1-0.20260305152611-5bfebdbc3fdf - open-cluster-management.io/sdk-go v1.2.1-0.20260306024852-c0938d15158a + open-cluster-management.io/sdk-go v1.2.1-0.20260310072111-3041045c0177 sigs.k8s.io/about-api v0.0.0-20250131010323-518069c31c03 sigs.k8s.io/cluster-inventory-api v0.0.0-20251124125836-445319b6307a sigs.k8s.io/controller-runtime v0.23.3 diff --git a/go.sum b/go.sum index 8d8f076f4..4e6ccb838 100644 --- a/go.sum +++ b/go.sum @@ -589,8 +589,8 @@ open-cluster-management.io/addon-framework v1.2.1-0.20260306083712-b6f9759b7b6d open-cluster-management.io/addon-framework v1.2.1-0.20260306083712-b6f9759b7b6d/go.mod h1:Bpw37w4GANroADMDR3F/ZUFoEuTKV9GIn4ijwICcK6E= open-cluster-management.io/api v1.2.1-0.20260305152611-5bfebdbc3fdf h1:SnLaZD2QHz+Ep2SfVKx9y5WIMmyLnQcEv/ySW8k/NXc= open-cluster-management.io/api v1.2.1-0.20260305152611-5bfebdbc3fdf/go.mod h1:ZpXs1bFTIIqKstMHdLO9IY0NFlbCvZgEtByvvNSmab0= -open-cluster-management.io/sdk-go v1.2.1-0.20260306024852-c0938d15158a h1:vP9e4d65jPQbHCeuUNEnVRzJ8mxAlhLc8o+PTbDL++k= -open-cluster-management.io/sdk-go v1.2.1-0.20260306024852-c0938d15158a/go.mod h1:lDef+5BvifXww0S7cseux+Wi8melkH29bAf33OZ0ZVg= +open-cluster-management.io/sdk-go v1.2.1-0.20260310072111-3041045c0177 h1:8YzKbl+PuWIvTyU7C1pVuPUIY4cdbccv5BXZySQfDYI= +open-cluster-management.io/sdk-go v1.2.1-0.20260310072111-3041045c0177/go.mod h1:lDef+5BvifXww0S7cseux+Wi8melkH29bAf33OZ0ZVg= sigs.k8s.io/about-api v0.0.0-20250131010323-518069c31c03 h1:1ShFiMjGQOR/8jTBkmZrk1gORxnvMwm1nOy2/DbHg4U= sigs.k8s.io/about-api v0.0.0-20250131010323-518069c31c03/go.mod h1:F1pT4mK53U6F16/zuaPSYpBaR7x5Kjym6aKJJC0/DHU= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.2 h1:jpcvIRr3GLoUoEKRkHKSmGjxb6lWwrBlJsXc+eUYQHM= diff --git a/vendor/modules.txt b/vendor/modules.txt index 159c4a55f..9086a3cec 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1973,7 +1973,7 @@ open-cluster-management.io/api/operator/v1 open-cluster-management.io/api/utils/work/v1/workapplier open-cluster-management.io/api/work/v1 open-cluster-management.io/api/work/v1alpha1 -# open-cluster-management.io/sdk-go v1.2.1-0.20260306024852-c0938d15158a +# open-cluster-management.io/sdk-go v1.2.1-0.20260310072111-3041045c0177 ## explicit; go 1.25.0 open-cluster-management.io/sdk-go/pkg/apis/cluster/v1alpha1 open-cluster-management.io/sdk-go/pkg/apis/cluster/v1beta1 diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store/watcher.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store/watcher.go index 93cf44d6e..c1293d3a1 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store/watcher.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store/watcher.go @@ -15,6 +15,7 @@ type Watcher struct { result chan watch.Event done chan struct{} stopped bool + wg sync.WaitGroup } var _ watch.Interface = &Watcher{} @@ -42,35 +43,44 @@ func (w *Watcher) ResultChan() <-chan watch.Event { func (w *Watcher) Stop() { // Call Close() exactly once by locking and setting a flag. w.Lock() - defer w.Unlock() - // closing a closed channel always panics, therefore check before closing - select { - case <-w.done: - close(w.result) - default: - w.stopped = true - close(w.done) + + if w.stopped { + w.Unlock() + return } + + w.stopped = true + close(w.done) + w.Unlock() + + // Wait for all Receive() calls to complete before closing the result channel + w.wg.Wait() + close(w.result) } // Receive a event from the work client and sends down the result channel. func (w *Watcher) Receive(evt watch.Event) { - if w.isStopped() { - // this watcher is stopped, do nothing. + // Atomically check if stopped and add to WaitGroup + w.RLock() + if w.stopped { + w.RUnlock() return } + w.wg.Add(1) + w.RUnlock() + + defer w.wg.Done() if klog.V(4).Enabled() { obj, _ := meta.Accessor(evt.Object) klog.V(4).Infof("Receive the event %v for %v", evt.Type, obj.GetName()) } - w.result <- evt -} - -func (w *Watcher) isStopped() bool { - w.RLock() - defer w.RUnlock() - - return w.stopped + select { + case <-w.done: + // watcher is stopped, do nothing + return + case w.result <- evt: + // event sent successfully + } } diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/agent/client/manifestwork.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/agent/client/manifestwork.go index 46aade3bc..5c24bc547 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/agent/client/manifestwork.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/agent/client/manifestwork.go @@ -6,12 +6,14 @@ import ( "net/http" "strconv" "sync" + "time" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kubetypes "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/klog/v2" @@ -22,11 +24,17 @@ import ( cloudeventserrors "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/errors" "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store" "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/utils" + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/payload" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/metrics" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" ) +const ( + // workDeletionCheckInterval defines how often to check for works that need deletion + workDeletionCheckInterval = 2 * time.Second +) + // ManifestWorkAgentClient implements the ManifestWorkInterface. It sends the manifestworks status back to source by // CloudEventAgentClient. type ManifestWorkAgentClient struct { @@ -42,14 +50,39 @@ type ManifestWorkAgentClient struct { var _ workv1client.ManifestWorkInterface = &ManifestWorkAgentClient{} func NewManifestWorkAgentClient( - _ string, + ctx context.Context, + clusterName string, watcherStore store.ClientWatcherStore[*workv1.ManifestWork], cloudEventsClient generic.CloudEventsClient[*workv1.ManifestWork], ) *ManifestWorkAgentClient { - return &ManifestWorkAgentClient{ + + client := &ManifestWorkAgentClient{ cloudEventsClient: cloudEventsClient, watcherStore: watcherStore, } + + // Start a background goroutine to periodically check for works that need deletion. + // This ensures that works with deletion timestamps and no finalizers are properly + // cleaned up and their deletion status is sent back to the source. + go wait.UntilWithContext(ctx, func(ctx context.Context) { + logger := klog.FromContext(ctx) + + // List all works and check if any need to be deleted + works, err := watcherStore.List(ctx, clusterName, metav1.ListOptions{}) + if err != nil { + logger.Error(err, "failed to list all works for deletion check") + return + } + + // Process each work for potential deletion + for _, work := range works.Items { + if err := client.deleteWork(ctx, work); err != nil { + logger.Error(err, "failed to delete work", "namespace", work.Namespace, "name", work.Name) + } + } + }, workDeletionCheckInterval) + + return client } func (c *ManifestWorkAgentClient) SetNamespace(namespace string) { @@ -187,18 +220,7 @@ func (c *ManifestWorkAgentClient) Patch(ctx context.Context, name string, pt kub newWork := patchedWork.DeepCopy() - isDeleted := !newWork.DeletionTimestamp.IsZero() && len(newWork.Finalizers) == 0 - - if utils.IsStatusPatch(subresources) || isDeleted { - if isDeleted { - meta.SetStatusCondition(&newWork.Status.Conditions, metav1.Condition{ - Type: common.ResourceDeleted, - Status: metav1.ConditionTrue, - Reason: "ManifestsDeleted", - Message: fmt.Sprintf("The manifests are deleted from the cluster %s", newWork.Namespace), - }) - } - + if utils.IsStatusPatch(subresources) { // Set work's resource version to remote resource version for publishing workToPublish := newWork.DeepCopy() workToPublish.ResourceVersion = "" @@ -211,19 +233,6 @@ func (c *ManifestWorkAgentClient) Patch(ctx context.Context, name string, pt kub } } - // the finalizers of a deleting manifestwork are removed, marking the manifestwork status to deleted and sending - // it back to source - if isDeleted { - if err := c.watcherStore.Delete(newWork); err != nil { - returnErr := errors.NewInternalError(err) - metrics.IncreaseWorkProcessedCounter("delete", string(returnErr.ErrStatus.Reason)) - return nil, returnErr - } - - metrics.IncreaseWorkProcessedCounter("delete", metav1.StatusSuccess) - return newWork, nil - } - // Fetch the latest work from the store and verify the resource version to avoid updating the store // with outdated work. Return a conflict error if the resource version is outdated. // Due to the lack of read-modify-write guarantees in the store, race conditions may occur between @@ -248,6 +257,46 @@ func (c *ManifestWorkAgentClient) Patch(ctx context.Context, name string, pt kub return newWork, nil } +// deleteWork handles the cleanup of a manifestwork that is being deleted. It checks if the work +// has a deletion timestamp and all finalizers have been removed. If so, it marks the manifestwork +// status as deleted, publishes the deletion event to the source, and removes the work from the cache. +func (c *ManifestWorkAgentClient) deleteWork(ctx context.Context, work *workv1.ManifestWork) error { + if work.DeletionTimestamp.IsZero() || len(work.Finalizers) != 0 { + // not ready for deletion (has finalizers or no deletion timestamp) + return nil + } + + eventType := types.CloudEventsType{ + CloudEventsDataType: payload.ManifestBundleEventDataType, + SubResource: types.SubResourceStatus, + Action: types.UpdateRequestAction, + } + + workToPublish := work.DeepCopy() + workToPublish.ResourceVersion = "" + meta.SetStatusCondition(&workToPublish.Status.Conditions, metav1.Condition{ + Type: common.ResourceDeleted, + Status: metav1.ConditionTrue, + Reason: "ManifestsDeleted", + Message: fmt.Sprintf("The manifests are deleted from the cluster %s", work.Namespace), + }) + + if err := c.cloudEventsClient.Publish(ctx, eventType, workToPublish); err != nil { + return cloudeventserrors.ToStatusError(common.ManifestWorkGR, work.Name, err) + } + + c.Lock() + defer c.Unlock() + if err := c.watcherStore.Delete(work); err != nil { + returnErr := errors.NewInternalError(err) + metrics.IncreaseWorkProcessedCounter("delete", string(returnErr.ErrStatus.Reason)) + return returnErr + } + + metrics.IncreaseWorkProcessedCounter("delete", metav1.StatusSuccess) + return nil +} + func versionCompare(new, old *workv1.ManifestWork) *errors.StatusError { // Resource version 0 means force conflict. if new.GetResourceVersion() == "0" { diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/clientholder.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/clientholder.go index 51a36e8cb..19906bb56 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/clientholder.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/clientholder.go @@ -52,7 +52,7 @@ func NewAgentClientHolder(ctx context.Context, opt *options.GenericClientOptions return nil, err } - manifestWorkClient := agentclient.NewManifestWorkAgentClient(opt.ClusterName(), opt.WatcherStore(), agentClient) + manifestWorkClient := agentclient.NewManifestWorkAgentClient(ctx, opt.ClusterName(), opt.WatcherStore(), agentClient) workClient := &internal.WorkV1ClientWrapper{ManifestWorkClient: manifestWorkClient} workClientSet := &internal.WorkClientSetWrapper{WorkV1ClientWrapper: workClient} return &ClientHolder{workClientSet: workClientSet}, nil