From 0c5377c34b9cef298c078044cac14586d559aeaa Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Thu, 27 Mar 2025 15:06:09 +0800 Subject: [PATCH] upgrade go-sdk (#914) Signed-off-by: Wei Liu --- go.mod | 2 +- go.sum | 4 +- pkg/work/hub/manager.go | 16 +- pkg/work/spoke/spokeagent.go | 16 +- test/integration/work/suite_test.go | 19 +- test/integration/work/work_test.go | 4 +- vendor/modules.txt | 30 +-- .../{work => clients}/common/common.go | 22 +- .../{work => clients}/errors/errors.go | 2 +- .../cloudevents/clients/options/generic.go | 226 ++++++++++++++++ .../clients/statushash/statushash.go | 33 +++ .../pkg/cloudevents/clients/store/base.go | 75 ++++++ .../pkg/cloudevents/clients/store/informer.go | 133 +++++++++ .../{work => clients}/store/interface.go | 39 +-- .../pkg/cloudevents/clients/store/lister.go | 59 ++++ .../pkg/cloudevents/clients/store/watcher.go | 76 ++++++ .../pkg/cloudevents/clients/utils/utils.go | 222 +++++++++++++++ .../pkg/cloudevents/clients/utils/work.go | 50 ++++ .../work/agent/client/manifestwork.go | 27 +- .../work/agent/codec/manifestbundle.go | 8 +- .../cloudevents/clients/work/clientholder.go | 59 ++++ .../{ => clients}/work/internal/clientset.go | 4 +- .../work/payload/manifestbundle.go | 1 + .../work/source/client/manifestwork.go | 35 +-- .../work/source/codec/manifestbundle.go | 5 +- .../{ => clients}/work/store/base.go | 170 +----------- .../{ => clients}/work/store/informer.go | 68 ++--- .../{ => clients}/work/store/local.go | 69 +++-- .../generic/options/cert/cert_rotation.go | 32 ++- .../generic/options/grpc/options.go | 25 +- .../cloudevents/work/agent/lister/lister.go | 50 ---- .../pkg/cloudevents/work/clientbuilder.go | 253 ------------------ .../cloudevents/work/source/lister/lister.go | 42 --- .../cloudevents/work/statushash/statushash.go | 18 -- .../pkg/cloudevents/work/utils/utils.go | 208 -------------- 35 files changed, 1173 insertions(+), 929 deletions(-) rename vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/{work => clients}/common/common.go (55%) rename vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/{work => clients}/errors/errors.go (90%) create mode 100644 vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/options/generic.go create mode 100644 vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/statushash/statushash.go create mode 100644 vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store/base.go create mode 100644 vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store/informer.go rename vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/{work => clients}/store/interface.go (52%) create mode 100644 vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store/lister.go create mode 100644 vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store/watcher.go create mode 100644 vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/utils/utils.go create mode 100644 vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/utils/work.go rename vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/{ => clients}/work/agent/client/manifestwork.go (92%) rename vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/{ => clients}/work/agent/codec/manifestbundle.go (95%) create mode 100644 vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/clientholder.go rename vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/{ => clients}/work/internal/clientset.go (97%) rename vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/{ => clients}/work/payload/manifestbundle.go (99%) rename vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/{ => clients}/work/source/client/manifestwork.go (90%) rename vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/{ => clients}/work/source/codec/manifestbundle.go (97%) rename vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/{ => clients}/work/store/base.go (56%) rename vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/{ => clients}/work/store/informer.go (65%) rename vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/{ => clients}/work/store/local.go (74%) delete mode 100644 vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/lister/lister.go delete mode 100644 vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/clientbuilder.go delete mode 100644 vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/lister/lister.go delete mode 100644 vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/statushash/statushash.go delete mode 100644 vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/utils/utils.go diff --git a/go.mod b/go.mod index 3ae062bdc..7146ab7fe 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,7 @@ require ( k8s.io/utils v0.0.0-20240921022957-49e7df575cb6 open-cluster-management.io/addon-framework v0.12.0 open-cluster-management.io/api v0.16.1 - open-cluster-management.io/sdk-go v0.16.0 + open-cluster-management.io/sdk-go v0.16.1-0.20250327030116-e8c524c1a85b sigs.k8s.io/cluster-inventory-api v0.0.0-20240730014211-ef0154379848 sigs.k8s.io/controller-runtime v0.19.3 sigs.k8s.io/kube-storage-version-migrator v0.0.6-0.20230721195810-5c8923c5ff96 diff --git a/go.sum b/go.sum index 0ed7e3865..2f2ac5699 100644 --- a/go.sum +++ b/go.sum @@ -491,8 +491,8 @@ open-cluster-management.io/addon-framework v0.12.0 h1:5j7mpyk2ij0SLUZkwWk0KkNTWt open-cluster-management.io/addon-framework v0.12.0/go.mod h1:eReMWXrEHqtilwz5wzEpUrWw9Vfz0HJCH9pi3gOTZns= open-cluster-management.io/api v0.16.1 h1:mS+4UGxHLPQd7CRM0gdFQdVaz139Lo2bkLfqSE0CDNU= open-cluster-management.io/api v0.16.1/go.mod h1:9erZEWEn4bEqh0nIX2wA7f/s3KCuFycQdBrPrRzi0QM= -open-cluster-management.io/sdk-go v0.16.0 h1:Ui1jerkeLaNaJPu47xjOJ3lh+rJQgeJHD25ViQMzAMs= -open-cluster-management.io/sdk-go v0.16.0/go.mod h1:TyOjZC5YxyM5BRNgwTmLuTbHXX6xXqzYBXllrfoVp9w= +open-cluster-management.io/sdk-go v0.16.1-0.20250327030116-e8c524c1a85b h1:MjK6LuPi6kPMG40kMfzV2c7lffmwscUmspXmNGyezV4= +open-cluster-management.io/sdk-go v0.16.1-0.20250327030116-e8c524c1a85b/go.mod h1:TyOjZC5YxyM5BRNgwTmLuTbHXX6xXqzYBXllrfoVp9w= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.3 h1:2770sDpzrjjsAtVhSeUFseziht227YAWYHLGNM8QPwY= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.3/go.mod h1:Ve9uj1L+deCXFrPOk1LpFXqTg7LCFzFso6PA48q/XZw= sigs.k8s.io/cluster-inventory-api v0.0.0-20240730014211-ef0154379848 h1:WYPi2PdQyZwZkHG648v2jQl6deyCgyjJ0fkLYgUJ618= diff --git a/pkg/work/hub/manager.go b/pkg/work/hub/manager.go index 195cac660..2a339d9cf 100644 --- a/pkg/work/hub/manager.go +++ b/pkg/work/hub/manager.go @@ -14,10 +14,11 @@ import ( workinformers "open-cluster-management.io/api/client/work/informers/externalversions" workv1informer "open-cluster-management.io/api/client/work/informers/externalversions/work/v1" workapplier "open-cluster-management.io/sdk-go/pkg/apis/work/v1/applier" + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/options" + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work" + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/source/codec" + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/store" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/store" "open-cluster-management.io/ocm/pkg/work/hub/controllers/manifestworkreplicasetcontroller" ) @@ -96,12 +97,11 @@ func (c *WorkHubManagerConfig) RunWorkHubManager(ctx context.Context, controller return err } - clientHolder, err := work.NewClientHolderBuilder(config). - WithClientID(c.workOptions.CloudEventsClientID). + clientOptions := options.NewGenericClientOptions( + config, codec.NewManifestBundleCodec(), c.workOptions.CloudEventsClientID). WithSourceID(sourceID). - WithCodec(codec.NewManifestBundleCodec()). - WithWorkClientWatcherStore(watcherStore). - NewSourceClientHolder(ctx) + WithClientWatcherStore(watcherStore) + clientHolder, err := work.NewSourceClientHolder(ctx, clientOptions) if err != nil { return err } diff --git a/pkg/work/spoke/spokeagent.go b/pkg/work/spoke/spokeagent.go index 2eae363dc..4ffbc367c 100644 --- a/pkg/work/spoke/spokeagent.go +++ b/pkg/work/spoke/spokeagent.go @@ -18,10 +18,11 @@ import ( workinformers "open-cluster-management.io/api/client/work/informers/externalversions" workv1informers "open-cluster-management.io/api/client/work/informers/externalversions/work/v1" ocmfeature "open-cluster-management.io/api/feature" + cloudeventsoptions "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/options" + cloudeventswork "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work" + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/agent/codec" + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/store" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" - cloudeventswork "open-cluster-management.io/sdk-go/pkg/cloudevents/work" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/codec" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/store" "open-cluster-management.io/ocm/pkg/common/options" "open-cluster-management.io/ocm/pkg/features" @@ -226,12 +227,11 @@ func (o *WorkAgentConfig) newWorkClientAndInformer( return "", nil, nil, err } - clientHolder, err := cloudeventswork.NewClientHolderBuilder(config). - WithClientID(o.workOptions.CloudEventsClientID). + clientOptions := cloudeventsoptions.NewGenericClientOptions( + config, codec.NewManifestBundleCodec(), o.workOptions.CloudEventsClientID). WithClusterName(o.agentOptions.SpokeClusterName). - WithCodec(codec.NewManifestBundleCodec()). - WithWorkClientWatcherStore(watcherStore). - NewAgentClientHolder(ctx) + WithClientWatcherStore(watcherStore) + clientHolder, err := cloudeventswork.NewAgentClientHolder(ctx, clientOptions) if err != nil { return "", nil, nil, err } diff --git a/test/integration/work/suite_test.go b/test/integration/work/suite_test.go index 3d5a06d1f..19efcd181 100644 --- a/test/integration/work/suite_test.go +++ b/test/integration/work/suite_test.go @@ -24,9 +24,10 @@ import ( workclientset "open-cluster-management.io/api/client/work/clientset/versioned" ocmfeature "open-cluster-management.io/api/feature" workapiv1 "open-cluster-management.io/api/work/v1" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work" - sourcecodec "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec" - workstore "open-cluster-management.io/sdk-go/pkg/cloudevents/work/store" + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/options" + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work" + sourcecodec "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/source/codec" + workstore "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/store" "open-cluster-management.io/ocm/pkg/features" "open-cluster-management.io/ocm/pkg/work/helper" @@ -143,12 +144,12 @@ var _ = ginkgo.BeforeSuite(func() { }) gomega.Expect(err).ToNot(gomega.HaveOccurred()) - sourceClient, err := work.NewClientHolderBuilder(util.NewMQTTSourceOptions(sourceID)). - WithClientID(fmt.Sprintf("%s-%s", sourceID, rand.String(5))). - WithSourceID(sourceID). - WithCodec(sourcecodec.NewManifestBundleCodec()). - WithWorkClientWatcherStore(watcherStore). - NewSourceClientHolder(envCtx) + clientOptions := options.NewGenericClientOptions( + util.NewMQTTSourceOptions(sourceID), + sourcecodec.NewManifestBundleCodec(), + fmt.Sprintf("%s-%s", sourceID, rand.String(5)), + ).WithSourceID(sourceID).WithClientWatcherStore(watcherStore) + sourceClient, err := work.NewSourceClientHolder(envCtx, clientOptions) gomega.Expect(err).ToNot(gomega.HaveOccurred()) hubWorkClient = sourceClient.WorkInterface() diff --git a/test/integration/work/work_test.go b/test/integration/work/work_test.go index c6b15570e..8d842f1c8 100644 --- a/test/integration/work/work_test.go +++ b/test/integration/work/work_test.go @@ -19,7 +19,7 @@ import ( "k8s.io/client-go/util/retry" workapiv1 "open-cluster-management.io/api/work/v1" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/store" + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/common" commonoptions "open-cluster-management.io/ocm/pkg/common/options" "open-cluster-management.io/ocm/pkg/work/spoke" @@ -59,7 +59,7 @@ var _ = ginkgo.Describe("ManifestWork", func() { o.WorkloadSourceDriver = sourceDriver o.WorkloadSourceConfig = sourceConfigFileName if sourceDriver != util.KubeDriver { - expectedFinalizer = store.ManifestWorkFinalizer + expectedFinalizer = common.ResourceFinalizer o.CloudEventsClientID = fmt.Sprintf("%s-work-agent", clusterName) o.CloudEventsClientCodecs = []string{"manifestbundle"} } diff --git a/vendor/modules.txt b/vendor/modules.txt index 611745f1b..2de51d5c1 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1777,7 +1777,7 @@ open-cluster-management.io/api/operator/v1 open-cluster-management.io/api/utils/work/v1/workapplier open-cluster-management.io/api/work/v1 open-cluster-management.io/api/work/v1alpha1 -# open-cluster-management.io/sdk-go v0.16.0 +# open-cluster-management.io/sdk-go v0.16.1-0.20250327030116-e8c524c1a85b ## explicit; go 1.22.0 open-cluster-management.io/sdk-go/pkg/apis/cluster/v1alpha1 open-cluster-management.io/sdk-go/pkg/apis/cluster/v1beta1 @@ -1788,6 +1788,20 @@ open-cluster-management.io/sdk-go/pkg/apis/work/v1/validator open-cluster-management.io/sdk-go/pkg/basecontroller/events open-cluster-management.io/sdk-go/pkg/basecontroller/factory open-cluster-management.io/sdk-go/pkg/certrotation +open-cluster-management.io/sdk-go/pkg/cloudevents/clients/common +open-cluster-management.io/sdk-go/pkg/cloudevents/clients/errors +open-cluster-management.io/sdk-go/pkg/cloudevents/clients/options +open-cluster-management.io/sdk-go/pkg/cloudevents/clients/statushash +open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store +open-cluster-management.io/sdk-go/pkg/cloudevents/clients/utils +open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work +open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/agent/client +open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/agent/codec +open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/internal +open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/payload +open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/source/client +open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/source/codec +open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/store open-cluster-management.io/sdk-go/pkg/cloudevents/constants open-cluster-management.io/sdk-go/pkg/cloudevents/generic open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options @@ -1799,20 +1813,6 @@ open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/kafka open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt open-cluster-management.io/sdk-go/pkg/cloudevents/generic/payload open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types -open-cluster-management.io/sdk-go/pkg/cloudevents/work -open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/client -open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/codec -open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/lister -open-cluster-management.io/sdk-go/pkg/cloudevents/work/common -open-cluster-management.io/sdk-go/pkg/cloudevents/work/errors -open-cluster-management.io/sdk-go/pkg/cloudevents/work/internal -open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload -open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client -open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec -open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/lister -open-cluster-management.io/sdk-go/pkg/cloudevents/work/statushash -open-cluster-management.io/sdk-go/pkg/cloudevents/work/store -open-cluster-management.io/sdk-go/pkg/cloudevents/work/utils open-cluster-management.io/sdk-go/pkg/helpers open-cluster-management.io/sdk-go/pkg/patcher # sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.3 diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/common/common.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/common/common.go similarity index 55% rename from vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/common/common.go rename to vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/common/common.go index 72594d3f1..994ddc8e8 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/common/common.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/common/common.go @@ -1,8 +1,10 @@ package common import ( + certificatev1 "k8s.io/api/certificates/v1" "k8s.io/apimachinery/pkg/runtime/schema" + clusterv1 "open-cluster-management.io/api/cluster/v1" workv1 "open-cluster-management.io/api/work/v1" ) @@ -10,10 +12,10 @@ const ( // CloudEventsDataTypeAnnotationKey is the key of the cloudevents data type annotation. CloudEventsDataTypeAnnotationKey = "cloudevents.open-cluster-management.io/datatype" - // CloudEventsResourceVersionAnnotationKey is the key of the manifestwork resourceversion annotation. + // CloudEventsResourceVersionAnnotationKey is the key of the resource resourceversion annotation. // - // This annotation is used for tracing the ManifestWork specific changes, the value of this annotation - // should be a sequence number representing the ManifestWork specific generation. + // This annotation is used for tracing a resource specific changes, the value of this annotation + // should be a sequence number representing the resource specific generation. CloudEventsResourceVersionAnnotationKey = "cloudevents.open-cluster-management.io/resourceversion" // CloudEventsSequenceIDAnnotationKey is the key of the status update event sequence ID. @@ -24,14 +26,22 @@ const ( // CloudEventsOriginalSourceLabelKey is the key of the cloudevents original source label. const CloudEventsOriginalSourceLabelKey = "cloudevents.open-cluster-management.io/originalsource" -// ManifestsDeleted represents the manifests are deleted. -const ManifestsDeleted = "Deleted" - const ( CreateRequestAction = "create_request" UpdateRequestAction = "update_request" DeleteRequestAction = "delete_request" ) +// ResourceDeleted represents a resource is deleted. +const ResourceDeleted = "Deleted" + +const ResourceFinalizer = "cloudevents.open-cluster-management.io/resource-cleanup" + +var ManagedClusterGK = schema.GroupKind{Group: clusterv1.GroupName, Kind: "ManagedCluster"} +var ManagedClusterGR = schema.GroupResource{Group: clusterv1.GroupName, Resource: "managedclusters"} + var ManifestWorkGK = schema.GroupKind{Group: workv1.GroupName, Kind: "ManifestWork"} var ManifestWorkGR = schema.GroupResource{Group: workv1.GroupName, Resource: "manifestworks"} + +var CSRGK = schema.GroupKind{Group: certificatev1.GroupName, Kind: "CertificateSigningRequest"} +var CSRGR = schema.GroupResource{Group: certificatev1.GroupName, Resource: "certificatesigningrequests"} diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/errors/errors.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/errors/errors.go similarity index 90% rename from vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/errors/errors.go rename to vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/errors/errors.go index 0f594a851..818681320 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/errors/errors.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/errors/errors.go @@ -11,7 +11,7 @@ import ( const StatusReasonPublishError metav1.StatusReason = "PublishError" -// NewPublishError returns an error indicating the work could not be published, and the client can try again. +// NewPublishError returns an error indicating a resource could not be published, and the client can try again. func NewPublishError(qualifiedResource schema.GroupResource, name string, err error) *errors.StatusError { return &errors.StatusError{ ErrStatus: metav1.Status{ diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/options/generic.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/options/generic.go new file mode 100644 index 000000000..a60225b28 --- /dev/null +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/options/generic.go @@ -0,0 +1,226 @@ +package options + +import ( + "context" + "fmt" + + "k8s.io/klog/v2" + + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/statushash" + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" +) + +type GenericClientOptions[T generic.ResourceObject] struct { + config any + codec generic.Codec[T] + watcherStore store.ClientWatcherStore[T] + clientID string + sourceID string + clusterName string + resync bool +} + +// NewGenericClientOptions create a GenericClientOptions +// +// - config, available configurations: +// +// MQTTOptions (*mqtt.MQTTOptions): builds a generic cloudevents client with MQTT +// +// GRPCOptions (*grpc.GRPCOptions): builds a generic cloudevents client with GRPC +// +// KafkaOptions (*kafka.KafkaOptions): builds a generic cloudevents client with Kafka +// +// - codec, the codec for resource +// +// - clientID, the client ID for generic cloudevents client. +// +// TODO using a specified config instead of any +func NewGenericClientOptions[T generic.ResourceObject](config any, + codec generic.Codec[T], + clientID string) *GenericClientOptions[T] { + return &GenericClientOptions[T]{ + config: config, + codec: codec, + clientID: clientID, + resync: true, + } +} + +// WithClientWatcherStore set the ClientWatcherStore. The client uses this store to caches the resources and +// watch the resource events. For agent, the AgentInformerWatcherStore is used by default +// +// TODO provide a default ClientWatcherStore for source. +func (o *GenericClientOptions[T]) WithClientWatcherStore(store store.ClientWatcherStore[T]) *GenericClientOptions[T] { + o.watcherStore = store + return o +} + +// WithSourceID set the source ID when building a client for a source. +func (o *GenericClientOptions[T]) WithSourceID(sourceID string) *GenericClientOptions[T] { + o.sourceID = sourceID + return o +} + +// WithClusterName set the managed cluster name when building a client for an agent. +func (o *GenericClientOptions[T]) WithClusterName(clusterName string) *GenericClientOptions[T] { + o.clusterName = clusterName + return o +} + +// WithResyncEnabled control the client resync (Default is true), if it's true, the resync happens when +// 1. after the client's store is initiated +// 2. the client reconnected +func (o *GenericClientOptions[T]) WithResyncEnabled(resync bool) *GenericClientOptions[T] { + o.resync = resync + return o +} + +func (o *GenericClientOptions[T]) ClusterName() string { + return o.clusterName +} + +func (o *GenericClientOptions[T]) SourceID() string { + return o.sourceID +} + +func (o *GenericClientOptions[T]) WatcherStore() store.ClientWatcherStore[T] { + return o.watcherStore +} + +func (o *GenericClientOptions[T]) AgentClient(ctx context.Context) (*generic.CloudEventAgentClient[T], error) { + if len(o.clientID) == 0 { + return nil, fmt.Errorf("client id is required") + } + + if len(o.clusterName) == 0 { + return nil, fmt.Errorf("cluster name is required") + } + + if o.watcherStore == nil { + o.watcherStore = store.NewAgentInformerWatcherStore[T]() + } + + options, err := generic.BuildCloudEventsAgentOptions(o.config, o.clusterName, o.clientID) + if err != nil { + return nil, err + } + + cloudEventsClient, err := generic.NewCloudEventAgentClient( + ctx, + options, + store.NewAgentWatcherStoreLister(o.watcherStore), + statushash.StatusHash, + o.codec, + ) + if err != nil { + return nil, err + } + + // start to subscribe + cloudEventsClient.Subscribe(ctx, o.watcherStore.HandleReceivedResource) + + // start a go routine to receive client reconnect signal + go func() { + for { + select { + case <-ctx.Done(): + return + case <-cloudEventsClient.ReconnectedChan(): + if !o.resync { + klog.V(4).Infof("resync is disabled, do nothing") + continue + } + + // when receiving a client reconnected signal, we resync all sources for this agent + // TODO after supporting multiple sources, we should only resync agent known sources + if err := cloudEventsClient.Resync(ctx, types.SourceAll); err != nil { + klog.Errorf("failed to send resync request, %v", err) + } + } + } + }() + + if !o.resync { + return cloudEventsClient, nil + } + + // start a go routine to resync the works after this client's store is initiated + go func() { + if store.WaitForStoreInit(ctx, o.watcherStore.HasInitiated) { + if err := cloudEventsClient.Resync(ctx, types.SourceAll); err != nil { + klog.Errorf("failed to send resync request, %v", err) + } + } + }() + + return cloudEventsClient, nil +} + +func (o *GenericClientOptions[T]) SourceClient(ctx context.Context) (*generic.CloudEventSourceClient[T], error) { + if len(o.clientID) == 0 { + return nil, fmt.Errorf("client id is required") + } + + if len(o.sourceID) == 0 { + return nil, fmt.Errorf("source id is required") + } + + if o.watcherStore == nil { + return nil, fmt.Errorf("a watcher store is required") + } + + options, err := generic.BuildCloudEventsSourceOptions(o.config, o.clientID, o.sourceID) + if err != nil { + return nil, err + } + + cloudEventsClient, err := generic.NewCloudEventSourceClient( + ctx, + options, + store.NewSourceWatcherStoreLister(o.watcherStore), + statushash.StatusHash, + o.codec, + ) + if err != nil { + return nil, err + } + + // start to subscribe + cloudEventsClient.Subscribe(ctx, o.watcherStore.HandleReceivedResource) + // start a go routine to receive client reconnect signal + go func() { + for { + select { + case <-ctx.Done(): + return + case <-cloudEventsClient.ReconnectedChan(): + if !o.resync { + klog.V(4).Infof("resync is disabled, do nothing") + continue + } + + // when receiving a client reconnected signal, we resync all clusters for this source + if err := cloudEventsClient.Resync(ctx, types.ClusterAll); err != nil { + klog.Errorf("failed to send resync request, %v", err) + } + } + } + }() + + if !o.resync { + return cloudEventsClient, nil + } + + // start a go routine to resync the works after this client's store is initiated + go func() { + if store.WaitForStoreInit(ctx, o.watcherStore.HasInitiated) { + if err := cloudEventsClient.Resync(ctx, types.ClusterAll); err != nil { + klog.Errorf("failed to send resync request, %v", err) + } + } + }() + + return cloudEventsClient, nil +} diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/statushash/statushash.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/statushash/statushash.go new file mode 100644 index 000000000..6d02651e3 --- /dev/null +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/statushash/statushash.go @@ -0,0 +1,33 @@ +package statushash + +import ( + "crypto/sha256" + "encoding/json" + "fmt" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" +) + +// StatusHash returns the SHA256 checksum of a resource status. +func StatusHash[T generic.ResourceObject](resource T) (string, error) { + u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(resource) + if err != nil { + return "", err + } + + status, found, err := unstructured.NestedMap(u, "status") + if err != nil { + return "", err + } + if !found { + return "", fmt.Errorf("no status for the resource %s", resource.GetUID()) + } + + statusBytes, err := json.Marshal(status) + if err != nil { + return "", fmt.Errorf("failed to marshal resource status, %v", err) + } + return fmt.Sprintf("%x", sha256.Sum256(statusBytes)), nil +} diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store/base.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store/base.go new file mode 100644 index 000000000..577130c3b --- /dev/null +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store/base.go @@ -0,0 +1,75 @@ +package store + +import ( + "fmt" + "sync" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/cache" + + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/utils" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" +) + +// BaseClientWatchStore implements the ClientWatcherStore ListAll/List/Get methods +type BaseClientWatchStore[T generic.ResourceObject] struct { + sync.RWMutex + + Store cache.Store + Initiated bool +} + +// List the resources from the store with the list options +func (s *BaseClientWatchStore[T]) List(namespace string, opts metav1.ListOptions) ([]T, error) { + s.RLock() + defer s.RUnlock() + + resources, err := utils.ListResourcesWithOptions[T](s.Store, namespace, opts) + if err != nil { + return nil, err + } + + return resources, nil +} + +// Get a resource from the store +func (s *BaseClientWatchStore[T]) Get(namespace, name string) (resource T, exists bool, err error) { + s.RLock() + defer s.RUnlock() + + key := name + if len(namespace) != 0 { + key = fmt.Sprintf("%s/%s", namespace, name) + } + + obj, exists, err := s.Store.GetByKey(key) + if err != nil { + return resource, false, err + } + + if !exists { + return resource, false, nil + } + + res, ok := obj.(T) + if !ok { + return resource, false, fmt.Errorf("unknown type %T", obj) + } + + return res, true, nil +} + +// List all of resources from the store +func (s *BaseClientWatchStore[T]) ListAll() ([]T, error) { + s.RLock() + defer s.RUnlock() + + resources := []T{} + for _, obj := range s.Store.List() { + if res, ok := obj.(T); ok { + resources = append(resources, res) + } + } + + return resources, nil +} diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store/informer.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store/informer.go new file mode 100644 index 000000000..d993dd8e0 --- /dev/null +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store/informer.go @@ -0,0 +1,133 @@ +package store + +import ( + "fmt" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/utils" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" +) + +// AgentInformerWatcherStore extends the BaseClientWatchStore. + +// It gets/lists the resources from the given informer store and send +// the resource add/update/delete event to the watch channel directly. +// +// It is used for building resource agent client. +type AgentInformerWatcherStore[T generic.ResourceObject] struct { + BaseClientWatchStore[T] + Watcher *Watcher + + informer cache.SharedIndexInformer +} + +func NewAgentInformerWatcherStore[T generic.ResourceObject]() *AgentInformerWatcherStore[T] { + return &AgentInformerWatcherStore[T]{ + BaseClientWatchStore: BaseClientWatchStore[T]{}, + Watcher: NewWatcher(), + } +} + +func (s *AgentInformerWatcherStore[T]) Add(resource runtime.Object) error { + s.Watcher.Receive(watch.Event{Type: watch.Added, Object: resource}) + return nil +} + +func (s *AgentInformerWatcherStore[T]) Update(resource runtime.Object) error { + s.Watcher.Receive(watch.Event{Type: watch.Modified, Object: resource}) + return nil +} + +func (s *AgentInformerWatcherStore[T]) Delete(resource runtime.Object) error { + s.Watcher.Receive(watch.Event{Type: watch.Deleted, Object: resource}) + return nil +} + +func (s *AgentInformerWatcherStore[T]) HandleReceivedResource(action types.ResourceAction, resource T) error { + switch action { + case types.Added: + newObj, err := utils.ToRuntimeObject(resource) + if err != nil { + return err + } + + return s.Add(newObj) + case types.Modified: + newObj, err := meta.Accessor(resource) + if err != nil { + return err + } + + lastObj, exists, err := s.Get(newObj.GetNamespace(), newObj.GetName()) + if err != nil { + return err + } + if !exists { + return fmt.Errorf("the resource %s/%s does not exist", newObj.GetNamespace(), newObj.GetName()) + } + + // prevent the resource from being updated if it is deleting + if !lastObj.GetDeletionTimestamp().IsZero() { + klog.Warningf("the resource %s/%s is deleting, ignore the update", newObj.GetNamespace(), newObj.GetName()) + return nil + } + + updated, err := utils.ToRuntimeObject(resource) + if err != nil { + return err + } + + return s.Update(updated) + case types.Deleted: + newObj, err := meta.Accessor(resource) + if err != nil { + return err + } + + if newObj.GetDeletionTimestamp().IsZero() { + return nil + } + + if len(newObj.GetFinalizers()) != 0 { + return nil + } + + last, exists, err := s.Get(newObj.GetNamespace(), newObj.GetName()) + if err != nil { + return err + } + if !exists { + return nil + } + + deletingObj, err := utils.ToRuntimeObject(last) + if err != nil { + return err + } + + return s.Delete(deletingObj) + default: + return fmt.Errorf("unsupported resource action %s", action) + } +} + +func (s *AgentInformerWatcherStore[T]) GetWatcher(namespace string, opts metav1.ListOptions) (watch.Interface, error) { + return s.Watcher, nil +} + +func (s *AgentInformerWatcherStore[T]) HasInitiated() bool { + return s.Initiated && s.informer.HasSynced() +} + +func (s *AgentInformerWatcherStore[T]) SetInformer(informer cache.SharedIndexInformer) { + s.informer = informer + s.Store = informer.GetStore() + s.Initiated = true +} diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/interface.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store/interface.go similarity index 52% rename from vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/interface.go rename to vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store/interface.go index 0b7f0c227..eb705a396 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/interface.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store/interface.go @@ -5,11 +5,12 @@ import ( "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/klog/v2" - workv1 "open-cluster-management.io/api/work/v1" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" ) @@ -18,37 +19,37 @@ const syncedPollPeriod = 100 * time.Millisecond // StoreInitiated is a function that can be used to determine if a store has initiated. type StoreInitiated func() bool -// WorkClientWatcherStore provides a watcher with a work store. -type WorkClientWatcherStore interface { - // GetWatcher returns a watcher to receive work changes. +// ClientWatcherStore provides a watcher with a resource store. +type ClientWatcherStore[T generic.ResourceObject] interface { + // GetWatcher returns a watcher to receive resource changes. GetWatcher(namespace string, opts metav1.ListOptions) (watch.Interface, error) - // HandleReceivedWork handles the client received work events. - HandleReceivedWork(action types.ResourceAction, work *workv1.ManifestWork) error + // HandleReceivedResource handles the client received resource events. + HandleReceivedResource(action types.ResourceAction, resource T) error - // Add will be called by work client when adding works. The implementation is based on the specific + // Add will be called by resource client when adding resources. The implementation is based on the specific // watcher store, in some case, it does not need to update a store, but just send a watch event. - Add(work *workv1.ManifestWork) error + Add(resource runtime.Object) error - // Update will be called by work client when updating works. The implementation is based on the specific + // Update will be called by resource client when updating works. The implementation is based on the specific // watcher store, in some case, it does not need to update a store, but just send a watch event. - Update(work *workv1.ManifestWork) error + Update(resource runtime.Object) error - // Delete will be called by work client when deleting works. The implementation is based on the specific + // Delete will be called by resource client when deleting works. The implementation is based on the specific // watcher store, in some case, it does not need to update a store, but just send a watch event. - Delete(work *workv1.ManifestWork) error + Delete(resource runtime.Object) error - // List returns the works from store for a given namespace with list options - List(namespace string, opts metav1.ListOptions) (*workv1.ManifestWorkList, error) + // List returns the resources from store for a given namespace with list options + List(namespace string, opts metav1.ListOptions) ([]T, error) - // ListAll list all of the works from store - ListAll() ([]*workv1.ManifestWork, error) + // ListAll list all of the resources from store + ListAll() ([]T, error) - // Get returns a work from store with work namespace and name - Get(namespace, name string) (*workv1.ManifestWork, bool, error) + // Get returns a resource from store with resource namespace and name + Get(namespace, name string) (T, bool, error) // HasInitiated marks the store has been initiated, A resync may be required after the store is initiated - // when building a work client. + // when building a resource client. HasInitiated() bool } diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store/lister.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store/lister.go new file mode 100644 index 000000000..b0b2b2f49 --- /dev/null +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store/lister.go @@ -0,0 +1,59 @@ +package store + +import ( + "fmt" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/common" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" +) + +// AgentWatcherStoreLister list the resources from ClientWatcherStore on an agent +type AgentWatcherStoreLister[T generic.ResourceObject] struct { + store ClientWatcherStore[T] +} + +func NewAgentWatcherStoreLister[T generic.ResourceObject](store ClientWatcherStore[T]) *AgentWatcherStoreLister[T] { + return &AgentWatcherStoreLister[T]{ + store: store, + } +} + +// List returns the resources from a ClientWatcherStore with list options +func (l *AgentWatcherStoreLister[T]) List(options types.ListOptions) ([]T, error) { + opts := metav1.ListOptions{} + + if options.Source != types.SourceAll { + opts.LabelSelector = fmt.Sprintf("%s=%s", common.CloudEventsOriginalSourceLabelKey, options.Source) + } + + list, err := l.store.List("", opts) + if err != nil { + return nil, err + } + + return list, nil +} + +// SourceWatcherStoreLister list the resources from a ClientWatcherStore on a source. +type SourceWatcherStoreLister[T generic.ResourceObject] struct { + store ClientWatcherStore[T] +} + +func NewSourceWatcherStoreLister[T generic.ResourceObject](store ClientWatcherStore[T]) *SourceWatcherStoreLister[T] { + return &SourceWatcherStoreLister[T]{ + store: store, + } +} + +// List returns the resources from a ClientWatcherStore with list options. +func (l *SourceWatcherStoreLister[T]) List(options types.ListOptions) ([]T, error) { + list, err := l.store.List(options.ClusterName, metav1.ListOptions{}) + if err != nil { + return nil, err + } + + return list, nil +} diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store/watcher.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store/watcher.go new file mode 100644 index 000000000..93cf44d6e --- /dev/null +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store/watcher.go @@ -0,0 +1,76 @@ +package store + +import ( + "sync" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/klog/v2" +) + +// Watcher implements the watch.Interface. +type Watcher struct { + sync.RWMutex + + result chan watch.Event + done chan struct{} + stopped bool +} + +var _ watch.Interface = &Watcher{} + +func NewWatcher() *Watcher { + return &Watcher{ + // It's easy for a consumer to add buffering via an extra + // goroutine/channel, but impossible for them to remove it, + // so nonbuffered is better. + result: make(chan watch.Event), + // If the watcher is externally stopped there is no receiver anymore + // and the send operations on the result channel, especially the + // error reporting might block forever. + // Therefore a dedicated stop channel is used to resolve this blocking. + done: make(chan struct{}), + } +} + +// ResultChan implements Interface. +func (w *Watcher) ResultChan() <-chan watch.Event { + return w.result +} + +// Stop implements Interface. +func (w *Watcher) Stop() { + // Call Close() exactly once by locking and setting a flag. + w.Lock() + defer w.Unlock() + // closing a closed channel always panics, therefore check before closing + select { + case <-w.done: + close(w.result) + default: + w.stopped = true + close(w.done) + } +} + +// Receive a event from the work client and sends down the result channel. +func (w *Watcher) Receive(evt watch.Event) { + if w.isStopped() { + // this watcher is stopped, do nothing. + return + } + + if klog.V(4).Enabled() { + obj, _ := meta.Accessor(evt.Object) + klog.V(4).Infof("Receive the event %v for %v", evt.Type, obj.GetName()) + } + + w.result <- evt +} + +func (w *Watcher) isStopped() bool { + w.RLock() + defer w.RUnlock() + + return w.stopped +} diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/utils/utils.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/utils/utils.go new file mode 100644 index 000000000..90f99829e --- /dev/null +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/utils/utils.go @@ -0,0 +1,222 @@ +package utils + +import ( + "encoding/json" + "fmt" + + "github.com/bwmarrin/snowflake" + jsonpatch "github.com/evanphx/json-patch" + "github.com/google/uuid" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/apimachinery/pkg/api/validation" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + metav1validation "k8s.io/apimachinery/pkg/apis/meta/v1/validation" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/validation/field" + "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2" + + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/common" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" +) + +// Patch applies the patch to a resource with the patch type. +func Patch[T generic.ResourceObject](patchType types.PatchType, work T, patchData []byte) (resource T, err error) { + workData, err := json.Marshal(work) + if err != nil { + return resource, err + } + + var patchedData []byte + switch patchType { + case types.JSONPatchType: + var patchObj jsonpatch.Patch + patchObj, err = jsonpatch.DecodePatch(patchData) + if err != nil { + return resource, err + } + patchedData, err = patchObj.Apply(workData) + if err != nil { + return resource, err + } + + case types.MergePatchType: + patchedData, err = jsonpatch.MergePatch(workData, patchData) + if err != nil { + return resource, err + } + default: + return resource, fmt.Errorf("unsupported patch type: %s", patchType) + } + + patchedWork := new(T) + if err := json.Unmarshal(patchedData, patchedWork); err != nil { + return resource, err + } + + return *patchedWork, nil +} + +// ListResourcesWithOptions retrieves the resources from store which matches the options. +func ListResourcesWithOptions[T generic.ResourceObject](store cache.Store, namespace string, opts metav1.ListOptions) ([]T, error) { + var err error + + labelSelector := labels.Everything() + fieldSelector := fields.Everything() + + if len(opts.LabelSelector) != 0 { + labelSelector, err = labels.Parse(opts.LabelSelector) + if err != nil { + return nil, fmt.Errorf("invalid labels selector %q: %v", opts.LabelSelector, err) + } + } + + if len(opts.FieldSelector) != 0 { + fieldSelector, err = fields.ParseSelector(opts.FieldSelector) + if err != nil { + return nil, fmt.Errorf("invalid fields selector %q: %v", opts.FieldSelector, err) + } + } + + resources := []T{} + // list with labels + if err := cache.ListAll(store, labelSelector, func(obj any) { + resourceMeta, ok := obj.(metav1.Object) + if !ok { + klog.Warningf("the object in store %T is not a meta object", obj) + return + } + + if namespace != metav1.NamespaceAll && resourceMeta.GetNamespace() != namespace { + return + } + + workFieldSet := fields.Set{ + "metadata.name": resourceMeta.GetName(), + "metadata.namespace": resourceMeta.GetNamespace(), + } + + if !fieldSelector.Matches(workFieldSet) { + return + } + + resource, ok := obj.(T) + if !ok { + return + } + + resources = append(resources, resource) + }); err != nil { + return nil, err + } + + return resources, nil +} + +// ValidateResourceMetadata validates the metadata of the given resource +func ValidateResourceMetadata[T generic.ResourceObject](resource T) field.ErrorList { + errs := field.ErrorList{} + fldPath := field.NewPath("metadata") + + obj, err := meta.Accessor(resource) + if err != nil { + errs = append(errs, field.TypeInvalid(fldPath, resource, err.Error())) + return errs + } + + if obj.GetUID() == "" { + errs = append(errs, field.Required(fldPath.Child("uid"), "field not set")) + return errs + } + + if obj.GetResourceVersion() == "" { + errs = append(errs, field.Required(fldPath.Child("resourceVersion"), "field not set")) + return errs + } + + if obj.GetName() == "" { + errs = append(errs, field.Required(fldPath.Child("name"), "field not set")) + return errs + } + + for _, msg := range validation.ValidateNamespaceName(obj.GetName(), false) { + errs = append(errs, field.Invalid(fldPath.Child("name"), obj.GetName(), msg)) + } + + if obj.GetNamespace() != "" { + for _, msg := range validation.ValidateNamespaceName(obj.GetNamespace(), false) { + errs = append(errs, field.Invalid(fldPath.Child("namespace"), obj.GetNamespace(), msg)) + } + } + + errs = append(errs, metav1validation.ValidateLabels(obj.GetLabels(), fldPath.Child("labels"))...) + errs = append(errs, validation.ValidateAnnotations(obj.GetAnnotations(), fldPath.Child("annotations"))...) + errs = append(errs, validation.ValidateFinalizers(obj.GetFinalizers(), fldPath.Child("finalizers"))...) + return errs +} + +// ToRuntimeObject convert a resource to a runtime Object +func ToRuntimeObject[T generic.ResourceObject](resource T) (runtime.Object, error) { + obj, ok := any(resource).(runtime.Object) + if !ok { + return nil, fmt.Errorf("object %T does not implement the runtime Object interfaces", resource) + } + + return obj.DeepCopyObject(), nil +} + +// CompareSnowflakeSequenceIDs compares two snowflake sequence IDs. +// Returns true if the current ID is greater than the last. +// If the last sequence ID is empty, then the current is greater. +func CompareSnowflakeSequenceIDs(last, current string) (bool, error) { + if current != "" && last == "" { + return true, nil + } + + lastSID, err := snowflake.ParseString(last) + if err != nil { + return false, fmt.Errorf("unable to parse last sequence ID: %s, %v", last, err) + } + + currentSID, err := snowflake.ParseString(current) + if err != nil { + return false, fmt.Errorf("unable to parse current sequence ID: %s %v", current, err) + } + + if currentSID.Node() != lastSID.Node() { + return false, fmt.Errorf("sequence IDs (%s,%s) are not from the same node", last, current) + } + + if currentSID.Time() != lastSID.Time() { + return currentSID.Time() > lastSID.Time(), nil + } + + return currentSID.Step() > lastSID.Step(), nil +} + +// UID returns a v5 UUID based on sourceID, groupResource, namespace and name to make sure it is consistent +func UID(sourceID, groupResource, namespace, name string) string { + id := fmt.Sprintf("%s-%s-%s-%s", sourceID, groupResource, namespace, name) + return uuid.NewSHA1(uuid.NameSpaceOID, []byte(id)).String() +} + +// EnsureResourceFinalizer ensures the resource finalizer in the given finalizers +func EnsureResourceFinalizer(finalizers []string) []string { + has := false + for _, f := range finalizers { + if f == common.ResourceFinalizer { + has = true + break + } + } + + if !has { + finalizers = append(finalizers, common.ResourceFinalizer) + } + + return finalizers +} diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/utils/work.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/utils/work.go new file mode 100644 index 000000000..25c65b06e --- /dev/null +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/utils/work.go @@ -0,0 +1,50 @@ +package utils + +import ( + "bytes" + "fmt" + + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/util/validation/field" + + workv1 "open-cluster-management.io/api/work/v1" + "open-cluster-management.io/sdk-go/pkg/apis/work/v1/validator" +) + +func ValidateWork(work *workv1.ManifestWork) field.ErrorList { + errs := field.ErrorList{} + + if work.Namespace == "" { + errs = append(errs, field.Required(field.NewPath("metadata").Child("namespace"), "field not set")) + } + + if metaErrs := ValidateResourceMetadata(work); len(metaErrs) != 0 { + errs = append(errs, metaErrs...) + } + + if err := validator.ManifestValidator.ValidateManifests(work.Spec.Workload.Manifests); err != nil { + errs = append(errs, field.Invalid(field.NewPath("spec"), "spec", err.Error())) + } + return errs +} + +// encode ensures the given work's manifests are encoded +func EncodeManifests(work *workv1.ManifestWork) error { + for index, manifest := range work.Spec.Workload.Manifests { + if manifest.Raw == nil { + if manifest.Object == nil { + return fmt.Errorf("the Object and Raw of the manifest[%d] for the work (%s/%s) are both `nil`", + index, work.Namespace, work.Name) + } + + var buf bytes.Buffer + if err := unstructured.UnstructuredJSONScheme.Encode(manifest.Object, &buf); err != nil { + return err + } + + work.Spec.Workload.Manifests[index].Raw = buf.Bytes() + } + } + + return nil +} diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/client/manifestwork.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/agent/client/manifestwork.go similarity index 92% rename from vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/client/manifestwork.go rename to vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/agent/client/manifestwork.go index c250e45d0..8bee84087 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/client/manifestwork.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/agent/client/manifestwork.go @@ -17,12 +17,12 @@ import ( workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1" workv1 "open-cluster-management.io/api/work/v1" + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/common" + cloudeventserrors "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/errors" + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store" + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/utils" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/common" - workerrors "open-cluster-management.io/sdk-go/pkg/cloudevents/work/errors" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/store" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/utils" ) // ManifestWorkAgentClient implements the ManifestWorkInterface. It sends the manifestworks status back to source by @@ -31,7 +31,7 @@ type ManifestWorkAgentClient struct { sync.RWMutex cloudEventsClient *generic.CloudEventAgentClient[*workv1.ManifestWork] - watcherStore store.WorkClientWatcherStore + watcherStore store.ClientWatcherStore[*workv1.ManifestWork] // this namespace should be same with the cluster name to which this client subscribes namespace string @@ -40,9 +40,9 @@ type ManifestWorkAgentClient struct { var _ workv1client.ManifestWorkInterface = &ManifestWorkAgentClient{} func NewManifestWorkAgentClient( - cloudEventsClient *generic.CloudEventAgentClient[*workv1.ManifestWork], - watcherStore store.WorkClientWatcherStore, clusterName string, + watcherStore store.ClientWatcherStore[*workv1.ManifestWork], + cloudEventsClient *generic.CloudEventAgentClient[*workv1.ManifestWork], ) *ManifestWorkAgentClient { return &ManifestWorkAgentClient{ cloudEventsClient: cloudEventsClient, @@ -102,7 +102,12 @@ func (c *ManifestWorkAgentClient) List(ctx context.Context, opts metav1.ListOpti } generic.IncreaseWorkProcessedCounter("list", metav1.StatusSuccess) - return works, nil + items := []workv1.ManifestWork{} + for _, work := range works { + items = append(items, *work) + } + + return &workv1.ManifestWorkList{Items: items}, nil } func (c *ManifestWorkAgentClient) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { @@ -169,7 +174,7 @@ func (c *ManifestWorkAgentClient) Patch(ctx context.Context, name string, pt kub // publish the status update event to source, source will check the resource version // and reject the update if it's status update is outdated. if err := c.cloudEventsClient.Publish(ctx, eventType, newWork); err != nil { - returnErr := workerrors.NewPublishError(common.ManifestWorkGR, name, err) + returnErr := cloudeventserrors.NewPublishError(common.ManifestWorkGR, name, err) generic.IncreaseWorkProcessedCounter("patch", string(returnErr.ErrStatus.Reason)) return nil, returnErr } @@ -223,7 +228,7 @@ func (c *ManifestWorkAgentClient) Patch(ctx context.Context, name string, pt kub // it back to source if !newWork.DeletionTimestamp.IsZero() && len(newWork.Finalizers) == 0 { meta.SetStatusCondition(&newWork.Status.Conditions, metav1.Condition{ - Type: common.ManifestsDeleted, + Type: common.ResourceDeleted, Status: metav1.ConditionTrue, Reason: "ManifestsDeleted", Message: fmt.Sprintf("The manifests are deleted from the cluster %s", newWork.Namespace), @@ -231,7 +236,7 @@ func (c *ManifestWorkAgentClient) Patch(ctx context.Context, name string, pt kub eventType.Action = common.DeleteRequestAction if err := c.cloudEventsClient.Publish(ctx, eventType, newWork); err != nil { - returnErr := workerrors.NewPublishError(common.ManifestWorkGR, name, err) + returnErr := cloudeventserrors.NewPublishError(common.ManifestWorkGR, name, err) generic.IncreaseWorkProcessedCounter("delete", string(returnErr.ErrStatus.Reason)) return nil, returnErr } diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/codec/manifestbundle.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/agent/codec/manifestbundle.go similarity index 95% rename from vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/codec/manifestbundle.go rename to vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/agent/codec/manifestbundle.go index 6ca8998d4..432b12704 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/codec/manifestbundle.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/agent/codec/manifestbundle.go @@ -13,10 +13,10 @@ import ( workv1 "open-cluster-management.io/api/work/v1" "open-cluster-management.io/sdk-go/pkg/apis/work/v1/validator" + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/common" + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/statushash" + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/payload" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/common" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/statushash" ) var sequenceGenerator *snowflake.Node @@ -67,7 +67,7 @@ func (c *ManifestBundleCodec) Encode(source string, eventType types.CloudEventsT WithOriginalSource(originalSource). NewEvent() - statusHash, err := statushash.ManifestWorkStatusHash(work) + statusHash, err := statushash.StatusHash(work) if err != nil { return nil, err } diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/clientholder.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/clientholder.go new file mode 100644 index 000000000..51a36e8cb --- /dev/null +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/clientholder.go @@ -0,0 +1,59 @@ +package work + +import ( + "context" + + workclientset "open-cluster-management.io/api/client/work/clientset/versioned" + workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1" + workv1 "open-cluster-management.io/api/work/v1" + + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/options" + agentclient "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/agent/client" + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/internal" + sourceclient "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/source/client" +) + +// ClientHolder holds a manifestwork client that implements the ManifestWorkInterface based on different configuration +// +// ClientHolder also implements the ManifestWorksGetter interface. +type ClientHolder struct { + workClientSet workclientset.Interface +} + +var _ workv1client.ManifestWorksGetter = &ClientHolder{} + +// WorkInterface returns a workclientset Interface +func (h *ClientHolder) WorkInterface() workclientset.Interface { + return h.workClientSet +} + +// ManifestWorks returns a ManifestWorkInterface +func (h *ClientHolder) ManifestWorks(namespace string) workv1client.ManifestWorkInterface { + return h.workClientSet.WorkV1().ManifestWorks(namespace) +} + +// NewSourceClientHolder returns a ClientHolder for a source +func NewSourceClientHolder(ctx context.Context, opt *options.GenericClientOptions[*workv1.ManifestWork]) (*ClientHolder, error) { + sourceClient, err := opt.SourceClient(ctx) + if err != nil { + return nil, err + } + + manifestWorkClient := sourceclient.NewManifestWorkSourceClient(opt.SourceID(), opt.WatcherStore(), sourceClient) + workClient := &internal.WorkV1ClientWrapper{ManifestWorkClient: manifestWorkClient} + workClientSet := &internal.WorkClientSetWrapper{WorkV1ClientWrapper: workClient} + return &ClientHolder{workClientSet: workClientSet}, nil +} + +// NewAgentClientHolder returns a ClientHolder for an agent +func NewAgentClientHolder(ctx context.Context, opt *options.GenericClientOptions[*workv1.ManifestWork]) (*ClientHolder, error) { + agentClient, err := opt.AgentClient(ctx) + if err != nil { + return nil, err + } + + manifestWorkClient := agentclient.NewManifestWorkAgentClient(opt.ClusterName(), opt.WatcherStore(), agentClient) + workClient := &internal.WorkV1ClientWrapper{ManifestWorkClient: manifestWorkClient} + workClientSet := &internal.WorkClientSetWrapper{WorkV1ClientWrapper: workClient} + return &ClientHolder{workClientSet: workClientSet}, nil +} diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/internal/clientset.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/internal/clientset.go similarity index 97% rename from vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/internal/clientset.go rename to vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/internal/clientset.go index 6a00dfcc5..be3faba66 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/internal/clientset.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/internal/clientset.go @@ -8,8 +8,8 @@ import ( workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1" workv1alpha1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1alpha1" - agentclient "open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/client" - sourceclient "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client" + agentclient "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/agent/client" + sourceclient "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/source/client" ) // WorkClientSetWrapper wraps a work client that has a manifestwork client to a work clientset interface, this wrapper diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload/manifestbundle.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/payload/manifestbundle.go similarity index 99% rename from vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload/manifestbundle.go rename to vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/payload/manifestbundle.go index 2b744b425..829f4db96 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload/manifestbundle.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/payload/manifestbundle.go @@ -4,6 +4,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" workv1 "open-cluster-management.io/api/work/v1" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" ) diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client/manifestwork.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/source/client/manifestwork.go similarity index 90% rename from vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client/manifestwork.go rename to vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/source/client/manifestwork.go index 3da87c864..52c022cc8 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client/manifestwork.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/source/client/manifestwork.go @@ -15,19 +15,19 @@ import ( workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1" workv1 "open-cluster-management.io/api/work/v1" + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/common" + cloudeventserrors "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/errors" + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store" + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/utils" + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/payload" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/common" - workerrors "open-cluster-management.io/sdk-go/pkg/cloudevents/work/errors" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/store" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/utils" ) // ManifestWorkSourceClient implements the ManifestWorkInterface. type ManifestWorkSourceClient struct { cloudEventsClient *generic.CloudEventSourceClient[*workv1.ManifestWork] - watcherStore store.WorkClientWatcherStore + watcherStore store.ClientWatcherStore[*workv1.ManifestWork] namespace string sourceID string } @@ -36,8 +36,8 @@ var _ workv1client.ManifestWorkInterface = &ManifestWorkSourceClient{} func NewManifestWorkSourceClient( sourceID string, + watcherStore store.ClientWatcherStore[*workv1.ManifestWork], cloudEventsClient *generic.CloudEventSourceClient[*workv1.ManifestWork], - watcherStore store.WorkClientWatcherStore, ) *ManifestWorkSourceClient { return &ManifestWorkSourceClient{ cloudEventsClient: cloudEventsClient, @@ -84,24 +84,24 @@ func (c *ManifestWorkSourceClient) Create(ctx context.Context, manifestWork *wor } newWork := manifestWork.DeepCopy() - newWork.UID = kubetypes.UID(utils.UID(c.sourceID, c.namespace, newWork.Name)) + newWork.UID = kubetypes.UID(utils.UID(c.sourceID, common.ManifestWorkGR.String(), c.namespace, newWork.Name)) newWork.Namespace = c.namespace newWork.ResourceVersion = getWorkResourceVersion(manifestWork) - if err := utils.Encode(newWork); err != nil { + if err := utils.EncodeManifests(newWork); err != nil { returnErr := errors.NewInternalError(err) generic.IncreaseWorkProcessedCounter("create", string(returnErr.ErrStatus.Reason)) return nil, returnErr } - if errs := utils.Validate(newWork); len(errs) != 0 { + if errs := utils.ValidateWork(newWork); len(errs) != 0 { returnErr := errors.NewInvalid(common.ManifestWorkGK, manifestWork.Name, errs) generic.IncreaseWorkProcessedCounter("create", string(returnErr.ErrStatus.Reason)) return nil, returnErr } if err := c.cloudEventsClient.Publish(ctx, eventType, newWork); err != nil { - returnErr := workerrors.NewPublishError(common.ManifestWorkGR, manifestWork.Name, err) + returnErr := cloudeventserrors.NewPublishError(common.ManifestWorkGR, manifestWork.Name, err) generic.IncreaseWorkProcessedCounter("create", string(returnErr.ErrStatus.Reason)) return nil, returnErr } @@ -149,7 +149,7 @@ func (c *ManifestWorkSourceClient) Delete(ctx context.Context, name string, opts deletingWork.DeletionTimestamp = &now if err := c.cloudEventsClient.Publish(ctx, eventType, deletingWork); err != nil { - returnErr := workerrors.NewPublishError(common.ManifestWorkGR, name, err) + returnErr := cloudeventserrors.NewPublishError(common.ManifestWorkGR, name, err) generic.IncreaseWorkProcessedCounter("delete", string(returnErr.ErrStatus.Reason)) return returnErr } @@ -211,7 +211,12 @@ func (c *ManifestWorkSourceClient) List(ctx context.Context, opts metav1.ListOpt } generic.IncreaseWorkProcessedCounter("list", metav1.StatusSuccess) - return works, nil + items := []workv1.ManifestWork{} + for _, work := range works { + items = append(items, *work) + } + + return &workv1.ManifestWorkList{Items: items}, nil } func (c *ManifestWorkSourceClient) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) { @@ -266,14 +271,14 @@ func (c *ManifestWorkSourceClient) Patch(ctx context.Context, name string, pt ku newWork := patchedWork.DeepCopy() newWork.ResourceVersion = getWorkResourceVersion(patchedWork) - if errs := utils.Validate(newWork); len(errs) != 0 { + if errs := utils.ValidateWork(newWork); len(errs) != 0 { returnErr := errors.NewInvalid(common.ManifestWorkGK, name, errs) generic.IncreaseWorkProcessedCounter("patch", string(returnErr.ErrStatus.Reason)) return nil, returnErr } if err := c.cloudEventsClient.Publish(ctx, eventType, newWork); err != nil { - returnErr := workerrors.NewPublishError(common.ManifestWorkGR, name, err) + returnErr := cloudeventserrors.NewPublishError(common.ManifestWorkGR, name, err) generic.IncreaseWorkProcessedCounter("patch", string(returnErr.ErrStatus.Reason)) return nil, returnErr } diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec/manifestbundle.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/source/codec/manifestbundle.go similarity index 97% rename from vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec/manifestbundle.go rename to vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/source/codec/manifestbundle.go index d0a036f44..ead101147 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec/manifestbundle.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/source/codec/manifestbundle.go @@ -12,9 +12,10 @@ import ( kubetypes "k8s.io/apimachinery/pkg/types" workv1 "open-cluster-management.io/api/work/v1" + + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/common" + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/payload" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/common" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload" ) // ExtensionWorkMeta is an extension attribute for work meta data. diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/base.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/store/base.go similarity index 56% rename from vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/base.go rename to vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/store/base.go index ff96cd493..3f9cb892d 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/base.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/store/base.go @@ -3,98 +3,31 @@ package store import ( "fmt" "strconv" - "sync" "time" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kubetypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" workv1 "open-cluster-management.io/api/work/v1" + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/common" + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store" + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/utils" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/common" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/utils" ) -const ManifestWorkFinalizer = "cloudevents.open-cluster-management.io/manifest-work-cleanup" - -type baseStore struct { - sync.RWMutex - - store cache.Store - initiated bool -} - -// List the works from the store with the list options -func (b *baseStore) List(namespace string, opts metav1.ListOptions) (*workv1.ManifestWorkList, error) { - b.RLock() - defer b.RUnlock() - - works, err := utils.ListWorksWithOptions(b.store, namespace, opts) - if err != nil { - return nil, err - } - - items := []workv1.ManifestWork{} - for _, work := range works { - items = append(items, *work) - } - - return &workv1.ManifestWorkList{Items: items}, nil -} - -// Get a works from the store -func (b *baseStore) Get(namespace, name string) (*workv1.ManifestWork, bool, error) { - b.RLock() - defer b.RUnlock() - - obj, exists, err := b.store.GetByKey(fmt.Sprintf("%s/%s", namespace, name)) - if err != nil { - return nil, false, err - } - - if !exists { - return nil, false, nil - } - - work, ok := obj.(*workv1.ManifestWork) - if !ok { - return nil, false, fmt.Errorf("unknown type %T", obj) - } - - return work, true, nil -} - -// List all of works from the store -func (b *baseStore) ListAll() ([]*workv1.ManifestWork, error) { - b.RLock() - defer b.RUnlock() - - works := []*workv1.ManifestWork{} - for _, obj := range b.store.List() { - if work, ok := obj.(*workv1.ManifestWork); ok { - works = append(works, work) - } - } - - return works, nil -} - type baseSourceStore struct { - baseStore + store.BaseClientWatchStore[*workv1.ManifestWork] // a queue to save the received work events receivedWorks workqueue.RateLimitingInterface } -func (bs *baseSourceStore) HandleReceivedWork(action types.ResourceAction, work *workv1.ManifestWork) error { +func (bs *baseSourceStore) HandleReceivedResource(action types.ResourceAction, work *workv1.ManifestWork) error { switch action { case types.StatusModified: bs.receivedWorks.Add(work) @@ -107,10 +40,10 @@ func (bs *baseSourceStore) HandleReceivedWork(action types.ResourceAction, work // workProcessor process the received works from given work queue with a specific store type workProcessor struct { works workqueue.RateLimitingInterface - store WorkClientWatcherStore + store store.ClientWatcherStore[*workv1.ManifestWork] } -func newWorkProcessor(works workqueue.RateLimitingInterface, store WorkClientWatcherStore) *workProcessor { +func newWorkProcessor(works workqueue.RateLimitingInterface, store store.ClientWatcherStore[*workv1.ManifestWork]) *workProcessor { return &workProcessor{ works: works, store: store, @@ -163,7 +96,7 @@ func (b *workProcessor) handleWork(work *workv1.ManifestWork) error { if lastWork == nil { // the work is not found from the local cache and it has been deleted by the agent, // ignore this work. - if meta.IsStatusConditionTrue(work.Status.Conditions, common.ManifestsDeleted) { + if meta.IsStatusConditionTrue(work.Status.Conditions, common.ResourceDeleted) { return nil } @@ -175,7 +108,7 @@ func (b *workProcessor) handleWork(work *workv1.ManifestWork) error { } updatedWork := lastWork.DeepCopy() - if meta.IsStatusConditionTrue(work.Status.Conditions, common.ManifestsDeleted) { + if meta.IsStatusConditionTrue(work.Status.Conditions, common.ResourceDeleted) { updatedWork.Finalizers = []string{} // delete the work from the local cache. return b.store.Delete(updatedWork) @@ -223,7 +156,7 @@ func (b *workProcessor) handleWork(work *workv1.ManifestWork) error { } // the work has been handled by agent, we ensure a finalizer on the work - updatedWork.Finalizers = ensureFinalizers(updatedWork.Finalizers) + updatedWork.Finalizers = utils.EnsureResourceFinalizer(updatedWork.Finalizers) updatedWork.Annotations[common.CloudEventsSequenceIDAnnotationKey] = sequenceID updatedWork.Status = work.Status // update the work with status in the local cache. @@ -245,86 +178,3 @@ func (b *workProcessor) getWork(uid kubetypes.UID) *workv1.ManifestWork { return nil } - -// workWatcher implements the watch.Interface. -type workWatcher struct { - sync.RWMutex - - result chan watch.Event - done chan struct{} - stopped bool -} - -var _ watch.Interface = &workWatcher{} - -func newWorkWatcher() *workWatcher { - return &workWatcher{ - // It's easy for a consumer to add buffering via an extra - // goroutine/channel, but impossible for them to remove it, - // so nonbuffered is better. - result: make(chan watch.Event), - // If the watcher is externally stopped there is no receiver anymore - // and the send operations on the result channel, especially the - // error reporting might block forever. - // Therefore a dedicated stop channel is used to resolve this blocking. - done: make(chan struct{}), - } -} - -// ResultChan implements Interface. -func (w *workWatcher) ResultChan() <-chan watch.Event { - return w.result -} - -// Stop implements Interface. -func (w *workWatcher) Stop() { - // Call Close() exactly once by locking and setting a flag. - w.Lock() - defer w.Unlock() - // closing a closed channel always panics, therefore check before closing - select { - case <-w.done: - close(w.result) - default: - w.stopped = true - close(w.done) - } -} - -// Receive a event from the work client and sends down the result channel. -func (w *workWatcher) Receive(evt watch.Event) { - if w.isStopped() { - // this watcher is stopped, do nothing. - return - } - - if klog.V(4).Enabled() { - obj, _ := meta.Accessor(evt.Object) - klog.V(4).Infof("Receive the event %v for %v", evt.Type, obj.GetName()) - } - - w.result <- evt -} - -func (w *workWatcher) isStopped() bool { - w.RLock() - defer w.RUnlock() - - return w.stopped -} - -func ensureFinalizers(workFinalizers []string) []string { - has := false - for _, f := range workFinalizers { - if f == ManifestWorkFinalizer { - has = true - break - } - } - - if !has { - workFinalizers = append(workFinalizers, ManifestWorkFinalizer) - } - - return workFinalizers -} diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/informer.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/store/informer.go similarity index 65% rename from vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/informer.go rename to vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/store/informer.go index 4375c91a3..e0965dadd 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/informer.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/store/informer.go @@ -5,12 +5,15 @@ import ( "fmt" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/klog/v2" workv1 "open-cluster-management.io/api/work/v1" + + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" ) @@ -21,19 +24,19 @@ import ( // It is used for building ManifestWork source client. type SourceInformerWatcherStore struct { baseSourceStore - watcher *workWatcher + watcher *store.Watcher informer cache.SharedIndexInformer } -var _ WorkClientWatcherStore = &SourceInformerWatcherStore{} +var _ store.ClientWatcherStore[*workv1.ManifestWork] = &SourceInformerWatcherStore{} func NewSourceInformerWatcherStore(ctx context.Context) *SourceInformerWatcherStore { s := &SourceInformerWatcherStore{ baseSourceStore: baseSourceStore{ - baseStore: baseStore{}, - receivedWorks: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "informer-watcher-store"), + BaseClientWatchStore: store.BaseClientWatchStore[*workv1.ManifestWork]{}, + receivedWorks: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "informer-watcher-store"), }, - watcher: newWorkWatcher(), + watcher: store.NewWatcher(), } // start a goroutine to process the received work events from the work queue with current store. @@ -42,23 +45,23 @@ func NewSourceInformerWatcherStore(ctx context.Context) *SourceInformerWatcherSt return s } -func (s *SourceInformerWatcherStore) Add(work *workv1.ManifestWork) error { +func (s *SourceInformerWatcherStore) Add(work runtime.Object) error { s.watcher.Receive(watch.Event{Type: watch.Added, Object: work}) return nil } -func (s *SourceInformerWatcherStore) Update(work *workv1.ManifestWork) error { +func (s *SourceInformerWatcherStore) Update(work runtime.Object) error { s.watcher.Receive(watch.Event{Type: watch.Modified, Object: work}) return nil } -func (s *SourceInformerWatcherStore) Delete(work *workv1.ManifestWork) error { +func (s *SourceInformerWatcherStore) Delete(work runtime.Object) error { s.watcher.Receive(watch.Event{Type: watch.Deleted, Object: work}) return nil } func (s *SourceInformerWatcherStore) HasInitiated() bool { - return s.initiated && s.informer.HasSynced() + return s.Initiated && s.informer.HasSynced() } func (s *SourceInformerWatcherStore) GetWatcher(namespace string, opts metav1.ListOptions) (watch.Interface, error) { @@ -71,8 +74,8 @@ func (s *SourceInformerWatcherStore) GetWatcher(namespace string, opts metav1.Li func (s *SourceInformerWatcherStore) SetInformer(informer cache.SharedIndexInformer) { s.informer = informer - s.store = informer.GetStore() - s.initiated = true + s.Store = informer.GetStore() + s.Initiated = true } // AgentInformerWatcherStore extends the baseStore. @@ -81,36 +84,21 @@ func (s *SourceInformerWatcherStore) SetInformer(informer cache.SharedIndexInfor // // It is used for building ManifestWork agent client. type AgentInformerWatcherStore struct { - baseStore - informer cache.SharedIndexInformer - watcher *workWatcher + store.AgentInformerWatcherStore[*workv1.ManifestWork] } -var _ WorkClientWatcherStore = &AgentInformerWatcherStore{} +var _ store.ClientWatcherStore[*workv1.ManifestWork] = &AgentInformerWatcherStore{} func NewAgentInformerWatcherStore() *AgentInformerWatcherStore { return &AgentInformerWatcherStore{ - baseStore: baseStore{}, - watcher: newWorkWatcher(), + AgentInformerWatcherStore: store.AgentInformerWatcherStore[*workv1.ManifestWork]{ + BaseClientWatchStore: store.BaseClientWatchStore[*workv1.ManifestWork]{}, + Watcher: store.NewWatcher(), + }, } } -func (s *AgentInformerWatcherStore) Add(work *workv1.ManifestWork) error { - s.watcher.Receive(watch.Event{Type: watch.Added, Object: work}) - return nil -} - -func (s *AgentInformerWatcherStore) Update(work *workv1.ManifestWork) error { - s.watcher.Receive(watch.Event{Type: watch.Modified, Object: work}) - return nil -} - -func (s *AgentInformerWatcherStore) Delete(work *workv1.ManifestWork) error { - s.watcher.Receive(watch.Event{Type: watch.Deleted, Object: work}) - return nil -} - -func (s *AgentInformerWatcherStore) HandleReceivedWork(action types.ResourceAction, work *workv1.ManifestWork) error { +func (s *AgentInformerWatcherStore) HandleReceivedResource(action types.ResourceAction, work *workv1.ManifestWork) error { switch action { case types.Added: return s.Add(work.DeepCopy()) @@ -154,17 +142,3 @@ func (s *AgentInformerWatcherStore) HandleReceivedWork(action types.ResourceActi return fmt.Errorf("unsupported resource action %s", action) } } - -func (s *AgentInformerWatcherStore) GetWatcher(namespace string, opts metav1.ListOptions) (watch.Interface, error) { - return s.watcher, nil -} - -func (s *AgentInformerWatcherStore) HasInitiated() bool { - return s.initiated && s.informer.HasSynced() -} - -func (s *AgentInformerWatcherStore) SetInformer(informer cache.SharedIndexInformer) { - s.informer = informer - s.store = informer.GetStore() - s.initiated = true -} diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/local.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/store/local.go similarity index 74% rename from vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/local.go rename to vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/store/local.go index 8390cb6ae..c03ba7893 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/store/local.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/store/local.go @@ -6,6 +6,7 @@ import ( "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" @@ -14,7 +15,8 @@ import ( workv1 "open-cluster-management.io/api/work/v1" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/utils" + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store" + "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/utils" ) // ListLocalWorksFunc loads the works from the local environment. @@ -25,14 +27,14 @@ type watchEvent struct { Type watch.EventType } -var _ WorkClientWatcherStore = &SourceLocalWatcherStore{} +var _ store.ClientWatcherStore[*workv1.ManifestWork] = &SourceLocalWatcherStore{} // SourceLocalWatcherStore caches the works in this local store and provide the watch ability by watch event channel. // // It is used for building ManifestWork source client. type SourceLocalWatcherStore struct { baseSourceStore - watcher *workWatcher + watcher *store.Watcher eventQueue cache.Queue } @@ -43,23 +45,23 @@ func NewSourceLocalWatcherStore(ctx context.Context, listFunc ListLocalWorksFunc return nil, err } - // A local store to cache the works - store := cache.NewStore(cache.MetaNamespaceKeyFunc) + // A local localStore to cache the works + localStore := cache.NewStore(cache.MetaNamespaceKeyFunc) for _, work := range works { - if errs := utils.Validate(work); len(errs) != 0 { + if errs := utils.ValidateWork(work); len(errs) != 0 { return nil, fmt.Errorf("%s", errs.ToAggregate().Error()) } - if err := store.Add(work.DeepCopy()); err != nil { + if err := localStore.Add(work.DeepCopy()); err != nil { return nil, err } } s := &SourceLocalWatcherStore{ baseSourceStore: baseSourceStore{ - baseStore: baseStore{ - store: store, - initiated: true, + BaseClientWatchStore: store.BaseClientWatchStore[*workv1.ManifestWork]{ + Store: localStore, + Initiated: true, }, // A queue to save the received work events, it helps us retry events @@ -67,7 +69,7 @@ func NewSourceLocalWatcherStore(ctx context.Context, listFunc ListLocalWorksFunc receivedWorks: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "local-watcher-store"), }, - watcher: newWorkWatcher(), + watcher: store.NewWatcher(), // A queue to save the work client send events, if run a client without a watcher, // it will block the client, this queue helps to resolve this blocking. @@ -92,43 +94,58 @@ func NewSourceLocalWatcherStore(ctx context.Context, listFunc ListLocalWorksFunc } // Add a work to the cache and send an event to the event queue -func (s *SourceLocalWatcherStore) Add(work *workv1.ManifestWork) error { +func (s *SourceLocalWatcherStore) Add(work runtime.Object) error { s.Lock() defer s.Unlock() - if err := s.store.Add(work); err != nil { + if err := s.Store.Add(work); err != nil { return err } - return s.eventQueue.Add(&watchEvent{Key: key(work), Type: watch.Added}) + key, err := key(work) + if err != nil { + return err + } + + return s.eventQueue.Add(&watchEvent{Key: key, Type: watch.Added}) } // Update a work in the cache and send an event to the event queue -func (s *SourceLocalWatcherStore) Update(work *workv1.ManifestWork) error { +func (s *SourceLocalWatcherStore) Update(work runtime.Object) error { s.Lock() defer s.Unlock() - if err := s.store.Update(work); err != nil { + if err := s.Store.Update(work); err != nil { return err } - return s.eventQueue.Update(&watchEvent{Key: key(work), Type: watch.Modified}) + key, err := key(work) + if err != nil { + return err + } + + return s.eventQueue.Update(&watchEvent{Key: key, Type: watch.Modified}) } // Delete a work from the cache and send an event to the event queue -func (s *SourceLocalWatcherStore) Delete(work *workv1.ManifestWork) error { +func (s *SourceLocalWatcherStore) Delete(work runtime.Object) error { s.Lock() defer s.Unlock() - if err := s.store.Delete(work); err != nil { + if err := s.Store.Delete(work); err != nil { return err } - return s.eventQueue.Update(&watchEvent{Key: key(work), Type: watch.Deleted}) + key, err := key(work) + if err != nil { + return err + } + + return s.eventQueue.Update(&watchEvent{Key: key, Type: watch.Deleted}) } func (s *SourceLocalWatcherStore) HasInitiated() bool { - return s.initiated + return s.Initiated } func (s *SourceLocalWatcherStore) GetWatcher(namespace string, opts metav1.ListOptions) (watch.Interface, error) { @@ -167,7 +184,7 @@ func (s *SourceLocalWatcherStore) processLoop() { return } - obj, exists, err := s.store.GetByKey(evt.Key) + obj, exists, err := s.Store.GetByKey(evt.Key) if err != nil { klog.Errorf("failed to get the work %s, %v", evt.Key, err) return @@ -210,6 +227,10 @@ func (s *SourceLocalWatcherStore) processLoop() { } } -func key(work *workv1.ManifestWork) string { - return work.Namespace + "/" + work.Name +func key(obj runtime.Object) (string, error) { + work, ok := obj.(*workv1.ManifestWork) + if !ok { + return "", fmt.Errorf("obj %T is not a work", obj) + } + return work.Namespace + "/" + work.Name, nil } diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/cert/cert_rotation.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/cert/cert_rotation.go index cd6a139a5..f1994f6b5 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/cert/cert_rotation.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/cert/cert_rotation.go @@ -222,24 +222,22 @@ func CachingCertificateLoader(certFile, keyFile string) func() (*tls.Certificate // And a goroutine will be started to periodically refresh client certificates for this connection. func AutoLoadTLSConfig(caFile, certFile, keyFile string, conn Connection) (*tls.Config, error) { var tlsConfig *tls.Config - if caFile != "" { - certPool, err := rootCAs(caFile) - if err != nil { - return nil, err - } - tlsConfig = &tls.Config{ - RootCAs: certPool, - MinVersion: tls.VersionTLS13, - MaxVersion: tls.VersionTLS13, - } - if certFile != "" && keyFile != "" { - // Set client certificate and key getter for tls config - tlsConfig.GetClientCertificate = func(cri *tls.CertificateRequestInfo) (*tls.Certificate, error) { - return CachingCertificateLoader(certFile, keyFile)() - } - // Start a goroutine to periodically refresh client certificates for this connection - StartClientCertRotating(tlsConfig.GetClientCertificate, conn) + certPool, err := rootCAs(caFile) + if err != nil { + return nil, err + } + tlsConfig = &tls.Config{ + RootCAs: certPool, + MinVersion: tls.VersionTLS13, + MaxVersion: tls.VersionTLS13, + } + if certFile != "" && keyFile != "" { + // Set client certificate and key getter for tls config + tlsConfig.GetClientCertificate = func(cri *tls.CertificateRequestInfo) (*tls.Certificate, error) { + return CachingCertificateLoader(certFile, keyFile)() } + // Start a goroutine to periodically refresh client certificates for this connection + StartClientCertRotating(tlsConfig.GetClientCertificate, conn) } return tlsConfig, nil diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/options.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/options.go index 051a39641..b14eb3d21 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/options.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/options.go @@ -5,6 +5,7 @@ import ( "crypto/tls" "fmt" "os" + "sync" "time" "golang.org/x/oauth2" @@ -30,7 +31,8 @@ type GRPCDialer struct { KeepAliveOptions KeepAliveOptions TLSConfig *tls.Config TokenFile string - conn *grpc.ClientConn + mu sync.Mutex // Mutex to protect the connection. + conn *grpc.ClientConn // Cached gRPC client connection. } // KeepAliveOptions holds the keepalive options for the gRPC client. @@ -43,6 +45,15 @@ type KeepAliveOptions struct { // Dial connects to the gRPC server and returns a gRPC client connection. func (d *GRPCDialer) Dial() (*grpc.ClientConn, error) { + // Return the cached connection if it exists and is ready. + // Should not return a nil or unready connection, otherwise the caller (cloudevents client) + // will not use the connection for sending and receiving events in reconnect scenarios. + // lock the connection to ensure the connnection is not created by multiple goroutines concurrently. + d.mu.Lock() + defer d.mu.Unlock() + if d.conn != nil && (d.conn.GetState() == connectivity.Connecting || d.conn.GetState() == connectivity.Ready) { + return d.conn, nil + } // Prepare gRPC dial options. dialOpts := []grpc.DialOption{} if d.KeepAliveOptions.Enable { @@ -94,6 +105,8 @@ func (d *GRPCDialer) Dial() (*grpc.ClientConn, error) { // Close closes the gRPC client connection. func (d *GRPCDialer) Close() error { + d.mu.Lock() + defer d.mu.Unlock() if d.conn != nil { return d.conn.Close() } @@ -192,10 +205,12 @@ func BuildGRPCOptionsFromFlags(configPath string) (*GRPCOptions, error) { // Set the keepalive options options.Dialer.KeepAliveOptions = keepAliveOptions - // Set up TLS configuration for the gRPC connection, the certificates will be reloaded periodically. - options.Dialer.TLSConfig, err = cert.AutoLoadTLSConfig(config.CAFile, config.ClientCertFile, config.ClientKeyFile, options.Dialer) - if err != nil { - return nil, err + if config.CAFile != "" { + // Set up TLS configuration for the gRPC connection, the certificates will be reloaded periodically. + options.Dialer.TLSConfig, err = cert.AutoLoadTLSConfig(config.CAFile, config.ClientCertFile, config.ClientKeyFile, options.Dialer) + if err != nil { + return nil, err + } } return options, nil diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/lister/lister.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/lister/lister.go deleted file mode 100644 index 0a965a4cf..000000000 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/lister/lister.go +++ /dev/null @@ -1,50 +0,0 @@ -package lister - -import ( - "fmt" - - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - workv1 "open-cluster-management.io/api/work/v1" - - "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/common" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/store" -) - -// WatcherStoreLister list the ManifestWorks from WorkClientWatcherStore -type WatcherStoreLister struct { - store store.WorkClientWatcherStore -} - -func NewWatcherStoreLister(store store.WorkClientWatcherStore) *WatcherStoreLister { - return &WatcherStoreLister{ - store: store, - } -} - -// List returns the ManifestWorks from a WorkClientWatcherStore with list options -func (l *WatcherStoreLister) List(options types.ListOptions) ([]*workv1.ManifestWork, error) { - opts := metav1.ListOptions{} - - if options.Source != types.SourceAll { - opts.LabelSelector = fmt.Sprintf("%s=%s", common.CloudEventsOriginalSourceLabelKey, options.Source) - } - - list, err := l.store.List(options.ClusterName, opts) - if err != nil { - return nil, err - } - - works := []*workv1.ManifestWork{} - for _, work := range list.Items { - cloudEventsDataType := work.Annotations[common.CloudEventsDataTypeAnnotationKey] - if cloudEventsDataType != options.CloudEventsDataType.String() { - continue - } - - works = append(works, &work) - } - - return works, nil -} diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/clientbuilder.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/clientbuilder.go deleted file mode 100644 index 72b5e4a61..000000000 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/clientbuilder.go +++ /dev/null @@ -1,253 +0,0 @@ -package work - -import ( - "context" - "fmt" - - "k8s.io/klog/v2" - - workclientset "open-cluster-management.io/api/client/work/clientset/versioned" - workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1" - workv1 "open-cluster-management.io/api/work/v1" - - "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" - "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" - agentclient "open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/client" - agentlister "open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/lister" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/internal" - sourceclient "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client" - sourcelister "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/lister" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/statushash" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/store" -) - -// ClientHolder holds a manifestwork client that implements the ManifestWorkInterface based on different configuration -// -// ClientHolder also implements the ManifestWorksGetter interface. -type ClientHolder struct { - workClientSet workclientset.Interface -} - -var _ workv1client.ManifestWorksGetter = &ClientHolder{} - -// WorkInterface returns a workclientset Interface -func (h *ClientHolder) WorkInterface() workclientset.Interface { - return h.workClientSet -} - -// ManifestWorks returns a ManifestWorkInterface -func (h *ClientHolder) ManifestWorks(namespace string) workv1client.ManifestWorkInterface { - return h.workClientSet.WorkV1().ManifestWorks(namespace) -} - -// ClientHolderBuilder builds the ClientHolder with different configuration. -type ClientHolderBuilder struct { - config any - watcherStore store.WorkClientWatcherStore - codec generic.Codec[*workv1.ManifestWork] - sourceID string - clusterName string - clientID string - resync bool -} - -// NewClientHolderBuilder returns a ClientHolderBuilder with a given configuration. -// -// Available configurations: -// - MQTTOptions (*mqtt.MQTTOptions): builds a manifestwork client based on cloudevents with MQTT -// - GRPCOptions (*grpc.GRPCOptions): builds a manifestwork client based on cloudevents with GRPC -// - KafkaOptions (*kafka.KafkaOptions): builds a manifestwork client based on cloudevents with Kafka -// -// TODO using a specified config instead of any -func NewClientHolderBuilder(config any) *ClientHolderBuilder { - return &ClientHolderBuilder{ - config: config, - resync: true, - } -} - -// WithClientID set the client ID for source/agent cloudevents client. -func (b *ClientHolderBuilder) WithClientID(clientID string) *ClientHolderBuilder { - b.clientID = clientID - return b -} - -// WithSourceID set the source ID when building a manifestwork client for a source. -func (b *ClientHolderBuilder) WithSourceID(sourceID string) *ClientHolderBuilder { - b.sourceID = sourceID - return b -} - -// WithClusterName set the managed cluster name when building a manifestwork client for an agent. -func (b *ClientHolderBuilder) WithClusterName(clusterName string) *ClientHolderBuilder { - b.clusterName = clusterName - return b -} - -// WithCodec add codec when building a manifestwork client based on cloudevents. -func (b *ClientHolderBuilder) WithCodec(codec generic.Codec[*workv1.ManifestWork]) *ClientHolderBuilder { - b.codec = codec - return b -} - -// WithWorkClientWatcherStore set the WorkClientWatcherStore. The client will use this store to caches the works and -// watch the work events. -func (b *ClientHolderBuilder) WithWorkClientWatcherStore(store store.WorkClientWatcherStore) *ClientHolderBuilder { - b.watcherStore = store - return b -} - -// WithResyncEnabled control the client resync (Default is true), if it's true, the resync happens when -// 1. after the client's store is initiated -// 2. the client reconnected -func (b *ClientHolderBuilder) WithResyncEnabled(resync bool) *ClientHolderBuilder { - b.resync = resync - return b -} - -// NewSourceClientHolder returns a ClientHolder for a source -func (b *ClientHolderBuilder) NewSourceClientHolder(ctx context.Context) (*ClientHolder, error) { - if len(b.clientID) == 0 { - return nil, fmt.Errorf("client id is required") - } - - if len(b.sourceID) == 0 { - return nil, fmt.Errorf("source id is required") - } - - if b.watcherStore == nil { - return nil, fmt.Errorf("a watcher store is required") - } - - options, err := generic.BuildCloudEventsSourceOptions(b.config, b.clientID, b.sourceID) - if err != nil { - return nil, err - } - - cloudEventsClient, err := generic.NewCloudEventSourceClient[*workv1.ManifestWork]( - ctx, - options, - sourcelister.NewWatcherStoreLister(b.watcherStore), - statushash.ManifestWorkStatusHash, - b.codec, - ) - if err != nil { - return nil, err - } - - // start to subscribe - cloudEventsClient.Subscribe(ctx, b.watcherStore.HandleReceivedWork) - - manifestWorkClient := sourceclient.NewManifestWorkSourceClient(b.sourceID, cloudEventsClient, b.watcherStore) - workClient := &internal.WorkV1ClientWrapper{ManifestWorkClient: manifestWorkClient} - workClientSet := &internal.WorkClientSetWrapper{WorkV1ClientWrapper: workClient} - - // start a go routine to receive client reconnect signal - go func() { - for { - select { - case <-ctx.Done(): - return - case <-cloudEventsClient.ReconnectedChan(): - if !b.resync { - klog.V(4).Infof("resync is disabled, do nothing") - continue - } - - // when receiving a client reconnected signal, we resync all clusters for this source - if err := cloudEventsClient.Resync(ctx, types.ClusterAll); err != nil { - klog.Errorf("failed to send resync request, %v", err) - } - } - } - }() - - if !b.resync { - return &ClientHolder{workClientSet: workClientSet}, nil - } - - // start a go routine to resync the works after this client's store is initiated - go func() { - if store.WaitForStoreInit(ctx, b.watcherStore.HasInitiated) { - if err := cloudEventsClient.Resync(ctx, types.ClusterAll); err != nil { - klog.Errorf("failed to send resync request, %v", err) - } - } - }() - - return &ClientHolder{workClientSet: workClientSet}, nil -} - -// NewAgentClientHolder returns a ClientHolder for an agent -func (b *ClientHolderBuilder) NewAgentClientHolder(ctx context.Context) (*ClientHolder, error) { - if len(b.clientID) == 0 { - return nil, fmt.Errorf("client id is required") - } - - if len(b.clusterName) == 0 { - return nil, fmt.Errorf("cluster name is required") - } - - if b.watcherStore == nil { - return nil, fmt.Errorf("watcher store is required") - } - - options, err := generic.BuildCloudEventsAgentOptions(b.config, b.clusterName, b.clientID) - if err != nil { - return nil, err - } - - cloudEventsClient, err := generic.NewCloudEventAgentClient[*workv1.ManifestWork]( - ctx, - options, - agentlister.NewWatcherStoreLister(b.watcherStore), - statushash.ManifestWorkStatusHash, - b.codec, - ) - if err != nil { - return nil, err - } - - // start to subscribe - cloudEventsClient.Subscribe(ctx, b.watcherStore.HandleReceivedWork) - - manifestWorkClient := agentclient.NewManifestWorkAgentClient(cloudEventsClient, b.watcherStore, b.clusterName) - workClient := &internal.WorkV1ClientWrapper{ManifestWorkClient: manifestWorkClient} - workClientSet := &internal.WorkClientSetWrapper{WorkV1ClientWrapper: workClient} - - // start a go routine to receive client reconnect signal - go func() { - for { - select { - case <-ctx.Done(): - return - case <-cloudEventsClient.ReconnectedChan(): - if !b.resync { - klog.V(4).Infof("resync is disabled, do nothing") - continue - } - - // when receiving a client reconnected signal, we resync all sources for this agent - // TODO after supporting multiple sources, we should only resync agent known sources - if err := cloudEventsClient.Resync(ctx, types.SourceAll); err != nil { - klog.Errorf("failed to send resync request, %v", err) - } - } - } - }() - - if !b.resync { - return &ClientHolder{workClientSet: workClientSet}, nil - } - - // start a go routine to resync the works after this client's store is initiated - go func() { - if store.WaitForStoreInit(ctx, b.watcherStore.HasInitiated) { - if err := cloudEventsClient.Resync(ctx, types.SourceAll); err != nil { - klog.Errorf("failed to send resync request, %v", err) - } - } - }() - - return &ClientHolder{workClientSet: workClientSet}, nil -} diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/lister/lister.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/lister/lister.go deleted file mode 100644 index 80e7f219c..000000000 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/lister/lister.go +++ /dev/null @@ -1,42 +0,0 @@ -package lister - -import ( - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - - workv1 "open-cluster-management.io/api/work/v1" - "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/store" -) - -// WatcherStoreLister list the ManifestWorks from the WorkClientWatcherStore. -type WatcherStoreLister struct { - store store.WorkClientWatcherStore -} - -func NewWatcherStoreLister(store store.WorkClientWatcherStore) *WatcherStoreLister { - return &WatcherStoreLister{ - store: store, - } -} - -// List returns the ManifestWorks from the WorkClientWatcherCache with list options. -func (l *WatcherStoreLister) List(options types.ListOptions) ([]*workv1.ManifestWork, error) { - list, err := l.store.List(options.ClusterName, metav1.ListOptions{}) - if err != nil { - return nil, err - } - - works := []*workv1.ManifestWork{} - for _, work := range list.Items { - // Currently, the source client only support the ManifestBundle - // TODO: when supporting multiple cloud events data types, need a way - // to known the work event data type - if options.CloudEventsDataType != payload.ManifestBundleEventDataType { - continue - } - works = append(works, &work) - } - - return works, nil -} diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/statushash/statushash.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/statushash/statushash.go deleted file mode 100644 index 008245d20..000000000 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/statushash/statushash.go +++ /dev/null @@ -1,18 +0,0 @@ -package statushash - -import ( - "crypto/sha256" - "encoding/json" - "fmt" - - workv1 "open-cluster-management.io/api/work/v1" -) - -// ManifestWorkStatusHash returns the SHA256 checksum of a ManifestWork status. -func ManifestWorkStatusHash(work *workv1.ManifestWork) (string, error) { - statusBytes, err := json.Marshal(work.Status) - if err != nil { - return "", fmt.Errorf("failed to marshal work status, %v", err) - } - return fmt.Sprintf("%x", sha256.Sum256(statusBytes)), nil -} diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/utils/utils.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/utils/utils.go deleted file mode 100644 index 6630020c4..000000000 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/utils/utils.go +++ /dev/null @@ -1,208 +0,0 @@ -package utils - -import ( - "bytes" - "encoding/json" - "fmt" - - "github.com/bwmarrin/snowflake" - jsonpatch "github.com/evanphx/json-patch" - "github.com/google/uuid" - - "k8s.io/apimachinery/pkg/api/validation" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - metav1validation "k8s.io/apimachinery/pkg/apis/meta/v1/validation" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/validation/field" - "k8s.io/client-go/tools/cache" - - workv1 "open-cluster-management.io/api/work/v1" - - "open-cluster-management.io/sdk-go/pkg/apis/work/v1/validator" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/common" -) - -// Patch applies the patch to a work with the patch type. -func Patch(patchType types.PatchType, work *workv1.ManifestWork, patchData []byte) (*workv1.ManifestWork, error) { - workData, err := json.Marshal(work) - if err != nil { - return nil, err - } - - var patchedData []byte - switch patchType { - case types.JSONPatchType: - var patchObj jsonpatch.Patch - patchObj, err = jsonpatch.DecodePatch(patchData) - if err != nil { - return nil, err - } - patchedData, err = patchObj.Apply(workData) - if err != nil { - return nil, err - } - - case types.MergePatchType: - patchedData, err = jsonpatch.MergePatch(workData, patchData) - if err != nil { - return nil, err - } - default: - return nil, fmt.Errorf("unsupported patch type: %s", patchType) - } - - patchedWork := &workv1.ManifestWork{} - if err := json.Unmarshal(patchedData, patchedWork); err != nil { - return nil, err - } - - return patchedWork, nil -} - -// UID returns a v5 UUID based on sourceID, work name and namespace to make sure it is consistent -func UID(sourceID, namespace, name string) string { - id := fmt.Sprintf("%s-%s-%s-%s", sourceID, common.ManifestWorkGR.String(), namespace, name) - return uuid.NewSHA1(uuid.NameSpaceOID, []byte(id)).String() -} - -// ListWorksWithOptions retrieves the manifestworks from store which matches the options. -func ListWorksWithOptions(store cache.Store, namespace string, opts metav1.ListOptions) ([]*workv1.ManifestWork, error) { - var err error - - labelSelector := labels.Everything() - fieldSelector := fields.Everything() - - if len(opts.LabelSelector) != 0 { - labelSelector, err = labels.Parse(opts.LabelSelector) - if err != nil { - return nil, fmt.Errorf("invalid labels selector %q: %v", opts.LabelSelector, err) - } - } - - if len(opts.FieldSelector) != 0 { - fieldSelector, err = fields.ParseSelector(opts.FieldSelector) - if err != nil { - return nil, fmt.Errorf("invalid fields selector %q: %v", opts.FieldSelector, err) - } - } - - works := []*workv1.ManifestWork{} - // list with labels - if err := cache.ListAll(store, labelSelector, func(obj interface{}) { - work, ok := obj.(*workv1.ManifestWork) - if !ok { - return - } - - if namespace != metav1.NamespaceAll && work.Namespace != namespace { - return - } - - workFieldSet := fields.Set{ - "metadata.name": work.Name, - "metadata.namespace": work.Namespace, - } - - if !fieldSelector.Matches(workFieldSet) { - return - } - - works = append(works, work) - }); err != nil { - return nil, err - } - - return works, nil -} - -func Validate(work *workv1.ManifestWork) field.ErrorList { - fldPath := field.NewPath("metadata") - errs := field.ErrorList{} - - if work.UID == "" { - errs = append(errs, field.Required(fldPath.Child("uid"), "field not set")) - } - - if work.ResourceVersion == "" { - errs = append(errs, field.Required(fldPath.Child("resourceVersion"), "field not set")) - } - - if work.Name == "" { - errs = append(errs, field.Required(fldPath.Child("name"), "field not set")) - } - - for _, msg := range validation.ValidateNamespaceName(work.Name, false) { - errs = append(errs, field.Invalid(fldPath.Child("name"), work.Name, msg)) - } - - if work.Namespace == "" { - errs = append(errs, field.Required(fldPath.Child("namespace"), "field not set")) - } - - for _, msg := range validation.ValidateNamespaceName(work.Namespace, false) { - errs = append(errs, field.Invalid(fldPath.Child("namespace"), work.Namespace, msg)) - } - - errs = append(errs, validation.ValidateAnnotations(work.Annotations, fldPath.Child("annotations"))...) - errs = append(errs, validation.ValidateFinalizers(work.Finalizers, fldPath.Child("finalizers"))...) - errs = append(errs, metav1validation.ValidateLabels(work.Labels, fldPath.Child("labels"))...) - - if err := validator.ManifestValidator.ValidateManifests(work.Spec.Workload.Manifests); err != nil { - errs = append(errs, field.Invalid(field.NewPath("spec"), "spec", err.Error())) - } - - return errs -} - -// Encode ensures the given work's manifests are encoded -func Encode(work *workv1.ManifestWork) error { - for index, manifest := range work.Spec.Workload.Manifests { - if manifest.Raw == nil { - if manifest.Object == nil { - return fmt.Errorf("the Object and Raw of the manifest[%d] for the work (%s/%s) are both `nil`", - index, work.Namespace, work.Name) - } - - var buf bytes.Buffer - if err := unstructured.UnstructuredJSONScheme.Encode(manifest.Object, &buf); err != nil { - return err - } - - work.Spec.Workload.Manifests[index].Raw = buf.Bytes() - } - } - - return nil -} - -// CompareSnowflakeSequenceIDs compares two snowflake sequence IDs. -// Returns true if the current ID is greater than the last. -// If the last sequence ID is empty, then the current is greater. -func CompareSnowflakeSequenceIDs(last, current string) (bool, error) { - if current != "" && last == "" { - return true, nil - } - - lastSID, err := snowflake.ParseString(last) - if err != nil { - return false, fmt.Errorf("unable to parse last sequence ID: %s, %v", last, err) - } - - currentSID, err := snowflake.ParseString(current) - if err != nil { - return false, fmt.Errorf("unable to parse current sequence ID: %s %v", current, err) - } - - if currentSID.Node() != lastSID.Node() { - return false, fmt.Errorf("sequence IDs (%s,%s) are not from the same node", last, current) - } - - if currentSID.Time() != lastSID.Time() { - return currentSID.Time() > lastSID.Time(), nil - } - - return currentSID.Step() > lastSID.Step(), nil -}