Bump api/sdk-go/addon-framework to v0.16 (#879)

Signed-off-by: Jian Qiu <jqiu@redhat.com>
This commit is contained in:
Jian Qiu
2025-03-10 21:52:52 +08:00
committed by GitHub
parent edec4fd3f9
commit 453b775170
35 changed files with 498 additions and 824 deletions

6
go.mod
View File

@@ -35,9 +35,9 @@ require (
k8s.io/klog/v2 v2.130.1
k8s.io/kube-aggregator v0.31.4
k8s.io/utils v0.0.0-20240921022957-49e7df575cb6
open-cluster-management.io/addon-framework v0.11.1-0.20250303151103-b2865de5c39b
open-cluster-management.io/api v0.15.1-0.20250226073118-8c9793267c9e
open-cluster-management.io/sdk-go v0.15.1-0.20241125015855-1536c3970f8f
open-cluster-management.io/addon-framework v0.12.0
open-cluster-management.io/api v0.16.1
open-cluster-management.io/sdk-go v0.16.0
sigs.k8s.io/cluster-inventory-api v0.0.0-20240730014211-ef0154379848
sigs.k8s.io/controller-runtime v0.19.3
sigs.k8s.io/kube-storage-version-migrator v0.0.6-0.20230721195810-5c8923c5ff96

12
go.sum
View File

@@ -487,12 +487,12 @@ k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340 h1:BZqlfIlq5YbRMFko6/PM7F
k8s.io/kube-openapi v0.0.0-20240228011516-70dd3763d340/go.mod h1:yD4MZYeKMBwQKVht279WycxKyM84kkAx2DPrTXaeb98=
k8s.io/utils v0.0.0-20240921022957-49e7df575cb6 h1:MDF6h2H/h4tbzmtIKTuctcwZmY0tY9mD9fNT47QO6HI=
k8s.io/utils v0.0.0-20240921022957-49e7df575cb6/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0=
open-cluster-management.io/addon-framework v0.11.1-0.20250303151103-b2865de5c39b h1:vEemE32F9iiVvKfFsFEdiyGdDnSb9Cp9Dch2Jkc4Nfg=
open-cluster-management.io/addon-framework v0.11.1-0.20250303151103-b2865de5c39b/go.mod h1:3+UAkReHIEyqsDuq0Iv5w+ZRgZr254iehYV/JR2j038=
open-cluster-management.io/api v0.15.1-0.20250226073118-8c9793267c9e h1:4iQneGfxartfFSR+IHZRrjEuwtRpiHyKQ15Kd33YCVk=
open-cluster-management.io/api v0.15.1-0.20250226073118-8c9793267c9e/go.mod h1:9erZEWEn4bEqh0nIX2wA7f/s3KCuFycQdBrPrRzi0QM=
open-cluster-management.io/sdk-go v0.15.1-0.20241125015855-1536c3970f8f h1:zeC7QrFNarfK2zY6jGtd+mX+yDrQQmnH/J8A7n5Nh38=
open-cluster-management.io/sdk-go v0.15.1-0.20241125015855-1536c3970f8f/go.mod h1:fi5WBsbC5K3txKb8eRLuP0Sim/Oqz/PHX18skAEyjiA=
open-cluster-management.io/addon-framework v0.12.0 h1:5j7mpyk2ij0SLUZkwWk0KkNTWtsid2w7BIHmhm0Ecok=
open-cluster-management.io/addon-framework v0.12.0/go.mod h1:eReMWXrEHqtilwz5wzEpUrWw9Vfz0HJCH9pi3gOTZns=
open-cluster-management.io/api v0.16.1 h1:mS+4UGxHLPQd7CRM0gdFQdVaz139Lo2bkLfqSE0CDNU=
open-cluster-management.io/api v0.16.1/go.mod h1:9erZEWEn4bEqh0nIX2wA7f/s3KCuFycQdBrPrRzi0QM=
open-cluster-management.io/sdk-go v0.16.0 h1:Ui1jerkeLaNaJPu47xjOJ3lh+rJQgeJHD25ViQMzAMs=
open-cluster-management.io/sdk-go v0.16.0/go.mod h1:TyOjZC5YxyM5BRNgwTmLuTbHXX6xXqzYBXllrfoVp9w=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.3 h1:2770sDpzrjjsAtVhSeUFseziht227YAWYHLGNM8QPwY=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.3/go.mod h1:Ve9uj1L+deCXFrPOk1LpFXqTg7LCFzFso6PA48q/XZw=
sigs.k8s.io/cluster-inventory-api v0.0.0-20240730014211-ef0154379848 h1:WYPi2PdQyZwZkHG648v2jQl6deyCgyjJ0fkLYgUJ618=

View File

@@ -99,7 +99,7 @@ func (c *WorkHubManagerConfig) RunWorkHubManager(ctx context.Context, controller
clientHolder, err := work.NewClientHolderBuilder(config).
WithClientID(c.workOptions.CloudEventsClientID).
WithSourceID(sourceID).
WithCodecs(codec.NewManifestBundleCodec()).
WithCodec(codec.NewManifestBundleCodec()).
WithWorkClientWatcherStore(watcherStore).
NewSourceClientHolder(ctx)
if err != nil {

View File

@@ -7,9 +7,6 @@ import (
)
const (
manifestBundleCodecName = "manifestbundle"
manifestCodecName = "manifest"
defaultUserAgent = "work-agent"
)

View File

@@ -18,7 +18,6 @@ import (
workinformers "open-cluster-management.io/api/client/work/informers/externalversions"
workv1informers "open-cluster-management.io/api/client/work/informers/externalversions/work/v1"
ocmfeature "open-cluster-management.io/api/feature"
workv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
cloudeventswork "open-cluster-management.io/sdk-go/pkg/cloudevents/work"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/codec"
@@ -194,20 +193,6 @@ func (o *WorkAgentConfig) RunWorkloadAgent(ctx context.Context, controllerContex
return nil
}
func buildCodecs(codecNames []string, restMapper meta.RESTMapper) []generic.Codec[*workv1.ManifestWork] {
var codecs []generic.Codec[*workv1.ManifestWork]
for _, name := range codecNames {
if name == manifestBundleCodecName {
codecs = append(codecs, codec.NewManifestBundleCodec())
}
if name == manifestCodecName {
codecs = append(codecs, codec.NewManifestCodec(restMapper))
}
}
return codecs
}
func (o *WorkAgentConfig) newWorkClientAndInformer(
ctx context.Context,
restMapper meta.RESTMapper,
@@ -244,7 +229,7 @@ func (o *WorkAgentConfig) newWorkClientAndInformer(
clientHolder, err := cloudeventswork.NewClientHolderBuilder(config).
WithClientID(o.workOptions.CloudEventsClientID).
WithClusterName(o.agentOptions.SpokeClusterName).
WithCodecs(buildCodecs(o.workOptions.CloudEventsClientCodecs, restMapper)...).
WithCodec(codec.NewManifestBundleCodec()).
WithWorkClientWatcherStore(watcherStore).
NewAgentClientHolder(ctx)
if err != nil {

View File

@@ -146,7 +146,7 @@ var _ = ginkgo.BeforeSuite(func() {
sourceClient, err := work.NewClientHolderBuilder(util.NewMQTTSourceOptions(sourceID)).
WithClientID(fmt.Sprintf("%s-%s", sourceID, rand.String(5))).
WithSourceID(sourceID).
WithCodecs(sourcecodec.NewManifestBundleCodec()).
WithCodec(sourcecodec.NewManifestBundleCodec()).
WithWorkClientWatcherStore(watcherStore).
NewSourceClientHolder(envCtx)
gomega.Expect(err).ToNot(gomega.HaveOccurred())

9
vendor/modules.txt vendored
View File

@@ -1692,7 +1692,7 @@ k8s.io/utils/pointer
k8s.io/utils/ptr
k8s.io/utils/strings/slices
k8s.io/utils/trace
# open-cluster-management.io/addon-framework v0.11.1-0.20250303151103-b2865de5c39b
# open-cluster-management.io/addon-framework v0.12.0
## explicit; go 1.22.0
open-cluster-management.io/addon-framework/pkg/addonfactory
open-cluster-management.io/addon-framework/pkg/addonmanager
@@ -1708,7 +1708,7 @@ open-cluster-management.io/addon-framework/pkg/agent
open-cluster-management.io/addon-framework/pkg/assets
open-cluster-management.io/addon-framework/pkg/index
open-cluster-management.io/addon-framework/pkg/utils
# open-cluster-management.io/api v0.15.1-0.20250226073118-8c9793267c9e
# open-cluster-management.io/api v0.16.1
## explicit; go 1.22.0
open-cluster-management.io/api/addon/v1alpha1
open-cluster-management.io/api/client/addon/clientset/versioned
@@ -1774,12 +1774,10 @@ open-cluster-management.io/api/cluster/v1beta2
open-cluster-management.io/api/crdsv1beta1
open-cluster-management.io/api/feature
open-cluster-management.io/api/operator/v1
open-cluster-management.io/api/utils/work/v1/utils
open-cluster-management.io/api/utils/work/v1/workapplier
open-cluster-management.io/api/utils/work/v1/workvalidator
open-cluster-management.io/api/work/v1
open-cluster-management.io/api/work/v1alpha1
# open-cluster-management.io/sdk-go v0.15.1-0.20241125015855-1536c3970f8f
# open-cluster-management.io/sdk-go v0.16.0
## explicit; go 1.22.0
open-cluster-management.io/sdk-go/pkg/apis/cluster/v1alpha1
open-cluster-management.io/sdk-go/pkg/apis/cluster/v1beta1
@@ -1812,6 +1810,7 @@ open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload
open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client
open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec
open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/lister
open-cluster-management.io/sdk-go/pkg/cloudevents/work/statushash
open-cluster-management.io/sdk-go/pkg/cloudevents/work/store
open-cluster-management.io/sdk-go/pkg/cloudevents/work/utils
open-cluster-management.io/sdk-go/pkg/helpers

View File

@@ -1,76 +0,0 @@
package utils
import (
"fmt"
"reflect"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/kubernetes/scheme"
workv1 "open-cluster-management.io/api/work/v1"
)
var genericScheme = runtime.NewScheme()
// BuildResourceMeta builds manifest resource meta for the object
func BuildResourceMeta(
index int,
object runtime.Object,
restMapper meta.RESTMapper) (workv1.ManifestResourceMeta, schema.GroupVersionResource, error) {
resourceMeta := workv1.ManifestResourceMeta{
Ordinal: int32(index),
}
if object == nil || reflect.ValueOf(object).IsNil() {
return resourceMeta, schema.GroupVersionResource{}, nil
}
// set gvk
gvk, err := GuessObjectGroupVersionKind(object)
if err != nil {
return resourceMeta, schema.GroupVersionResource{}, err
}
resourceMeta.Group = gvk.Group
resourceMeta.Version = gvk.Version
resourceMeta.Kind = gvk.Kind
// set namespace/name
if accessor, e := meta.Accessor(object); e != nil {
err = fmt.Errorf("cannot access metadata of %v: %w", object, e)
} else {
resourceMeta.Namespace = accessor.GetNamespace()
resourceMeta.Name = accessor.GetName()
}
// set resource
if restMapper == nil {
return resourceMeta, schema.GroupVersionResource{}, err
}
mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return resourceMeta, schema.GroupVersionResource{}, fmt.Errorf("the server doesn't have a resource type %q", gvk.Kind)
}
resourceMeta.Resource = mapping.Resource.Resource
return resourceMeta, mapping.Resource, err
}
// GuessObjectGroupVersionKind returns GVK for the passed runtime object.
func GuessObjectGroupVersionKind(object runtime.Object) (*schema.GroupVersionKind, error) {
if gvk := object.GetObjectKind().GroupVersionKind(); len(gvk.Kind) > 0 {
return &gvk, nil
}
if kinds, _, _ := scheme.Scheme.ObjectKinds(object); len(kinds) > 0 {
return &kinds[0], nil
}
// otherwise fall back to genericScheme
if kinds, _, _ := genericScheme.ObjectKinds(object); len(kinds) > 0 {
return &kinds[0], nil
}
return nil, fmt.Errorf("cannot get gvk of %v", object)
}

View File

@@ -1,64 +0,0 @@
package workvalidator
import (
"fmt"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
workv1 "open-cluster-management.io/api/work/v1"
)
type Validator struct {
limit int
}
var ManifestValidator = &Validator{limit: 500 * 1024} // the default manifest limit is 500k.
func (m *Validator) WithLimit(limit int) {
m.limit = limit
}
func (m *Validator) ValidateManifests(manifests []workv1.Manifest) error {
if len(manifests) == 0 {
return errors.NewBadRequest("Workload manifests should not be empty")
}
totalSize := 0
for _, manifest := range manifests {
totalSize = totalSize + manifest.Size()
}
if totalSize > m.limit {
return fmt.Errorf("the size of manifests is %v bytes which exceeds the %v limit", totalSize, m.limit)
}
for _, manifest := range manifests {
err := validateManifest(manifest.Raw)
if err != nil {
return err
}
}
return nil
}
func validateManifest(manifest []byte) error {
// If the manifest cannot be decoded, return err
unstructuredObj := &unstructured.Unstructured{}
err := unstructuredObj.UnmarshalJSON(manifest)
if err != nil {
return err
}
// The object must have name specified, generateName is not allowed in manifestwork
if unstructuredObj.GetName() == "" {
return fmt.Errorf("name must be set in manifest")
}
if unstructuredObj.GetGenerateName() != "" {
return fmt.Errorf("generateName must not be set in manifest")
}
return nil
}

View File

@@ -7,6 +7,7 @@ import (
"time"
cloudevents "github.com/cloudevents/sdk-go/v2"
cloudeventstypes "github.com/cloudevents/sdk-go/v2/types"
"k8s.io/klog/v2"
@@ -22,7 +23,7 @@ import (
type CloudEventAgentClient[T ResourceObject] struct {
*baseClient
lister Lister[T]
codecs map[types.CloudEventsDataType]Codec[T]
codec Codec[T]
statusHashGetter StatusHashGetter[T]
agentID string
clusterName string
@@ -34,34 +35,30 @@ type CloudEventAgentClient[T ResourceObject] struct {
// protocols for sending/receiving the cloudevents.
// - lister gets the resources from a cache/store of an agent.
// - statusHashGetter calculates the resource status hash.
// - codecs is list of codecs for encoding/decoding a resource objet/cloudevent to/from a cloudevent/resource objet.
// - codec is used to encode/decode a resource objet/cloudevent to/from a cloudevent/resource objet.
func NewCloudEventAgentClient[T ResourceObject](
ctx context.Context,
agentOptions *options.CloudEventsAgentOptions,
lister Lister[T],
statusHashGetter StatusHashGetter[T],
codecs ...Codec[T],
codec Codec[T],
) (*CloudEventAgentClient[T], error) {
baseClient := &baseClient{
clientID: agentOptions.AgentID,
cloudEventsOptions: agentOptions.CloudEventsOptions,
cloudEventsRateLimiter: NewRateLimiter(agentOptions.EventRateLimit),
reconnectedChan: make(chan struct{}),
dataType: codec.EventDataType(),
}
if err := baseClient.connect(ctx); err != nil {
return nil, err
}
evtCodes := make(map[types.CloudEventsDataType]Codec[T])
for _, codec := range codecs {
evtCodes[codec.EventDataType()] = codec
}
return &CloudEventAgentClient[T]{
baseClient: baseClient,
lister: lister,
codecs: evtCodes,
codec: codec,
statusHashGetter: statusHashGetter,
agentID: agentOptions.AgentID,
clusterName: agentOptions.ClusterName,
@@ -76,64 +73,60 @@ func (c *CloudEventAgentClient[T]) ReconnectedChan() <-chan struct{} {
// Resync the resources spec by sending a spec resync request from the current to the given source.
func (c *CloudEventAgentClient[T]) Resync(ctx context.Context, source string) error {
// only resync the resources whose event data type is registered
for eventDataType := range c.codecs {
// list the resource objects that are maintained by the current agent with the given source
options := types.ListOptions{Source: source, ClusterName: c.clusterName, CloudEventsDataType: eventDataType}
objs, err := c.lister.List(options)
// list the resource objects that are maintained by the current agent with the given source
options := types.ListOptions{Source: source, ClusterName: c.clusterName, CloudEventsDataType: c.codec.EventDataType()}
objs, err := c.lister.List(options)
if err != nil {
return err
}
resources := &payload.ResourceVersionList{Versions: make([]payload.ResourceVersion, len(objs))}
for i, obj := range objs {
resourceVersion, err := strconv.ParseInt(obj.GetResourceVersion(), 10, 64)
if err != nil {
return err
}
resources := &payload.ResourceVersionList{Versions: make([]payload.ResourceVersion, len(objs))}
for i, obj := range objs {
resourceVersion, err := strconv.ParseInt(obj.GetResourceVersion(), 10, 64)
if err != nil {
return err
}
resources.Versions[i] = payload.ResourceVersion{
ResourceID: string(obj.GetUID()),
ResourceVersion: resourceVersion,
}
resources.Versions[i] = payload.ResourceVersion{
ResourceID: string(obj.GetUID()),
ResourceVersion: resourceVersion,
}
eventType := types.CloudEventsType{
CloudEventsDataType: eventDataType,
SubResource: types.SubResourceSpec,
Action: types.ResyncRequestAction,
}
evt := types.NewEventBuilder(c.agentID, eventType).
WithOriginalSource(source).
WithClusterName(c.clusterName).
NewEvent()
if err := evt.SetData(cloudevents.ApplicationJSON, resources); err != nil {
return fmt.Errorf("failed to set data to cloud event: %v", err)
}
if err := c.publish(ctx, evt); err != nil {
return err
}
increaseCloudEventsSentCounter(evt.Source(), c.clusterName, eventDataType.String())
}
eventType := types.CloudEventsType{
CloudEventsDataType: c.codec.EventDataType(),
SubResource: types.SubResourceSpec,
Action: types.ResyncRequestAction,
}
evt := types.NewEventBuilder(c.agentID, eventType).
WithOriginalSource(source).
WithClusterName(c.clusterName).
NewEvent()
if err := evt.SetData(cloudevents.ApplicationJSON, resources); err != nil {
return fmt.Errorf("failed to set data to cloud event: %v", err)
}
if err := c.publish(ctx, evt); err != nil {
return err
}
increaseCloudEventsSentCounter(evt.Source(), source, c.clusterName, c.codec.EventDataType().String(), string(eventType.SubResource), string(eventType.Action))
return nil
}
// Publish a resource status from an agent to a source.
func (c *CloudEventAgentClient[T]) Publish(ctx context.Context, eventType types.CloudEventsType, obj T) error {
codec, ok := c.codecs[eventType.CloudEventsDataType]
if !ok {
return fmt.Errorf("failed to find a codec for event %s", eventType.CloudEventsDataType)
if eventType.CloudEventsDataType != c.codec.EventDataType() {
return fmt.Errorf("unsupported cloudevent data type %s", eventType.CloudEventsDataType)
}
if eventType.SubResource != types.SubResourceStatus {
return fmt.Errorf("unsupported event eventType %s", eventType)
}
evt, err := codec.Encode(c.agentID, eventType, obj)
evt, err := c.codec.Encode(c.agentID, eventType, obj)
if err != nil {
return err
}
@@ -142,7 +135,8 @@ func (c *CloudEventAgentClient[T]) Publish(ctx context.Context, eventType types.
return err
}
increaseCloudEventsSentCounter(evt.Source(), c.clusterName, eventType.CloudEventsDataType.String())
originalSource, _ := cloudeventstypes.ToString(evt.Context.GetExtensions()[types.ExtensionOriginalSource])
increaseCloudEventsSentCounter(evt.Source(), originalSource, c.clusterName, eventType.CloudEventsDataType.String(), string(eventType.SubResource), string(eventType.Action))
return nil
}
@@ -163,7 +157,7 @@ func (c *CloudEventAgentClient[T]) receive(ctx context.Context, evt cloudevents.
return
}
increaseCloudEventsReceivedCounter(evt.Source(), c.clusterName, eventType.CloudEventsDataType.String())
increaseCloudEventsReceivedCounter(evt.Source(), c.clusterName, eventType.CloudEventsDataType.String(), string(eventType.SubResource), string(eventType.Action))
if eventType.Action == types.ResyncRequestAction {
if eventType.SubResource != types.SubResourceStatus {
@@ -185,13 +179,23 @@ func (c *CloudEventAgentClient[T]) receive(ctx context.Context, evt cloudevents.
return
}
codec, ok := c.codecs[eventType.CloudEventsDataType]
if !ok {
klog.Warningf("failed to find the codec for event %s, ignore", eventType.CloudEventsDataType)
evtExtensions := evt.Context.GetExtensions()
clusterName, err := cloudeventstypes.ToString(evtExtensions[types.ExtensionClusterName])
if err != nil {
klog.Errorf("failed to get clustername extension: %v", err)
return
}
if clusterName != c.clusterName {
klog.V(4).Infof("event clustername %s and agent clustername %s do not match, ignore", clusterName, c.clusterName)
return
}
obj, err := codec.Decode(&evt)
if eventType.CloudEventsDataType != c.codec.EventDataType() {
klog.Warningf("unsupported event data type %s, ignore", eventType.CloudEventsDataType)
return
}
obj, err := c.codec.Decode(&evt)
if err != nil {
klog.Errorf("failed to decode spec, %v", err)
return

View File

@@ -15,6 +15,7 @@ import (
"k8s.io/utils/clock"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
)
const (
@@ -43,6 +44,7 @@ type baseClient struct {
receiverChan chan int
reconnectedChan chan struct{}
clientReady bool
dataType types.CloudEventsDataType
}
func (c *baseClient) connect(ctx context.Context) error {
@@ -114,8 +116,8 @@ func (c *baseClient) publish(ctx context.Context, evt cloudevents.Event) error {
latency := time.Since(now)
if latency > longThrottleLatency {
klog.Warningf(fmt.Sprintf("Waited for %v due to client-side throttling, not priority and fairness, request: %s",
latency, evt))
klog.Warningf("Waited for %v due to client-side throttling, not priority and fairness, request: %s",
latency, evt.Context)
}
sendingCtx, err := c.cloudEventsOptions.WithContext(ctx, evt.Context)
@@ -127,7 +129,8 @@ func (c *baseClient) publish(ctx context.Context, evt cloudevents.Event) error {
return fmt.Errorf("the cloudevents client is not ready")
}
klog.V(4).Infof("Sending event: %v\n%s", sendingCtx, evt)
klog.V(4).Infof("Sending event: %v\n%s", sendingCtx, evt.Context)
klog.V(5).Infof("Sending event: evt=%s", evt)
if result := c.cloudEventsClient.Send(sendingCtx, evt); cloudevents.IsUndelivered(result) {
return fmt.Errorf("failed to send event %s, %v", evt.Context, result)
}
@@ -156,7 +159,9 @@ func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) {
if startReceiving {
go func() {
if err := c.cloudEventsClient.StartReceiver(receiverCtx, func(evt cloudevents.Event) {
klog.V(4).Infof("Received event: %s", evt)
klog.V(4).Infof("Received event: %s", evt.Context)
klog.V(5).Infof("Received event: evt=%s", evt)
receive(receiverCtx, evt)
}); err != nil {
runtime.HandleError(fmt.Errorf("failed to receive cloudevents, %v", err))
@@ -222,7 +227,7 @@ func (c *baseClient) setClientReady(ready bool) {
func (c *baseClient) newCloudEventsClient(ctx context.Context) (cloudevents.Client, error) {
var err error
c.cloudEventsProtocol, err = c.cloudEventsOptions.Protocol(ctx)
c.cloudEventsProtocol, err = c.cloudEventsOptions.Protocol(ctx, c.dataType)
if err != nil {
return nil, err
}

View File

@@ -15,16 +15,40 @@ const (
// Names of the labels added to metrics:
const (
metricsSourceLabel = "source"
metricsClusterLabel = "cluster"
metricsDataTypeLabel = "type"
metricsClientIDLabel = "client_id"
metricsWorkActionLabel = "action"
metricsWorkCodeLabel = "code"
metricsSourceLabel = "source"
metricsOriginalSourceLabel = "original_source"
metricsClusterLabel = "cluster"
metricsDataTypeLabel = "type"
metricsSubResourceLabel = "subresource"
metricsActionLabel = "action"
metricsClientIDLabel = "client_id"
metricsWorkActionLabel = "action"
metricsWorkCodeLabel = "code"
)
// cloudeventsMetricsLabels - Array of labels added to cloudevents metrics:
var cloudeventsMetricsLabels = []string{
const noneOriginalSource = "none"
// cloudeventsReceivedMetricsLabels - Array of labels added to cloudevents received metrics:
var cloudeventsReceivedMetricsLabels = []string{
metricsSourceLabel, // source
metricsClusterLabel, // cluster
metricsDataTypeLabel, // data type, e.g. manifests, manifestbundles
metricsSubResourceLabel, // subresource, eg, spec or status
metricsActionLabel, // action, eg, create, update, delete, resync_request, resync_response
}
// cloudeventsSentMetricsLabels - Array of labels added to cloudevents sent metrics:
var cloudeventsSentMetricsLabels = []string{
metricsSourceLabel, // source
metricsOriginalSourceLabel, // original source, if no, set to "none"
metricsClusterLabel, // cluster
metricsDataTypeLabel, // data type, e.g. manifests, manifestbundles
metricsSubResourceLabel, // subresource, eg, spec or status
metricsActionLabel, // action, eg, create, update, delete, resync_request, resync_response
}
// cloudeventsResyncMetricsLabels - Array of labels added to cloudevents resync metrics:
var cloudeventsResyncMetricsLabels = []string{
metricsSourceLabel, // source
metricsClusterLabel, // cluster
metricsDataTypeLabel, // data type, e.g. manifests, manifestbundles
@@ -53,28 +77,32 @@ const (
// The cloudevents received counter metric is a counter with a base metric name of 'received_total'
// and a help string of 'The total number of received CloudEvents.'
// For example, 2 CloudEvents received from source1 to agent on cluster1 with data type manifests would result in the following metrics:
// cloudevents_received_total{source="source1",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifests"} 2
// For example, 2 CloudEvents received from source1 to agent on cluster1 with data type manifests, one for resource create,
// another for resource updatewould result in the following metrics:
// cloudevents_received_total{source="source1",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifests",subresource="spec",action="create"} 1
// cloudevents_received_total{source="source1",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifests",subresource="spec",action="update"} 1
var cloudeventsReceivedCounterMetric = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: cloudeventsMetricsSubsystem,
Name: receivedCounterMetric,
Help: "The total number of received CloudEvents.",
},
cloudeventsMetricsLabels,
cloudeventsReceivedMetricsLabels,
)
// The cloudevents sent counter metric is a counter with a base metric name of 'sent_total'
// and a help string of 'The total number of sent CloudEvents.'
// For example, 2 CloudEvents sent from agent on cluster1 to source1 with data type manifestbundles would result in the following metrics:
// cloudevents_sent_total{source="cluster1-work-agent",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifestbundles"} 2
// For example, 1 cloudevent sent from source1 with data type manifestbundles for resource spec create (original source is empty),
// and 2 CloudEvents sent from agent on cluster1 back to source1 for resource status update would result in the following metrics:
// cloudevents_sent_total{source="source1",original_source="none",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifestbundles",subresource="spec",action="create"} 1
// cloudevents_sent_total{source="cluster1-work-agent",original_source="source1",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifestbundles",subresource="status",action="update"} 2
var cloudeventsSentCounterMetric = prometheus.NewCounterVec(
prometheus.CounterOpts{
Subsystem: cloudeventsMetricsSubsystem,
Name: sentCounterMetric,
Help: "The total number of sent CloudEvents.",
},
cloudeventsMetricsLabels,
cloudeventsSentMetricsLabels,
)
// The resource spec resync duration metric is a histogram with a base metric name of 'resource_spec_resync_duration_second'
@@ -108,7 +136,7 @@ var resourceSpecResyncDurationMetric = prometheus.NewHistogramVec(
30.0,
},
},
cloudeventsMetricsLabels,
cloudeventsResyncMetricsLabels,
)
// The resource status resync duration metric is a histogram with a base metric name of 'resource_status_resync_duration_second'
@@ -142,7 +170,7 @@ var resourceStatusResyncDurationMetric = prometheus.NewHistogramVec(
30.0,
},
},
cloudeventsMetricsLabels,
cloudeventsResyncMetricsLabels,
)
// The cloudevents client reconnected counter metric is a counter with a base metric name of 'client_reconnected_total'
@@ -198,21 +226,29 @@ func ResetCloudEventsMetrics() {
}
// increaseCloudEventsReceivedCounter increases the cloudevents sent counter metric:
func increaseCloudEventsReceivedCounter(source, cluster, dataType string) {
func increaseCloudEventsReceivedCounter(source, cluster, dataType, subresource, action string) {
labels := prometheus.Labels{
metricsSourceLabel: source,
metricsClusterLabel: cluster,
metricsDataTypeLabel: dataType,
metricsSourceLabel: source,
metricsClusterLabel: cluster,
metricsDataTypeLabel: dataType,
metricsSubResourceLabel: subresource,
metricsActionLabel: action,
}
cloudeventsReceivedCounterMetric.With(labels).Inc()
}
// increaseCloudEventsSentCounter increases the cloudevents sent counter metric:
func increaseCloudEventsSentCounter(source, cluster, dataType string) {
func increaseCloudEventsSentCounter(source, originalSource, cluster, dataType, subresource, action string) {
if originalSource == "" {
originalSource = noneOriginalSource
}
labels := prometheus.Labels{
metricsSourceLabel: source,
metricsClusterLabel: cluster,
metricsDataTypeLabel: dataType,
metricsSourceLabel: source,
metricsOriginalSourceLabel: originalSource,
metricsClusterLabel: cluster,
metricsDataTypeLabel: dataType,
metricsSubResourceLabel: subresource,
metricsActionLabel: action,
}
cloudeventsSentCounterMetric.With(labels).Inc()
}

View File

@@ -4,7 +4,9 @@ import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"os"
"reflect"
"sync"
"time"
@@ -212,3 +214,59 @@ func CachingCertificateLoader(certFile, keyFile string) func() (*tls.Certificate
return current.cert, current.err
}
}
// AutoLoadTLSConfig returns a TLS configuration for the given CA, client certificate, key files
// that can be used to establish a TLS connection.
// If CA is not provided, the system cert pool will be used.
// If client certificate and key are provided, they will be used for client authentication.
// And a goroutine will be started to periodically refresh client certificates for this connection.
func AutoLoadTLSConfig(caFile, certFile, keyFile string, conn Connection) (*tls.Config, error) {
var tlsConfig *tls.Config
if caFile != "" {
certPool, err := rootCAs(caFile)
if err != nil {
return nil, err
}
tlsConfig = &tls.Config{
RootCAs: certPool,
MinVersion: tls.VersionTLS13,
MaxVersion: tls.VersionTLS13,
}
if certFile != "" && keyFile != "" {
// Set client certificate and key getter for tls config
tlsConfig.GetClientCertificate = func(cri *tls.CertificateRequestInfo) (*tls.Certificate, error) {
return CachingCertificateLoader(certFile, keyFile)()
}
// Start a goroutine to periodically refresh client certificates for this connection
StartClientCertRotating(tlsConfig.GetClientCertificate, conn)
}
}
return tlsConfig, nil
}
// rootCAs returns a cert pool to verify the TLS connection.
// If the caFile is not provided, the default system certificate pool will be returned
// If the caFile is provided, the provided CA will be appended to the system certificate pool
func rootCAs(caFile string) (*x509.CertPool, error) {
certPool, err := x509.SystemCertPool()
if err != nil {
return nil, err
}
if len(caFile) == 0 {
klog.Warningf("CA file is not provided, TLS connection will be verified with the system cert pool")
return certPool, nil
}
caPEM, err := os.ReadFile(caFile)
if err != nil {
return nil, err
}
if ok := certPool.AppendCertsFromPEM(caPEM); !ok {
return nil, fmt.Errorf("invalid CA %s", caFile)
}
return certPool, nil
}

View File

@@ -33,7 +33,7 @@ func (o *grpcAgentOptions) WithContext(ctx context.Context, evtCtx cloudevents.E
return ctx, nil
}
func (o *grpcAgentOptions) Protocol(ctx context.Context) (options.CloudEventsProtocol, error) {
func (o *grpcAgentOptions) Protocol(ctx context.Context, dataType types.CloudEventsDataType) (options.CloudEventsProtocol, error) {
receiver, err := o.GetCloudEventsProtocol(
ctx,
func(err error) {
@@ -45,6 +45,7 @@ func (o *grpcAgentOptions) Protocol(ctx context.Context) (options.CloudEventsPro
// as a placeholder with all the sources.
Source: types.SourceAll,
ClusterName: o.clusterName,
DataType: dataType.String(),
}),
)
if err != nil {

View File

@@ -3,7 +3,6 @@ package grpc
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"os"
"time"
@@ -14,19 +13,96 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/credentials/oauth"
"google.golang.org/grpc/keepalive"
"gopkg.in/yaml.v2"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/cert"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protocol"
)
var _ cert.Connection = &GRPCDialer{}
// GRPCDialer is a gRPC dialer that connects to a gRPC server
// with the given URL, TLS configuration and keepalive options.
type GRPCDialer struct {
URL string
KeepAliveOptions KeepAliveOptions
TLSConfig *tls.Config
TokenFile string
conn *grpc.ClientConn
}
// KeepAliveOptions holds the keepalive options for the gRPC client.
type KeepAliveOptions struct {
Enable bool
Time time.Duration
Timeout time.Duration
PermitWithoutStream bool
}
// Dial connects to the gRPC server and returns a gRPC client connection.
func (d *GRPCDialer) Dial() (*grpc.ClientConn, error) {
// Prepare gRPC dial options.
dialOpts := []grpc.DialOption{}
if d.KeepAliveOptions.Enable {
dialOpts = append(dialOpts, grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: d.KeepAliveOptions.Time,
Timeout: d.KeepAliveOptions.Timeout,
PermitWithoutStream: d.KeepAliveOptions.PermitWithoutStream,
}))
}
if d.TLSConfig != nil {
// Enable TLS
dialOpts = append(dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(d.TLSConfig)))
if len(d.TokenFile) != 0 {
// Use token-based authentication if token file is provided.
token, err := os.ReadFile(d.TokenFile)
if err != nil {
return nil, fmt.Errorf("failed to read token file %s, %v", d.TokenFile, err)
}
perRPCCred := oauth.TokenSource{
TokenSource: oauth2.StaticTokenSource(&oauth2.Token{
AccessToken: string(token),
})}
// Add per-RPC credentials to the dial options.
dialOpts = append(dialOpts, grpc.WithPerRPCCredentials(perRPCCred))
}
// Establish a TLS connection to the gRPC server.
conn, err := grpc.Dial(d.URL, dialOpts...)
if err != nil {
return nil, fmt.Errorf("failed to connect to grpc server %s, %v", d.URL, err)
}
// Cache the connection for future use.
d.conn = conn
return d.conn, nil
}
// Insecure connection option; should not be used in production.
dialOpts = append(dialOpts, grpc.WithTransportCredentials(insecure.NewCredentials()))
conn, err := grpc.Dial(d.URL, dialOpts...)
if err != nil {
return nil, fmt.Errorf("failed to connect to grpc server %s, %v", d.URL, err)
}
// Cache the connection for future use.
d.conn = conn
return d.conn, nil
}
// Close closes the gRPC client connection.
func (d *GRPCDialer) Close() error {
if d.conn != nil {
return d.conn.Close()
}
return nil
}
// GRPCOptions holds the options that are used to build gRPC client.
type GRPCOptions struct {
URL string
CAFile string
ClientCertFile string
ClientKeyFile string
TokenFile string
Dialer *GRPCDialer
}
// GRPCConfig holds the information needed to build connect to gRPC server as a given user.
@@ -41,6 +117,26 @@ type GRPCConfig struct {
ClientKeyFile string `json:"clientKeyFile,omitempty" yaml:"clientKeyFile,omitempty"`
// TokenFile is the file path to a token file for authentication.
TokenFile string `json:"tokenFile,omitempty" yaml:"tokenFile,omitempty"`
// keepalive options
KeepAliveConfig KeepAliveConfig `json:"keepAliveConfig,omitempty" yaml:"keepAliveConfig,omitempty"`
}
// KeepAliveConfig holds the keepalive options for the gRPC client.
type KeepAliveConfig struct {
// Enable specifies whether the keepalive option is enabled.
// When disabled, other keepalive configurations are ignored. Default is false.
Enable bool `json:"enable,omitempty" yaml:"enable,omitempty"`
// Time sets the duration after which the client pings the server if no activity is seen.
// A minimum value of 10s is enforced if set below that. Default is 30s.
Time *time.Duration `json:"time,omitempty" yaml:"time,omitempty"`
// Timeout sets the duration the client waits for a response after a keepalive ping.
// If no response is received, the connection is closed. Default is 10s.
Timeout *time.Duration `json:"timeout,omitempty" yaml:"timeout,omitempty"`
// PermitWithoutStream determines if keepalive pings are sent when there are no active RPCs.
// If false, pings are not sent and Time and Timeout are ignored. Default is false.
PermitWithoutStream bool `json:"permitWithoutStream,omitempty" yaml:"permitWithoutStream,omitempty"`
}
// BuildGRPCOptionsFromFlags builds configs from a config filepath.
@@ -70,92 +166,47 @@ func BuildGRPCOptionsFromFlags(configPath string) (*GRPCOptions, error) {
return nil, fmt.Errorf("setting tokenFile requires caFile")
}
return &GRPCOptions{
URL: config.URL,
CAFile: config.CAFile,
ClientCertFile: config.ClientCertFile,
ClientKeyFile: config.ClientKeyFile,
TokenFile: config.TokenFile,
}, nil
options := &GRPCOptions{
Dialer: &GRPCDialer{
URL: config.URL,
TokenFile: config.TokenFile,
},
}
// Default keepalive options
keepAliveOptions := KeepAliveOptions{
Enable: false,
Time: 30 * time.Second,
Timeout: 10 * time.Second,
PermitWithoutStream: false,
}
keepAliveOptions.Enable = config.KeepAliveConfig.Enable
if config.KeepAliveConfig.Time != nil {
keepAliveOptions.Time = *config.KeepAliveConfig.Time
}
if config.KeepAliveConfig.Timeout != nil {
keepAliveOptions.Timeout = *config.KeepAliveConfig.Timeout
}
keepAliveOptions.PermitWithoutStream = config.KeepAliveConfig.PermitWithoutStream
// Set the keepalive options
options.Dialer.KeepAliveOptions = keepAliveOptions
// Set up TLS configuration for the gRPC connection, the certificates will be reloaded periodically.
options.Dialer.TLSConfig, err = cert.AutoLoadTLSConfig(config.CAFile, config.ClientCertFile, config.ClientKeyFile, options.Dialer)
if err != nil {
return nil, err
}
return options, nil
}
func NewGRPCOptions() *GRPCOptions {
return &GRPCOptions{}
}
func (o *GRPCOptions) GetGRPCClientConn() (*grpc.ClientConn, error) {
if len(o.CAFile) != 0 {
certPool, err := x509.SystemCertPool()
if err != nil {
return nil, err
}
caPEM, err := os.ReadFile(o.CAFile)
if err != nil {
return nil, err
}
if ok := certPool.AppendCertsFromPEM(caPEM); !ok {
return nil, fmt.Errorf("invalid CA %s", o.CAFile)
}
// Prepare gRPC dial options.
diaOpts := []grpc.DialOption{}
// Create a TLS configuration with CA pool and TLS 1.3.
tlsConfig := &tls.Config{
RootCAs: certPool,
MinVersion: tls.VersionTLS13,
MaxVersion: tls.VersionTLS13,
}
// Check if client certificate and key files are provided for mutual TLS.
if len(o.ClientCertFile) != 0 && len(o.ClientKeyFile) != 0 {
// Load client certificate and key pair.
clientCerts, err := tls.LoadX509KeyPair(o.ClientCertFile, o.ClientKeyFile)
if err != nil {
return nil, err
}
// Add client certificates to the TLS configuration.
tlsConfig.Certificates = []tls.Certificate{clientCerts}
diaOpts = append(diaOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
} else {
// token based authentication requires the configuration of transport credentials.
diaOpts = append(diaOpts, grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)))
if len(o.TokenFile) != 0 {
// Use token-based authentication if token file is provided.
token, err := os.ReadFile(o.TokenFile)
if err != nil {
return nil, err
}
perRPCCred := oauth.TokenSource{
TokenSource: oauth2.StaticTokenSource(&oauth2.Token{
AccessToken: string(token),
})}
// Add per-RPC credentials to the dial options.
diaOpts = append(diaOpts, grpc.WithPerRPCCredentials(perRPCCred))
}
}
// Establish a connection to the gRPC server.
conn, err := grpc.Dial(o.URL, diaOpts...)
if err != nil {
return nil, fmt.Errorf("failed to connect to grpc server %s, %v", o.URL, err)
}
return conn, nil
}
// Insecure connection option; should not be used in production.
conn, err := grpc.Dial(o.URL, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
return nil, fmt.Errorf("failed to connect to grpc server %s, %v", o.URL, err)
}
return conn, nil
}
func (o *GRPCOptions) GetCloudEventsProtocol(ctx context.Context, errorHandler func(error), clientOpts ...protocol.Option) (options.CloudEventsProtocol, error) {
conn, err := o.GetGRPCClientConn()
conn, err := o.Dialer.Dial()
if err != nil {
return nil, err
}
@@ -175,10 +226,15 @@ func (o *GRPCOptions) GetCloudEventsProtocol(ctx context.Context, errorHandler f
// TransientFailure.
// For a connected grpc client, if the connections is down, the grpc client connection state will be
// changed from Ready to Idle.
if connState == connectivity.TransientFailure || connState == connectivity.Idle {
// When client certificate is expired, client will proactively close the connection, which will result
// in connection state changed from Ready to Shutdown.
if connState == connectivity.TransientFailure || connState == connectivity.Idle || connState == connectivity.Shutdown {
errorHandler(fmt.Errorf("grpc connection is disconnected (state=%s)", connState))
ticker.Stop()
conn.Close()
if connState != connectivity.Shutdown {
// don't close the connection if it's already shutdown
conn.Close()
}
return // exit the goroutine as the error handler function will handle the reconnection.
}
}

View File

@@ -398,6 +398,9 @@ type SubscriptionRequest struct {
Source string `protobuf:"bytes,1,opt,name=source,proto3" json:"source,omitempty"`
// Optional. The cluster name of the respond CloudEvent(s).
ClusterName string `protobuf:"bytes,2,opt,name=cluster_name,json=clusterName,proto3" json:"cluster_name,omitempty"`
// Optional. The data type for the respond CloudEvent(s).
// eg. io.open-cluster-management.works.v1alpha1.manifests
DataType string `protobuf:"bytes,3,opt,name=data_type,json=dataType,proto3" json:"data_type,omitempty"`
}
func (x *SubscriptionRequest) Reset() {
@@ -446,6 +449,13 @@ func (x *SubscriptionRequest) GetClusterName() string {
return ""
}
func (x *SubscriptionRequest) GetDataType() string {
if x != nil {
return x.DataType
}
return ""
}
var File_cloudevent_proto protoreflect.FileDescriptor
var file_cloudevent_proto_rawDesc = []byte{
@@ -505,29 +515,31 @@ var file_cloudevent_proto_rawDesc = []byte{
0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x33, 0x0a, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74,
0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1d, 0x2e, 0x69, 0x6f, 0x2e, 0x63, 0x6c, 0x6f, 0x75,
0x64, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6c, 0x6f, 0x75, 0x64,
0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x22, 0x50, 0x0a, 0x13,
0x45, 0x76, 0x65, 0x6e, 0x74, 0x52, 0x05, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x22, 0x6d, 0x0a, 0x13,
0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75,
0x65, 0x73, 0x74, 0x12, 0x16, 0x0a, 0x06, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x18, 0x01, 0x20,
0x01, 0x28, 0x09, 0x52, 0x06, 0x73, 0x6f, 0x75, 0x72, 0x63, 0x65, 0x12, 0x21, 0x0a, 0x0c, 0x63,
0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28,
0x09, 0x52, 0x0b, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x32, 0xb3,
0x01, 0x0a, 0x11, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x53, 0x65, 0x72,
0x76, 0x69, 0x63, 0x65, 0x12, 0x46, 0x0a, 0x07, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x12,
0x21, 0x2e, 0x69, 0x6f, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73,
0x2e, 0x76, 0x31, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65,
0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x56, 0x0a, 0x09,
0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x26, 0x2e, 0x69, 0x6f, 0x2e, 0x63,
0x6c, 0x6f, 0x75, 0x64, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x75,
0x62, 0x73, 0x63, 0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x1a, 0x1d, 0x2e, 0x69, 0x6f, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x65, 0x76, 0x65, 0x6e,
0x74, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74,
0x22, 0x00, 0x30, 0x01, 0x42, 0x50, 0x5a, 0x4e, 0x6f, 0x70, 0x65, 0x6e, 0x2d, 0x63, 0x6c, 0x75,
0x73, 0x74, 0x65, 0x72, 0x2d, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x2e,
0x69, 0x6f, 0x2f, 0x73, 0x64, 0x6b, 0x2d, 0x67, 0x6f, 0x2f, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x65,
0x76, 0x65, 0x6e, 0x74, 0x73, 0x2f, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x69, 0x63, 0x2f, 0x6f, 0x70,
0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x62, 0x75, 0x66, 0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x09, 0x52, 0x0b, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x1b,
0x0a, 0x09, 0x64, 0x61, 0x74, 0x61, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28,
0x09, 0x52, 0x08, 0x64, 0x61, 0x74, 0x61, 0x54, 0x79, 0x70, 0x65, 0x32, 0xb3, 0x01, 0x0a, 0x11,
0x43, 0x6c, 0x6f, 0x75, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x53, 0x65, 0x72, 0x76, 0x69, 0x63,
0x65, 0x12, 0x46, 0x0a, 0x07, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x12, 0x21, 0x2e, 0x69,
0x6f, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x2e, 0x76, 0x31,
0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a,
0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75,
0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x00, 0x12, 0x56, 0x0a, 0x09, 0x53, 0x75, 0x62,
0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x26, 0x2e, 0x69, 0x6f, 0x2e, 0x63, 0x6c, 0x6f, 0x75,
0x64, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63,
0x72, 0x69, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d,
0x2e, 0x69, 0x6f, 0x2e, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x73, 0x2e,
0x76, 0x31, 0x2e, 0x43, 0x6c, 0x6f, 0x75, 0x64, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x22, 0x00, 0x30,
0x01, 0x42, 0x50, 0x5a, 0x4e, 0x6f, 0x70, 0x65, 0x6e, 0x2d, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65,
0x72, 0x2d, 0x6d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x6d, 0x65, 0x6e, 0x74, 0x2e, 0x69, 0x6f, 0x2f,
0x73, 0x64, 0x6b, 0x2d, 0x67, 0x6f, 0x2f, 0x63, 0x6c, 0x6f, 0x75, 0x64, 0x65, 0x76, 0x65, 0x6e,
0x74, 0x73, 0x2f, 0x67, 0x65, 0x6e, 0x65, 0x72, 0x69, 0x63, 0x2f, 0x6f, 0x70, 0x74, 0x69, 0x6f,
0x6e, 0x73, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66,
0x2f, 0x76, 0x31, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (

View File

@@ -75,6 +75,9 @@ message SubscriptionRequest {
string source = 1;
// Optional. The cluster name of the respond CloudEvent(s).
string cluster_name = 2;
// Optional. The data type for the respond CloudEvent(s).
// eg. io.open-cluster-management.works.v1alpha1.manifests
string data_type = 3;
}
service CloudEventService {

View File

@@ -11,6 +11,7 @@ type Option func(*Protocol) error
type SubscribeOption struct {
Source string
ClusterName string
DataType string // data type for the client, eg. "io.open-cluster-management.works.v1alpha1.manifestbundles"
}
// WithSubscribeOption sets the Subscribe configuration for the client.

View File

@@ -105,15 +105,16 @@ func (p *Protocol) OpenInbound(ctx context.Context) error {
subClient, err := p.client.Subscribe(ctx, &pbv1.SubscriptionRequest{
Source: p.subscribeOption.Source,
ClusterName: p.subscribeOption.ClusterName,
DataType: p.subscribeOption.DataType,
})
if err != nil {
return err
}
if p.subscribeOption.Source != "" {
logger.Infof("subscribing events for: %v", p.subscribeOption.Source)
logger.Infof("subscribing events for: %v with data types: %v", p.subscribeOption.Source, p.subscribeOption.DataType)
} else {
logger.Infof("subscribing events for cluster: %v", p.subscribeOption.ClusterName)
logger.Infof("subscribing events for cluster: %v with data types: %v", p.subscribeOption.ClusterName, p.subscribeOption.DataType)
}
go func() {

View File

@@ -7,6 +7,7 @@ import (
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protocol"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
)
type gRPCSourceOptions struct {
@@ -31,14 +32,15 @@ func (o *gRPCSourceOptions) WithContext(ctx context.Context, evtCtx cloudevents.
return ctx, nil
}
func (o *gRPCSourceOptions) Protocol(ctx context.Context) (options.CloudEventsProtocol, error) {
func (o *gRPCSourceOptions) Protocol(ctx context.Context, dataType types.CloudEventsDataType) (options.CloudEventsProtocol, error) {
receiver, err := o.GetCloudEventsProtocol(
ctx,
func(err error) {
o.errorChan <- err
},
protocol.WithSubscribeOption(&protocol.SubscribeOption{
Source: o.sourceID,
Source: o.sourceID,
DataType: dataType.String(),
}),
)
if err != nil {

View File

@@ -4,11 +4,8 @@ package kafka
import (
"context"
"fmt"
"strings"
cloudevents "github.com/cloudevents/sdk-go/v2"
cloudeventscontext "github.com/cloudevents/sdk-go/v2/context"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
confluent "github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2"
@@ -45,36 +42,13 @@ func NewAgentOptions(kafkaOptions *KafkaOptions, clusterName, agentID string) *o
// encode the source and agent to the message key
func (o *kafkaAgentOptions) WithContext(ctx context.Context, evtCtx cloudevents.EventContext) (context.Context, error) {
eventType, err := types.ParseCloudEventsType(evtCtx.GetType())
if err != nil {
return nil, err
}
// agent publishes event to status topic to send the resource status from a specified cluster
originalSource, err := evtCtx.GetExtension(types.ExtensionOriginalSource)
if err != nil {
return nil, err
}
if eventType.Action == types.ResyncRequestAction && originalSource == types.SourceAll {
// TODO support multiple sources, agent may need a source list instead of the broadcast
topic := strings.Replace(agentBroadcastTopic, "*", o.clusterName, 1)
return confluent.WithMessageKey(cloudeventscontext.WithTopic(ctx, topic), o.clusterName), nil
}
topic := strings.Replace(agentEventsTopic, "*", fmt.Sprintf("%s", originalSource), 1)
topic = strings.Replace(topic, "*", o.clusterName, 1)
messageKey := fmt.Sprintf("%s@%s", originalSource, o.clusterName)
return confluent.WithMessageKey(cloudeventscontext.WithTopic(ctx, topic), messageKey), nil
return ctx, nil
}
func (o *kafkaAgentOptions) Protocol(ctx context.Context) (options.CloudEventsProtocol, error) {
func (o *kafkaAgentOptions) Protocol(ctx context.Context, dataType types.CloudEventsDataType) (options.CloudEventsProtocol, error) {
protocol, err := confluent.New(confluent.WithConfigMap(&o.KafkaOptions.ConfigMap),
confluent.WithReceiverTopics([]string{
fmt.Sprintf("^%s", replaceLast(sourceEventsTopic, "*", o.clusterName)),
fmt.Sprintf("^%s", sourceBroadcastTopic),
}),
confluent.WithSenderTopic("agentevents"),
confluent.WithReceiverTopics([]string{sourceEventsTopic}),
confluent.WithSenderTopic(agentEventsTopic),
confluent.WithErrorHandler(func(ctx context.Context, err kafka.Error) {
o.errorChan <- err
}))
@@ -89,11 +63,3 @@ func (o *kafkaAgentOptions) Protocol(ctx context.Context) (options.CloudEventsPr
func (o *kafkaAgentOptions) ErrorChan() <-chan error {
return o.errorChan
}
func replaceLast(str, old, new string) string {
last := strings.LastIndex(str, old)
if last == -1 {
return str
}
return str[:last] + new + str[last+len(old):]
}

View File

@@ -13,16 +13,10 @@ import (
)
const (
// sourceEventsTopic is a topic for sources to publish their resource create/update/delete events, the first
// asterisk is a wildcard for source, the second asterisk is a wildcard for cluster.
sourceEventsTopic = "sourceevents.*.*"
// agentEventsTopic is a topic for agents to publish their resource status update events, the first
// asterisk is a wildcard for source, the second asterisk is a wildcard for cluster.
agentEventsTopic = "agentevents.*.*"
// sourceBroadcastTopic is for a source to publish its events to all agents, the asterisk is a wildcard for source.
sourceBroadcastTopic = "sourcebroadcast.*"
// agentBroadcastTopic is for a agent to publish its events to all sources, the asterisk is a wildcard for cluster.
agentBroadcastTopic = "agentbroadcast.*"
// sourceEventsTopic is a topic for sources to publish their events.
sourceEventsTopic = "sourceevents"
// agentEventsTopic is a topic for agents to publish their events.
agentEventsTopic = "agentevents"
)
type KafkaOptions struct {

View File

@@ -4,11 +4,8 @@ package kafka
import (
"context"
"fmt"
"strings"
cloudevents "github.com/cloudevents/sdk-go/v2"
cloudeventscontext "github.com/cloudevents/sdk-go/v2/context"
"github.com/confluentinc/confluent-kafka-go/v2/kafka"
confluent "github.com/cloudevents/sdk-go/protocol/kafka_confluent/v2"
@@ -43,36 +40,13 @@ func NewSourceOptions(kafkaOptions *KafkaOptions, sourceID string) *options.Clou
func (o *kafkaSourceOptions) WithContext(ctx context.Context,
evtCtx cloudevents.EventContext,
) (context.Context, error) {
eventType, err := types.ParseCloudEventsType(evtCtx.GetType())
if err != nil {
return nil, err
}
clusterName, err := evtCtx.GetExtension(types.ExtensionClusterName)
if err != nil {
return nil, err
}
if eventType.Action == types.ResyncRequestAction && clusterName == types.ClusterAll {
// source request to get resources status from all agents
topic := strings.Replace(sourceBroadcastTopic, "*", o.sourceID, 1)
return confluent.WithMessageKey(cloudeventscontext.WithTopic(ctx, topic), o.sourceID), nil
}
// source publishes event to source topic to send the resource spec to a specified cluster
messageKey := fmt.Sprintf("%s@%s", o.sourceID, clusterName)
topic := strings.Replace(sourceEventsTopic, "*", o.sourceID, 1)
topic = strings.Replace(topic, "*", fmt.Sprintf("%s", clusterName), 1)
return confluent.WithMessageKey(cloudeventscontext.WithTopic(ctx, topic), messageKey), nil
return ctx, nil
}
func (o *kafkaSourceOptions) Protocol(ctx context.Context) (options.CloudEventsProtocol, error) {
func (o *kafkaSourceOptions) Protocol(ctx context.Context, dataType types.CloudEventsDataType) (options.CloudEventsProtocol, error) {
protocol, err := confluent.New(confluent.WithConfigMap(&o.KafkaOptions.ConfigMap),
confluent.WithReceiverTopics([]string{
fmt.Sprintf("^%s", strings.Replace(agentEventsTopic, "*", o.sourceID, 1)),
fmt.Sprintf("^%s", agentBroadcastTopic),
}),
confluent.WithSenderTopic("sourceevents"),
confluent.WithReceiverTopics([]string{agentEventsTopic}),
confluent.WithSenderTopic(sourceEventsTopic),
confluent.WithErrorHandler(func(ctx context.Context, err kafka.Error) {
o.errorChan <- err
}))

View File

@@ -82,7 +82,7 @@ func (o *mqttAgentOptions) WithContext(ctx context.Context, evtCtx cloudevents.E
return cloudeventscontext.WithTopic(ctx, eventsTopic), nil
}
func (o *mqttAgentOptions) Protocol(ctx context.Context) (options.CloudEventsProtocol, error) {
func (o *mqttAgentOptions) Protocol(ctx context.Context, dataType types.CloudEventsDataType) (options.CloudEventsProtocol, error) {
subscribe := &paho.Subscribe{
Subscriptions: []paho.SubscribeOptions{
{

View File

@@ -3,7 +3,6 @@ package mqtt
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"net"
"os"
@@ -16,7 +15,6 @@ import (
"github.com/eclipse/paho.golang/paho"
"gopkg.in/yaml.v2"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/klog/v2"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/cert"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
@@ -162,34 +160,20 @@ func BuildMQTTOptionsFromFlags(configPath string) (*MQTTOptions, error) {
dialTimeout = *config.DialTimeout
}
if config.ClientCertFile != "" && config.ClientKeyFile != "" {
certPool, err := rootCAs(config.CAFile)
if err != nil {
return nil, err
}
tlsConfig := &tls.Config{
RootCAs: certPool,
GetClientCertificate: func(cri *tls.CertificateRequestInfo) (*tls.Certificate, error) {
return cert.CachingCertificateLoader(config.ClientCertFile, config.ClientKeyFile)()
},
}
options.Dialer = &MQTTDialer{
BrokerHost: config.BrokerHost,
TLSConfig: tlsConfig,
Timeout: dialTimeout,
}
// start a goroutine to periodically refresh client certificates for this connection
cert.StartClientCertRotating(tlsConfig.GetClientCertificate, options.Dialer)
return options, nil
}
options.Dialer = &MQTTDialer{
BrokerHost: config.BrokerHost,
Timeout: dialTimeout,
}
if config.ClientCertFile != "" && config.ClientKeyFile != "" {
// Set up TLS configuration for the MQTT connection if the client certificate and key are provided.
// the certificates will be reloaded periodically.
options.Dialer.TLSConfig, err = cert.AutoLoadTLSConfig(config.CAFile, config.ClientCertFile, config.ClientKeyFile, options.Dialer)
if err != nil {
return nil, err
}
}
return options, nil
}
@@ -334,29 +318,3 @@ func getAgentPubTopic(ctx context.Context) (*PubTopic, error) {
return nil, fmt.Errorf("invalid agent pub topic")
}
// rootCAs returns a cert pool to verify the TLS connection.
// If the caFile is not provided, the default system certificate pool will be returned
// If the caFile is provided, the provided CA will be appended to the system certificate pool
func rootCAs(caFile string) (*x509.CertPool, error) {
certPool, err := x509.SystemCertPool()
if err != nil {
return nil, err
}
if len(caFile) == 0 {
klog.Warningf("CA file is not provided, TLS connection will be verified with the system cert pool")
return certPool, nil
}
caPEM, err := os.ReadFile(caFile)
if err != nil {
return nil, err
}
if ok := certPool.AppendCertsFromPEM(caPEM); !ok {
return nil, fmt.Errorf("invalid CA %s", caFile)
}
return certPool, nil
}

View File

@@ -70,7 +70,7 @@ func (o *mqttSourceOptions) WithContext(ctx context.Context, evtCtx cloudevents.
return cloudeventscontext.WithTopic(ctx, eventsTopic), nil
}
func (o *mqttSourceOptions) Protocol(ctx context.Context) (options.CloudEventsProtocol, error) {
func (o *mqttSourceOptions) Protocol(ctx context.Context, dataType types.CloudEventsDataType) (options.CloudEventsProtocol, error) {
topicSource, err := getSourceFromEventsTopic(o.Topics.AgentEvents)
if err != nil {
return nil, err

View File

@@ -5,6 +5,7 @@ import (
cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/protocol"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
)
// CloudEventsOptions provides cloudevents clients to send/receive cloudevents based on different event protocol.
@@ -19,8 +20,8 @@ type CloudEventsOptions interface {
// the MQTT topic, for Kafka, the context should contain the message key, etc.
WithContext(ctx context.Context, evtContext cloudevents.EventContext) (context.Context, error)
// Protocol returns a specific protocol to initialize the cloudevents client
Protocol(ctx context.Context) (CloudEventsProtocol, error)
// Protocol returns a specific protocol to initialize the cloudevents client.
Protocol(ctx context.Context, dataType types.CloudEventsDataType) (CloudEventsProtocol, error)
// ErrorChan returns a chan which will receive the cloudevents connection error. The source/agent client will try to
// reconnect the when this error occurs.

View File

@@ -45,7 +45,7 @@ func (l *ConfigLoader) LoadConfig() (string, any, error) {
return "", nil, err
}
return grpcOptions.URL, grpcOptions, nil
return grpcOptions.Dialer.URL, grpcOptions, nil
case constants.ConfigTypeKafka:
kafkaOptions, err := kafka.BuildKafkaOptionsFromFlags(l.configPath)

View File

@@ -24,7 +24,7 @@ import (
type CloudEventSourceClient[T ResourceObject] struct {
*baseClient
lister Lister[T]
codecs map[types.CloudEventsDataType]Codec[T]
codec Codec[T]
statusHashGetter StatusHashGetter[T]
sourceID string
}
@@ -35,34 +35,30 @@ type CloudEventSourceClient[T ResourceObject] struct {
// sending/receiving the cloudevents.
// - lister gets the resources from a cache/store of a source.
// - statusHashGetter calculates the resource status hash.
// - codecs is list of codecs for encoding/decoding a resource objet/cloudevent to/from a cloudevent/resource objet.
// - codec is used to encode/decode a resource objet/cloudevent to/from a cloudevent/resource objet.
func NewCloudEventSourceClient[T ResourceObject](
ctx context.Context,
sourceOptions *options.CloudEventsSourceOptions,
lister Lister[T],
statusHashGetter StatusHashGetter[T],
codecs ...Codec[T],
codec Codec[T],
) (*CloudEventSourceClient[T], error) {
baseClient := &baseClient{
clientID: sourceOptions.SourceID,
cloudEventsOptions: sourceOptions.CloudEventsOptions,
cloudEventsRateLimiter: NewRateLimiter(sourceOptions.EventRateLimit),
reconnectedChan: make(chan struct{}),
dataType: codec.EventDataType(),
}
if err := baseClient.connect(ctx); err != nil {
return nil, err
}
evtCodes := make(map[types.CloudEventsDataType]Codec[T])
for _, codec := range codecs {
evtCodes[codec.EventDataType()] = codec
}
return &CloudEventSourceClient[T]{
baseClient: baseClient,
lister: lister,
codecs: evtCodes,
codec: codec,
statusHashGetter: statusHashGetter,
sourceID: sourceOptions.SourceID,
}, nil
@@ -74,61 +70,57 @@ func (c *CloudEventSourceClient[T]) ReconnectedChan() <-chan struct{} {
// Resync the resources status by sending a status resync request from the current source to a specified cluster.
func (c *CloudEventSourceClient[T]) Resync(ctx context.Context, clusterName string) error {
// only resync the resources whose event data type is registered
for eventDataType := range c.codecs {
// list the resource objects that are maintained by the current source with a specified cluster
options := types.ListOptions{Source: c.sourceID, ClusterName: clusterName, CloudEventsDataType: eventDataType}
objs, err := c.lister.List(options)
// list the resource objects that are maintained by the current source with a specified cluster
options := types.ListOptions{Source: c.sourceID, ClusterName: clusterName, CloudEventsDataType: c.codec.EventDataType()}
objs, err := c.lister.List(options)
if err != nil {
return err
}
hashes := &payload.ResourceStatusHashList{Hashes: make([]payload.ResourceStatusHash, len(objs))}
for i, obj := range objs {
statusHash, err := c.statusHashGetter(obj)
if err != nil {
return err
}
hashes := &payload.ResourceStatusHashList{Hashes: make([]payload.ResourceStatusHash, len(objs))}
for i, obj := range objs {
statusHash, err := c.statusHashGetter(obj)
if err != nil {
return err
}
hashes.Hashes[i] = payload.ResourceStatusHash{
ResourceID: string(obj.GetUID()),
StatusHash: statusHash,
}
hashes.Hashes[i] = payload.ResourceStatusHash{
ResourceID: string(obj.GetUID()),
StatusHash: statusHash,
}
eventType := types.CloudEventsType{
CloudEventsDataType: eventDataType,
SubResource: types.SubResourceStatus,
Action: types.ResyncRequestAction,
}
evt := types.NewEventBuilder(c.sourceID, eventType).WithClusterName(clusterName).NewEvent()
if err := evt.SetData(cloudevents.ApplicationJSON, hashes); err != nil {
return fmt.Errorf("failed to set data to cloud event: %v", err)
}
if err := c.publish(ctx, evt); err != nil {
return err
}
increaseCloudEventsSentCounter(evt.Source(), clusterName, eventDataType.String())
}
eventType := types.CloudEventsType{
CloudEventsDataType: c.codec.EventDataType(),
SubResource: types.SubResourceStatus,
Action: types.ResyncRequestAction,
}
evt := types.NewEventBuilder(c.sourceID, eventType).WithClusterName(clusterName).NewEvent()
if err := evt.SetData(cloudevents.ApplicationJSON, hashes); err != nil {
return fmt.Errorf("failed to set data to cloud event: %v", err)
}
if err := c.publish(ctx, evt); err != nil {
return err
}
increaseCloudEventsSentCounter(evt.Source(), "", clusterName, c.codec.EventDataType().String(), string(eventType.SubResource), string(eventType.Action))
return nil
}
// Publish a resource spec from a source to an agent.
func (c *CloudEventSourceClient[T]) Publish(ctx context.Context, eventType types.CloudEventsType, obj T) error {
if eventType.CloudEventsDataType != c.codec.EventDataType() {
return fmt.Errorf("unsupported event data type %s", eventType.CloudEventsDataType)
}
if eventType.SubResource != types.SubResourceSpec {
return fmt.Errorf("unsupported event eventType %s", eventType)
}
codec, ok := c.codecs[eventType.CloudEventsDataType]
if !ok {
return fmt.Errorf("failed to find the codec for event %s", eventType.CloudEventsDataType)
}
evt, err := codec.Encode(c.sourceID, eventType, obj)
evt, err := c.codec.Encode(c.sourceID, eventType, obj)
if err != nil {
return err
}
@@ -138,7 +130,7 @@ func (c *CloudEventSourceClient[T]) Publish(ctx context.Context, eventType types
}
clusterName := evt.Context.GetExtensions()[types.ExtensionClusterName].(string)
increaseCloudEventsSentCounter(evt.Source(), clusterName, eventType.CloudEventsDataType.String())
increaseCloudEventsSentCounter(evt.Source(), "", clusterName, eventType.CloudEventsDataType.String(), string(eventType.SubResource), string(eventType.Action))
return nil
}
@@ -166,7 +158,7 @@ func (c *CloudEventSourceClient[T]) receive(ctx context.Context, evt cloudevents
cn = ""
}
increaseCloudEventsReceivedCounter(evt.Source(), cn, eventType.CloudEventsDataType.String())
increaseCloudEventsReceivedCounter(evt.Source(), cn, eventType.CloudEventsDataType.String(), string(eventType.SubResource), string(eventType.Action))
if eventType.Action == types.ResyncRequestAction {
if eventType.SubResource != types.SubResourceSpec {
@@ -189,9 +181,8 @@ func (c *CloudEventSourceClient[T]) receive(ctx context.Context, evt cloudevents
return
}
codec, ok := c.codecs[eventType.CloudEventsDataType]
if !ok {
klog.Warningf("failed to find the codec for event %s, ignore", eventType.CloudEventsDataType)
if eventType.CloudEventsDataType != c.codec.EventDataType() {
klog.Warningf("unsupported event data type %s, ignore", eventType.CloudEventsDataType)
return
}
@@ -200,7 +191,7 @@ func (c *CloudEventSourceClient[T]) receive(ctx context.Context, evt cloudevents
return
}
obj, err := codec.Decode(&evt)
obj, err := c.codec.Decode(&evt)
if err != nil {
klog.Errorf("failed to decode status, %v", err)
return
@@ -299,8 +290,7 @@ func (c *CloudEventSourceClient[T]) respondResyncSpecRequest(
if err := c.publish(ctx, evt); err != nil {
return err
}
increaseCloudEventsSentCounter(evt.Source(), fmt.Sprintf("%s", clusterName), evtDataType.String())
increaseCloudEventsSentCounter(evt.Source(), "", fmt.Sprintf("%s", clusterName), evtDataType.String(), string(eventType.SubResource), string(eventType.Action))
}
return nil

View File

@@ -60,6 +60,9 @@ const (
// ExtensionOriginalSource is the cloud event extension key of the original source.
ExtensionOriginalSource = "originalsource"
// ExtensionStatusHash is the cloud event extension key of the status hash.
ExtensionStatusHash = "statushash"
)
// ResourceAction represents an action on a resource object on the source or agent.

View File

@@ -1,200 +0,0 @@
package codec
import (
"fmt"
"strconv"
"github.com/bwmarrin/snowflake"
cloudevents "github.com/cloudevents/sdk-go/v2"
cloudeventstypes "github.com/cloudevents/sdk-go/v2/types"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
kubetypes "k8s.io/apimachinery/pkg/types"
"open-cluster-management.io/api/utils/work/v1/utils"
"open-cluster-management.io/api/utils/work/v1/workvalidator"
workv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/common"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload"
)
var sequenceGenerator *snowflake.Node
func init() {
// init the snowflake id generator with node id 1 for each single agent. Each single agent has its own consumer id
// to be identified, and we can ensure the order of status update event from the same agent via sequence id. The
// events from different agents are independent, hence the ordering among them needs not to be guaranteed.
//
// The snowflake `NewNode` returns error only when the snowflake node id is less than 1 or great than 1024, so the
// error can be ignored here.
sequenceGenerator, _ = snowflake.NewNode(1)
}
// ManifestCodec is a codec to encode/decode a ManifestWork/cloudevent with ManifestBundle for an agent.
type ManifestCodec struct {
restMapper meta.RESTMapper
}
func NewManifestCodec(restMapper meta.RESTMapper) *ManifestCodec {
return &ManifestCodec{
restMapper: restMapper,
}
}
// EventDataType returns the event data type for `io.open-cluster-management.works.v1alpha1.manifests`.
func (c *ManifestCodec) EventDataType() types.CloudEventsDataType {
return payload.ManifestEventDataType
}
// Encode the status of a ManifestWork to a cloudevent with ManifestStatus.
func (c *ManifestCodec) Encode(source string, eventType types.CloudEventsType, work *workv1.ManifestWork) (*cloudevents.Event, error) {
if eventType.CloudEventsDataType != payload.ManifestEventDataType {
return nil, fmt.Errorf("unsupported cloudevents data type %s", eventType.CloudEventsDataType)
}
resourceVersion, err := strconv.ParseInt(work.ResourceVersion, 10, 64)
if err != nil {
return nil, fmt.Errorf("failed to parse the resourceversion of the work %s, %v", work.UID, err)
}
originalSource, ok := work.Labels[common.CloudEventsOriginalSourceLabelKey]
if !ok {
return nil, fmt.Errorf("failed to find originalsource from the work %s", work.UID)
}
// for the manifest deletion case: no manifest in the spec will be rebuilt in the cache upon agent restart.
// for status update cases other than manifest deletion case, there should be only one manifest in the work.
if !meta.IsStatusConditionTrue(work.Status.Conditions, common.ManifestsDeleted) {
if len(work.Spec.Workload.Manifests) != 1 {
return nil, fmt.Errorf("too many manifests in the work %s", work.UID)
}
}
evt := types.NewEventBuilder(source, eventType).
WithResourceID(string(work.UID)).
WithStatusUpdateSequenceID(sequenceGenerator.Generate().String()).
WithResourceVersion(resourceVersion).
WithClusterName(work.Namespace).
WithOriginalSource(originalSource).
NewEvent()
statusPayload := &payload.ManifestStatus{
Conditions: work.Status.Conditions,
}
if len(work.Status.ResourceStatus.Manifests) != 0 {
statusPayload.Status = &work.Status.ResourceStatus.Manifests[0]
}
if err := evt.SetData(cloudevents.ApplicationJSON, statusPayload); err != nil {
return nil, fmt.Errorf("failed to encode manifestwork status to a cloudevent: %v", err)
}
return &evt, nil
}
// Decode a cloudevent whose data is Manifest to a ManifestWork.
func (c *ManifestCodec) Decode(evt *cloudevents.Event) (*workv1.ManifestWork, error) {
eventType, err := types.ParseCloudEventsType(evt.Type())
if err != nil {
return nil, fmt.Errorf("failed to parse cloud event type %s, %v", evt.Type(), err)
}
if eventType.CloudEventsDataType != payload.ManifestEventDataType {
return nil, fmt.Errorf("unsupported cloudevents data type %s", eventType.CloudEventsDataType)
}
evtExtensions := evt.Context.GetExtensions()
resourceID, err := cloudeventstypes.ToString(evtExtensions[types.ExtensionResourceID])
if err != nil {
return nil, fmt.Errorf("failed to get resourceid extension: %v", err)
}
resourceVersion, err := cloudeventstypes.ToInteger(evtExtensions[types.ExtensionResourceVersion])
if err != nil {
return nil, fmt.Errorf("failed to get resourceversion extension: %v", err)
}
clusterName, err := cloudeventstypes.ToString(evtExtensions[types.ExtensionClusterName])
if err != nil {
return nil, fmt.Errorf("failed to get clustername extension: %v", err)
}
work := &workv1.ManifestWork{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
UID: kubetypes.UID(resourceID),
ResourceVersion: fmt.Sprintf("%d", resourceVersion),
Name: resourceID,
Namespace: clusterName,
Labels: map[string]string{
common.CloudEventsOriginalSourceLabelKey: evt.Source(),
},
Annotations: map[string]string{
common.CloudEventsDataTypeAnnotationKey: eventType.CloudEventsDataType.String(),
},
},
}
if _, ok := evtExtensions[types.ExtensionDeletionTimestamp]; ok {
deletionTimestamp, err := cloudeventstypes.ToTime(evtExtensions[types.ExtensionDeletionTimestamp])
if err != nil {
return nil, fmt.Errorf("failed to get deletiontimestamp, %v", err)
}
// In the case of an agent restart, the manifestwork finalizer is cleared.
// Explicitly re-add the finalizer to ensure proper cleanup of the manifestwork.
work.Finalizers = []string{workv1.ManifestWorkFinalizer}
work.DeletionTimestamp = &metav1.Time{Time: deletionTimestamp}
return work, nil
}
manifestPayload := &payload.Manifest{}
if err := evt.DataAs(manifestPayload); err != nil {
return nil, fmt.Errorf("failed to unmarshal event data %s, %v", string(evt.Data()), err)
}
unstructuredObj := manifestPayload.Manifest
rawJson, err := unstructuredObj.MarshalJSON()
if err != nil {
return nil, fmt.Errorf("failed to get manifest GVR from event %s, %v", string(evt.Data()), err)
}
work.Spec = workv1.ManifestWorkSpec{
Workload: workv1.ManifestsTemplate{
Manifests: []workv1.Manifest{{RawExtension: runtime.RawExtension{Raw: rawJson}}},
},
DeleteOption: manifestPayload.DeleteOption,
}
if manifestPayload.ConfigOption != nil {
_, gvr, err := utils.BuildResourceMeta(0, &unstructuredObj, c.restMapper)
if err != nil {
return nil, fmt.Errorf("failed to get manifest GVR from event %s, %v", string(evt.Data()), err)
}
work.Spec.ManifestConfigs = []workv1.ManifestConfigOption{
{
ResourceIdentifier: workv1.ResourceIdentifier{
Group: gvr.Group,
Resource: gvr.Resource,
Name: unstructuredObj.GetName(),
Namespace: unstructuredObj.GetNamespace(),
},
FeedbackRules: manifestPayload.ConfigOption.FeedbackRules,
UpdateStrategy: manifestPayload.ConfigOption.UpdateStrategy,
},
}
}
// validate the manifest
if err := workvalidator.ManifestValidator.ValidateManifests(work.Spec.Workload.Manifests); err != nil {
return nil, fmt.Errorf("manifest is invalid, %v", err)
}
return work, nil
}

View File

@@ -4,6 +4,7 @@ import (
"fmt"
"strconv"
"github.com/bwmarrin/snowflake"
cloudevents "github.com/cloudevents/sdk-go/v2"
cloudeventstypes "github.com/cloudevents/sdk-go/v2/types"
@@ -15,8 +16,21 @@ import (
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/common"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/statushash"
)
var sequenceGenerator *snowflake.Node
func init() {
// init the snowflake id generator with node id 1 for each single agent. Each single agent has its own consumer id
// to be identified, and we can ensure the order of status update event from the same agent via sequence id. The
// events from different agents are independent, hence the ordering among them needs not to be guaranteed.
//
// The snowflake `NewNode` returns error only when the snowflake node id is less than 1 or great than 1024, so the
// error can be ignored here.
sequenceGenerator, _ = snowflake.NewNode(1)
}
// ManifestBundleCodec is a codec to encode/decode a ManifestWork/cloudevent with ManifestBundle for an agent.
type ManifestBundleCodec struct{}
@@ -53,6 +67,13 @@ func (c *ManifestBundleCodec) Encode(source string, eventType types.CloudEventsT
WithOriginalSource(originalSource).
NewEvent()
statusHash, err := statushash.ManifestWorkStatusHash(work)
if err != nil {
return nil, err
}
evt.SetExtension(types.ExtensionStatusHash, statusHash)
manifestBundleStatus := &payload.ManifestBundleStatus{
Conditions: work.Status.Conditions,
ResourceStatus: work.Status.ResourceStatus.Manifests,

View File

@@ -17,6 +17,7 @@ import (
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/internal"
sourceclient "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client"
sourcelister "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/lister"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/statushash"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/store"
)
@@ -43,7 +44,7 @@ func (h *ClientHolder) ManifestWorks(namespace string) workv1client.ManifestWork
type ClientHolderBuilder struct {
config any
watcherStore store.WorkClientWatcherStore
codecs []generic.Codec[*workv1.ManifestWork]
codec generic.Codec[*workv1.ManifestWork]
sourceID string
clusterName string
clientID string
@@ -83,9 +84,9 @@ func (b *ClientHolderBuilder) WithClusterName(clusterName string) *ClientHolderB
return b
}
// WithCodecs add codecs when building a manifestwork client based on cloudevents.
func (b *ClientHolderBuilder) WithCodecs(codecs ...generic.Codec[*workv1.ManifestWork]) *ClientHolderBuilder {
b.codecs = codecs
// WithCodec add codec when building a manifestwork client based on cloudevents.
func (b *ClientHolderBuilder) WithCodec(codec generic.Codec[*workv1.ManifestWork]) *ClientHolderBuilder {
b.codec = codec
return b
}
@@ -127,8 +128,8 @@ func (b *ClientHolderBuilder) NewSourceClientHolder(ctx context.Context) (*Clien
ctx,
options,
sourcelister.NewWatcherStoreLister(b.watcherStore),
ManifestWorkStatusHash,
b.codecs...,
statushash.ManifestWorkStatusHash,
b.codec,
)
if err != nil {
return nil, err
@@ -200,8 +201,8 @@ func (b *ClientHolderBuilder) NewAgentClientHolder(ctx context.Context) (*Client
ctx,
options,
agentlister.NewWatcherStoreLister(b.watcherStore),
ManifestWorkStatusHash,
b.codecs...,
statushash.ManifestWorkStatusHash,
b.codec,
)
if err != nil {
return nil, err

View File

@@ -1,54 +0,0 @@
package payload
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
workv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
)
var ManifestEventDataType = types.CloudEventsDataType{
Group: "io.open-cluster-management.works",
Version: "v1alpha1",
Resource: "manifests",
}
// Manifest represents the data in a cloudevent, it contains a single manifest.
type Manifest struct {
// Manifest represents a resource to be deployed on managed cluster.
Manifest unstructured.Unstructured `json:"manifest"`
// DeleteOption represents deletion strategy when this manifest is deleted.
DeleteOption *workv1.DeleteOption `json:"deleteOption,omitempty"`
// ConfigOption represents the configuration of this manifest.
ConfigOption *ManifestConfigOption `json:"configOption,omitempty"`
}
// ManifestStatus represents the data in a cloudevent, it contains the status of a SingleManifest on a managed
// cluster.
type ManifestStatus struct {
// Conditions contains the different condition statuses for a SingleManifest on a managed cluster.
// Valid condition types are:
// 1. Applied represents the manifest of a SingleManifest is applied successfully on a managed cluster.
// 2. Progressing represents the manifest of a SingleManifest is being applied on a managed cluster.
// 3. Available represents the manifest of a SingleManifest exists on the managed cluster.
// 4. Degraded represents the current state of manifest of a SingleManifest does not match the desired state for a
// certain period.
// 5. Deleted represents the manifests of a SingleManifest is deleted from a managed cluster.
Conditions []metav1.Condition `json:"conditions"`
// Status represents the conditions of this manifest on a managed cluster.
Status *workv1.ManifestCondition `json:"status,omitempty"`
}
type ManifestConfigOption struct {
// FeedbackRules defines what resource status field should be returned.
// If it is not set or empty, no feedback rules will be honored.
FeedbackRules []workv1.FeedbackRule `json:"feedbackRules,omitempty"`
// UpdateStrategy defines the strategy to update this manifest.
// UpdateStrategy is Update if it is not set.
UpdateStrategy *workv1.UpdateStrategy `json:"updateStrategy,omitempty"`
}