diff --git a/go.mod b/go.mod index 45ee470f6..3ae062bdc 100644 --- a/go.mod +++ b/go.mod @@ -35,9 +35,9 @@ require ( k8s.io/klog/v2 v2.130.1 k8s.io/kube-aggregator v0.31.4 k8s.io/utils v0.0.0-20240921022957-49e7df575cb6 - open-cluster-management.io/addon-framework v0.11.1-0.20250303151103-b2865de5c39b - open-cluster-management.io/api v0.15.1-0.20250226073118-8c9793267c9e - open-cluster-management.io/sdk-go v0.15.1-0.20241125015855-1536c3970f8f + open-cluster-management.io/addon-framework v0.12.0 + open-cluster-management.io/api v0.16.1 + open-cluster-management.io/sdk-go v0.16.0 sigs.k8s.io/cluster-inventory-api v0.0.0-20240730014211-ef0154379848 sigs.k8s.io/controller-runtime v0.19.3 sigs.k8s.io/kube-storage-version-migrator v0.0.6-0.20230721195810-5c8923c5ff96 diff --git a/go.sum b/go.sum index a199ea7ae..0ed7e3865 100644 --- a/go.sum +++ b/go.sum @@ -487,12 +487,12 @@ k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 h1:BZqlfIlq5YbRMFko6/PM7F k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340/go.mod h1:yD4MZYeKMBwQKVht279WycxKyM84kkAx2DPrTXaeb98= k8s.io/utils v0.0.0-20240921022957-49e7df575cb6 h1:MDF6h2H/h4tbzmtIKTuctcwZmY0tY9mD9fNT47QO6HI= k8s.io/utils v0.0.0-20240921022957-49e7df575cb6/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= -open-cluster-management.io/addon-framework v0.11.1-0.20250303151103-b2865de5c39b h1:vEemE32F9iiVvKfFsFEdiyGdDnSb9Cp9Dch2Jkc4Nfg= -open-cluster-management.io/addon-framework v0.11.1-0.20250303151103-b2865de5c39b/go.mod h1:3+UAkReHIEyqsDuq0Iv5w+ZRgZr254iehYV/JR2j038= -open-cluster-management.io/api v0.15.1-0.20250226073118-8c9793267c9e h1:4iQneGfxartfFSR+IHZRrjEuwtRpiHyKQ15Kd33YCVk= -open-cluster-management.io/api v0.15.1-0.20250226073118-8c9793267c9e/go.mod h1:9erZEWEn4bEqh0nIX2wA7f/s3KCuFycQdBrPrRzi0QM= -open-cluster-management.io/sdk-go v0.15.1-0.20241125015855-1536c3970f8f h1:zeC7QrFNarfK2zY6jGtd+mX+yDrQQmnH/J8A7n5Nh38= -open-cluster-management.io/sdk-go v0.15.1-0.20241125015855-1536c3970f8f/go.mod h1:fi5WBsbC5K3txKb8eRLuP0Sim/Oqz/PHX18skAEyjiA= +open-cluster-management.io/addon-framework v0.12.0 h1:5j7mpyk2ij0SLUZkwWk0KkNTWtsid2w7BIHmhm0Ecok= +open-cluster-management.io/addon-framework v0.12.0/go.mod h1:eReMWXrEHqtilwz5wzEpUrWw9Vfz0HJCH9pi3gOTZns= +open-cluster-management.io/api v0.16.1 h1:mS+4UGxHLPQd7CRM0gdFQdVaz139Lo2bkLfqSE0CDNU= +open-cluster-management.io/api v0.16.1/go.mod h1:9erZEWEn4bEqh0nIX2wA7f/s3KCuFycQdBrPrRzi0QM= +open-cluster-management.io/sdk-go v0.16.0 h1:Ui1jerkeLaNaJPu47xjOJ3lh+rJQgeJHD25ViQMzAMs= +open-cluster-management.io/sdk-go v0.16.0/go.mod h1:TyOjZC5YxyM5BRNgwTmLuTbHXX6xXqzYBXllrfoVp9w= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.3 h1:2770sDpzrjjsAtVhSeUFseziht227YAWYHLGNM8QPwY= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.3/go.mod h1:Ve9uj1L+deCXFrPOk1LpFXqTg7LCFzFso6PA48q/XZw= sigs.k8s.io/cluster-inventory-api v0.0.0-20240730014211-ef0154379848 h1:WYPi2PdQyZwZkHG648v2jQl6deyCgyjJ0fkLYgUJ618= diff --git a/pkg/work/hub/manager.go b/pkg/work/hub/manager.go index 3b451d1b1..195cac660 100644 --- a/pkg/work/hub/manager.go +++ b/pkg/work/hub/manager.go @@ -99,7 +99,7 @@ func (c *WorkHubManagerConfig) RunWorkHubManager(ctx context.Context, controller clientHolder, err := work.NewClientHolderBuilder(config). WithClientID(c.workOptions.CloudEventsClientID). WithSourceID(sourceID). - WithCodecs(codec.NewManifestBundleCodec()). + WithCodec(codec.NewManifestBundleCodec()). WithWorkClientWatcherStore(watcherStore). NewSourceClientHolder(ctx) if err != nil { diff --git a/pkg/work/spoke/options.go b/pkg/work/spoke/options.go index d6c6ec8ef..64d3d4afa 100644 --- a/pkg/work/spoke/options.go +++ b/pkg/work/spoke/options.go @@ -7,9 +7,6 @@ import ( ) const ( - manifestBundleCodecName = "manifestbundle" - manifestCodecName = "manifest" - defaultUserAgent = "work-agent" ) diff --git a/pkg/work/spoke/spokeagent.go b/pkg/work/spoke/spokeagent.go index 117b63491..2eae363dc 100644 --- a/pkg/work/spoke/spokeagent.go +++ b/pkg/work/spoke/spokeagent.go @@ -18,7 +18,6 @@ import ( workinformers "open-cluster-management.io/api/client/work/informers/externalversions" workv1informers "open-cluster-management.io/api/client/work/informers/externalversions/work/v1" ocmfeature "open-cluster-management.io/api/feature" - workv1 "open-cluster-management.io/api/work/v1" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" cloudeventswork "open-cluster-management.io/sdk-go/pkg/cloudevents/work" "open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/codec" @@ -194,20 +193,6 @@ func (o *WorkAgentConfig) RunWorkloadAgent(ctx context.Context, controllerContex return nil } -func buildCodecs(codecNames []string, restMapper meta.RESTMapper) []generic.Codec[*workv1.ManifestWork] { - var codecs []generic.Codec[*workv1.ManifestWork] - for _, name := range codecNames { - if name == manifestBundleCodecName { - codecs = append(codecs, codec.NewManifestBundleCodec()) - } - - if name == manifestCodecName { - codecs = append(codecs, codec.NewManifestCodec(restMapper)) - } - } - return codecs -} - func (o *WorkAgentConfig) newWorkClientAndInformer( ctx context.Context, restMapper meta.RESTMapper, @@ -244,7 +229,7 @@ func (o *WorkAgentConfig) newWorkClientAndInformer( clientHolder, err := cloudeventswork.NewClientHolderBuilder(config). WithClientID(o.workOptions.CloudEventsClientID). WithClusterName(o.agentOptions.SpokeClusterName). - WithCodecs(buildCodecs(o.workOptions.CloudEventsClientCodecs, restMapper)...). + WithCodec(codec.NewManifestBundleCodec()). WithWorkClientWatcherStore(watcherStore). NewAgentClientHolder(ctx) if err != nil { diff --git a/test/integration/work/suite_test.go b/test/integration/work/suite_test.go index 69d4868d4..3d5a06d1f 100644 --- a/test/integration/work/suite_test.go +++ b/test/integration/work/suite_test.go @@ -146,7 +146,7 @@ var _ = ginkgo.BeforeSuite(func() { sourceClient, err := work.NewClientHolderBuilder(util.NewMQTTSourceOptions(sourceID)). WithClientID(fmt.Sprintf("%s-%s", sourceID, rand.String(5))). WithSourceID(sourceID). - WithCodecs(sourcecodec.NewManifestBundleCodec()). + WithCodec(sourcecodec.NewManifestBundleCodec()). WithWorkClientWatcherStore(watcherStore). NewSourceClientHolder(envCtx) gomega.Expect(err).ToNot(gomega.HaveOccurred()) diff --git a/vendor/modules.txt b/vendor/modules.txt index 556341cfa..611745f1b 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1692,7 +1692,7 @@ k8s.io/utils/pointer k8s.io/utils/ptr k8s.io/utils/strings/slices k8s.io/utils/trace -# open-cluster-management.io/addon-framework v0.11.1-0.20250303151103-b2865de5c39b +# open-cluster-management.io/addon-framework v0.12.0 ## explicit; go 1.22.0 open-cluster-management.io/addon-framework/pkg/addonfactory open-cluster-management.io/addon-framework/pkg/addonmanager @@ -1708,7 +1708,7 @@ open-cluster-management.io/addon-framework/pkg/agent open-cluster-management.io/addon-framework/pkg/assets open-cluster-management.io/addon-framework/pkg/index open-cluster-management.io/addon-framework/pkg/utils -# open-cluster-management.io/api v0.15.1-0.20250226073118-8c9793267c9e +# open-cluster-management.io/api v0.16.1 ## explicit; go 1.22.0 open-cluster-management.io/api/addon/v1alpha1 open-cluster-management.io/api/client/addon/clientset/versioned @@ -1774,12 +1774,10 @@ open-cluster-management.io/api/cluster/v1beta2 open-cluster-management.io/api/crdsv1beta1 open-cluster-management.io/api/feature open-cluster-management.io/api/operator/v1 -open-cluster-management.io/api/utils/work/v1/utils open-cluster-management.io/api/utils/work/v1/workapplier -open-cluster-management.io/api/utils/work/v1/workvalidator open-cluster-management.io/api/work/v1 open-cluster-management.io/api/work/v1alpha1 -# open-cluster-management.io/sdk-go v0.15.1-0.20241125015855-1536c3970f8f +# open-cluster-management.io/sdk-go v0.16.0 ## explicit; go 1.22.0 open-cluster-management.io/sdk-go/pkg/apis/cluster/v1alpha1 open-cluster-management.io/sdk-go/pkg/apis/cluster/v1beta1 @@ -1812,6 +1810,7 @@ open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/lister +open-cluster-management.io/sdk-go/pkg/cloudevents/work/statushash open-cluster-management.io/sdk-go/pkg/cloudevents/work/store open-cluster-management.io/sdk-go/pkg/cloudevents/work/utils open-cluster-management.io/sdk-go/pkg/helpers diff --git a/vendor/open-cluster-management.io/api/utils/work/v1/utils/utils.go b/vendor/open-cluster-management.io/api/utils/work/v1/utils/utils.go deleted file mode 100644 index 32b4557c1..000000000 --- a/vendor/open-cluster-management.io/api/utils/work/v1/utils/utils.go +++ /dev/null @@ -1,76 +0,0 @@ -package utils - -import ( - "fmt" - "reflect" - - "k8s.io/apimachinery/pkg/api/meta" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/client-go/kubernetes/scheme" - - workv1 "open-cluster-management.io/api/work/v1" -) - -var genericScheme = runtime.NewScheme() - -// BuildResourceMeta builds manifest resource meta for the object -func BuildResourceMeta( - index int, - object runtime.Object, - restMapper meta.RESTMapper) (workv1.ManifestResourceMeta, schema.GroupVersionResource, error) { - resourceMeta := workv1.ManifestResourceMeta{ - Ordinal: int32(index), - } - - if object == nil || reflect.ValueOf(object).IsNil() { - return resourceMeta, schema.GroupVersionResource{}, nil - } - - // set gvk - gvk, err := GuessObjectGroupVersionKind(object) - if err != nil { - return resourceMeta, schema.GroupVersionResource{}, err - } - resourceMeta.Group = gvk.Group - resourceMeta.Version = gvk.Version - resourceMeta.Kind = gvk.Kind - - // set namespace/name - if accessor, e := meta.Accessor(object); e != nil { - err = fmt.Errorf("cannot access metadata of %v: %w", object, e) - } else { - resourceMeta.Namespace = accessor.GetNamespace() - resourceMeta.Name = accessor.GetName() - } - - // set resource - if restMapper == nil { - return resourceMeta, schema.GroupVersionResource{}, err - } - mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version) - if err != nil { - return resourceMeta, schema.GroupVersionResource{}, fmt.Errorf("the server doesn't have a resource type %q", gvk.Kind) - } - - resourceMeta.Resource = mapping.Resource.Resource - return resourceMeta, mapping.Resource, err -} - -// GuessObjectGroupVersionKind returns GVK for the passed runtime object. -func GuessObjectGroupVersionKind(object runtime.Object) (*schema.GroupVersionKind, error) { - if gvk := object.GetObjectKind().GroupVersionKind(); len(gvk.Kind) > 0 { - return &gvk, nil - } - - if kinds, _, _ := scheme.Scheme.ObjectKinds(object); len(kinds) > 0 { - return &kinds[0], nil - } - - // otherwise fall back to genericScheme - if kinds, _, _ := genericScheme.ObjectKinds(object); len(kinds) > 0 { - return &kinds[0], nil - } - - return nil, fmt.Errorf("cannot get gvk of %v", object) -} diff --git a/vendor/open-cluster-management.io/api/utils/work/v1/workvalidator/validator.go b/vendor/open-cluster-management.io/api/utils/work/v1/workvalidator/validator.go deleted file mode 100644 index 04559069c..000000000 --- a/vendor/open-cluster-management.io/api/utils/work/v1/workvalidator/validator.go +++ /dev/null @@ -1,64 +0,0 @@ -package workvalidator - -import ( - "fmt" - - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - - workv1 "open-cluster-management.io/api/work/v1" -) - -type Validator struct { - limit int -} - -var ManifestValidator = &Validator{limit: 500 * 1024} // the default manifest limit is 500k. - -func (m *Validator) WithLimit(limit int) { - m.limit = limit -} - -func (m *Validator) ValidateManifests(manifests []workv1.Manifest) error { - if len(manifests) == 0 { - return errors.NewBadRequest("Workload manifests should not be empty") - } - - totalSize := 0 - for _, manifest := range manifests { - totalSize = totalSize + manifest.Size() - } - - if totalSize > m.limit { - return fmt.Errorf("the size of manifests is %v bytes which exceeds the %v limit", totalSize, m.limit) - } - - for _, manifest := range manifests { - err := validateManifest(manifest.Raw) - if err != nil { - return err - } - } - - return nil -} - -func validateManifest(manifest []byte) error { - // If the manifest cannot be decoded, return err - unstructuredObj := &unstructured.Unstructured{} - err := unstructuredObj.UnmarshalJSON(manifest) - if err != nil { - return err - } - - // The object must have name specified, generateName is not allowed in manifestwork - if unstructuredObj.GetName() == "" { - return fmt.Errorf("name must be set in manifest") - } - - if unstructuredObj.GetGenerateName() != "" { - return fmt.Errorf("generateName must not be set in manifest") - } - - return nil -} diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/agentclient.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/agentclient.go index 47e78f6a5..53e9c5b75 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/agentclient.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/agentclient.go @@ -7,6 +7,7 @@ import ( "time" cloudevents "github.com/cloudevents/sdk-go/v2" + cloudeventstypes "github.com/cloudevents/sdk-go/v2/types" "k8s.io/klog/v2" @@ -22,7 +23,7 @@ import ( type CloudEventAgentClient[T ResourceObject] struct { *baseClient lister Lister[T] - codecs map[types.CloudEventsDataType]Codec[T] + codec Codec[T] statusHashGetter StatusHashGetter[T] agentID string clusterName string @@ -34,34 +35,30 @@ type CloudEventAgentClient[T ResourceObject] struct { // protocols for sending/receiving the cloudevents. // - lister gets the resources from a cache/store of an agent. // - statusHashGetter calculates the resource status hash. -// - codecs is list of codecs for encoding/decoding a resource objet/cloudevent to/from a cloudevent/resource objet. +// - codec is used to encode/decode a resource objet/cloudevent to/from a cloudevent/resource objet. func NewCloudEventAgentClient[T ResourceObject]( ctx context.Context, agentOptions *options.CloudEventsAgentOptions, lister Lister[T], statusHashGetter StatusHashGetter[T], - codecs ...Codec[T], + codec Codec[T], ) (*CloudEventAgentClient[T], error) { baseClient := &baseClient{ clientID: agentOptions.AgentID, cloudEventsOptions: agentOptions.CloudEventsOptions, cloudEventsRateLimiter: NewRateLimiter(agentOptions.EventRateLimit), reconnectedChan: make(chan struct{}), + dataType: codec.EventDataType(), } if err := baseClient.connect(ctx); err != nil { return nil, err } - evtCodes := make(map[types.CloudEventsDataType]Codec[T]) - for _, codec := range codecs { - evtCodes[codec.EventDataType()] = codec - } - return &CloudEventAgentClient[T]{ baseClient: baseClient, lister: lister, - codecs: evtCodes, + codec: codec, statusHashGetter: statusHashGetter, agentID: agentOptions.AgentID, clusterName: agentOptions.ClusterName, @@ -76,64 +73,60 @@ func (c *CloudEventAgentClient[T]) ReconnectedChan() <-chan struct{} { // Resync the resources spec by sending a spec resync request from the current to the given source. func (c *CloudEventAgentClient[T]) Resync(ctx context.Context, source string) error { - // only resync the resources whose event data type is registered - for eventDataType := range c.codecs { - // list the resource objects that are maintained by the current agent with the given source - options := types.ListOptions{Source: source, ClusterName: c.clusterName, CloudEventsDataType: eventDataType} - objs, err := c.lister.List(options) + // list the resource objects that are maintained by the current agent with the given source + options := types.ListOptions{Source: source, ClusterName: c.clusterName, CloudEventsDataType: c.codec.EventDataType()} + objs, err := c.lister.List(options) + if err != nil { + return err + } + + resources := &payload.ResourceVersionList{Versions: make([]payload.ResourceVersion, len(objs))} + for i, obj := range objs { + resourceVersion, err := strconv.ParseInt(obj.GetResourceVersion(), 10, 64) if err != nil { return err } - resources := &payload.ResourceVersionList{Versions: make([]payload.ResourceVersion, len(objs))} - for i, obj := range objs { - resourceVersion, err := strconv.ParseInt(obj.GetResourceVersion(), 10, 64) - if err != nil { - return err - } - - resources.Versions[i] = payload.ResourceVersion{ - ResourceID: string(obj.GetUID()), - ResourceVersion: resourceVersion, - } + resources.Versions[i] = payload.ResourceVersion{ + ResourceID: string(obj.GetUID()), + ResourceVersion: resourceVersion, } - - eventType := types.CloudEventsType{ - CloudEventsDataType: eventDataType, - SubResource: types.SubResourceSpec, - Action: types.ResyncRequestAction, - } - - evt := types.NewEventBuilder(c.agentID, eventType). - WithOriginalSource(source). - WithClusterName(c.clusterName). - NewEvent() - if err := evt.SetData(cloudevents.ApplicationJSON, resources); err != nil { - return fmt.Errorf("failed to set data to cloud event: %v", err) - } - - if err := c.publish(ctx, evt); err != nil { - return err - } - - increaseCloudEventsSentCounter(evt.Source(), c.clusterName, eventDataType.String()) } + eventType := types.CloudEventsType{ + CloudEventsDataType: c.codec.EventDataType(), + SubResource: types.SubResourceSpec, + Action: types.ResyncRequestAction, + } + + evt := types.NewEventBuilder(c.agentID, eventType). + WithOriginalSource(source). + WithClusterName(c.clusterName). + NewEvent() + if err := evt.SetData(cloudevents.ApplicationJSON, resources); err != nil { + return fmt.Errorf("failed to set data to cloud event: %v", err) + } + + if err := c.publish(ctx, evt); err != nil { + return err + } + + increaseCloudEventsSentCounter(evt.Source(), source, c.clusterName, c.codec.EventDataType().String(), string(eventType.SubResource), string(eventType.Action)) + return nil } // Publish a resource status from an agent to a source. func (c *CloudEventAgentClient[T]) Publish(ctx context.Context, eventType types.CloudEventsType, obj T) error { - codec, ok := c.codecs[eventType.CloudEventsDataType] - if !ok { - return fmt.Errorf("failed to find a codec for event %s", eventType.CloudEventsDataType) + if eventType.CloudEventsDataType != c.codec.EventDataType() { + return fmt.Errorf("unsupported cloudevent data type %s", eventType.CloudEventsDataType) } if eventType.SubResource != types.SubResourceStatus { return fmt.Errorf("unsupported event eventType %s", eventType) } - evt, err := codec.Encode(c.agentID, eventType, obj) + evt, err := c.codec.Encode(c.agentID, eventType, obj) if err != nil { return err } @@ -142,7 +135,8 @@ func (c *CloudEventAgentClient[T]) Publish(ctx context.Context, eventType types. return err } - increaseCloudEventsSentCounter(evt.Source(), c.clusterName, eventType.CloudEventsDataType.String()) + originalSource, _ := cloudeventstypes.ToString(evt.Context.GetExtensions()[types.ExtensionOriginalSource]) + increaseCloudEventsSentCounter(evt.Source(), originalSource, c.clusterName, eventType.CloudEventsDataType.String(), string(eventType.SubResource), string(eventType.Action)) return nil } @@ -163,7 +157,7 @@ func (c *CloudEventAgentClient[T]) receive(ctx context.Context, evt cloudevents. return } - increaseCloudEventsReceivedCounter(evt.Source(), c.clusterName, eventType.CloudEventsDataType.String()) + increaseCloudEventsReceivedCounter(evt.Source(), c.clusterName, eventType.CloudEventsDataType.String(), string(eventType.SubResource), string(eventType.Action)) if eventType.Action == types.ResyncRequestAction { if eventType.SubResource != types.SubResourceStatus { @@ -185,13 +179,23 @@ func (c *CloudEventAgentClient[T]) receive(ctx context.Context, evt cloudevents. return } - codec, ok := c.codecs[eventType.CloudEventsDataType] - if !ok { - klog.Warningf("failed to find the codec for event %s, ignore", eventType.CloudEventsDataType) + evtExtensions := evt.Context.GetExtensions() + clusterName, err := cloudeventstypes.ToString(evtExtensions[types.ExtensionClusterName]) + if err != nil { + klog.Errorf("failed to get clustername extension: %v", err) + return + } + if clusterName != c.clusterName { + klog.V(4).Infof("event clustername %s and agent clustername %s do not match, ignore", clusterName, c.clusterName) return } - obj, err := codec.Decode(&evt) + if eventType.CloudEventsDataType != c.codec.EventDataType() { + klog.Warningf("unsupported event data type %s, ignore", eventType.CloudEventsDataType) + return + } + + obj, err := c.codec.Decode(&evt) if err != nil { klog.Errorf("failed to decode spec, %v", err) return diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/baseclient.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/baseclient.go index b27bfeb17..67ed73aa7 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/baseclient.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/baseclient.go @@ -15,6 +15,7 @@ import ( "k8s.io/utils/clock" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" ) const ( @@ -43,6 +44,7 @@ type baseClient struct { receiverChan chan int reconnectedChan chan struct{} clientReady bool + dataType types.CloudEventsDataType } func (c *baseClient) connect(ctx context.Context) error { @@ -114,8 +116,8 @@ func (c *baseClient) publish(ctx context.Context, evt cloudevents.Event) error { latency := time.Since(now) if latency > longThrottleLatency { - klog.Warningf(fmt.Sprintf("Waited for %v due to client-side throttling, not priority and fairness, request: %s", - latency, evt)) + klog.Warningf("Waited for %v due to client-side throttling, not priority and fairness, request: %s", + latency, evt.Context) } sendingCtx, err := c.cloudEventsOptions.WithContext(ctx, evt.Context) @@ -127,7 +129,8 @@ func (c *baseClient) publish(ctx context.Context, evt cloudevents.Event) error { return fmt.Errorf("the cloudevents client is not ready") } - klog.V(4).Infof("Sending event: %v\n%s", sendingCtx, evt) + klog.V(4).Infof("Sending event: %v\n%s", sendingCtx, evt.Context) + klog.V(5).Infof("Sending event: evt=%s", evt) if result := c.cloudEventsClient.Send(sendingCtx, evt); cloudevents.IsUndelivered(result) { return fmt.Errorf("failed to send event %s, %v", evt.Context, result) } @@ -156,7 +159,9 @@ func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) { if startReceiving { go func() { if err := c.cloudEventsClient.StartReceiver(receiverCtx, func(evt cloudevents.Event) { - klog.V(4).Infof("Received event: %s", evt) + klog.V(4).Infof("Received event: %s", evt.Context) + klog.V(5).Infof("Received event: evt=%s", evt) + receive(receiverCtx, evt) }); err != nil { runtime.HandleError(fmt.Errorf("failed to receive cloudevents, %v", err)) @@ -222,7 +227,7 @@ func (c *baseClient) setClientReady(ready bool) { func (c *baseClient) newCloudEventsClient(ctx context.Context) (cloudevents.Client, error) { var err error - c.cloudEventsProtocol, err = c.cloudEventsOptions.Protocol(ctx) + c.cloudEventsProtocol, err = c.cloudEventsOptions.Protocol(ctx, c.dataType) if err != nil { return nil, err } diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/metrics_collector.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/metrics_collector.go index 75790a890..f972621af 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/metrics_collector.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/metrics_collector.go @@ -15,16 +15,40 @@ const ( // Names of the labels added to metrics: const ( - metricsSourceLabel = "source" - metricsClusterLabel = "cluster" - metricsDataTypeLabel = "type" - metricsClientIDLabel = "client_id" - metricsWorkActionLabel = "action" - metricsWorkCodeLabel = "code" + metricsSourceLabel = "source" + metricsOriginalSourceLabel = "original_source" + metricsClusterLabel = "cluster" + metricsDataTypeLabel = "type" + metricsSubResourceLabel = "subresource" + metricsActionLabel = "action" + metricsClientIDLabel = "client_id" + metricsWorkActionLabel = "action" + metricsWorkCodeLabel = "code" ) -// cloudeventsMetricsLabels - Array of labels added to cloudevents metrics: -var cloudeventsMetricsLabels = []string{ +const noneOriginalSource = "none" + +// cloudeventsReceivedMetricsLabels - Array of labels added to cloudevents received metrics: +var cloudeventsReceivedMetricsLabels = []string{ + metricsSourceLabel, // source + metricsClusterLabel, // cluster + metricsDataTypeLabel, // data type, e.g. manifests, manifestbundles + metricsSubResourceLabel, // subresource, eg, spec or status + metricsActionLabel, // action, eg, create, update, delete, resync_request, resync_response +} + +// cloudeventsSentMetricsLabels - Array of labels added to cloudevents sent metrics: +var cloudeventsSentMetricsLabels = []string{ + metricsSourceLabel, // source + metricsOriginalSourceLabel, // original source, if no, set to "none" + metricsClusterLabel, // cluster + metricsDataTypeLabel, // data type, e.g. manifests, manifestbundles + metricsSubResourceLabel, // subresource, eg, spec or status + metricsActionLabel, // action, eg, create, update, delete, resync_request, resync_response +} + +// cloudeventsResyncMetricsLabels - Array of labels added to cloudevents resync metrics: +var cloudeventsResyncMetricsLabels = []string{ metricsSourceLabel, // source metricsClusterLabel, // cluster metricsDataTypeLabel, // data type, e.g. manifests, manifestbundles @@ -53,28 +77,32 @@ const ( // The cloudevents received counter metric is a counter with a base metric name of 'received_total' // and a help string of 'The total number of received CloudEvents.' -// For example, 2 CloudEvents received from source1 to agent on cluster1 with data type manifests would result in the following metrics: -// cloudevents_received_total{source="source1",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifests"} 2 +// For example, 2 CloudEvents received from source1 to agent on cluster1 with data type manifests, one for resource create, +// another for resource updatewould result in the following metrics: +// cloudevents_received_total{source="source1",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifests",subresource="spec",action="create"} 1 +// cloudevents_received_total{source="source1",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifests",subresource="spec",action="update"} 1 var cloudeventsReceivedCounterMetric = prometheus.NewCounterVec( prometheus.CounterOpts{ Subsystem: cloudeventsMetricsSubsystem, Name: receivedCounterMetric, Help: "The total number of received CloudEvents.", }, - cloudeventsMetricsLabels, + cloudeventsReceivedMetricsLabels, ) // The cloudevents sent counter metric is a counter with a base metric name of 'sent_total' // and a help string of 'The total number of sent CloudEvents.' -// For example, 2 CloudEvents sent from agent on cluster1 to source1 with data type manifestbundles would result in the following metrics: -// cloudevents_sent_total{source="cluster1-work-agent",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifestbundles"} 2 +// For example, 1 cloudevent sent from source1 with data type manifestbundles for resource spec create (original source is empty), +// and 2 CloudEvents sent from agent on cluster1 back to source1 for resource status update would result in the following metrics: +// cloudevents_sent_total{source="source1",original_source="none",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifestbundles",subresource="spec",action="create"} 1 +// cloudevents_sent_total{source="cluster1-work-agent",original_source="source1",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifestbundles",subresource="status",action="update"} 2 var cloudeventsSentCounterMetric = prometheus.NewCounterVec( prometheus.CounterOpts{ Subsystem: cloudeventsMetricsSubsystem, Name: sentCounterMetric, Help: "The total number of sent CloudEvents.", }, - cloudeventsMetricsLabels, + cloudeventsSentMetricsLabels, ) // The resource spec resync duration metric is a histogram with a base metric name of 'resource_spec_resync_duration_second' @@ -108,7 +136,7 @@ var resourceSpecResyncDurationMetric = prometheus.NewHistogramVec( 30.0, }, }, - cloudeventsMetricsLabels, + cloudeventsResyncMetricsLabels, ) // The resource status resync duration metric is a histogram with a base metric name of 'resource_status_resync_duration_second' @@ -142,7 +170,7 @@ var resourceStatusResyncDurationMetric = prometheus.NewHistogramVec( 30.0, }, }, - cloudeventsMetricsLabels, + cloudeventsResyncMetricsLabels, ) // The cloudevents client reconnected counter metric is a counter with a base metric name of 'client_reconnected_total' @@ -198,21 +226,29 @@ func ResetCloudEventsMetrics() { } // increaseCloudEventsReceivedCounter increases the cloudevents sent counter metric: -func increaseCloudEventsReceivedCounter(source, cluster, dataType string) { +func increaseCloudEventsReceivedCounter(source, cluster, dataType, subresource, action string) { labels := prometheus.Labels{ - metricsSourceLabel: source, - metricsClusterLabel: cluster, - metricsDataTypeLabel: dataType, + metricsSourceLabel: source, + metricsClusterLabel: cluster, + metricsDataTypeLabel: dataType, + metricsSubResourceLabel: subresource, + metricsActionLabel: action, } cloudeventsReceivedCounterMetric.With(labels).Inc() } // increaseCloudEventsSentCounter increases the cloudevents sent counter metric: -func increaseCloudEventsSentCounter(source, cluster, dataType string) { +func increaseCloudEventsSentCounter(source, originalSource, cluster, dataType, subresource, action string) { + if originalSource == "" { + originalSource = noneOriginalSource + } labels := prometheus.Labels{ - metricsSourceLabel: source, - metricsClusterLabel: cluster, - metricsDataTypeLabel: dataType, + metricsSourceLabel: source, + metricsOriginalSourceLabel: originalSource, + metricsClusterLabel: cluster, + metricsDataTypeLabel: dataType, + metricsSubResourceLabel: subresource, + metricsActionLabel: action, } cloudeventsSentCounterMetric.With(labels).Inc() } diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/cert/cert_rotation.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/cert/cert_rotation.go index 10a98b6bf..cd6a139a5 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/cert/cert_rotation.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/cert/cert_rotation.go @@ -4,7 +4,9 @@ import ( "bytes" "context" "crypto/tls" + "crypto/x509" "fmt" + "os" "reflect" "sync" "time" @@ -212,3 +214,59 @@ func CachingCertificateLoader(certFile, keyFile string) func() (*tls.Certificate return current.cert, current.err } } + +// AutoLoadTLSConfig returns a TLS configuration for the given CA, client certificate, key files +// that can be used to establish a TLS connection. +// If CA is not provided, the system cert pool will be used. +// If client certificate and key are provided, they will be used for client authentication. +// And a goroutine will be started to periodically refresh client certificates for this connection. +func AutoLoadTLSConfig(caFile, certFile, keyFile string, conn Connection) (*tls.Config, error) { + var tlsConfig *tls.Config + if caFile != "" { + certPool, err := rootCAs(caFile) + if err != nil { + return nil, err + } + tlsConfig = &tls.Config{ + RootCAs: certPool, + MinVersion: tls.VersionTLS13, + MaxVersion: tls.VersionTLS13, + } + if certFile != "" && keyFile != "" { + // Set client certificate and key getter for tls config + tlsConfig.GetClientCertificate = func(cri *tls.CertificateRequestInfo) (*tls.Certificate, error) { + return CachingCertificateLoader(certFile, keyFile)() + } + // Start a goroutine to periodically refresh client certificates for this connection + StartClientCertRotating(tlsConfig.GetClientCertificate, conn) + } + } + + return tlsConfig, nil +} + +// rootCAs returns a cert pool to verify the TLS connection. +// If the caFile is not provided, the default system certificate pool will be returned +// If the caFile is provided, the provided CA will be appended to the system certificate pool +func rootCAs(caFile string) (*x509.CertPool, error) { + certPool, err := x509.SystemCertPool() + if err != nil { + return nil, err + } + + if len(caFile) == 0 { + klog.Warningf("CA file is not provided, TLS connection will be verified with the system cert pool") + return certPool, nil + } + + caPEM, err := os.ReadFile(caFile) + if err != nil { + return nil, err + } + + if ok := certPool.AppendCertsFromPEM(caPEM); !ok { + return nil, fmt.Errorf("invalid CA %s", caFile) + } + + return certPool, nil +} diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/agentoptions.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/agentoptions.go index fad712f8a..43510c4fa 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/agentoptions.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/agentoptions.go @@ -33,7 +33,7 @@ func (o *grpcAgentOptions) WithContext(ctx context.Context, evtCtx cloudevents.E return ctx, nil } -func (o *grpcAgentOptions) Protocol(ctx context.Context) (options.CloudEventsProtocol, error) { +func (o *grpcAgentOptions) Protocol(ctx context.Context, dataType types.CloudEventsDataType) (options.CloudEventsProtocol, error) { receiver, err := o.GetCloudEventsProtocol( ctx, func(err error) { @@ -45,6 +45,7 @@ func (o *grpcAgentOptions) Protocol(ctx context.Context) (options.CloudEventsPro // as a placeholder with all the sources. Source: types.SourceAll, ClusterName: o.clusterName, + DataType: dataType.String(), }), ) if err != nil { diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/options.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/options.go index 564ccf333..051a39641 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/options.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/options.go @@ -3,7 +3,6 @@ package grpc import ( "context" "crypto/tls" - "crypto/x509" "fmt" "os" "time" @@ -14,19 +13,96 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/oauth" + "google.golang.org/grpc/keepalive" "gopkg.in/yaml.v2" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/cert" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protocol" ) +var _ cert.Connection = &GRPCDialer{} + +// GRPCDialer is a gRPC dialer that connects to a gRPC server +// with the given URL, TLS configuration and keepalive options. +type GRPCDialer struct { + URL string + KeepAliveOptions KeepAliveOptions + TLSConfig *tls.Config + TokenFile string + conn *grpc.ClientConn +} + +// KeepAliveOptions holds the keepalive options for the gRPC client. +type KeepAliveOptions struct { + Enable bool + Time time.Duration + Timeout time.Duration + PermitWithoutStream bool +} + +// Dial connects to the gRPC server and returns a gRPC client connection. +func (d *GRPCDialer) Dial() (*grpc.ClientConn, error) { + // Prepare gRPC dial options. + dialOpts := []grpc.DialOption{} + if d.KeepAliveOptions.Enable { + dialOpts = append(dialOpts, grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: d.KeepAliveOptions.Time, + Timeout: d.KeepAliveOptions.Timeout, + PermitWithoutStream: d.KeepAliveOptions.PermitWithoutStream, + })) + } + if d.TLSConfig != nil { + // Enable TLS + dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(d.TLSConfig))) + if len(d.TokenFile) != 0 { + // Use token-based authentication if token file is provided. + token, err := os.ReadFile(d.TokenFile) + if err != nil { + return nil, fmt.Errorf("failed to read token file %s, %v", d.TokenFile, err) + } + perRPCCred := oauth.TokenSource{ + TokenSource: oauth2.StaticTokenSource(&oauth2.Token{ + AccessToken: string(token), + })} + // Add per-RPC credentials to the dial options. + dialOpts = append(dialOpts, grpc.WithPerRPCCredentials(perRPCCred)) + } + + // Establish a TLS connection to the gRPC server. + conn, err := grpc.Dial(d.URL, dialOpts...) + if err != nil { + return nil, fmt.Errorf("failed to connect to grpc server %s, %v", d.URL, err) + } + + // Cache the connection for future use. + d.conn = conn + return d.conn, nil + } + + // Insecure connection option; should not be used in production. + dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials())) + conn, err := grpc.Dial(d.URL, dialOpts...) + if err != nil { + return nil, fmt.Errorf("failed to connect to grpc server %s, %v", d.URL, err) + } + + // Cache the connection for future use. + d.conn = conn + return d.conn, nil +} + +// Close closes the gRPC client connection. +func (d *GRPCDialer) Close() error { + if d.conn != nil { + return d.conn.Close() + } + return nil +} + // GRPCOptions holds the options that are used to build gRPC client. type GRPCOptions struct { - URL string - CAFile string - ClientCertFile string - ClientKeyFile string - TokenFile string + Dialer *GRPCDialer } // GRPCConfig holds the information needed to build connect to gRPC server as a given user. @@ -41,6 +117,26 @@ type GRPCConfig struct { ClientKeyFile string `json:"clientKeyFile,omitempty" yaml:"clientKeyFile,omitempty"` // TokenFile is the file path to a token file for authentication. TokenFile string `json:"tokenFile,omitempty" yaml:"tokenFile,omitempty"` + // keepalive options + KeepAliveConfig KeepAliveConfig `json:"keepAliveConfig,omitempty" yaml:"keepAliveConfig,omitempty"` +} + +// KeepAliveConfig holds the keepalive options for the gRPC client. +type KeepAliveConfig struct { + // Enable specifies whether the keepalive option is enabled. + // When disabled, other keepalive configurations are ignored. Default is false. + Enable bool `json:"enable,omitempty" yaml:"enable,omitempty"` + // Time sets the duration after which the client pings the server if no activity is seen. + // A minimum value of 10s is enforced if set below that. Default is 30s. + Time *time.Duration `json:"time,omitempty" yaml:"time,omitempty"` + + // Timeout sets the duration the client waits for a response after a keepalive ping. + // If no response is received, the connection is closed. Default is 10s. + Timeout *time.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"` + + // PermitWithoutStream determines if keepalive pings are sent when there are no active RPCs. + // If false, pings are not sent and Time and Timeout are ignored. Default is false. + PermitWithoutStream bool `json:"permitWithoutStream,omitempty" yaml:"permitWithoutStream,omitempty"` } // BuildGRPCOptionsFromFlags builds configs from a config filepath. @@ -70,92 +166,47 @@ func BuildGRPCOptionsFromFlags(configPath string) (*GRPCOptions, error) { return nil, fmt.Errorf("setting tokenFile requires caFile") } - return &GRPCOptions{ - URL: config.URL, - CAFile: config.CAFile, - ClientCertFile: config.ClientCertFile, - ClientKeyFile: config.ClientKeyFile, - TokenFile: config.TokenFile, - }, nil + options := &GRPCOptions{ + Dialer: &GRPCDialer{ + URL: config.URL, + TokenFile: config.TokenFile, + }, + } + + // Default keepalive options + keepAliveOptions := KeepAliveOptions{ + Enable: false, + Time: 30 * time.Second, + Timeout: 10 * time.Second, + PermitWithoutStream: false, + } + keepAliveOptions.Enable = config.KeepAliveConfig.Enable + if config.KeepAliveConfig.Time != nil { + keepAliveOptions.Time = *config.KeepAliveConfig.Time + } + if config.KeepAliveConfig.Timeout != nil { + keepAliveOptions.Timeout = *config.KeepAliveConfig.Timeout + } + keepAliveOptions.PermitWithoutStream = config.KeepAliveConfig.PermitWithoutStream + + // Set the keepalive options + options.Dialer.KeepAliveOptions = keepAliveOptions + + // Set up TLS configuration for the gRPC connection, the certificates will be reloaded periodically. + options.Dialer.TLSConfig, err = cert.AutoLoadTLSConfig(config.CAFile, config.ClientCertFile, config.ClientKeyFile, options.Dialer) + if err != nil { + return nil, err + } + + return options, nil } func NewGRPCOptions() *GRPCOptions { return &GRPCOptions{} } -func (o *GRPCOptions) GetGRPCClientConn() (*grpc.ClientConn, error) { - if len(o.CAFile) != 0 { - certPool, err := x509.SystemCertPool() - if err != nil { - return nil, err - } - - caPEM, err := os.ReadFile(o.CAFile) - if err != nil { - return nil, err - } - - if ok := certPool.AppendCertsFromPEM(caPEM); !ok { - return nil, fmt.Errorf("invalid CA %s", o.CAFile) - } - - // Prepare gRPC dial options. - diaOpts := []grpc.DialOption{} - // Create a TLS configuration with CA pool and TLS 1.3. - tlsConfig := &tls.Config{ - RootCAs: certPool, - MinVersion: tls.VersionTLS13, - MaxVersion: tls.VersionTLS13, - } - - // Check if client certificate and key files are provided for mutual TLS. - if len(o.ClientCertFile) != 0 && len(o.ClientKeyFile) != 0 { - // Load client certificate and key pair. - clientCerts, err := tls.LoadX509KeyPair(o.ClientCertFile, o.ClientKeyFile) - if err != nil { - return nil, err - } - // Add client certificates to the TLS configuration. - tlsConfig.Certificates = []tls.Certificate{clientCerts} - diaOpts = append(diaOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))) - } else { - // token based authentication requires the configuration of transport credentials. - diaOpts = append(diaOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig))) - if len(o.TokenFile) != 0 { - // Use token-based authentication if token file is provided. - token, err := os.ReadFile(o.TokenFile) - if err != nil { - return nil, err - } - perRPCCred := oauth.TokenSource{ - TokenSource: oauth2.StaticTokenSource(&oauth2.Token{ - AccessToken: string(token), - })} - // Add per-RPC credentials to the dial options. - diaOpts = append(diaOpts, grpc.WithPerRPCCredentials(perRPCCred)) - } - } - - // Establish a connection to the gRPC server. - conn, err := grpc.Dial(o.URL, diaOpts...) - if err != nil { - return nil, fmt.Errorf("failed to connect to grpc server %s, %v", o.URL, err) - } - - return conn, nil - } - - // Insecure connection option; should not be used in production. - conn, err := grpc.Dial(o.URL, grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - return nil, fmt.Errorf("failed to connect to grpc server %s, %v", o.URL, err) - } - - return conn, nil -} - func (o *GRPCOptions) GetCloudEventsProtocol(ctx context.Context, errorHandler func(error), clientOpts ...protocol.Option) (options.CloudEventsProtocol, error) { - conn, err := o.GetGRPCClientConn() + conn, err := o.Dialer.Dial() if err != nil { return nil, err } @@ -175,10 +226,15 @@ func (o *GRPCOptions) GetCloudEventsProtocol(ctx context.Context, errorHandler f // TransientFailure. // For a connected grpc client, if the connections is down, the grpc client connection state will be // changed from Ready to Idle. - if connState == connectivity.TransientFailure || connState == connectivity.Idle { + // When client certificate is expired, client will proactively close the connection, which will result + // in connection state changed from Ready to Shutdown. + if connState == connectivity.TransientFailure || connState == connectivity.Idle || connState == connectivity.Shutdown { errorHandler(fmt.Errorf("grpc connection is disconnected (state=%s)", connState)) ticker.Stop() - conn.Close() + if connState != connectivity.Shutdown { + // don't close the connection if it's already shutdown + conn.Close() + } return // exit the goroutine as the error handler function will handle the reconnection. } } diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protobuf/v1/cloudevent.pb.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protobuf/v1/cloudevent.pb.go index f895d8697..b4008d700 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protobuf/v1/cloudevent.pb.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protobuf/v1/cloudevent.pb.go @@ -398,6 +398,9 @@ type SubscriptionRequest struct { Source string `protobuf:"bytes,1,opt,name=source,proto3" json:"source,omitempty"` // Optional. The cluster name of the respond CloudEvent(s). ClusterName string `protobuf:"bytes,2,opt,name=cluster_name,json=clusterName,proto3" json:"cluster_name,omitempty"` + // Optional. The data type for the respond CloudEvent(s). + // eg. io.open-cluster-management.works.v1alpha1.manifests + DataType string `protobuf:"bytes,3,opt,name=data_type,json=dataType,proto3" json:"data_type,omitempty"` } func (x *SubscriptionRequest) Reset() { @@ -446,6 +449,13 @@ func (x *SubscriptionRequest) GetClusterName() string { return "" } +func (x *SubscriptionRequest) GetDataType() string { + if x != nil { + return x.DataType + } + return "" +} + var File_cloudevent_proto protoreflect.FileDescriptor var file_cloudevent_proto_rawDesc = []byte{ @@ -505,29 +515,31 @@ var file_cloudevent_proto_rawDesc = []byte{ 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x33, 0x0a, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x69, 0x6f, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6c, 0x6f, 0x75, 0x64, - 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x22, 0x50, 0x0a, 0x13, + 0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x22, 0x6d, 0x0a, 0x13, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, - 0x09, 0x52, 0x0b, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x32, 0xb3, - 0x01, 0x0a, 0x11, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x53, 0x65, 0x72, - 0x76, 0x69, 0x63, 0x65, 0x12, 0x46, 0x0a, 0x07, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x12, - 0x21, 0x2e, 0x69, 0x6f, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, - 0x2e, 0x76, 0x31, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, - 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x56, 0x0a, 0x09, - 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x26, 0x2e, 0x69, 0x6f, 0x2e, 0x63, - 0x6c, 0x6f, 0x75, 0x64, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x75, - 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x1a, 0x1d, 0x2e, 0x69, 0x6f, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x65, 0x76, 0x65, 0x6e, - 0x74, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, - 0x22, 0x00, 0x30, 0x01, 0x42, 0x50, 0x5a, 0x4e, 0x6f, 0x70, 0x65, 0x6e, 0x2d, 0x63, 0x6c, 0x75, - 0x73, 0x74, 0x65, 0x72, 0x2d, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x2e, - 0x69, 0x6f, 0x2f, 0x73, 0x64, 0x6b, 0x2d, 0x67, 0x6f, 0x2f, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x65, - 0x76, 0x65, 0x6e, 0x74, 0x73, 0x2f, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x69, 0x63, 0x2f, 0x6f, 0x70, - 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, - 0x62, 0x75, 0x66, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x09, 0x52, 0x0b, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1b, + 0x0a, 0x09, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x08, 0x64, 0x61, 0x74, 0x61, 0x54, 0x79, 0x70, 0x65, 0x32, 0xb3, 0x01, 0x0a, 0x11, + 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63, + 0x65, 0x12, 0x46, 0x0a, 0x07, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x12, 0x21, 0x2e, 0x69, + 0x6f, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x2e, 0x76, 0x31, + 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, + 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x56, 0x0a, 0x09, 0x53, 0x75, 0x62, + 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x26, 0x2e, 0x69, 0x6f, 0x2e, 0x63, 0x6c, 0x6f, 0x75, + 0x64, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, + 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, + 0x2e, 0x69, 0x6f, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x2e, + 0x76, 0x31, 0x2e, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x22, 0x00, 0x30, + 0x01, 0x42, 0x50, 0x5a, 0x4e, 0x6f, 0x70, 0x65, 0x6e, 0x2d, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, + 0x72, 0x2d, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x2e, 0x69, 0x6f, 0x2f, + 0x73, 0x64, 0x6b, 0x2d, 0x67, 0x6f, 0x2f, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x65, 0x76, 0x65, 0x6e, + 0x74, 0x73, 0x2f, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x69, 0x63, 0x2f, 0x6f, 0x70, 0x74, 0x69, 0x6f, + 0x6e, 0x73, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, + 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protobuf/v1/cloudevent.proto b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protobuf/v1/cloudevent.proto index c6a5ae77d..ed5d4bb32 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protobuf/v1/cloudevent.proto +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protobuf/v1/cloudevent.proto @@ -75,6 +75,9 @@ message SubscriptionRequest { string source = 1; // Optional. The cluster name of the respond CloudEvent(s). string cluster_name = 2; + // Optional. The data type for the respond CloudEvent(s). + // eg. io.open-cluster-management.works.v1alpha1.manifests + string data_type = 3; } service CloudEventService { diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protocol/option.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protocol/option.go index f227e4825..1ccdb02c8 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protocol/option.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protocol/option.go @@ -11,6 +11,7 @@ type Option func(*Protocol) error type SubscribeOption struct { Source string ClusterName string + DataType string // data type for the client, eg. "io.open-cluster-management.works.v1alpha1.manifestbundles" } // WithSubscribeOption sets the Subscribe configuration for the client. diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protocol/protocol.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protocol/protocol.go index ff9afadae..44c1352f1 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protocol/protocol.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protocol/protocol.go @@ -105,15 +105,16 @@ func (p *Protocol) OpenInbound(ctx context.Context) error { subClient, err := p.client.Subscribe(ctx, &pbv1.SubscriptionRequest{ Source: p.subscribeOption.Source, ClusterName: p.subscribeOption.ClusterName, + DataType: p.subscribeOption.DataType, }) if err != nil { return err } if p.subscribeOption.Source != "" { - logger.Infof("subscribing events for: %v", p.subscribeOption.Source) + logger.Infof("subscribing events for: %v with data types: %v", p.subscribeOption.Source, p.subscribeOption.DataType) } else { - logger.Infof("subscribing events for cluster: %v", p.subscribeOption.ClusterName) + logger.Infof("subscribing events for cluster: %v with data types: %v", p.subscribeOption.ClusterName, p.subscribeOption.DataType) } go func() { diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/sourceoptions.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/sourceoptions.go index 1d71cfe7d..2d3b0ab2d 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/sourceoptions.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/sourceoptions.go @@ -7,6 +7,7 @@ import ( "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protocol" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" ) type gRPCSourceOptions struct { @@ -31,14 +32,15 @@ func (o *gRPCSourceOptions) WithContext(ctx context.Context, evtCtx cloudevents. return ctx, nil } -func (o *gRPCSourceOptions) Protocol(ctx context.Context) (options.CloudEventsProtocol, error) { +func (o *gRPCSourceOptions) Protocol(ctx context.Context, dataType types.CloudEventsDataType) (options.CloudEventsProtocol, error) { receiver, err := o.GetCloudEventsProtocol( ctx, func(err error) { o.errorChan <- err }, protocol.WithSubscribeOption(&protocol.SubscribeOption{ - Source: o.sourceID, + Source: o.sourceID, + DataType: dataType.String(), }), ) if err != nil { diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/kafka/agentoptions.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/kafka/agentoptions.go index 1b79b4b4c..bb50bb698 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/kafka/agentoptions.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/kafka/agentoptions.go @@ -4,11 +4,8 @@ package kafka import ( "context" - "fmt" - "strings" cloudevents "github.com/cloudevents/sdk-go/v2" - cloudeventscontext "github.com/cloudevents/sdk-go/v2/context" "github.com/confluentinc/confluent-kafka-go/v2/kafka" confluent "github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2" @@ -45,36 +42,13 @@ func NewAgentOptions(kafkaOptions *KafkaOptions, clusterName, agentID string) *o // encode the source and agent to the message key func (o *kafkaAgentOptions) WithContext(ctx context.Context, evtCtx cloudevents.EventContext) (context.Context, error) { - eventType, err := types.ParseCloudEventsType(evtCtx.GetType()) - if err != nil { - return nil, err - } - - // agent publishes event to status topic to send the resource status from a specified cluster - originalSource, err := evtCtx.GetExtension(types.ExtensionOriginalSource) - if err != nil { - return nil, err - } - - if eventType.Action == types.ResyncRequestAction && originalSource == types.SourceAll { - // TODO support multiple sources, agent may need a source list instead of the broadcast - topic := strings.Replace(agentBroadcastTopic, "*", o.clusterName, 1) - return confluent.WithMessageKey(cloudeventscontext.WithTopic(ctx, topic), o.clusterName), nil - } - - topic := strings.Replace(agentEventsTopic, "*", fmt.Sprintf("%s", originalSource), 1) - topic = strings.Replace(topic, "*", o.clusterName, 1) - messageKey := fmt.Sprintf("%s@%s", originalSource, o.clusterName) - return confluent.WithMessageKey(cloudeventscontext.WithTopic(ctx, topic), messageKey), nil + return ctx, nil } -func (o *kafkaAgentOptions) Protocol(ctx context.Context) (options.CloudEventsProtocol, error) { +func (o *kafkaAgentOptions) Protocol(ctx context.Context, dataType types.CloudEventsDataType) (options.CloudEventsProtocol, error) { protocol, err := confluent.New(confluent.WithConfigMap(&o.KafkaOptions.ConfigMap), - confluent.WithReceiverTopics([]string{ - fmt.Sprintf("^%s", replaceLast(sourceEventsTopic, "*", o.clusterName)), - fmt.Sprintf("^%s", sourceBroadcastTopic), - }), - confluent.WithSenderTopic("agentevents"), + confluent.WithReceiverTopics([]string{sourceEventsTopic}), + confluent.WithSenderTopic(agentEventsTopic), confluent.WithErrorHandler(func(ctx context.Context, err kafka.Error) { o.errorChan <- err })) @@ -89,11 +63,3 @@ func (o *kafkaAgentOptions) Protocol(ctx context.Context) (options.CloudEventsPr func (o *kafkaAgentOptions) ErrorChan() <-chan error { return o.errorChan } - -func replaceLast(str, old, new string) string { - last := strings.LastIndex(str, old) - if last == -1 { - return str - } - return str[:last] + new + str[last+len(old):] -} diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/kafka/options.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/kafka/options.go index b8102439f..6acf3c8b4 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/kafka/options.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/kafka/options.go @@ -13,16 +13,10 @@ import ( ) const ( - // sourceEventsTopic is a topic for sources to publish their resource create/update/delete events, the first - // asterisk is a wildcard for source, the second asterisk is a wildcard for cluster. - sourceEventsTopic = "sourceevents.*.*" - // agentEventsTopic is a topic for agents to publish their resource status update events, the first - // asterisk is a wildcard for source, the second asterisk is a wildcard for cluster. - agentEventsTopic = "agentevents.*.*" - // sourceBroadcastTopic is for a source to publish its events to all agents, the asterisk is a wildcard for source. - sourceBroadcastTopic = "sourcebroadcast.*" - // agentBroadcastTopic is for a agent to publish its events to all sources, the asterisk is a wildcard for cluster. - agentBroadcastTopic = "agentbroadcast.*" + // sourceEventsTopic is a topic for sources to publish their events. + sourceEventsTopic = "sourceevents" + // agentEventsTopic is a topic for agents to publish their events. + agentEventsTopic = "agentevents" ) type KafkaOptions struct { diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/kafka/sourceoptions.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/kafka/sourceoptions.go index 0d9bfc59b..12f6dfc4d 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/kafka/sourceoptions.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/kafka/sourceoptions.go @@ -4,11 +4,8 @@ package kafka import ( "context" - "fmt" - "strings" cloudevents "github.com/cloudevents/sdk-go/v2" - cloudeventscontext "github.com/cloudevents/sdk-go/v2/context" "github.com/confluentinc/confluent-kafka-go/v2/kafka" confluent "github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2" @@ -43,36 +40,13 @@ func NewSourceOptions(kafkaOptions *KafkaOptions, sourceID string) *options.Clou func (o *kafkaSourceOptions) WithContext(ctx context.Context, evtCtx cloudevents.EventContext, ) (context.Context, error) { - eventType, err := types.ParseCloudEventsType(evtCtx.GetType()) - if err != nil { - return nil, err - } - - clusterName, err := evtCtx.GetExtension(types.ExtensionClusterName) - if err != nil { - return nil, err - } - - if eventType.Action == types.ResyncRequestAction && clusterName == types.ClusterAll { - // source request to get resources status from all agents - topic := strings.Replace(sourceBroadcastTopic, "*", o.sourceID, 1) - return confluent.WithMessageKey(cloudeventscontext.WithTopic(ctx, topic), o.sourceID), nil - } - - // source publishes event to source topic to send the resource spec to a specified cluster - messageKey := fmt.Sprintf("%s@%s", o.sourceID, clusterName) - topic := strings.Replace(sourceEventsTopic, "*", o.sourceID, 1) - topic = strings.Replace(topic, "*", fmt.Sprintf("%s", clusterName), 1) - return confluent.WithMessageKey(cloudeventscontext.WithTopic(ctx, topic), messageKey), nil + return ctx, nil } -func (o *kafkaSourceOptions) Protocol(ctx context.Context) (options.CloudEventsProtocol, error) { +func (o *kafkaSourceOptions) Protocol(ctx context.Context, dataType types.CloudEventsDataType) (options.CloudEventsProtocol, error) { protocol, err := confluent.New(confluent.WithConfigMap(&o.KafkaOptions.ConfigMap), - confluent.WithReceiverTopics([]string{ - fmt.Sprintf("^%s", strings.Replace(agentEventsTopic, "*", o.sourceID, 1)), - fmt.Sprintf("^%s", agentBroadcastTopic), - }), - confluent.WithSenderTopic("sourceevents"), + confluent.WithReceiverTopics([]string{agentEventsTopic}), + confluent.WithSenderTopic(sourceEventsTopic), confluent.WithErrorHandler(func(ctx context.Context, err kafka.Error) { o.errorChan <- err })) diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/agentoptions.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/agentoptions.go index bf226dc36..a96880973 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/agentoptions.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/agentoptions.go @@ -82,7 +82,7 @@ func (o *mqttAgentOptions) WithContext(ctx context.Context, evtCtx cloudevents.E return cloudeventscontext.WithTopic(ctx, eventsTopic), nil } -func (o *mqttAgentOptions) Protocol(ctx context.Context) (options.CloudEventsProtocol, error) { +func (o *mqttAgentOptions) Protocol(ctx context.Context, dataType types.CloudEventsDataType) (options.CloudEventsProtocol, error) { subscribe := &paho.Subscribe{ Subscriptions: []paho.SubscribeOptions{ { diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/options.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/options.go index d0ed8ff26..03265794a 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/options.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/options.go @@ -3,7 +3,6 @@ package mqtt import ( "context" "crypto/tls" - "crypto/x509" "fmt" "net" "os" @@ -16,7 +15,6 @@ import ( "github.com/eclipse/paho.golang/paho" "gopkg.in/yaml.v2" "k8s.io/apimachinery/pkg/util/errors" - "k8s.io/klog/v2" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/cert" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" @@ -162,34 +160,20 @@ func BuildMQTTOptionsFromFlags(configPath string) (*MQTTOptions, error) { dialTimeout = *config.DialTimeout } - if config.ClientCertFile != "" && config.ClientKeyFile != "" { - certPool, err := rootCAs(config.CAFile) - if err != nil { - return nil, err - } - - tlsConfig := &tls.Config{ - RootCAs: certPool, - GetClientCertificate: func(cri *tls.CertificateRequestInfo) (*tls.Certificate, error) { - return cert.CachingCertificateLoader(config.ClientCertFile, config.ClientKeyFile)() - }, - } - - options.Dialer = &MQTTDialer{ - BrokerHost: config.BrokerHost, - TLSConfig: tlsConfig, - Timeout: dialTimeout, - } - - // start a goroutine to periodically refresh client certificates for this connection - cert.StartClientCertRotating(tlsConfig.GetClientCertificate, options.Dialer) - return options, nil - } - options.Dialer = &MQTTDialer{ BrokerHost: config.BrokerHost, Timeout: dialTimeout, } + + if config.ClientCertFile != "" && config.ClientKeyFile != "" { + // Set up TLS configuration for the MQTT connection if the client certificate and key are provided. + // the certificates will be reloaded periodically. + options.Dialer.TLSConfig, err = cert.AutoLoadTLSConfig(config.CAFile, config.ClientCertFile, config.ClientKeyFile, options.Dialer) + if err != nil { + return nil, err + } + } + return options, nil } @@ -334,29 +318,3 @@ func getAgentPubTopic(ctx context.Context) (*PubTopic, error) { return nil, fmt.Errorf("invalid agent pub topic") } - -// rootCAs returns a cert pool to verify the TLS connection. -// If the caFile is not provided, the default system certificate pool will be returned -// If the caFile is provided, the provided CA will be appended to the system certificate pool -func rootCAs(caFile string) (*x509.CertPool, error) { - certPool, err := x509.SystemCertPool() - if err != nil { - return nil, err - } - - if len(caFile) == 0 { - klog.Warningf("CA file is not provided, TLS connection will be verified with the system cert pool") - return certPool, nil - } - - caPEM, err := os.ReadFile(caFile) - if err != nil { - return nil, err - } - - if ok := certPool.AppendCertsFromPEM(caPEM); !ok { - return nil, fmt.Errorf("invalid CA %s", caFile) - } - - return certPool, nil -} diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/sourceoptions.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/sourceoptions.go index 4cc97ec39..1d5ecd426 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/sourceoptions.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/sourceoptions.go @@ -70,7 +70,7 @@ func (o *mqttSourceOptions) WithContext(ctx context.Context, evtCtx cloudevents. return cloudeventscontext.WithTopic(ctx, eventsTopic), nil } -func (o *mqttSourceOptions) Protocol(ctx context.Context) (options.CloudEventsProtocol, error) { +func (o *mqttSourceOptions) Protocol(ctx context.Context, dataType types.CloudEventsDataType) (options.CloudEventsProtocol, error) { topicSource, err := getSourceFromEventsTopic(o.Topics.AgentEvents) if err != nil { return nil, err diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/options.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/options.go index 388b5506c..acf40f951 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/options.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/options.go @@ -5,6 +5,7 @@ import ( cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/cloudevents/sdk-go/v2/protocol" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" ) // CloudEventsOptions provides cloudevents clients to send/receive cloudevents based on different event protocol. @@ -19,8 +20,8 @@ type CloudEventsOptions interface { // the MQTT topic, for Kafka, the context should contain the message key, etc. WithContext(ctx context.Context, evtContext cloudevents.EventContext) (context.Context, error) - // Protocol returns a specific protocol to initialize the cloudevents client - Protocol(ctx context.Context) (CloudEventsProtocol, error) + // Protocol returns a specific protocol to initialize the cloudevents client. + Protocol(ctx context.Context, dataType types.CloudEventsDataType) (CloudEventsProtocol, error) // ErrorChan returns a chan which will receive the cloudevents connection error. The source/agent client will try to // reconnect the when this error occurs. diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/optionsbuilder.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/optionsbuilder.go index b839c86af..74d75edaa 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/optionsbuilder.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/optionsbuilder.go @@ -45,7 +45,7 @@ func (l *ConfigLoader) LoadConfig() (string, any, error) { return "", nil, err } - return grpcOptions.URL, grpcOptions, nil + return grpcOptions.Dialer.URL, grpcOptions, nil case constants.ConfigTypeKafka: kafkaOptions, err := kafka.BuildKafkaOptionsFromFlags(l.configPath) diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/sourceclient.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/sourceclient.go index 6f1f99e90..da0869bb5 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/sourceclient.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/sourceclient.go @@ -24,7 +24,7 @@ import ( type CloudEventSourceClient[T ResourceObject] struct { *baseClient lister Lister[T] - codecs map[types.CloudEventsDataType]Codec[T] + codec Codec[T] statusHashGetter StatusHashGetter[T] sourceID string } @@ -35,34 +35,30 @@ type CloudEventSourceClient[T ResourceObject] struct { // sending/receiving the cloudevents. // - lister gets the resources from a cache/store of a source. // - statusHashGetter calculates the resource status hash. -// - codecs is list of codecs for encoding/decoding a resource objet/cloudevent to/from a cloudevent/resource objet. +// - codec is used to encode/decode a resource objet/cloudevent to/from a cloudevent/resource objet. func NewCloudEventSourceClient[T ResourceObject]( ctx context.Context, sourceOptions *options.CloudEventsSourceOptions, lister Lister[T], statusHashGetter StatusHashGetter[T], - codecs ...Codec[T], + codec Codec[T], ) (*CloudEventSourceClient[T], error) { baseClient := &baseClient{ clientID: sourceOptions.SourceID, cloudEventsOptions: sourceOptions.CloudEventsOptions, cloudEventsRateLimiter: NewRateLimiter(sourceOptions.EventRateLimit), reconnectedChan: make(chan struct{}), + dataType: codec.EventDataType(), } if err := baseClient.connect(ctx); err != nil { return nil, err } - evtCodes := make(map[types.CloudEventsDataType]Codec[T]) - for _, codec := range codecs { - evtCodes[codec.EventDataType()] = codec - } - return &CloudEventSourceClient[T]{ baseClient: baseClient, lister: lister, - codecs: evtCodes, + codec: codec, statusHashGetter: statusHashGetter, sourceID: sourceOptions.SourceID, }, nil @@ -74,61 +70,57 @@ func (c *CloudEventSourceClient[T]) ReconnectedChan() <-chan struct{} { // Resync the resources status by sending a status resync request from the current source to a specified cluster. func (c *CloudEventSourceClient[T]) Resync(ctx context.Context, clusterName string) error { - // only resync the resources whose event data type is registered - for eventDataType := range c.codecs { - // list the resource objects that are maintained by the current source with a specified cluster - options := types.ListOptions{Source: c.sourceID, ClusterName: clusterName, CloudEventsDataType: eventDataType} - objs, err := c.lister.List(options) + // list the resource objects that are maintained by the current source with a specified cluster + options := types.ListOptions{Source: c.sourceID, ClusterName: clusterName, CloudEventsDataType: c.codec.EventDataType()} + objs, err := c.lister.List(options) + if err != nil { + return err + } + + hashes := &payload.ResourceStatusHashList{Hashes: make([]payload.ResourceStatusHash, len(objs))} + for i, obj := range objs { + statusHash, err := c.statusHashGetter(obj) if err != nil { return err } - hashes := &payload.ResourceStatusHashList{Hashes: make([]payload.ResourceStatusHash, len(objs))} - for i, obj := range objs { - statusHash, err := c.statusHashGetter(obj) - if err != nil { - return err - } - - hashes.Hashes[i] = payload.ResourceStatusHash{ - ResourceID: string(obj.GetUID()), - StatusHash: statusHash, - } + hashes.Hashes[i] = payload.ResourceStatusHash{ + ResourceID: string(obj.GetUID()), + StatusHash: statusHash, } - - eventType := types.CloudEventsType{ - CloudEventsDataType: eventDataType, - SubResource: types.SubResourceStatus, - Action: types.ResyncRequestAction, - } - - evt := types.NewEventBuilder(c.sourceID, eventType).WithClusterName(clusterName).NewEvent() - if err := evt.SetData(cloudevents.ApplicationJSON, hashes); err != nil { - return fmt.Errorf("failed to set data to cloud event: %v", err) - } - - if err := c.publish(ctx, evt); err != nil { - return err - } - - increaseCloudEventsSentCounter(evt.Source(), clusterName, eventDataType.String()) } + eventType := types.CloudEventsType{ + CloudEventsDataType: c.codec.EventDataType(), + SubResource: types.SubResourceStatus, + Action: types.ResyncRequestAction, + } + + evt := types.NewEventBuilder(c.sourceID, eventType).WithClusterName(clusterName).NewEvent() + if err := evt.SetData(cloudevents.ApplicationJSON, hashes); err != nil { + return fmt.Errorf("failed to set data to cloud event: %v", err) + } + + if err := c.publish(ctx, evt); err != nil { + return err + } + + increaseCloudEventsSentCounter(evt.Source(), "", clusterName, c.codec.EventDataType().String(), string(eventType.SubResource), string(eventType.Action)) + return nil } // Publish a resource spec from a source to an agent. func (c *CloudEventSourceClient[T]) Publish(ctx context.Context, eventType types.CloudEventsType, obj T) error { + if eventType.CloudEventsDataType != c.codec.EventDataType() { + return fmt.Errorf("unsupported event data type %s", eventType.CloudEventsDataType) + } + if eventType.SubResource != types.SubResourceSpec { return fmt.Errorf("unsupported event eventType %s", eventType) } - codec, ok := c.codecs[eventType.CloudEventsDataType] - if !ok { - return fmt.Errorf("failed to find the codec for event %s", eventType.CloudEventsDataType) - } - - evt, err := codec.Encode(c.sourceID, eventType, obj) + evt, err := c.codec.Encode(c.sourceID, eventType, obj) if err != nil { return err } @@ -138,7 +130,7 @@ func (c *CloudEventSourceClient[T]) Publish(ctx context.Context, eventType types } clusterName := evt.Context.GetExtensions()[types.ExtensionClusterName].(string) - increaseCloudEventsSentCounter(evt.Source(), clusterName, eventType.CloudEventsDataType.String()) + increaseCloudEventsSentCounter(evt.Source(), "", clusterName, eventType.CloudEventsDataType.String(), string(eventType.SubResource), string(eventType.Action)) return nil } @@ -166,7 +158,7 @@ func (c *CloudEventSourceClient[T]) receive(ctx context.Context, evt cloudevents cn = "" } - increaseCloudEventsReceivedCounter(evt.Source(), cn, eventType.CloudEventsDataType.String()) + increaseCloudEventsReceivedCounter(evt.Source(), cn, eventType.CloudEventsDataType.String(), string(eventType.SubResource), string(eventType.Action)) if eventType.Action == types.ResyncRequestAction { if eventType.SubResource != types.SubResourceSpec { @@ -189,9 +181,8 @@ func (c *CloudEventSourceClient[T]) receive(ctx context.Context, evt cloudevents return } - codec, ok := c.codecs[eventType.CloudEventsDataType] - if !ok { - klog.Warningf("failed to find the codec for event %s, ignore", eventType.CloudEventsDataType) + if eventType.CloudEventsDataType != c.codec.EventDataType() { + klog.Warningf("unsupported event data type %s, ignore", eventType.CloudEventsDataType) return } @@ -200,7 +191,7 @@ func (c *CloudEventSourceClient[T]) receive(ctx context.Context, evt cloudevents return } - obj, err := codec.Decode(&evt) + obj, err := c.codec.Decode(&evt) if err != nil { klog.Errorf("failed to decode status, %v", err) return @@ -299,8 +290,7 @@ func (c *CloudEventSourceClient[T]) respondResyncSpecRequest( if err := c.publish(ctx, evt); err != nil { return err } - - increaseCloudEventsSentCounter(evt.Source(), fmt.Sprintf("%s", clusterName), evtDataType.String()) + increaseCloudEventsSentCounter(evt.Source(), "", fmt.Sprintf("%s", clusterName), evtDataType.String(), string(eventType.SubResource), string(eventType.Action)) } return nil diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types/types.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types/types.go index 7a25c61dc..f55cb3d7c 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types/types.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types/types.go @@ -60,6 +60,9 @@ const ( // ExtensionOriginalSource is the cloud event extension key of the original source. ExtensionOriginalSource = "originalsource" + + // ExtensionStatusHash is the cloud event extension key of the status hash. + ExtensionStatusHash = "statushash" ) // ResourceAction represents an action on a resource object on the source or agent. diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/codec/manifest.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/codec/manifest.go deleted file mode 100644 index 5ffc5726a..000000000 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/codec/manifest.go +++ /dev/null @@ -1,200 +0,0 @@ -package codec - -import ( - "fmt" - "strconv" - - "github.com/bwmarrin/snowflake" - cloudevents "github.com/cloudevents/sdk-go/v2" - cloudeventstypes "github.com/cloudevents/sdk-go/v2/types" - - "k8s.io/apimachinery/pkg/api/meta" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" - kubetypes "k8s.io/apimachinery/pkg/types" - - "open-cluster-management.io/api/utils/work/v1/utils" - "open-cluster-management.io/api/utils/work/v1/workvalidator" - workv1 "open-cluster-management.io/api/work/v1" - "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/common" - "open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload" -) - -var sequenceGenerator *snowflake.Node - -func init() { - // init the snowflake id generator with node id 1 for each single agent. Each single agent has its own consumer id - // to be identified, and we can ensure the order of status update event from the same agent via sequence id. The - // events from different agents are independent, hence the ordering among them needs not to be guaranteed. - // - // The snowflake `NewNode` returns error only when the snowflake node id is less than 1 or great than 1024, so the - // error can be ignored here. - sequenceGenerator, _ = snowflake.NewNode(1) -} - -// ManifestCodec is a codec to encode/decode a ManifestWork/cloudevent with ManifestBundle for an agent. -type ManifestCodec struct { - restMapper meta.RESTMapper -} - -func NewManifestCodec(restMapper meta.RESTMapper) *ManifestCodec { - return &ManifestCodec{ - restMapper: restMapper, - } -} - -// EventDataType returns the event data type for `io.open-cluster-management.works.v1alpha1.manifests`. -func (c *ManifestCodec) EventDataType() types.CloudEventsDataType { - return payload.ManifestEventDataType -} - -// Encode the status of a ManifestWork to a cloudevent with ManifestStatus. -func (c *ManifestCodec) Encode(source string, eventType types.CloudEventsType, work *workv1.ManifestWork) (*cloudevents.Event, error) { - if eventType.CloudEventsDataType != payload.ManifestEventDataType { - return nil, fmt.Errorf("unsupported cloudevents data type %s", eventType.CloudEventsDataType) - } - - resourceVersion, err := strconv.ParseInt(work.ResourceVersion, 10, 64) - if err != nil { - return nil, fmt.Errorf("failed to parse the resourceversion of the work %s, %v", work.UID, err) - } - - originalSource, ok := work.Labels[common.CloudEventsOriginalSourceLabelKey] - if !ok { - return nil, fmt.Errorf("failed to find originalsource from the work %s", work.UID) - } - - // for the manifest deletion case: no manifest in the spec will be rebuilt in the cache upon agent restart. - // for status update cases other than manifest deletion case, there should be only one manifest in the work. - if !meta.IsStatusConditionTrue(work.Status.Conditions, common.ManifestsDeleted) { - if len(work.Spec.Workload.Manifests) != 1 { - return nil, fmt.Errorf("too many manifests in the work %s", work.UID) - } - } - - evt := types.NewEventBuilder(source, eventType). - WithResourceID(string(work.UID)). - WithStatusUpdateSequenceID(sequenceGenerator.Generate().String()). - WithResourceVersion(resourceVersion). - WithClusterName(work.Namespace). - WithOriginalSource(originalSource). - NewEvent() - - statusPayload := &payload.ManifestStatus{ - Conditions: work.Status.Conditions, - } - - if len(work.Status.ResourceStatus.Manifests) != 0 { - statusPayload.Status = &work.Status.ResourceStatus.Manifests[0] - } - - if err := evt.SetData(cloudevents.ApplicationJSON, statusPayload); err != nil { - return nil, fmt.Errorf("failed to encode manifestwork status to a cloudevent: %v", err) - } - - return &evt, nil -} - -// Decode a cloudevent whose data is Manifest to a ManifestWork. -func (c *ManifestCodec) Decode(evt *cloudevents.Event) (*workv1.ManifestWork, error) { - eventType, err := types.ParseCloudEventsType(evt.Type()) - if err != nil { - return nil, fmt.Errorf("failed to parse cloud event type %s, %v", evt.Type(), err) - } - - if eventType.CloudEventsDataType != payload.ManifestEventDataType { - return nil, fmt.Errorf("unsupported cloudevents data type %s", eventType.CloudEventsDataType) - } - - evtExtensions := evt.Context.GetExtensions() - - resourceID, err := cloudeventstypes.ToString(evtExtensions[types.ExtensionResourceID]) - if err != nil { - return nil, fmt.Errorf("failed to get resourceid extension: %v", err) - } - - resourceVersion, err := cloudeventstypes.ToInteger(evtExtensions[types.ExtensionResourceVersion]) - if err != nil { - return nil, fmt.Errorf("failed to get resourceversion extension: %v", err) - } - - clusterName, err := cloudeventstypes.ToString(evtExtensions[types.ExtensionClusterName]) - if err != nil { - return nil, fmt.Errorf("failed to get clustername extension: %v", err) - } - - work := &workv1.ManifestWork{ - TypeMeta: metav1.TypeMeta{}, - ObjectMeta: metav1.ObjectMeta{ - UID: kubetypes.UID(resourceID), - ResourceVersion: fmt.Sprintf("%d", resourceVersion), - Name: resourceID, - Namespace: clusterName, - Labels: map[string]string{ - common.CloudEventsOriginalSourceLabelKey: evt.Source(), - }, - Annotations: map[string]string{ - common.CloudEventsDataTypeAnnotationKey: eventType.CloudEventsDataType.String(), - }, - }, - } - - if _, ok := evtExtensions[types.ExtensionDeletionTimestamp]; ok { - deletionTimestamp, err := cloudeventstypes.ToTime(evtExtensions[types.ExtensionDeletionTimestamp]) - if err != nil { - return nil, fmt.Errorf("failed to get deletiontimestamp, %v", err) - } - - // In the case of an agent restart, the manifestwork finalizer is cleared. - // Explicitly re-add the finalizer to ensure proper cleanup of the manifestwork. - work.Finalizers = []string{workv1.ManifestWorkFinalizer} - work.DeletionTimestamp = &metav1.Time{Time: deletionTimestamp} - return work, nil - } - - manifestPayload := &payload.Manifest{} - if err := evt.DataAs(manifestPayload); err != nil { - return nil, fmt.Errorf("failed to unmarshal event data %s, %v", string(evt.Data()), err) - } - - unstructuredObj := manifestPayload.Manifest - rawJson, err := unstructuredObj.MarshalJSON() - if err != nil { - return nil, fmt.Errorf("failed to get manifest GVR from event %s, %v", string(evt.Data()), err) - } - - work.Spec = workv1.ManifestWorkSpec{ - Workload: workv1.ManifestsTemplate{ - Manifests: []workv1.Manifest{{RawExtension: runtime.RawExtension{Raw: rawJson}}}, - }, - DeleteOption: manifestPayload.DeleteOption, - } - - if manifestPayload.ConfigOption != nil { - _, gvr, err := utils.BuildResourceMeta(0, &unstructuredObj, c.restMapper) - if err != nil { - return nil, fmt.Errorf("failed to get manifest GVR from event %s, %v", string(evt.Data()), err) - } - - work.Spec.ManifestConfigs = []workv1.ManifestConfigOption{ - { - ResourceIdentifier: workv1.ResourceIdentifier{ - Group: gvr.Group, - Resource: gvr.Resource, - Name: unstructuredObj.GetName(), - Namespace: unstructuredObj.GetNamespace(), - }, - FeedbackRules: manifestPayload.ConfigOption.FeedbackRules, - UpdateStrategy: manifestPayload.ConfigOption.UpdateStrategy, - }, - } - } - - // validate the manifest - if err := workvalidator.ManifestValidator.ValidateManifests(work.Spec.Workload.Manifests); err != nil { - return nil, fmt.Errorf("manifest is invalid, %v", err) - } - - return work, nil -} diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/codec/manifestbundle.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/codec/manifestbundle.go index ec2a0cd1f..6ca8998d4 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/codec/manifestbundle.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/codec/manifestbundle.go @@ -4,6 +4,7 @@ import ( "fmt" "strconv" + "github.com/bwmarrin/snowflake" cloudevents "github.com/cloudevents/sdk-go/v2" cloudeventstypes "github.com/cloudevents/sdk-go/v2/types" @@ -15,8 +16,21 @@ import ( "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" "open-cluster-management.io/sdk-go/pkg/cloudevents/work/common" "open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload" + "open-cluster-management.io/sdk-go/pkg/cloudevents/work/statushash" ) +var sequenceGenerator *snowflake.Node + +func init() { + // init the snowflake id generator with node id 1 for each single agent. Each single agent has its own consumer id + // to be identified, and we can ensure the order of status update event from the same agent via sequence id. The + // events from different agents are independent, hence the ordering among them needs not to be guaranteed. + // + // The snowflake `NewNode` returns error only when the snowflake node id is less than 1 or great than 1024, so the + // error can be ignored here. + sequenceGenerator, _ = snowflake.NewNode(1) +} + // ManifestBundleCodec is a codec to encode/decode a ManifestWork/cloudevent with ManifestBundle for an agent. type ManifestBundleCodec struct{} @@ -53,6 +67,13 @@ func (c *ManifestBundleCodec) Encode(source string, eventType types.CloudEventsT WithOriginalSource(originalSource). NewEvent() + statusHash, err := statushash.ManifestWorkStatusHash(work) + if err != nil { + return nil, err + } + + evt.SetExtension(types.ExtensionStatusHash, statusHash) + manifestBundleStatus := &payload.ManifestBundleStatus{ Conditions: work.Status.Conditions, ResourceStatus: work.Status.ResourceStatus.Manifests, diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/clientbuilder.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/clientbuilder.go index ad156e0b9..72b5e4a61 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/clientbuilder.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/clientbuilder.go @@ -17,6 +17,7 @@ import ( "open-cluster-management.io/sdk-go/pkg/cloudevents/work/internal" sourceclient "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client" sourcelister "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/lister" + "open-cluster-management.io/sdk-go/pkg/cloudevents/work/statushash" "open-cluster-management.io/sdk-go/pkg/cloudevents/work/store" ) @@ -43,7 +44,7 @@ func (h *ClientHolder) ManifestWorks(namespace string) workv1client.ManifestWork type ClientHolderBuilder struct { config any watcherStore store.WorkClientWatcherStore - codecs []generic.Codec[*workv1.ManifestWork] + codec generic.Codec[*workv1.ManifestWork] sourceID string clusterName string clientID string @@ -83,9 +84,9 @@ func (b *ClientHolderBuilder) WithClusterName(clusterName string) *ClientHolderB return b } -// WithCodecs add codecs when building a manifestwork client based on cloudevents. -func (b *ClientHolderBuilder) WithCodecs(codecs ...generic.Codec[*workv1.ManifestWork]) *ClientHolderBuilder { - b.codecs = codecs +// WithCodec add codec when building a manifestwork client based on cloudevents. +func (b *ClientHolderBuilder) WithCodec(codec generic.Codec[*workv1.ManifestWork]) *ClientHolderBuilder { + b.codec = codec return b } @@ -127,8 +128,8 @@ func (b *ClientHolderBuilder) NewSourceClientHolder(ctx context.Context) (*Clien ctx, options, sourcelister.NewWatcherStoreLister(b.watcherStore), - ManifestWorkStatusHash, - b.codecs..., + statushash.ManifestWorkStatusHash, + b.codec, ) if err != nil { return nil, err @@ -200,8 +201,8 @@ func (b *ClientHolderBuilder) NewAgentClientHolder(ctx context.Context) (*Client ctx, options, agentlister.NewWatcherStoreLister(b.watcherStore), - ManifestWorkStatusHash, - b.codecs..., + statushash.ManifestWorkStatusHash, + b.codec, ) if err != nil { return nil, err diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload/manifest.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload/manifest.go deleted file mode 100644 index 519729a05..000000000 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload/manifest.go +++ /dev/null @@ -1,54 +0,0 @@ -package payload - -import ( - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - - workv1 "open-cluster-management.io/api/work/v1" - "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" -) - -var ManifestEventDataType = types.CloudEventsDataType{ - Group: "io.open-cluster-management.works", - Version: "v1alpha1", - Resource: "manifests", -} - -// Manifest represents the data in a cloudevent, it contains a single manifest. -type Manifest struct { - // Manifest represents a resource to be deployed on managed cluster. - Manifest unstructured.Unstructured `json:"manifest"` - - // DeleteOption represents deletion strategy when this manifest is deleted. - DeleteOption *workv1.DeleteOption `json:"deleteOption,omitempty"` - - // ConfigOption represents the configuration of this manifest. - ConfigOption *ManifestConfigOption `json:"configOption,omitempty"` -} - -// ManifestStatus represents the data in a cloudevent, it contains the status of a SingleManifest on a managed -// cluster. -type ManifestStatus struct { - // Conditions contains the different condition statuses for a SingleManifest on a managed cluster. - // Valid condition types are: - // 1. Applied represents the manifest of a SingleManifest is applied successfully on a managed cluster. - // 2. Progressing represents the manifest of a SingleManifest is being applied on a managed cluster. - // 3. Available represents the manifest of a SingleManifest exists on the managed cluster. - // 4. Degraded represents the current state of manifest of a SingleManifest does not match the desired state for a - // certain period. - // 5. Deleted represents the manifests of a SingleManifest is deleted from a managed cluster. - Conditions []metav1.Condition `json:"conditions"` - - // Status represents the conditions of this manifest on a managed cluster. - Status *workv1.ManifestCondition `json:"status,omitempty"` -} - -type ManifestConfigOption struct { - // FeedbackRules defines what resource status field should be returned. - // If it is not set or empty, no feedback rules will be honored. - FeedbackRules []workv1.FeedbackRule `json:"feedbackRules,omitempty"` - - // UpdateStrategy defines the strategy to update this manifest. - // UpdateStrategy is Update if it is not set. - UpdateStrategy *workv1.UpdateStrategy `json:"updateStrategy,omitempty"` -} diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/statushash.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/statushash/statushash.go similarity index 95% rename from vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/statushash.go rename to vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/statushash/statushash.go index 642f78609..008245d20 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/statushash.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/statushash/statushash.go @@ -1,4 +1,4 @@ -package work +package statushash import ( "crypto/sha256"