mirror of
https://github.com/open-cluster-management-io/ocm.git
synced 2026-05-06 01:07:03 +00:00
2
go.mod
2
go.mod
@@ -42,7 +42,7 @@ require (
|
||||
k8s.io/utils v0.0.0-20251002143259-bc988d571ff4
|
||||
open-cluster-management.io/addon-framework v1.2.1-0.20260306083712-b6f9759b7b6d
|
||||
open-cluster-management.io/api v1.2.1-0.20260305152611-5bfebdbc3fdf
|
||||
open-cluster-management.io/sdk-go v1.2.1-0.20260306024852-c0938d15158a
|
||||
open-cluster-management.io/sdk-go v1.2.1-0.20260310072111-3041045c0177
|
||||
sigs.k8s.io/about-api v0.0.0-20250131010323-518069c31c03
|
||||
sigs.k8s.io/cluster-inventory-api v0.0.0-20251124125836-445319b6307a
|
||||
sigs.k8s.io/controller-runtime v0.23.3
|
||||
|
||||
4
go.sum
4
go.sum
@@ -589,8 +589,8 @@ open-cluster-management.io/addon-framework v1.2.1-0.20260306083712-b6f9759b7b6d
|
||||
open-cluster-management.io/addon-framework v1.2.1-0.20260306083712-b6f9759b7b6d/go.mod h1:Bpw37w4GANroADMDR3F/ZUFoEuTKV9GIn4ijwICcK6E=
|
||||
open-cluster-management.io/api v1.2.1-0.20260305152611-5bfebdbc3fdf h1:SnLaZD2QHz+Ep2SfVKx9y5WIMmyLnQcEv/ySW8k/NXc=
|
||||
open-cluster-management.io/api v1.2.1-0.20260305152611-5bfebdbc3fdf/go.mod h1:ZpXs1bFTIIqKstMHdLO9IY0NFlbCvZgEtByvvNSmab0=
|
||||
open-cluster-management.io/sdk-go v1.2.1-0.20260306024852-c0938d15158a h1:vP9e4d65jPQbHCeuUNEnVRzJ8mxAlhLc8o+PTbDL++k=
|
||||
open-cluster-management.io/sdk-go v1.2.1-0.20260306024852-c0938d15158a/go.mod h1:lDef+5BvifXww0S7cseux+Wi8melkH29bAf33OZ0ZVg=
|
||||
open-cluster-management.io/sdk-go v1.2.1-0.20260310072111-3041045c0177 h1:8YzKbl+PuWIvTyU7C1pVuPUIY4cdbccv5BXZySQfDYI=
|
||||
open-cluster-management.io/sdk-go v1.2.1-0.20260310072111-3041045c0177/go.mod h1:lDef+5BvifXww0S7cseux+Wi8melkH29bAf33OZ0ZVg=
|
||||
sigs.k8s.io/about-api v0.0.0-20250131010323-518069c31c03 h1:1ShFiMjGQOR/8jTBkmZrk1gORxnvMwm1nOy2/DbHg4U=
|
||||
sigs.k8s.io/about-api v0.0.0-20250131010323-518069c31c03/go.mod h1:F1pT4mK53U6F16/zuaPSYpBaR7x5Kjym6aKJJC0/DHU=
|
||||
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.2 h1:jpcvIRr3GLoUoEKRkHKSmGjxb6lWwrBlJsXc+eUYQHM=
|
||||
|
||||
2
vendor/modules.txt
vendored
2
vendor/modules.txt
vendored
@@ -1973,7 +1973,7 @@ open-cluster-management.io/api/operator/v1
|
||||
open-cluster-management.io/api/utils/work/v1/workapplier
|
||||
open-cluster-management.io/api/work/v1
|
||||
open-cluster-management.io/api/work/v1alpha1
|
||||
# open-cluster-management.io/sdk-go v1.2.1-0.20260306024852-c0938d15158a
|
||||
# open-cluster-management.io/sdk-go v1.2.1-0.20260310072111-3041045c0177
|
||||
## explicit; go 1.25.0
|
||||
open-cluster-management.io/sdk-go/pkg/apis/cluster/v1alpha1
|
||||
open-cluster-management.io/sdk-go/pkg/apis/cluster/v1beta1
|
||||
|
||||
46
vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store/watcher.go
generated
vendored
46
vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store/watcher.go
generated
vendored
@@ -15,6 +15,7 @@ type Watcher struct {
|
||||
result chan watch.Event
|
||||
done chan struct{}
|
||||
stopped bool
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
var _ watch.Interface = &Watcher{}
|
||||
@@ -42,35 +43,44 @@ func (w *Watcher) ResultChan() <-chan watch.Event {
|
||||
func (w *Watcher) Stop() {
|
||||
// Call Close() exactly once by locking and setting a flag.
|
||||
w.Lock()
|
||||
defer w.Unlock()
|
||||
// closing a closed channel always panics, therefore check before closing
|
||||
select {
|
||||
case <-w.done:
|
||||
close(w.result)
|
||||
default:
|
||||
w.stopped = true
|
||||
close(w.done)
|
||||
|
||||
if w.stopped {
|
||||
w.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
w.stopped = true
|
||||
close(w.done)
|
||||
w.Unlock()
|
||||
|
||||
// Wait for all Receive() calls to complete before closing the result channel
|
||||
w.wg.Wait()
|
||||
close(w.result)
|
||||
}
|
||||
|
||||
// Receive a event from the work client and sends down the result channel.
|
||||
func (w *Watcher) Receive(evt watch.Event) {
|
||||
if w.isStopped() {
|
||||
// this watcher is stopped, do nothing.
|
||||
// Atomically check if stopped and add to WaitGroup
|
||||
w.RLock()
|
||||
if w.stopped {
|
||||
w.RUnlock()
|
||||
return
|
||||
}
|
||||
w.wg.Add(1)
|
||||
w.RUnlock()
|
||||
|
||||
defer w.wg.Done()
|
||||
|
||||
if klog.V(4).Enabled() {
|
||||
obj, _ := meta.Accessor(evt.Object)
|
||||
klog.V(4).Infof("Receive the event %v for %v", evt.Type, obj.GetName())
|
||||
}
|
||||
|
||||
w.result <- evt
|
||||
}
|
||||
|
||||
func (w *Watcher) isStopped() bool {
|
||||
w.RLock()
|
||||
defer w.RUnlock()
|
||||
|
||||
return w.stopped
|
||||
select {
|
||||
case <-w.done:
|
||||
// watcher is stopped, do nothing
|
||||
return
|
||||
case w.result <- evt:
|
||||
// event sent successfully
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,12 +6,14 @@ import (
|
||||
"net/http"
|
||||
"strconv"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
|
||||
"k8s.io/apimachinery/pkg/api/errors"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
kubetypes "k8s.io/apimachinery/pkg/types"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/apimachinery/pkg/watch"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
@@ -22,11 +24,17 @@ import (
|
||||
cloudeventserrors "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/errors"
|
||||
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store"
|
||||
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/utils"
|
||||
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/payload"
|
||||
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
|
||||
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/metrics"
|
||||
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
|
||||
)
|
||||
|
||||
const (
|
||||
// workDeletionCheckInterval defines how often to check for works that need deletion
|
||||
workDeletionCheckInterval = 2 * time.Second
|
||||
)
|
||||
|
||||
// ManifestWorkAgentClient implements the ManifestWorkInterface. It sends the manifestworks status back to source by
|
||||
// CloudEventAgentClient.
|
||||
type ManifestWorkAgentClient struct {
|
||||
@@ -42,14 +50,39 @@ type ManifestWorkAgentClient struct {
|
||||
var _ workv1client.ManifestWorkInterface = &ManifestWorkAgentClient{}
|
||||
|
||||
func NewManifestWorkAgentClient(
|
||||
_ string,
|
||||
ctx context.Context,
|
||||
clusterName string,
|
||||
watcherStore store.ClientWatcherStore[*workv1.ManifestWork],
|
||||
cloudEventsClient generic.CloudEventsClient[*workv1.ManifestWork],
|
||||
) *ManifestWorkAgentClient {
|
||||
return &ManifestWorkAgentClient{
|
||||
|
||||
client := &ManifestWorkAgentClient{
|
||||
cloudEventsClient: cloudEventsClient,
|
||||
watcherStore: watcherStore,
|
||||
}
|
||||
|
||||
// Start a background goroutine to periodically check for works that need deletion.
|
||||
// This ensures that works with deletion timestamps and no finalizers are properly
|
||||
// cleaned up and their deletion status is sent back to the source.
|
||||
go wait.UntilWithContext(ctx, func(ctx context.Context) {
|
||||
logger := klog.FromContext(ctx)
|
||||
|
||||
// List all works and check if any need to be deleted
|
||||
works, err := watcherStore.List(ctx, clusterName, metav1.ListOptions{})
|
||||
if err != nil {
|
||||
logger.Error(err, "failed to list all works for deletion check")
|
||||
return
|
||||
}
|
||||
|
||||
// Process each work for potential deletion
|
||||
for _, work := range works.Items {
|
||||
if err := client.deleteWork(ctx, work); err != nil {
|
||||
logger.Error(err, "failed to delete work", "namespace", work.Namespace, "name", work.Name)
|
||||
}
|
||||
}
|
||||
}, workDeletionCheckInterval)
|
||||
|
||||
return client
|
||||
}
|
||||
|
||||
func (c *ManifestWorkAgentClient) SetNamespace(namespace string) {
|
||||
@@ -187,18 +220,7 @@ func (c *ManifestWorkAgentClient) Patch(ctx context.Context, name string, pt kub
|
||||
|
||||
newWork := patchedWork.DeepCopy()
|
||||
|
||||
isDeleted := !newWork.DeletionTimestamp.IsZero() && len(newWork.Finalizers) == 0
|
||||
|
||||
if utils.IsStatusPatch(subresources) || isDeleted {
|
||||
if isDeleted {
|
||||
meta.SetStatusCondition(&newWork.Status.Conditions, metav1.Condition{
|
||||
Type: common.ResourceDeleted,
|
||||
Status: metav1.ConditionTrue,
|
||||
Reason: "ManifestsDeleted",
|
||||
Message: fmt.Sprintf("The manifests are deleted from the cluster %s", newWork.Namespace),
|
||||
})
|
||||
}
|
||||
|
||||
if utils.IsStatusPatch(subresources) {
|
||||
// Set work's resource version to remote resource version for publishing
|
||||
workToPublish := newWork.DeepCopy()
|
||||
workToPublish.ResourceVersion = ""
|
||||
@@ -211,19 +233,6 @@ func (c *ManifestWorkAgentClient) Patch(ctx context.Context, name string, pt kub
|
||||
}
|
||||
}
|
||||
|
||||
// the finalizers of a deleting manifestwork are removed, marking the manifestwork status to deleted and sending
|
||||
// it back to source
|
||||
if isDeleted {
|
||||
if err := c.watcherStore.Delete(newWork); err != nil {
|
||||
returnErr := errors.NewInternalError(err)
|
||||
metrics.IncreaseWorkProcessedCounter("delete", string(returnErr.ErrStatus.Reason))
|
||||
return nil, returnErr
|
||||
}
|
||||
|
||||
metrics.IncreaseWorkProcessedCounter("delete", metav1.StatusSuccess)
|
||||
return newWork, nil
|
||||
}
|
||||
|
||||
// Fetch the latest work from the store and verify the resource version to avoid updating the store
|
||||
// with outdated work. Return a conflict error if the resource version is outdated.
|
||||
// Due to the lack of read-modify-write guarantees in the store, race conditions may occur between
|
||||
@@ -248,6 +257,46 @@ func (c *ManifestWorkAgentClient) Patch(ctx context.Context, name string, pt kub
|
||||
return newWork, nil
|
||||
}
|
||||
|
||||
// deleteWork handles the cleanup of a manifestwork that is being deleted. It checks if the work
|
||||
// has a deletion timestamp and all finalizers have been removed. If so, it marks the manifestwork
|
||||
// status as deleted, publishes the deletion event to the source, and removes the work from the cache.
|
||||
func (c *ManifestWorkAgentClient) deleteWork(ctx context.Context, work *workv1.ManifestWork) error {
|
||||
if work.DeletionTimestamp.IsZero() || len(work.Finalizers) != 0 {
|
||||
// not ready for deletion (has finalizers or no deletion timestamp)
|
||||
return nil
|
||||
}
|
||||
|
||||
eventType := types.CloudEventsType{
|
||||
CloudEventsDataType: payload.ManifestBundleEventDataType,
|
||||
SubResource: types.SubResourceStatus,
|
||||
Action: types.UpdateRequestAction,
|
||||
}
|
||||
|
||||
workToPublish := work.DeepCopy()
|
||||
workToPublish.ResourceVersion = ""
|
||||
meta.SetStatusCondition(&workToPublish.Status.Conditions, metav1.Condition{
|
||||
Type: common.ResourceDeleted,
|
||||
Status: metav1.ConditionTrue,
|
||||
Reason: "ManifestsDeleted",
|
||||
Message: fmt.Sprintf("The manifests are deleted from the cluster %s", work.Namespace),
|
||||
})
|
||||
|
||||
if err := c.cloudEventsClient.Publish(ctx, eventType, workToPublish); err != nil {
|
||||
return cloudeventserrors.ToStatusError(common.ManifestWorkGR, work.Name, err)
|
||||
}
|
||||
|
||||
c.Lock()
|
||||
defer c.Unlock()
|
||||
if err := c.watcherStore.Delete(work); err != nil {
|
||||
returnErr := errors.NewInternalError(err)
|
||||
metrics.IncreaseWorkProcessedCounter("delete", string(returnErr.ErrStatus.Reason))
|
||||
return returnErr
|
||||
}
|
||||
|
||||
metrics.IncreaseWorkProcessedCounter("delete", metav1.StatusSuccess)
|
||||
return nil
|
||||
}
|
||||
|
||||
func versionCompare(new, old *workv1.ManifestWork) *errors.StatusError {
|
||||
// Resource version 0 means force conflict.
|
||||
if new.GetResourceVersion() == "0" {
|
||||
|
||||
@@ -52,7 +52,7 @@ func NewAgentClientHolder(ctx context.Context, opt *options.GenericClientOptions
|
||||
return nil, err
|
||||
}
|
||||
|
||||
manifestWorkClient := agentclient.NewManifestWorkAgentClient(opt.ClusterName(), opt.WatcherStore(), agentClient)
|
||||
manifestWorkClient := agentclient.NewManifestWorkAgentClient(ctx, opt.ClusterName(), opt.WatcherStore(), agentClient)
|
||||
workClient := &internal.WorkV1ClientWrapper{ManifestWorkClient: manifestWorkClient}
|
||||
workClientSet := &internal.WorkClientSetWrapper{WorkV1ClientWrapper: workClient}
|
||||
return &ClientHolder{workClientSet: workClientSet}, nil
|
||||
|
||||
Reference in New Issue
Block a user