From 4ab47cf73dd4e1ce8a9f77055360852150f0b28e Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Mon, 29 Jan 2024 16:00:29 +0800 Subject: [PATCH] update sdk-go to update work clients mqtt topics (#356) Signed-off-by: Wei Liu --- .github/workflows/cloudevents-integration.yml | 1 + go.mod | 2 +- go.sum | 4 +- test/integration/cloudevents/source/source.go | 5 + vendor/modules.txt | 2 +- .../pkg/cloudevents/generic/agentclient.go | 13 +- .../pkg/cloudevents/generic/interface.go | 7 +- .../generic/options/mqtt/agentoptions.go | 59 ++++++--- .../generic/options/mqtt/options.go | 117 ++++++++++-------- .../generic/options/mqtt/sourceoptions.go | 55 +++++--- .../pkg/cloudevents/generic/sourceclient.go | 10 +- .../pkg/cloudevents/generic/types/types.go | 73 ++++++----- .../work/agent/client/manifestwork.go | 2 +- 13 files changed, 212 insertions(+), 138 deletions(-) diff --git a/.github/workflows/cloudevents-integration.yml b/.github/workflows/cloudevents-integration.yml index 51829bb14..d280ccc9d 100644 --- a/.github/workflows/cloudevents-integration.yml +++ b/.github/workflows/cloudevents-integration.yml @@ -5,6 +5,7 @@ on: pull_request: paths: - 'pkg/work/spoke/*.go' + - 'test/integration/cloudevents/**' branches: - main - release-* diff --git a/go.mod b/go.mod index f6dfca837..1781d5da3 100644 --- a/go.mod +++ b/go.mod @@ -35,7 +35,7 @@ require ( k8s.io/utils v0.0.0-20240102154912-e7106e64919e open-cluster-management.io/addon-framework v0.8.1-0.20240123051722-71f1b13cbb63 open-cluster-management.io/api v0.12.1-0.20240122084346-e7bd1bd9ea6a - open-cluster-management.io/sdk-go v0.0.0-20240122034348-9793ade2466b + open-cluster-management.io/sdk-go v0.0.0-20240129065956-3ff755820797 sigs.k8s.io/controller-runtime v0.16.2 sigs.k8s.io/kube-storage-version-migrator v0.0.6-0.20230721195810-5c8923c5ff96 ) diff --git a/go.sum b/go.sum index dc0202017..40045fda3 100644 --- a/go.sum +++ b/go.sum @@ -454,8 +454,8 @@ open-cluster-management.io/addon-framework v0.8.1-0.20240123051722-71f1b13cbb63 open-cluster-management.io/addon-framework v0.8.1-0.20240123051722-71f1b13cbb63/go.mod h1:SBs6wF0Umzr5/miJb9p8uMaTDbcjphHHQLa76nXnbU8= open-cluster-management.io/api v0.12.1-0.20240122084346-e7bd1bd9ea6a h1:NjIU3aN4JSJjTotHiOkOCqYaPGG2tNtm7BY/o9uPb8M= open-cluster-management.io/api v0.12.1-0.20240122084346-e7bd1bd9ea6a/go.mod h1:vOz9InrJq1BDFEI51+OwAyq2M3tjYPY+1cnoQhMhIlE= -open-cluster-management.io/sdk-go v0.0.0-20240122034348-9793ade2466b h1:UH3uy5vv3/VdtHQoWzHWhVFqsbcG9zUk1coY1YgD/uo= -open-cluster-management.io/sdk-go v0.0.0-20240122034348-9793ade2466b/go.mod h1:p3oaf+iu9ghMl4cBJXWXlDnUHVn+QxL90YLTve9bn/k= +open-cluster-management.io/sdk-go v0.0.0-20240129065956-3ff755820797 h1:UgM7kMPxiGAtik/382oOYk2AaTCAWPmnILs39W9ojSk= +open-cluster-management.io/sdk-go v0.0.0-20240129065956-3ff755820797/go.mod h1:p3oaf+iu9ghMl4cBJXWXlDnUHVn+QxL90YLTve9bn/k= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0 h1:TgtAeesdhpm2SGwkQasmbeqDo8th5wOBA5h/AjTKA4I= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0/go.mod h1:VHVDI/KrK4fjnV61bE2g3sA7tiETLn8sooImelsCx3Y= sigs.k8s.io/controller-runtime v0.16.2 h1:mwXAVuEk3EQf478PQwQ48zGOXvW27UJc8NHktQVuIPU= diff --git a/test/integration/cloudevents/source/source.go b/test/integration/cloudevents/source/source.go index 39981f4a8..30783cb4d 100644 --- a/test/integration/cloudevents/source/source.go +++ b/test/integration/cloudevents/source/source.go @@ -17,6 +17,7 @@ import ( workv1 "open-cluster-management.io/api/work/v1" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt" + "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" "open-cluster-management.io/sdk-go/pkg/cloudevents/work" "open-cluster-management.io/sdk-go/pkg/cloudevents/work/watcher" ) @@ -72,6 +73,10 @@ func (m *MQTTSource) Start(ctx context.Context) error { // write the mqtt broker config to a file config := mqtt.MQTTConfig{ BrokerHost: mqttBrokerHost, + Topics: &types.Topics{ + SourceEvents: fmt.Sprintf("sources/%s/clusters/+/sourceevents", sourceID), + AgentEvents: fmt.Sprintf("sources/%s/clusters/+/agentevents", sourceID), + }, } configData, err := yaml.Marshal(config) diff --git a/vendor/modules.txt b/vendor/modules.txt index 0407f5f2b..dbd021369 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1578,7 +1578,7 @@ open-cluster-management.io/api/utils/work/v1/workapplier open-cluster-management.io/api/utils/work/v1/workvalidator open-cluster-management.io/api/work/v1 open-cluster-management.io/api/work/v1alpha1 -# open-cluster-management.io/sdk-go v0.0.0-20240122034348-9793ade2466b +# open-cluster-management.io/sdk-go v0.0.0-20240129065956-3ff755820797 ## explicit; go 1.21 open-cluster-management.io/sdk-go/pkg/apis/cluster/v1alpha1 open-cluster-management.io/sdk-go/pkg/apis/cluster/v1beta1 diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/agentclient.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/agentclient.go index 50337e7a5..c341f1f2b 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/agentclient.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/agentclient.go @@ -65,10 +65,10 @@ func NewCloudEventAgentClient[T ResourceObject]( }, nil } -// Resync the resources spec by sending a spec resync request from an agent to sources with list options. -func (c *CloudEventAgentClient[T]) Resync(ctx context.Context, listOptions types.ListOptions) error { - // list the resource objects that are maintained by the current agent with list options - objs, err := c.lister.List(listOptions) +// Resync the resources spec by sending a spec resync request from the current to the given source. +func (c *CloudEventAgentClient[T]) Resync(ctx context.Context, source string) error { + // list the resource objects that are maintained by the current agent with the given source + objs, err := c.lister.List(types.ListOptions{Source: source, ClusterName: c.clusterName}) if err != nil { return err } @@ -94,7 +94,10 @@ func (c *CloudEventAgentClient[T]) Resync(ctx context.Context, listOptions types Action: types.ResyncRequestAction, } - evt := types.NewEventBuilder(c.agentID, eventType).WithClusterName(c.clusterName).NewEvent() + evt := types.NewEventBuilder(c.agentID, eventType). + WithOriginalSource(source). + WithClusterName(c.clusterName). + NewEvent() if err := evt.SetData(cloudevents.ApplicationJSON, resources); err != nil { return fmt.Errorf("failed to set data to cloud event: %v", err) } diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/interface.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/interface.go index c7033296a..ee8b77b37 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/interface.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/interface.go @@ -54,7 +54,12 @@ type Codec[T ResourceObject] interface { type CloudEventsClient[T ResourceObject] interface { // Resync the resources of one source/agent by sending resync request. - Resync(ctx context.Context, listOptions types.ListOptions) error + // The second parameter is used to specify cluster name/source ID for a source/agent. + // - A source sends the resource status resync request to a cluster with the given cluster name. + // If setting this parameter to `types.ClusterAll`, the source will broadcast the resync request to all clusters. + // - An agent sends the resources spec resync request to a source with the given source ID. + // If setting this parameter to `types.SourceAll`, the agent will broadcast the resync request to all sources. + Resync(context.Context, string) error // Publish the resources spec/status event to the broker. Publish(ctx context.Context, eventType types.CloudEventsType, obj T) error diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/agentoptions.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/agentoptions.go index a854ca3a4..0f9e3ffbf 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/agentoptions.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/agentoptions.go @@ -9,6 +9,7 @@ import ( cloudevents "github.com/cloudevents/sdk-go/v2" cloudeventscontext "github.com/cloudevents/sdk-go/v2/context" "github.com/eclipse/paho.golang/paho" + "k8s.io/klog/v2" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" @@ -42,24 +43,51 @@ func (o *mqttAgentOptions) WithContext(ctx context.Context, evtCtx cloudevents.E return nil, fmt.Errorf("unsupported event type %s, %v", eventType, err) } - if eventType.Action == types.ResyncRequestAction { - // agent publishes event to spec resync topic to request to get resources spec from all sources - topic := strings.Replace(o.Topics.SpecResync, "+", o.clusterName, -1) - return cloudeventscontext.WithTopic(ctx, topic), nil - } - - // agent publishes event to status topic to send the resource status from a specified cluster originalSource, err := evtCtx.GetExtension(types.ExtensionOriginalSource) if err != nil { return nil, err } - statusTopic := strings.Replace(o.Topics.Status, "+", fmt.Sprintf("%s", originalSource), 1) - statusTopic = strings.Replace(statusTopic, "+", o.clusterName, -1) - return cloudeventscontext.WithTopic(ctx, statusTopic), nil + // agent request to sync resource spec from all sources + if eventType.Action == types.ResyncRequestAction && originalSource == types.SourceAll { + if len(o.Topics.AgentBroadcast) == 0 { + klog.Warningf("the source wild card resync topic not set, fall back to the agent events topic") + + // TODO after supporting multiple sources, we should list each source + eventsTopic := replaceLast(o.Topics.AgentEvents, "+", o.clusterName) + return cloudeventscontext.WithTopic(ctx, eventsTopic), nil + } + + resyncTopic := strings.Replace(o.Topics.AgentBroadcast, "+", o.clusterName, 1) + return cloudeventscontext.WithTopic(ctx, resyncTopic), nil + } + + topicSource, err := getSourceFromEventsTopic(o.Topics.AgentEvents) + if err != nil { + return nil, err + } + + // agent publishes status events or spec resync events + eventsTopic := replaceLast(o.Topics.AgentEvents, "+", o.clusterName) + eventsTopic = replaceLast(eventsTopic, "+", topicSource) + return cloudeventscontext.WithTopic(ctx, eventsTopic), nil } func (o *mqttAgentOptions) Client(ctx context.Context) (cloudevents.Client, error) { + subscribe := &paho.Subscribe{ + Subscriptions: map[string]paho.SubscribeOptions{ + // TODO support multiple sources, currently the client require the source events topic has a sourceID, in + // the future, client may need a source list, it will subscribe to each source + // receiving the sources events + replaceLast(o.Topics.SourceEvents, "+", o.clusterName): {QoS: byte(o.SubQoS)}, + }, + } + + if len(o.Topics.SourceBroadcast) != 0 { + // receiving status resync events from all sources + subscribe.Subscriptions[o.Topics.SourceBroadcast] = paho.SubscribeOptions{QoS: byte(o.SubQoS)} + } + receiver, err := o.GetCloudEventsClient( ctx, fmt.Sprintf("%s-client", o.agentID), @@ -67,16 +95,7 @@ func (o *mqttAgentOptions) Client(ctx context.Context) (cloudevents.Client, erro o.errorChan <- err }, cloudeventsmqtt.WithPublish(&paho.Publish{QoS: byte(o.PubQoS)}), - cloudeventsmqtt.WithSubscribe( - &paho.Subscribe{ - Subscriptions: map[string]paho.SubscribeOptions{ - // receiving the resources spec from sources with spec topic - replaceNth(o.Topics.Spec, "+", o.clusterName, 2): {QoS: byte(o.SubQoS)}, - // receiving the resources status resync request from sources with status resync topic - o.Topics.StatusResync: {QoS: byte(o.SubQoS)}, - }, - }, - ), + cloudeventsmqtt.WithSubscribe(subscribe), ) if err != nil { return nil, err diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/options.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/options.go index 2c342a13d..593c44bb0 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/options.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/options.go @@ -7,30 +7,19 @@ import ( "fmt" "net" "os" + "regexp" "strings" + "time" cloudeventsmqtt "github.com/cloudevents/sdk-go/protocol/mqtt_paho/v2" cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/eclipse/paho.golang/packets" "github.com/eclipse/paho.golang/paho" "gopkg.in/yaml.v2" + "k8s.io/apimachinery/pkg/util/errors" "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types" ) -const ( - // defaultSpecTopic is a default MQTT topic for resource spec. - defaultSpecTopic = "sources/+/clusters/+/spec" - - // defaultStatusTopic is a default MQTT topic for resource status. - defaultStatusTopic = "sources/+/clusters/+/status" - - // defaultSpecResyncTopic is a default MQTT topic for resource spec resync. - defaultSpecResyncTopic = "sources/clusters/+/specresync" - - // defaultStatusResyncTopic is a default MQTT topic for resource status resync. - defaultStatusResyncTopic = "sources/+/clusters/statusresync" -) - // MQTTOptions holds the options that are used to build MQTT client. type MQTTOptions struct { Topics types.Topics @@ -41,6 +30,7 @@ type MQTTOptions struct { ClientCertFile string ClientKeyFile string KeepAlive uint16 + Timeout time.Duration PubQoS int SubQoS int } @@ -74,20 +64,6 @@ type MQTTConfig struct { Topics *types.Topics `json:"topics,omitempty" yaml:"topics,omitempty"` } -func NewMQTTOptions() *MQTTOptions { - return &MQTTOptions{ - Topics: types.Topics{ - Spec: defaultSpecTopic, - Status: defaultStatusTopic, - SpecResync: defaultSpecResyncTopic, - StatusResync: defaultStatusResyncTopic, - }, - KeepAlive: 60, - PubQoS: 1, - SubQoS: 1, - } -} - // BuildMQTTOptionsFromFlags builds configs from a config filepath. func BuildMQTTOptionsFromFlags(configPath string) (*MQTTOptions, error) { configData, err := os.ReadFile(configPath) @@ -112,9 +88,8 @@ func BuildMQTTOptionsFromFlags(configPath string) (*MQTTOptions, error) { return nil, fmt.Errorf("setting clientCertFile and clientKeyFile requires caFile") } - if config.Topics != nil && (config.Topics.Spec == "" || config.Topics.Status == "" || - config.Topics.SpecResync == "" || config.Topics.StatusResync == "") { - return nil, fmt.Errorf("topics must be set") + if err := validateTopics(config.Topics); err != nil { + return nil, err } options := &MQTTOptions{ @@ -127,16 +102,14 @@ func BuildMQTTOptionsFromFlags(configPath string) (*MQTTOptions, error) { KeepAlive: 60, PubQoS: 1, SubQoS: 1, - Topics: types.Topics{ - Spec: defaultSpecTopic, - Status: defaultStatusTopic, - SpecResync: defaultSpecResyncTopic, - StatusResync: defaultStatusResyncTopic, - }, + Timeout: 180 * time.Second, + Topics: *config.Topics, } if config.KeepAlive != nil { options.KeepAlive = *config.KeepAlive + // Setting the mqtt tcp connection read and write timeouts to three times the mqtt keepalive + options.Timeout = 3 * time.Duration(*config.KeepAlive) * time.Second } if config.PubQoS != nil { @@ -147,10 +120,6 @@ func BuildMQTTOptionsFromFlags(configPath string) (*MQTTOptions, error) { options.SubQoS = *config.SubQoS } - if config.Topics != nil { - options.Topics = *config.Topics - } - return options, nil } @@ -226,6 +195,10 @@ func (o *MQTTOptions) GetCloudEventsClient( if err != nil { return nil, err } + err = netConn.SetDeadline(time.Now().Add(o.Timeout)) + if err != nil { + return nil, err + } config := &paho.ClientConfig{ ClientID: clientID, @@ -243,19 +216,53 @@ func (o *MQTTOptions) GetCloudEventsClient( return cloudevents.NewClient(protocol) } -// Replace the nth occurrence of old in str by new. -func replaceNth(str, old, new string, n int) string { - i := 0 - for m := 1; m <= n; m++ { - x := strings.Index(str[i:], old) - if x < 0 { - break - } - i += x - if m == n { - return str[:i] + new + str[i+len(old):] - } - i += len(old) +func validateTopics(topics *types.Topics) error { + if topics == nil { + return fmt.Errorf("the topics must be set") } - return str + + var errs []error + if !regexp.MustCompile(types.SourceEventsTopicPattern).MatchString(topics.SourceEvents) { + errs = append(errs, fmt.Errorf("invalid source events topic %q, it should match `%s`", + topics.SourceEvents, types.SourceEventsTopicPattern)) + } + + if !regexp.MustCompile(types.AgentEventsTopicPattern).MatchString(topics.AgentEvents) { + errs = append(errs, fmt.Errorf("invalid agent events topic %q, it should match `%s`", + topics.AgentEvents, types.AgentEventsTopicPattern)) + } + + if len(topics.SourceBroadcast) != 0 { + if !regexp.MustCompile(types.SourceBroadcastTopicPattern).MatchString(topics.SourceBroadcast) { + errs = append(errs, fmt.Errorf("invalid source broadcast topic %q, it should match `%s`", + topics.SourceBroadcast, types.SourceBroadcastTopicPattern)) + } + } + + if len(topics.AgentBroadcast) != 0 { + if !regexp.MustCompile(types.AgentBroadcastTopicPattern).MatchString(topics.AgentBroadcast) { + errs = append(errs, fmt.Errorf("invalid agent broadcast topic %q, it should match `%s`", + topics.AgentBroadcast, types.AgentBroadcastTopicPattern)) + } + } + + return errors.NewAggregate(errs) +} + +func getSourceFromEventsTopic(topic string) (string, error) { + subTopics := strings.Split(topic, "/") + + if len(subTopics) != 5 { + return "", fmt.Errorf("bad format for topic %q", topic) + } + + return subTopics[1], nil +} + +func replaceLast(str, old, new string) string { + last := strings.LastIndex(str, old) + if last == -1 { + return str + } + return str[:last] + new + str[last+len(old):] } diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/sourceoptions.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/sourceoptions.go index 15af10f76..0a9f9ac6b 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/sourceoptions.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt/sourceoptions.go @@ -41,23 +41,49 @@ func (o *mqttSourceOptions) WithContext(ctx context.Context, evtCtx cloudevents. return nil, fmt.Errorf("unsupported event type %s, %v", eventType, err) } - if eventType.Action == types.ResyncRequestAction { - // source publishes event to status resync topic to request to get resources status from all clusters - return cloudeventscontext.WithTopic(ctx, strings.Replace(o.Topics.StatusResync, "+", o.sourceID, -1)), nil - } - clusterName, err := evtCtx.GetExtension(types.ExtensionClusterName) if err != nil { return nil, err } - // source publishes event to spec topic to send the resource spec to a specified cluster - specTopic := strings.Replace(o.Topics.Spec, "+", o.sourceID, 1) - specTopic = strings.Replace(specTopic, "+", fmt.Sprintf("%s", clusterName), -1) - return cloudeventscontext.WithTopic(ctx, specTopic), nil + if eventType.Action == types.ResyncRequestAction && clusterName == types.ClusterAll { + // source request to get resources status from all sources + if len(o.Topics.SourceBroadcast) == 0 { + return nil, fmt.Errorf("the source wild card resync topic not set") + } + + resyncTopic := strings.Replace(o.Topics.SourceBroadcast, "+", o.sourceID, 1) + return cloudeventscontext.WithTopic(ctx, resyncTopic), nil + } + + // source publishes spec events or status resync events + eventsTopic := strings.Replace(o.Topics.SourceEvents, "+", fmt.Sprintf("%s", clusterName), 1) + return cloudeventscontext.WithTopic(ctx, eventsTopic), nil } func (o *mqttSourceOptions) Client(ctx context.Context) (cloudevents.Client, error) { + topicSource, err := getSourceFromEventsTopic(o.Topics.AgentEvents) + if err != nil { + return nil, err + } + + if topicSource != o.sourceID { + return nil, fmt.Errorf("the topic source %q does not match with the client sourceID %q", + o.Topics.AgentEvents, o.sourceID) + } + + subscribe := &paho.Subscribe{ + Subscriptions: map[string]paho.SubscribeOptions{ + // receiving the agent events + o.Topics.AgentEvents: {QoS: byte(o.SubQoS)}, + }, + } + + if len(o.Topics.AgentBroadcast) != 0 { + // receiving spec resync events from all agents + subscribe.Subscriptions[o.Topics.AgentBroadcast] = paho.SubscribeOptions{QoS: byte(o.SubQoS)} + } + receiver, err := o.GetCloudEventsClient( ctx, o.clientID, @@ -65,16 +91,7 @@ func (o *mqttSourceOptions) Client(ctx context.Context) (cloudevents.Client, err o.errorChan <- err }, cloudeventsmqtt.WithPublish(&paho.Publish{QoS: byte(o.PubQoS)}), - cloudeventsmqtt.WithSubscribe( - &paho.Subscribe{ - Subscriptions: map[string]paho.SubscribeOptions{ - // receiving the resources status from agents with status topic - strings.Replace(o.Topics.Status, "+", o.sourceID, 1): {QoS: byte(o.SubQoS)}, - // receiving the resources spec resync request from agents with spec resync topic - o.Topics.SpecResync: {QoS: byte(o.SubQoS)}, - }, - }, - ), + cloudeventsmqtt.WithSubscribe(subscribe), ) if err != nil { return nil, err diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/sourceclient.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/sourceclient.go index 822267aae..bb783901d 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/sourceclient.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/sourceclient.go @@ -64,10 +64,10 @@ func NewCloudEventSourceClient[T ResourceObject]( }, nil } -// Resync the resources status by sending a status resync request from a source to clusters with list options. -func (c *CloudEventSourceClient[T]) Resync(ctx context.Context, listOptions types.ListOptions) error { - // list the resource objects that are maintained by the current source with list options - objs, err := c.lister.List(listOptions) +// Resync the resources status by sending a status resync request from the current source to a specified cluster. +func (c *CloudEventSourceClient[T]) Resync(ctx context.Context, clusterName string) error { + // list the resource objects that are maintained by the current source with a specified cluster + objs, err := c.lister.List(types.ListOptions{Source: c.sourceID, ClusterName: clusterName}) if err != nil { return err } @@ -93,7 +93,7 @@ func (c *CloudEventSourceClient[T]) Resync(ctx context.Context, listOptions type Action: types.ResyncRequestAction, } - evt := types.NewEventBuilder(c.sourceID, eventType).NewEvent() + evt := types.NewEventBuilder(c.sourceID, eventType).WithClusterName(clusterName).NewEvent() if err := evt.SetData(cloudevents.ApplicationJSON, hashes); err != nil { return fmt.Errorf("failed to set data to cloud event: %v", err) } diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types/types.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types/types.go index 29c83d3bb..6cd946634 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types/types.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types/types.go @@ -60,9 +60,6 @@ const ( // ExtensionOriginalSource is the cloud event extension key of the original source. ExtensionOriginalSource = "originalsource" - - // ExtensionResourceMeta is the cloud event extension key of the original resource meta. - ExtensionResourceMeta = "resourcemeta" ) // ResourceAction represents an action on a resource object on the source or agent. @@ -82,16 +79,50 @@ const ( Deleted ResourceAction = "DELETED" ) +const ( + SourceEventsTopicPattern = `^(\$share/[a-z0-9-]+/)?([a-z]+)/([a-z0-9-]+|\+)/([a-z]+)/([a-z0-9-]+|\+)/sourceevents$` + AgentEventsTopicPattern = `^(\$share/[a-z0-9-]+/)?([a-z]+)/([a-z0-9-]+|\+)/([a-z]+)/([a-z0-9-]+|\+)/agentevents$` + SourceBroadcastTopicPattern = `^(\$share/[a-z0-9-]+/)?([a-z]+)/([a-z0-9-]+|\+)/sourcebroadcast$` + AgentBroadcastTopicPattern = `^(\$share/[a-z0-9-]+/)?([a-z]+)/([a-z0-9-]+|\+)/agentbroadcast$` +) + // Topics represents required messaging system topics for a source or agent. type Topics struct { - // Spec is a topic for resource spec - Spec string `json:"spec" yaml:"spec"` - // Status is a topic for resource status - Status string `json:"status" yaml:"status"` - // SpecResync is a topic for resource spec resync - SpecResync string `json:"specResync" yaml:"specResync"` - // StatusResync is a MQTT topic for resource status resync - StatusResync string `json:"statusResync" yaml:"statusResync"` + // SourceEvents topic is a topic for sources to publish their resource create/update/delete events or status resync events + // - A source uses this topic to publish its resource create/update/delete request or status resync request with + // its sourceID to a specified agent + // - An agent subscribes to this topic with its cluster name to response sources resource create/update/delete + // request or status resync request + // The topic format is `^(\$share/[a-z0-9-]+/)?([a-z]+)/([a-z0-9-]+|\+)/([a-z]+)/([a-z0-9-]+|\+)/sourceevents$`, e.g. + // sources/+/clusters/+/sourceevents, sources/source1/clusters/+/sourceevents, sources/source1/clusters/cluster1/sourceevents + // or $share/source-group/sources/+/clusters/+/sourceevents + SourceEvents string `json:"sourceEvents" yaml:"sourceEvents"` + + // AgentEvents topic is a topic for agents to publish their resource status update events or spec resync events + // - An agent using this topic to publish the resource status update request or spec resync request with its + // cluster name to a specified source. + // - A source subscribe to this topic with its sourceID to response agents resource status update request or spec + // resync request + // The topic format is `^(\$share/[a-z0-9-]+/)?([a-z]+)/([a-z0-9-]+|\+)/([a-z]+)/([a-z0-9-]+|\+)/agentevents$`, e.g. + // sources/+/clusters/+/agentevents, sources/source1/clusters/+/agentevents, sources/source1/clusters/cluster1/agentevents + // or $share/agent-group/+/clusters/+/agentevents + AgentEvents string `json:"agentEvents" yaml:"agentEvents"` + + // SourceBroadcast is an optional topic, it is for a source to publish its events to all agents, currently, we use + // this topic to resync resource status from all agents for a source that does not known the exact agents, e.g. + // - A source uses this topic to publish its resource status resync request with its sourceID to all the agents + // - Each agent subscribes to this topic to response sources resource status resync request + // The topic format is `^(\$share/[a-z0-9-]+/)?([a-z]+)/([a-z0-9-]+|\+)/sourcebroadcast$`, e.g. + // sources/+/sourcebroadcast, sources/source1/sourcebroadcast or $share/source-group/sources/+/sourcebroadcast + SourceBroadcast string `json:"sourceBroadcast,omitempty" yaml:"sourceBroadcast,omitempty"` + + // AgentBroadcast is an optional topic, it is for a agent to publish its events to all sources, currently, we use + // this topic to resync resources from all sources for an agent that does not known the exact sources, e.g. + // - An agent using this topic to publish the spec resync request with its cluster name to all the sources. + // - Each source subscribe to this topic to response agents spec resync request + // The topic format is `^(\$share/[a-z0-9-]+/)?([a-z]+)/([a-z0-9-]+|\+)/agentbroadcast$`, e.g. + // clusters/+/agentbroadcast, clusters/cluster1/agentbroadcast or $share/agent-group/clusters/+/agentbroadcast + AgentBroadcast string `json:"agentBroadcast,omitempty" yaml:"agentBroadcast,omitempty"` } // ListOptions is the query options for listing the resource objects from the source/agent. @@ -105,15 +136,6 @@ type ListOptions struct { Source string } -// ResourceMeta represents a resource original meta data on the source -type ResourceMeta struct { - Group string `json:"group"` - Version string `json:"version"` - Resource string `json:"resource"` - Namespace string `json:"namespace"` - Name string `json:"name"` -} - // CloudEventsDataType uniquely identifies the type of cloud event data. type CloudEventsDataType struct { Group string @@ -237,6 +259,9 @@ func (b *EventBuilder) NewEvent() cloudevents.Event { evt.SetTime(time.Now()) evt.SetSource(b.source) + evt.SetExtension(ExtensionClusterName, b.clusterName) + evt.SetExtension(ExtensionOriginalSource, b.originalSource) + if len(b.resourceID) != 0 { evt.SetExtension(ExtensionResourceID, b.resourceID) } @@ -249,14 +274,6 @@ func (b *EventBuilder) NewEvent() cloudevents.Event { evt.SetExtension(ExtensionStatusUpdateSequenceID, b.sequenceID) } - if len(b.clusterName) != 0 { - evt.SetExtension(ExtensionClusterName, b.clusterName) - } - - if len(b.originalSource) != 0 { - evt.SetExtension(ExtensionOriginalSource, b.originalSource) - } - if !b.deletionTimestamp.IsZero() { evt.SetExtension(ExtensionDeletionTimestamp, b.deletionTimestamp) } diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/client/manifestwork.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/client/manifestwork.go index 4384efe5c..d66bac83c 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/client/manifestwork.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/client/manifestwork.go @@ -80,7 +80,7 @@ func (c *ManifestWorkAgentClient) Get(ctx context.Context, name string, opts met func (c *ManifestWorkAgentClient) List(ctx context.Context, opts metav1.ListOptions) (*workv1.ManifestWorkList, error) { klog.V(4).Infof("sync manifestworks") // send resync request to fetch manifestworks from source when the ManifestWorkInformer starts - if err := c.cloudEventsClient.Resync(ctx, types.ListOptions{}); err != nil { + if err := c.cloudEventsClient.Resync(ctx, types.SourceAll); err != nil { return nil, err }