upgrade go-sdk (#914)

Signed-off-by: Wei Liu <liuweixa@redhat.com>
This commit is contained in:
Wei Liu
2025-03-27 15:06:09 +08:00
committed by GitHub
parent 4a4d51ea80
commit 0c5377c34b
35 changed files with 1173 additions and 929 deletions

2
go.mod
View File

@@ -37,7 +37,7 @@ require (
k8s.io/utils v0.0.0-20240921022957-49e7df575cb6
open-cluster-management.io/addon-framework v0.12.0
open-cluster-management.io/api v0.16.1
open-cluster-management.io/sdk-go v0.16.0
open-cluster-management.io/sdk-go v0.16.1-0.20250327030116-e8c524c1a85b
sigs.k8s.io/cluster-inventory-api v0.0.0-20240730014211-ef0154379848
sigs.k8s.io/controller-runtime v0.19.3
sigs.k8s.io/kube-storage-version-migrator v0.0.6-0.20230721195810-5c8923c5ff96

4
go.sum
View File

@@ -491,8 +491,8 @@ open-cluster-management.io/addon-framework v0.12.0 h1:5j7mpyk2ij0SLUZkwWk0KkNTWt
open-cluster-management.io/addon-framework v0.12.0/go.mod h1:eReMWXrEHqtilwz5wzEpUrWw9Vfz0HJCH9pi3gOTZns=
open-cluster-management.io/api v0.16.1 h1:mS+4UGxHLPQd7CRM0gdFQdVaz139Lo2bkLfqSE0CDNU=
open-cluster-management.io/api v0.16.1/go.mod h1:9erZEWEn4bEqh0nIX2wA7f/s3KCuFycQdBrPrRzi0QM=
open-cluster-management.io/sdk-go v0.16.0 h1:Ui1jerkeLaNaJPu47xjOJ3lh+rJQgeJHD25ViQMzAMs=
open-cluster-management.io/sdk-go v0.16.0/go.mod h1:TyOjZC5YxyM5BRNgwTmLuTbHXX6xXqzYBXllrfoVp9w=
open-cluster-management.io/sdk-go v0.16.1-0.20250327030116-e8c524c1a85b h1:MjK6LuPi6kPMG40kMfzV2c7lffmwscUmspXmNGyezV4=
open-cluster-management.io/sdk-go v0.16.1-0.20250327030116-e8c524c1a85b/go.mod h1:TyOjZC5YxyM5BRNgwTmLuTbHXX6xXqzYBXllrfoVp9w=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.3 h1:2770sDpzrjjsAtVhSeUFseziht227YAWYHLGNM8QPwY=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.3/go.mod h1:Ve9uj1L+deCXFrPOk1LpFXqTg7LCFzFso6PA48q/XZw=
sigs.k8s.io/cluster-inventory-api v0.0.0-20240730014211-ef0154379848 h1:WYPi2PdQyZwZkHG648v2jQl6deyCgyjJ0fkLYgUJ618=

View File

@@ -14,10 +14,11 @@ import (
workinformers "open-cluster-management.io/api/client/work/informers/externalversions"
workv1informer "open-cluster-management.io/api/client/work/informers/externalversions/work/v1"
workapplier "open-cluster-management.io/sdk-go/pkg/apis/work/v1/applier"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/options"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/source/codec"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/store"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/store"
"open-cluster-management.io/ocm/pkg/work/hub/controllers/manifestworkreplicasetcontroller"
)
@@ -96,12 +97,11 @@ func (c *WorkHubManagerConfig) RunWorkHubManager(ctx context.Context, controller
return err
}
clientHolder, err := work.NewClientHolderBuilder(config).
WithClientID(c.workOptions.CloudEventsClientID).
clientOptions := options.NewGenericClientOptions(
config, codec.NewManifestBundleCodec(), c.workOptions.CloudEventsClientID).
WithSourceID(sourceID).
WithCodec(codec.NewManifestBundleCodec()).
WithWorkClientWatcherStore(watcherStore).
NewSourceClientHolder(ctx)
WithClientWatcherStore(watcherStore)
clientHolder, err := work.NewSourceClientHolder(ctx, clientOptions)
if err != nil {
return err
}

View File

@@ -18,10 +18,11 @@ import (
workinformers "open-cluster-management.io/api/client/work/informers/externalversions"
workv1informers "open-cluster-management.io/api/client/work/informers/externalversions/work/v1"
ocmfeature "open-cluster-management.io/api/feature"
cloudeventsoptions "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/options"
cloudeventswork "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/agent/codec"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/store"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
cloudeventswork "open-cluster-management.io/sdk-go/pkg/cloudevents/work"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/codec"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/store"
"open-cluster-management.io/ocm/pkg/common/options"
"open-cluster-management.io/ocm/pkg/features"
@@ -226,12 +227,11 @@ func (o *WorkAgentConfig) newWorkClientAndInformer(
return "", nil, nil, err
}
clientHolder, err := cloudeventswork.NewClientHolderBuilder(config).
WithClientID(o.workOptions.CloudEventsClientID).
clientOptions := cloudeventsoptions.NewGenericClientOptions(
config, codec.NewManifestBundleCodec(), o.workOptions.CloudEventsClientID).
WithClusterName(o.agentOptions.SpokeClusterName).
WithCodec(codec.NewManifestBundleCodec()).
WithWorkClientWatcherStore(watcherStore).
NewAgentClientHolder(ctx)
WithClientWatcherStore(watcherStore)
clientHolder, err := cloudeventswork.NewAgentClientHolder(ctx, clientOptions)
if err != nil {
return "", nil, nil, err
}

View File

@@ -24,9 +24,10 @@ import (
workclientset "open-cluster-management.io/api/client/work/clientset/versioned"
ocmfeature "open-cluster-management.io/api/feature"
workapiv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work"
sourcecodec "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec"
workstore "open-cluster-management.io/sdk-go/pkg/cloudevents/work/store"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/options"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work"
sourcecodec "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/source/codec"
workstore "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/store"
"open-cluster-management.io/ocm/pkg/features"
"open-cluster-management.io/ocm/pkg/work/helper"
@@ -143,12 +144,12 @@ var _ = ginkgo.BeforeSuite(func() {
})
gomega.Expect(err).ToNot(gomega.HaveOccurred())
sourceClient, err := work.NewClientHolderBuilder(util.NewMQTTSourceOptions(sourceID)).
WithClientID(fmt.Sprintf("%s-%s", sourceID, rand.String(5))).
WithSourceID(sourceID).
WithCodec(sourcecodec.NewManifestBundleCodec()).
WithWorkClientWatcherStore(watcherStore).
NewSourceClientHolder(envCtx)
clientOptions := options.NewGenericClientOptions(
util.NewMQTTSourceOptions(sourceID),
sourcecodec.NewManifestBundleCodec(),
fmt.Sprintf("%s-%s", sourceID, rand.String(5)),
).WithSourceID(sourceID).WithClientWatcherStore(watcherStore)
sourceClient, err := work.NewSourceClientHolder(envCtx, clientOptions)
gomega.Expect(err).ToNot(gomega.HaveOccurred())
hubWorkClient = sourceClient.WorkInterface()

View File

@@ -19,7 +19,7 @@ import (
"k8s.io/client-go/util/retry"
workapiv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/store"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/common"
commonoptions "open-cluster-management.io/ocm/pkg/common/options"
"open-cluster-management.io/ocm/pkg/work/spoke"
@@ -59,7 +59,7 @@ var _ = ginkgo.Describe("ManifestWork", func() {
o.WorkloadSourceDriver = sourceDriver
o.WorkloadSourceConfig = sourceConfigFileName
if sourceDriver != util.KubeDriver {
expectedFinalizer = store.ManifestWorkFinalizer
expectedFinalizer = common.ResourceFinalizer
o.CloudEventsClientID = fmt.Sprintf("%s-work-agent", clusterName)
o.CloudEventsClientCodecs = []string{"manifestbundle"}
}

30
vendor/modules.txt vendored
View File

@@ -1777,7 +1777,7 @@ open-cluster-management.io/api/operator/v1
open-cluster-management.io/api/utils/work/v1/workapplier
open-cluster-management.io/api/work/v1
open-cluster-management.io/api/work/v1alpha1
# open-cluster-management.io/sdk-go v0.16.0
# open-cluster-management.io/sdk-go v0.16.1-0.20250327030116-e8c524c1a85b
## explicit; go 1.22.0
open-cluster-management.io/sdk-go/pkg/apis/cluster/v1alpha1
open-cluster-management.io/sdk-go/pkg/apis/cluster/v1beta1
@@ -1788,6 +1788,20 @@ open-cluster-management.io/sdk-go/pkg/apis/work/v1/validator
open-cluster-management.io/sdk-go/pkg/basecontroller/events
open-cluster-management.io/sdk-go/pkg/basecontroller/factory
open-cluster-management.io/sdk-go/pkg/certrotation
open-cluster-management.io/sdk-go/pkg/cloudevents/clients/common
open-cluster-management.io/sdk-go/pkg/cloudevents/clients/errors
open-cluster-management.io/sdk-go/pkg/cloudevents/clients/options
open-cluster-management.io/sdk-go/pkg/cloudevents/clients/statushash
open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store
open-cluster-management.io/sdk-go/pkg/cloudevents/clients/utils
open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work
open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/agent/client
open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/agent/codec
open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/internal
open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/payload
open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/source/client
open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/source/codec
open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/store
open-cluster-management.io/sdk-go/pkg/cloudevents/constants
open-cluster-management.io/sdk-go/pkg/cloudevents/generic
open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options
@@ -1799,20 +1813,6 @@ open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/kafka
open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/mqtt
open-cluster-management.io/sdk-go/pkg/cloudevents/generic/payload
open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types
open-cluster-management.io/sdk-go/pkg/cloudevents/work
open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/client
open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/codec
open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/lister
open-cluster-management.io/sdk-go/pkg/cloudevents/work/common
open-cluster-management.io/sdk-go/pkg/cloudevents/work/errors
open-cluster-management.io/sdk-go/pkg/cloudevents/work/internal
open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload
open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client
open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/codec
open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/lister
open-cluster-management.io/sdk-go/pkg/cloudevents/work/statushash
open-cluster-management.io/sdk-go/pkg/cloudevents/work/store
open-cluster-management.io/sdk-go/pkg/cloudevents/work/utils
open-cluster-management.io/sdk-go/pkg/helpers
open-cluster-management.io/sdk-go/pkg/patcher
# sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.3

View File

@@ -1,8 +1,10 @@
package common
import (
certificatev1 "k8s.io/api/certificates/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
clusterv1 "open-cluster-management.io/api/cluster/v1"
workv1 "open-cluster-management.io/api/work/v1"
)
@@ -10,10 +12,10 @@ const (
// CloudEventsDataTypeAnnotationKey is the key of the cloudevents data type annotation.
CloudEventsDataTypeAnnotationKey = "cloudevents.open-cluster-management.io/datatype"
// CloudEventsResourceVersionAnnotationKey is the key of the manifestwork resourceversion annotation.
// CloudEventsResourceVersionAnnotationKey is the key of the resource resourceversion annotation.
//
// This annotation is used for tracing the ManifestWork specific changes, the value of this annotation
// should be a sequence number representing the ManifestWork specific generation.
// This annotation is used for tracing a resource specific changes, the value of this annotation
// should be a sequence number representing the resource specific generation.
CloudEventsResourceVersionAnnotationKey = "cloudevents.open-cluster-management.io/resourceversion"
// CloudEventsSequenceIDAnnotationKey is the key of the status update event sequence ID.
@@ -24,14 +26,22 @@ const (
// CloudEventsOriginalSourceLabelKey is the key of the cloudevents original source label.
const CloudEventsOriginalSourceLabelKey = "cloudevents.open-cluster-management.io/originalsource"
// ManifestsDeleted represents the manifests are deleted.
const ManifestsDeleted = "Deleted"
const (
CreateRequestAction = "create_request"
UpdateRequestAction = "update_request"
DeleteRequestAction = "delete_request"
)
// ResourceDeleted represents a resource is deleted.
const ResourceDeleted = "Deleted"
const ResourceFinalizer = "cloudevents.open-cluster-management.io/resource-cleanup"
var ManagedClusterGK = schema.GroupKind{Group: clusterv1.GroupName, Kind: "ManagedCluster"}
var ManagedClusterGR = schema.GroupResource{Group: clusterv1.GroupName, Resource: "managedclusters"}
var ManifestWorkGK = schema.GroupKind{Group: workv1.GroupName, Kind: "ManifestWork"}
var ManifestWorkGR = schema.GroupResource{Group: workv1.GroupName, Resource: "manifestworks"}
var CSRGK = schema.GroupKind{Group: certificatev1.GroupName, Kind: "CertificateSigningRequest"}
var CSRGR = schema.GroupResource{Group: certificatev1.GroupName, Resource: "certificatesigningrequests"}

View File

@@ -11,7 +11,7 @@ import (
const StatusReasonPublishError metav1.StatusReason = "PublishError"
// NewPublishError returns an error indicating the work could not be published, and the client can try again.
// NewPublishError returns an error indicating a resource could not be published, and the client can try again.
func NewPublishError(qualifiedResource schema.GroupResource, name string, err error) *errors.StatusError {
return &errors.StatusError{
ErrStatus: metav1.Status{

View File

@@ -0,0 +1,226 @@
package options
import (
"context"
"fmt"
"k8s.io/klog/v2"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/statushash"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
)
type GenericClientOptions[T generic.ResourceObject] struct {
config any
codec generic.Codec[T]
watcherStore store.ClientWatcherStore[T]
clientID string
sourceID string
clusterName string
resync bool
}
// NewGenericClientOptions create a GenericClientOptions
//
// - config, available configurations:
//
// MQTTOptions (*mqtt.MQTTOptions): builds a generic cloudevents client with MQTT
//
// GRPCOptions (*grpc.GRPCOptions): builds a generic cloudevents client with GRPC
//
// KafkaOptions (*kafka.KafkaOptions): builds a generic cloudevents client with Kafka
//
// - codec, the codec for resource
//
// - clientID, the client ID for generic cloudevents client.
//
// TODO using a specified config instead of any
func NewGenericClientOptions[T generic.ResourceObject](config any,
codec generic.Codec[T],
clientID string) *GenericClientOptions[T] {
return &GenericClientOptions[T]{
config: config,
codec: codec,
clientID: clientID,
resync: true,
}
}
// WithClientWatcherStore set the ClientWatcherStore. The client uses this store to caches the resources and
// watch the resource events. For agent, the AgentInformerWatcherStore is used by default
//
// TODO provide a default ClientWatcherStore for source.
func (o *GenericClientOptions[T]) WithClientWatcherStore(store store.ClientWatcherStore[T]) *GenericClientOptions[T] {
o.watcherStore = store
return o
}
// WithSourceID set the source ID when building a client for a source.
func (o *GenericClientOptions[T]) WithSourceID(sourceID string) *GenericClientOptions[T] {
o.sourceID = sourceID
return o
}
// WithClusterName set the managed cluster name when building a client for an agent.
func (o *GenericClientOptions[T]) WithClusterName(clusterName string) *GenericClientOptions[T] {
o.clusterName = clusterName
return o
}
// WithResyncEnabled control the client resync (Default is true), if it's true, the resync happens when
// 1. after the client's store is initiated
// 2. the client reconnected
func (o *GenericClientOptions[T]) WithResyncEnabled(resync bool) *GenericClientOptions[T] {
o.resync = resync
return o
}
func (o *GenericClientOptions[T]) ClusterName() string {
return o.clusterName
}
func (o *GenericClientOptions[T]) SourceID() string {
return o.sourceID
}
func (o *GenericClientOptions[T]) WatcherStore() store.ClientWatcherStore[T] {
return o.watcherStore
}
func (o *GenericClientOptions[T]) AgentClient(ctx context.Context) (*generic.CloudEventAgentClient[T], error) {
if len(o.clientID) == 0 {
return nil, fmt.Errorf("client id is required")
}
if len(o.clusterName) == 0 {
return nil, fmt.Errorf("cluster name is required")
}
if o.watcherStore == nil {
o.watcherStore = store.NewAgentInformerWatcherStore[T]()
}
options, err := generic.BuildCloudEventsAgentOptions(o.config, o.clusterName, o.clientID)
if err != nil {
return nil, err
}
cloudEventsClient, err := generic.NewCloudEventAgentClient(
ctx,
options,
store.NewAgentWatcherStoreLister(o.watcherStore),
statushash.StatusHash,
o.codec,
)
if err != nil {
return nil, err
}
// start to subscribe
cloudEventsClient.Subscribe(ctx, o.watcherStore.HandleReceivedResource)
// start a go routine to receive client reconnect signal
go func() {
for {
select {
case <-ctx.Done():
return
case <-cloudEventsClient.ReconnectedChan():
if !o.resync {
klog.V(4).Infof("resync is disabled, do nothing")
continue
}
// when receiving a client reconnected signal, we resync all sources for this agent
// TODO after supporting multiple sources, we should only resync agent known sources
if err := cloudEventsClient.Resync(ctx, types.SourceAll); err != nil {
klog.Errorf("failed to send resync request, %v", err)
}
}
}
}()
if !o.resync {
return cloudEventsClient, nil
}
// start a go routine to resync the works after this client's store is initiated
go func() {
if store.WaitForStoreInit(ctx, o.watcherStore.HasInitiated) {
if err := cloudEventsClient.Resync(ctx, types.SourceAll); err != nil {
klog.Errorf("failed to send resync request, %v", err)
}
}
}()
return cloudEventsClient, nil
}
func (o *GenericClientOptions[T]) SourceClient(ctx context.Context) (*generic.CloudEventSourceClient[T], error) {
if len(o.clientID) == 0 {
return nil, fmt.Errorf("client id is required")
}
if len(o.sourceID) == 0 {
return nil, fmt.Errorf("source id is required")
}
if o.watcherStore == nil {
return nil, fmt.Errorf("a watcher store is required")
}
options, err := generic.BuildCloudEventsSourceOptions(o.config, o.clientID, o.sourceID)
if err != nil {
return nil, err
}
cloudEventsClient, err := generic.NewCloudEventSourceClient(
ctx,
options,
store.NewSourceWatcherStoreLister(o.watcherStore),
statushash.StatusHash,
o.codec,
)
if err != nil {
return nil, err
}
// start to subscribe
cloudEventsClient.Subscribe(ctx, o.watcherStore.HandleReceivedResource)
// start a go routine to receive client reconnect signal
go func() {
for {
select {
case <-ctx.Done():
return
case <-cloudEventsClient.ReconnectedChan():
if !o.resync {
klog.V(4).Infof("resync is disabled, do nothing")
continue
}
// when receiving a client reconnected signal, we resync all clusters for this source
if err := cloudEventsClient.Resync(ctx, types.ClusterAll); err != nil {
klog.Errorf("failed to send resync request, %v", err)
}
}
}
}()
if !o.resync {
return cloudEventsClient, nil
}
// start a go routine to resync the works after this client's store is initiated
go func() {
if store.WaitForStoreInit(ctx, o.watcherStore.HasInitiated) {
if err := cloudEventsClient.Resync(ctx, types.ClusterAll); err != nil {
klog.Errorf("failed to send resync request, %v", err)
}
}
}()
return cloudEventsClient, nil
}

View File

@@ -0,0 +1,33 @@
package statushash
import (
"crypto/sha256"
"encoding/json"
"fmt"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
)
// StatusHash returns the SHA256 checksum of a resource status.
func StatusHash[T generic.ResourceObject](resource T) (string, error) {
u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(resource)
if err != nil {
return "", err
}
status, found, err := unstructured.NestedMap(u, "status")
if err != nil {
return "", err
}
if !found {
return "", fmt.Errorf("no status for the resource %s", resource.GetUID())
}
statusBytes, err := json.Marshal(status)
if err != nil {
return "", fmt.Errorf("failed to marshal resource status, %v", err)
}
return fmt.Sprintf("%x", sha256.Sum256(statusBytes)), nil
}

View File

@@ -0,0 +1,75 @@
package store
import (
"fmt"
"sync"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/utils"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
)
// BaseClientWatchStore implements the ClientWatcherStore ListAll/List/Get methods
type BaseClientWatchStore[T generic.ResourceObject] struct {
sync.RWMutex
Store cache.Store
Initiated bool
}
// List the resources from the store with the list options
func (s *BaseClientWatchStore[T]) List(namespace string, opts metav1.ListOptions) ([]T, error) {
s.RLock()
defer s.RUnlock()
resources, err := utils.ListResourcesWithOptions[T](s.Store, namespace, opts)
if err != nil {
return nil, err
}
return resources, nil
}
// Get a resource from the store
func (s *BaseClientWatchStore[T]) Get(namespace, name string) (resource T, exists bool, err error) {
s.RLock()
defer s.RUnlock()
key := name
if len(namespace) != 0 {
key = fmt.Sprintf("%s/%s", namespace, name)
}
obj, exists, err := s.Store.GetByKey(key)
if err != nil {
return resource, false, err
}
if !exists {
return resource, false, nil
}
res, ok := obj.(T)
if !ok {
return resource, false, fmt.Errorf("unknown type %T", obj)
}
return res, true, nil
}
// List all of resources from the store
func (s *BaseClientWatchStore[T]) ListAll() ([]T, error) {
s.RLock()
defer s.RUnlock()
resources := []T{}
for _, obj := range s.Store.List() {
if res, ok := obj.(T); ok {
resources = append(resources, res)
}
}
return resources, nil
}

View File

@@ -0,0 +1,133 @@
package store
import (
"fmt"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/utils"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
)
// AgentInformerWatcherStore extends the BaseClientWatchStore.
// It gets/lists the resources from the given informer store and send
// the resource add/update/delete event to the watch channel directly.
//
// It is used for building resource agent client.
type AgentInformerWatcherStore[T generic.ResourceObject] struct {
BaseClientWatchStore[T]
Watcher *Watcher
informer cache.SharedIndexInformer
}
func NewAgentInformerWatcherStore[T generic.ResourceObject]() *AgentInformerWatcherStore[T] {
return &AgentInformerWatcherStore[T]{
BaseClientWatchStore: BaseClientWatchStore[T]{},
Watcher: NewWatcher(),
}
}
func (s *AgentInformerWatcherStore[T]) Add(resource runtime.Object) error {
s.Watcher.Receive(watch.Event{Type: watch.Added, Object: resource})
return nil
}
func (s *AgentInformerWatcherStore[T]) Update(resource runtime.Object) error {
s.Watcher.Receive(watch.Event{Type: watch.Modified, Object: resource})
return nil
}
func (s *AgentInformerWatcherStore[T]) Delete(resource runtime.Object) error {
s.Watcher.Receive(watch.Event{Type: watch.Deleted, Object: resource})
return nil
}
func (s *AgentInformerWatcherStore[T]) HandleReceivedResource(action types.ResourceAction, resource T) error {
switch action {
case types.Added:
newObj, err := utils.ToRuntimeObject(resource)
if err != nil {
return err
}
return s.Add(newObj)
case types.Modified:
newObj, err := meta.Accessor(resource)
if err != nil {
return err
}
lastObj, exists, err := s.Get(newObj.GetNamespace(), newObj.GetName())
if err != nil {
return err
}
if !exists {
return fmt.Errorf("the resource %s/%s does not exist", newObj.GetNamespace(), newObj.GetName())
}
// prevent the resource from being updated if it is deleting
if !lastObj.GetDeletionTimestamp().IsZero() {
klog.Warningf("the resource %s/%s is deleting, ignore the update", newObj.GetNamespace(), newObj.GetName())
return nil
}
updated, err := utils.ToRuntimeObject(resource)
if err != nil {
return err
}
return s.Update(updated)
case types.Deleted:
newObj, err := meta.Accessor(resource)
if err != nil {
return err
}
if newObj.GetDeletionTimestamp().IsZero() {
return nil
}
if len(newObj.GetFinalizers()) != 0 {
return nil
}
last, exists, err := s.Get(newObj.GetNamespace(), newObj.GetName())
if err != nil {
return err
}
if !exists {
return nil
}
deletingObj, err := utils.ToRuntimeObject(last)
if err != nil {
return err
}
return s.Delete(deletingObj)
default:
return fmt.Errorf("unsupported resource action %s", action)
}
}
func (s *AgentInformerWatcherStore[T]) GetWatcher(namespace string, opts metav1.ListOptions) (watch.Interface, error) {
return s.Watcher, nil
}
func (s *AgentInformerWatcherStore[T]) HasInitiated() bool {
return s.Initiated && s.informer.HasSynced()
}
func (s *AgentInformerWatcherStore[T]) SetInformer(informer cache.SharedIndexInformer) {
s.informer = informer
s.Store = informer.GetStore()
s.Initiated = true
}

View File

@@ -5,11 +5,12 @@ import (
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/klog/v2"
workv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
)
@@ -18,37 +19,37 @@ const syncedPollPeriod = 100 * time.Millisecond
// StoreInitiated is a function that can be used to determine if a store has initiated.
type StoreInitiated func() bool
// WorkClientWatcherStore provides a watcher with a work store.
type WorkClientWatcherStore interface {
// GetWatcher returns a watcher to receive work changes.
// ClientWatcherStore provides a watcher with a resource store.
type ClientWatcherStore[T generic.ResourceObject] interface {
// GetWatcher returns a watcher to receive resource changes.
GetWatcher(namespace string, opts metav1.ListOptions) (watch.Interface, error)
// HandleReceivedWork handles the client received work events.
HandleReceivedWork(action types.ResourceAction, work *workv1.ManifestWork) error
// HandleReceivedResource handles the client received resource events.
HandleReceivedResource(action types.ResourceAction, resource T) error
// Add will be called by work client when adding works. The implementation is based on the specific
// Add will be called by resource client when adding resources. The implementation is based on the specific
// watcher store, in some case, it does not need to update a store, but just send a watch event.
Add(work *workv1.ManifestWork) error
Add(resource runtime.Object) error
// Update will be called by work client when updating works. The implementation is based on the specific
// Update will be called by resource client when updating works. The implementation is based on the specific
// watcher store, in some case, it does not need to update a store, but just send a watch event.
Update(work *workv1.ManifestWork) error
Update(resource runtime.Object) error
// Delete will be called by work client when deleting works. The implementation is based on the specific
// Delete will be called by resource client when deleting works. The implementation is based on the specific
// watcher store, in some case, it does not need to update a store, but just send a watch event.
Delete(work *workv1.ManifestWork) error
Delete(resource runtime.Object) error
// List returns the works from store for a given namespace with list options
List(namespace string, opts metav1.ListOptions) (*workv1.ManifestWorkList, error)
// List returns the resources from store for a given namespace with list options
List(namespace string, opts metav1.ListOptions) ([]T, error)
// ListAll list all of the works from store
ListAll() ([]*workv1.ManifestWork, error)
// ListAll list all of the resources from store
ListAll() ([]T, error)
// Get returns a work from store with work namespace and name
Get(namespace, name string) (*workv1.ManifestWork, bool, error)
// Get returns a resource from store with resource namespace and name
Get(namespace, name string) (T, bool, error)
// HasInitiated marks the store has been initiated, A resync may be required after the store is initiated
// when building a work client.
// when building a resource client.
HasInitiated() bool
}

View File

@@ -0,0 +1,59 @@
package store
import (
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/common"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
)
// AgentWatcherStoreLister list the resources from ClientWatcherStore on an agent
type AgentWatcherStoreLister[T generic.ResourceObject] struct {
store ClientWatcherStore[T]
}
func NewAgentWatcherStoreLister[T generic.ResourceObject](store ClientWatcherStore[T]) *AgentWatcherStoreLister[T] {
return &AgentWatcherStoreLister[T]{
store: store,
}
}
// List returns the resources from a ClientWatcherStore with list options
func (l *AgentWatcherStoreLister[T]) List(options types.ListOptions) ([]T, error) {
opts := metav1.ListOptions{}
if options.Source != types.SourceAll {
opts.LabelSelector = fmt.Sprintf("%s=%s", common.CloudEventsOriginalSourceLabelKey, options.Source)
}
list, err := l.store.List("", opts)
if err != nil {
return nil, err
}
return list, nil
}
// SourceWatcherStoreLister list the resources from a ClientWatcherStore on a source.
type SourceWatcherStoreLister[T generic.ResourceObject] struct {
store ClientWatcherStore[T]
}
func NewSourceWatcherStoreLister[T generic.ResourceObject](store ClientWatcherStore[T]) *SourceWatcherStoreLister[T] {
return &SourceWatcherStoreLister[T]{
store: store,
}
}
// List returns the resources from a ClientWatcherStore with list options.
func (l *SourceWatcherStoreLister[T]) List(options types.ListOptions) ([]T, error) {
list, err := l.store.List(options.ClusterName, metav1.ListOptions{})
if err != nil {
return nil, err
}
return list, nil
}

View File

@@ -0,0 +1,76 @@
package store
import (
"sync"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/klog/v2"
)
// Watcher implements the watch.Interface.
type Watcher struct {
sync.RWMutex
result chan watch.Event
done chan struct{}
stopped bool
}
var _ watch.Interface = &Watcher{}
func NewWatcher() *Watcher {
return &Watcher{
// It's easy for a consumer to add buffering via an extra
// goroutine/channel, but impossible for them to remove it,
// so nonbuffered is better.
result: make(chan watch.Event),
// If the watcher is externally stopped there is no receiver anymore
// and the send operations on the result channel, especially the
// error reporting might block forever.
// Therefore a dedicated stop channel is used to resolve this blocking.
done: make(chan struct{}),
}
}
// ResultChan implements Interface.
func (w *Watcher) ResultChan() <-chan watch.Event {
return w.result
}
// Stop implements Interface.
func (w *Watcher) Stop() {
// Call Close() exactly once by locking and setting a flag.
w.Lock()
defer w.Unlock()
// closing a closed channel always panics, therefore check before closing
select {
case <-w.done:
close(w.result)
default:
w.stopped = true
close(w.done)
}
}
// Receive a event from the work client and sends down the result channel.
func (w *Watcher) Receive(evt watch.Event) {
if w.isStopped() {
// this watcher is stopped, do nothing.
return
}
if klog.V(4).Enabled() {
obj, _ := meta.Accessor(evt.Object)
klog.V(4).Infof("Receive the event %v for %v", evt.Type, obj.GetName())
}
w.result <- evt
}
func (w *Watcher) isStopped() bool {
w.RLock()
defer w.RUnlock()
return w.stopped
}

View File

@@ -0,0 +1,222 @@
package utils
import (
"encoding/json"
"fmt"
"github.com/bwmarrin/snowflake"
jsonpatch "github.com/evanphx/json-patch"
"github.com/google/uuid"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/api/validation"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
metav1validation "k8s.io/apimachinery/pkg/apis/meta/v1/validation"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/common"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
)
// Patch applies the patch to a resource with the patch type.
func Patch[T generic.ResourceObject](patchType types.PatchType, work T, patchData []byte) (resource T, err error) {
workData, err := json.Marshal(work)
if err != nil {
return resource, err
}
var patchedData []byte
switch patchType {
case types.JSONPatchType:
var patchObj jsonpatch.Patch
patchObj, err = jsonpatch.DecodePatch(patchData)
if err != nil {
return resource, err
}
patchedData, err = patchObj.Apply(workData)
if err != nil {
return resource, err
}
case types.MergePatchType:
patchedData, err = jsonpatch.MergePatch(workData, patchData)
if err != nil {
return resource, err
}
default:
return resource, fmt.Errorf("unsupported patch type: %s", patchType)
}
patchedWork := new(T)
if err := json.Unmarshal(patchedData, patchedWork); err != nil {
return resource, err
}
return *patchedWork, nil
}
// ListResourcesWithOptions retrieves the resources from store which matches the options.
func ListResourcesWithOptions[T generic.ResourceObject](store cache.Store, namespace string, opts metav1.ListOptions) ([]T, error) {
var err error
labelSelector := labels.Everything()
fieldSelector := fields.Everything()
if len(opts.LabelSelector) != 0 {
labelSelector, err = labels.Parse(opts.LabelSelector)
if err != nil {
return nil, fmt.Errorf("invalid labels selector %q: %v", opts.LabelSelector, err)
}
}
if len(opts.FieldSelector) != 0 {
fieldSelector, err = fields.ParseSelector(opts.FieldSelector)
if err != nil {
return nil, fmt.Errorf("invalid fields selector %q: %v", opts.FieldSelector, err)
}
}
resources := []T{}
// list with labels
if err := cache.ListAll(store, labelSelector, func(obj any) {
resourceMeta, ok := obj.(metav1.Object)
if !ok {
klog.Warningf("the object in store %T is not a meta object", obj)
return
}
if namespace != metav1.NamespaceAll && resourceMeta.GetNamespace() != namespace {
return
}
workFieldSet := fields.Set{
"metadata.name": resourceMeta.GetName(),
"metadata.namespace": resourceMeta.GetNamespace(),
}
if !fieldSelector.Matches(workFieldSet) {
return
}
resource, ok := obj.(T)
if !ok {
return
}
resources = append(resources, resource)
}); err != nil {
return nil, err
}
return resources, nil
}
// ValidateResourceMetadata validates the metadata of the given resource
func ValidateResourceMetadata[T generic.ResourceObject](resource T) field.ErrorList {
errs := field.ErrorList{}
fldPath := field.NewPath("metadata")
obj, err := meta.Accessor(resource)
if err != nil {
errs = append(errs, field.TypeInvalid(fldPath, resource, err.Error()))
return errs
}
if obj.GetUID() == "" {
errs = append(errs, field.Required(fldPath.Child("uid"), "field not set"))
return errs
}
if obj.GetResourceVersion() == "" {
errs = append(errs, field.Required(fldPath.Child("resourceVersion"), "field not set"))
return errs
}
if obj.GetName() == "" {
errs = append(errs, field.Required(fldPath.Child("name"), "field not set"))
return errs
}
for _, msg := range validation.ValidateNamespaceName(obj.GetName(), false) {
errs = append(errs, field.Invalid(fldPath.Child("name"), obj.GetName(), msg))
}
if obj.GetNamespace() != "" {
for _, msg := range validation.ValidateNamespaceName(obj.GetNamespace(), false) {
errs = append(errs, field.Invalid(fldPath.Child("namespace"), obj.GetNamespace(), msg))
}
}
errs = append(errs, metav1validation.ValidateLabels(obj.GetLabels(), fldPath.Child("labels"))...)
errs = append(errs, validation.ValidateAnnotations(obj.GetAnnotations(), fldPath.Child("annotations"))...)
errs = append(errs, validation.ValidateFinalizers(obj.GetFinalizers(), fldPath.Child("finalizers"))...)
return errs
}
// ToRuntimeObject convert a resource to a runtime Object
func ToRuntimeObject[T generic.ResourceObject](resource T) (runtime.Object, error) {
obj, ok := any(resource).(runtime.Object)
if !ok {
return nil, fmt.Errorf("object %T does not implement the runtime Object interfaces", resource)
}
return obj.DeepCopyObject(), nil
}
// CompareSnowflakeSequenceIDs compares two snowflake sequence IDs.
// Returns true if the current ID is greater than the last.
// If the last sequence ID is empty, then the current is greater.
func CompareSnowflakeSequenceIDs(last, current string) (bool, error) {
if current != "" && last == "" {
return true, nil
}
lastSID, err := snowflake.ParseString(last)
if err != nil {
return false, fmt.Errorf("unable to parse last sequence ID: %s, %v", last, err)
}
currentSID, err := snowflake.ParseString(current)
if err != nil {
return false, fmt.Errorf("unable to parse current sequence ID: %s %v", current, err)
}
if currentSID.Node() != lastSID.Node() {
return false, fmt.Errorf("sequence IDs (%s,%s) are not from the same node", last, current)
}
if currentSID.Time() != lastSID.Time() {
return currentSID.Time() > lastSID.Time(), nil
}
return currentSID.Step() > lastSID.Step(), nil
}
// UID returns a v5 UUID based on sourceID, groupResource, namespace and name to make sure it is consistent
func UID(sourceID, groupResource, namespace, name string) string {
id := fmt.Sprintf("%s-%s-%s-%s", sourceID, groupResource, namespace, name)
return uuid.NewSHA1(uuid.NameSpaceOID, []byte(id)).String()
}
// EnsureResourceFinalizer ensures the resource finalizer in the given finalizers
func EnsureResourceFinalizer(finalizers []string) []string {
has := false
for _, f := range finalizers {
if f == common.ResourceFinalizer {
has = true
break
}
}
if !has {
finalizers = append(finalizers, common.ResourceFinalizer)
}
return finalizers
}

View File

@@ -0,0 +1,50 @@
package utils
import (
"bytes"
"fmt"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/util/validation/field"
workv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/sdk-go/pkg/apis/work/v1/validator"
)
func ValidateWork(work *workv1.ManifestWork) field.ErrorList {
errs := field.ErrorList{}
if work.Namespace == "" {
errs = append(errs, field.Required(field.NewPath("metadata").Child("namespace"), "field not set"))
}
if metaErrs := ValidateResourceMetadata(work); len(metaErrs) != 0 {
errs = append(errs, metaErrs...)
}
if err := validator.ManifestValidator.ValidateManifests(work.Spec.Workload.Manifests); err != nil {
errs = append(errs, field.Invalid(field.NewPath("spec"), "spec", err.Error()))
}
return errs
}
// encode ensures the given work's manifests are encoded
func EncodeManifests(work *workv1.ManifestWork) error {
for index, manifest := range work.Spec.Workload.Manifests {
if manifest.Raw == nil {
if manifest.Object == nil {
return fmt.Errorf("the Object and Raw of the manifest[%d] for the work (%s/%s) are both `nil`",
index, work.Namespace, work.Name)
}
var buf bytes.Buffer
if err := unstructured.UnstructuredJSONScheme.Encode(manifest.Object, &buf); err != nil {
return err
}
work.Spec.Workload.Manifests[index].Raw = buf.Bytes()
}
}
return nil
}

View File

@@ -17,12 +17,12 @@ import (
workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1"
workv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/common"
cloudeventserrors "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/errors"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/utils"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/common"
workerrors "open-cluster-management.io/sdk-go/pkg/cloudevents/work/errors"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/store"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/utils"
)
// ManifestWorkAgentClient implements the ManifestWorkInterface. It sends the manifestworks status back to source by
@@ -31,7 +31,7 @@ type ManifestWorkAgentClient struct {
sync.RWMutex
cloudEventsClient *generic.CloudEventAgentClient[*workv1.ManifestWork]
watcherStore store.WorkClientWatcherStore
watcherStore store.ClientWatcherStore[*workv1.ManifestWork]
// this namespace should be same with the cluster name to which this client subscribes
namespace string
@@ -40,9 +40,9 @@ type ManifestWorkAgentClient struct {
var _ workv1client.ManifestWorkInterface = &ManifestWorkAgentClient{}
func NewManifestWorkAgentClient(
cloudEventsClient *generic.CloudEventAgentClient[*workv1.ManifestWork],
watcherStore store.WorkClientWatcherStore,
clusterName string,
watcherStore store.ClientWatcherStore[*workv1.ManifestWork],
cloudEventsClient *generic.CloudEventAgentClient[*workv1.ManifestWork],
) *ManifestWorkAgentClient {
return &ManifestWorkAgentClient{
cloudEventsClient: cloudEventsClient,
@@ -102,7 +102,12 @@ func (c *ManifestWorkAgentClient) List(ctx context.Context, opts metav1.ListOpti
}
generic.IncreaseWorkProcessedCounter("list", metav1.StatusSuccess)
return works, nil
items := []workv1.ManifestWork{}
for _, work := range works {
items = append(items, *work)
}
return &workv1.ManifestWorkList{Items: items}, nil
}
func (c *ManifestWorkAgentClient) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
@@ -169,7 +174,7 @@ func (c *ManifestWorkAgentClient) Patch(ctx context.Context, name string, pt kub
// publish the status update event to source, source will check the resource version
// and reject the update if it's status update is outdated.
if err := c.cloudEventsClient.Publish(ctx, eventType, newWork); err != nil {
returnErr := workerrors.NewPublishError(common.ManifestWorkGR, name, err)
returnErr := cloudeventserrors.NewPublishError(common.ManifestWorkGR, name, err)
generic.IncreaseWorkProcessedCounter("patch", string(returnErr.ErrStatus.Reason))
return nil, returnErr
}
@@ -223,7 +228,7 @@ func (c *ManifestWorkAgentClient) Patch(ctx context.Context, name string, pt kub
// it back to source
if !newWork.DeletionTimestamp.IsZero() && len(newWork.Finalizers) == 0 {
meta.SetStatusCondition(&newWork.Status.Conditions, metav1.Condition{
Type: common.ManifestsDeleted,
Type: common.ResourceDeleted,
Status: metav1.ConditionTrue,
Reason: "ManifestsDeleted",
Message: fmt.Sprintf("The manifests are deleted from the cluster %s", newWork.Namespace),
@@ -231,7 +236,7 @@ func (c *ManifestWorkAgentClient) Patch(ctx context.Context, name string, pt kub
eventType.Action = common.DeleteRequestAction
if err := c.cloudEventsClient.Publish(ctx, eventType, newWork); err != nil {
returnErr := workerrors.NewPublishError(common.ManifestWorkGR, name, err)
returnErr := cloudeventserrors.NewPublishError(common.ManifestWorkGR, name, err)
generic.IncreaseWorkProcessedCounter("delete", string(returnErr.ErrStatus.Reason))
return nil, returnErr
}

View File

@@ -13,10 +13,10 @@ import (
workv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/sdk-go/pkg/apis/work/v1/validator"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/common"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/statushash"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/payload"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/common"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/statushash"
)
var sequenceGenerator *snowflake.Node
@@ -67,7 +67,7 @@ func (c *ManifestBundleCodec) Encode(source string, eventType types.CloudEventsT
WithOriginalSource(originalSource).
NewEvent()
statusHash, err := statushash.ManifestWorkStatusHash(work)
statusHash, err := statushash.StatusHash(work)
if err != nil {
return nil, err
}

View File

@@ -0,0 +1,59 @@
package work
import (
"context"
workclientset "open-cluster-management.io/api/client/work/clientset/versioned"
workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1"
workv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/options"
agentclient "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/agent/client"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/internal"
sourceclient "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/source/client"
)
// ClientHolder holds a manifestwork client that implements the ManifestWorkInterface based on different configuration
//
// ClientHolder also implements the ManifestWorksGetter interface.
type ClientHolder struct {
workClientSet workclientset.Interface
}
var _ workv1client.ManifestWorksGetter = &ClientHolder{}
// WorkInterface returns a workclientset Interface
func (h *ClientHolder) WorkInterface() workclientset.Interface {
return h.workClientSet
}
// ManifestWorks returns a ManifestWorkInterface
func (h *ClientHolder) ManifestWorks(namespace string) workv1client.ManifestWorkInterface {
return h.workClientSet.WorkV1().ManifestWorks(namespace)
}
// NewSourceClientHolder returns a ClientHolder for a source
func NewSourceClientHolder(ctx context.Context, opt *options.GenericClientOptions[*workv1.ManifestWork]) (*ClientHolder, error) {
sourceClient, err := opt.SourceClient(ctx)
if err != nil {
return nil, err
}
manifestWorkClient := sourceclient.NewManifestWorkSourceClient(opt.SourceID(), opt.WatcherStore(), sourceClient)
workClient := &internal.WorkV1ClientWrapper{ManifestWorkClient: manifestWorkClient}
workClientSet := &internal.WorkClientSetWrapper{WorkV1ClientWrapper: workClient}
return &ClientHolder{workClientSet: workClientSet}, nil
}
// NewAgentClientHolder returns a ClientHolder for an agent
func NewAgentClientHolder(ctx context.Context, opt *options.GenericClientOptions[*workv1.ManifestWork]) (*ClientHolder, error) {
agentClient, err := opt.AgentClient(ctx)
if err != nil {
return nil, err
}
manifestWorkClient := agentclient.NewManifestWorkAgentClient(opt.ClusterName(), opt.WatcherStore(), agentClient)
workClient := &internal.WorkV1ClientWrapper{ManifestWorkClient: manifestWorkClient}
workClientSet := &internal.WorkClientSetWrapper{WorkV1ClientWrapper: workClient}
return &ClientHolder{workClientSet: workClientSet}, nil
}

View File

@@ -8,8 +8,8 @@ import (
workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1"
workv1alpha1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1alpha1"
agentclient "open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/client"
sourceclient "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client"
agentclient "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/agent/client"
sourceclient "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/source/client"
)
// WorkClientSetWrapper wraps a work client that has a manifestwork client to a work clientset interface, this wrapper

View File

@@ -4,6 +4,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
workv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
)

View File

@@ -15,19 +15,19 @@ import (
workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1"
workv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/common"
cloudeventserrors "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/errors"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/utils"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/payload"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/common"
workerrors "open-cluster-management.io/sdk-go/pkg/cloudevents/work/errors"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/store"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/utils"
)
// ManifestWorkSourceClient implements the ManifestWorkInterface.
type ManifestWorkSourceClient struct {
cloudEventsClient *generic.CloudEventSourceClient[*workv1.ManifestWork]
watcherStore store.WorkClientWatcherStore
watcherStore store.ClientWatcherStore[*workv1.ManifestWork]
namespace string
sourceID string
}
@@ -36,8 +36,8 @@ var _ workv1client.ManifestWorkInterface = &ManifestWorkSourceClient{}
func NewManifestWorkSourceClient(
sourceID string,
watcherStore store.ClientWatcherStore[*workv1.ManifestWork],
cloudEventsClient *generic.CloudEventSourceClient[*workv1.ManifestWork],
watcherStore store.WorkClientWatcherStore,
) *ManifestWorkSourceClient {
return &ManifestWorkSourceClient{
cloudEventsClient: cloudEventsClient,
@@ -84,24 +84,24 @@ func (c *ManifestWorkSourceClient) Create(ctx context.Context, manifestWork *wor
}
newWork := manifestWork.DeepCopy()
newWork.UID = kubetypes.UID(utils.UID(c.sourceID, c.namespace, newWork.Name))
newWork.UID = kubetypes.UID(utils.UID(c.sourceID, common.ManifestWorkGR.String(), c.namespace, newWork.Name))
newWork.Namespace = c.namespace
newWork.ResourceVersion = getWorkResourceVersion(manifestWork)
if err := utils.Encode(newWork); err != nil {
if err := utils.EncodeManifests(newWork); err != nil {
returnErr := errors.NewInternalError(err)
generic.IncreaseWorkProcessedCounter("create", string(returnErr.ErrStatus.Reason))
return nil, returnErr
}
if errs := utils.Validate(newWork); len(errs) != 0 {
if errs := utils.ValidateWork(newWork); len(errs) != 0 {
returnErr := errors.NewInvalid(common.ManifestWorkGK, manifestWork.Name, errs)
generic.IncreaseWorkProcessedCounter("create", string(returnErr.ErrStatus.Reason))
return nil, returnErr
}
if err := c.cloudEventsClient.Publish(ctx, eventType, newWork); err != nil {
returnErr := workerrors.NewPublishError(common.ManifestWorkGR, manifestWork.Name, err)
returnErr := cloudeventserrors.NewPublishError(common.ManifestWorkGR, manifestWork.Name, err)
generic.IncreaseWorkProcessedCounter("create", string(returnErr.ErrStatus.Reason))
return nil, returnErr
}
@@ -149,7 +149,7 @@ func (c *ManifestWorkSourceClient) Delete(ctx context.Context, name string, opts
deletingWork.DeletionTimestamp = &now
if err := c.cloudEventsClient.Publish(ctx, eventType, deletingWork); err != nil {
returnErr := workerrors.NewPublishError(common.ManifestWorkGR, name, err)
returnErr := cloudeventserrors.NewPublishError(common.ManifestWorkGR, name, err)
generic.IncreaseWorkProcessedCounter("delete", string(returnErr.ErrStatus.Reason))
return returnErr
}
@@ -211,7 +211,12 @@ func (c *ManifestWorkSourceClient) List(ctx context.Context, opts metav1.ListOpt
}
generic.IncreaseWorkProcessedCounter("list", metav1.StatusSuccess)
return works, nil
items := []workv1.ManifestWork{}
for _, work := range works {
items = append(items, *work)
}
return &workv1.ManifestWorkList{Items: items}, nil
}
func (c *ManifestWorkSourceClient) Watch(ctx context.Context, opts metav1.ListOptions) (watch.Interface, error) {
@@ -266,14 +271,14 @@ func (c *ManifestWorkSourceClient) Patch(ctx context.Context, name string, pt ku
newWork := patchedWork.DeepCopy()
newWork.ResourceVersion = getWorkResourceVersion(patchedWork)
if errs := utils.Validate(newWork); len(errs) != 0 {
if errs := utils.ValidateWork(newWork); len(errs) != 0 {
returnErr := errors.NewInvalid(common.ManifestWorkGK, name, errs)
generic.IncreaseWorkProcessedCounter("patch", string(returnErr.ErrStatus.Reason))
return nil, returnErr
}
if err := c.cloudEventsClient.Publish(ctx, eventType, newWork); err != nil {
returnErr := workerrors.NewPublishError(common.ManifestWorkGR, name, err)
returnErr := cloudeventserrors.NewPublishError(common.ManifestWorkGR, name, err)
generic.IncreaseWorkProcessedCounter("patch", string(returnErr.ErrStatus.Reason))
return nil, returnErr
}

View File

@@ -12,9 +12,10 @@ import (
kubetypes "k8s.io/apimachinery/pkg/types"
workv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/common"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/payload"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/common"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload"
)
// ExtensionWorkMeta is an extension attribute for work meta data.

View File

@@ -3,98 +3,31 @@ package store
import (
"fmt"
"strconv"
"sync"
"time"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubetypes "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
workv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/common"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/utils"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/common"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/utils"
)
const ManifestWorkFinalizer = "cloudevents.open-cluster-management.io/manifest-work-cleanup"
type baseStore struct {
sync.RWMutex
store cache.Store
initiated bool
}
// List the works from the store with the list options
func (b *baseStore) List(namespace string, opts metav1.ListOptions) (*workv1.ManifestWorkList, error) {
b.RLock()
defer b.RUnlock()
works, err := utils.ListWorksWithOptions(b.store, namespace, opts)
if err != nil {
return nil, err
}
items := []workv1.ManifestWork{}
for _, work := range works {
items = append(items, *work)
}
return &workv1.ManifestWorkList{Items: items}, nil
}
// Get a works from the store
func (b *baseStore) Get(namespace, name string) (*workv1.ManifestWork, bool, error) {
b.RLock()
defer b.RUnlock()
obj, exists, err := b.store.GetByKey(fmt.Sprintf("%s/%s", namespace, name))
if err != nil {
return nil, false, err
}
if !exists {
return nil, false, nil
}
work, ok := obj.(*workv1.ManifestWork)
if !ok {
return nil, false, fmt.Errorf("unknown type %T", obj)
}
return work, true, nil
}
// List all of works from the store
func (b *baseStore) ListAll() ([]*workv1.ManifestWork, error) {
b.RLock()
defer b.RUnlock()
works := []*workv1.ManifestWork{}
for _, obj := range b.store.List() {
if work, ok := obj.(*workv1.ManifestWork); ok {
works = append(works, work)
}
}
return works, nil
}
type baseSourceStore struct {
baseStore
store.BaseClientWatchStore[*workv1.ManifestWork]
// a queue to save the received work events
receivedWorks workqueue.RateLimitingInterface
}
func (bs *baseSourceStore) HandleReceivedWork(action types.ResourceAction, work *workv1.ManifestWork) error {
func (bs *baseSourceStore) HandleReceivedResource(action types.ResourceAction, work *workv1.ManifestWork) error {
switch action {
case types.StatusModified:
bs.receivedWorks.Add(work)
@@ -107,10 +40,10 @@ func (bs *baseSourceStore) HandleReceivedWork(action types.ResourceAction, work
// workProcessor process the received works from given work queue with a specific store
type workProcessor struct {
works workqueue.RateLimitingInterface
store WorkClientWatcherStore
store store.ClientWatcherStore[*workv1.ManifestWork]
}
func newWorkProcessor(works workqueue.RateLimitingInterface, store WorkClientWatcherStore) *workProcessor {
func newWorkProcessor(works workqueue.RateLimitingInterface, store store.ClientWatcherStore[*workv1.ManifestWork]) *workProcessor {
return &workProcessor{
works: works,
store: store,
@@ -163,7 +96,7 @@ func (b *workProcessor) handleWork(work *workv1.ManifestWork) error {
if lastWork == nil {
// the work is not found from the local cache and it has been deleted by the agent,
// ignore this work.
if meta.IsStatusConditionTrue(work.Status.Conditions, common.ManifestsDeleted) {
if meta.IsStatusConditionTrue(work.Status.Conditions, common.ResourceDeleted) {
return nil
}
@@ -175,7 +108,7 @@ func (b *workProcessor) handleWork(work *workv1.ManifestWork) error {
}
updatedWork := lastWork.DeepCopy()
if meta.IsStatusConditionTrue(work.Status.Conditions, common.ManifestsDeleted) {
if meta.IsStatusConditionTrue(work.Status.Conditions, common.ResourceDeleted) {
updatedWork.Finalizers = []string{}
// delete the work from the local cache.
return b.store.Delete(updatedWork)
@@ -223,7 +156,7 @@ func (b *workProcessor) handleWork(work *workv1.ManifestWork) error {
}
// the work has been handled by agent, we ensure a finalizer on the work
updatedWork.Finalizers = ensureFinalizers(updatedWork.Finalizers)
updatedWork.Finalizers = utils.EnsureResourceFinalizer(updatedWork.Finalizers)
updatedWork.Annotations[common.CloudEventsSequenceIDAnnotationKey] = sequenceID
updatedWork.Status = work.Status
// update the work with status in the local cache.
@@ -245,86 +178,3 @@ func (b *workProcessor) getWork(uid kubetypes.UID) *workv1.ManifestWork {
return nil
}
// workWatcher implements the watch.Interface.
type workWatcher struct {
sync.RWMutex
result chan watch.Event
done chan struct{}
stopped bool
}
var _ watch.Interface = &workWatcher{}
func newWorkWatcher() *workWatcher {
return &workWatcher{
// It's easy for a consumer to add buffering via an extra
// goroutine/channel, but impossible for them to remove it,
// so nonbuffered is better.
result: make(chan watch.Event),
// If the watcher is externally stopped there is no receiver anymore
// and the send operations on the result channel, especially the
// error reporting might block forever.
// Therefore a dedicated stop channel is used to resolve this blocking.
done: make(chan struct{}),
}
}
// ResultChan implements Interface.
func (w *workWatcher) ResultChan() <-chan watch.Event {
return w.result
}
// Stop implements Interface.
func (w *workWatcher) Stop() {
// Call Close() exactly once by locking and setting a flag.
w.Lock()
defer w.Unlock()
// closing a closed channel always panics, therefore check before closing
select {
case <-w.done:
close(w.result)
default:
w.stopped = true
close(w.done)
}
}
// Receive a event from the work client and sends down the result channel.
func (w *workWatcher) Receive(evt watch.Event) {
if w.isStopped() {
// this watcher is stopped, do nothing.
return
}
if klog.V(4).Enabled() {
obj, _ := meta.Accessor(evt.Object)
klog.V(4).Infof("Receive the event %v for %v", evt.Type, obj.GetName())
}
w.result <- evt
}
func (w *workWatcher) isStopped() bool {
w.RLock()
defer w.RUnlock()
return w.stopped
}
func ensureFinalizers(workFinalizers []string) []string {
has := false
for _, f := range workFinalizers {
if f == ManifestWorkFinalizer {
has = true
break
}
}
if !has {
workFinalizers = append(workFinalizers, ManifestWorkFinalizer)
}
return workFinalizers
}

View File

@@ -5,12 +5,15 @@ import (
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
workv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
)
@@ -21,19 +24,19 @@ import (
// It is used for building ManifestWork source client.
type SourceInformerWatcherStore struct {
baseSourceStore
watcher *workWatcher
watcher *store.Watcher
informer cache.SharedIndexInformer
}
var _ WorkClientWatcherStore = &SourceInformerWatcherStore{}
var _ store.ClientWatcherStore[*workv1.ManifestWork] = &SourceInformerWatcherStore{}
func NewSourceInformerWatcherStore(ctx context.Context) *SourceInformerWatcherStore {
s := &SourceInformerWatcherStore{
baseSourceStore: baseSourceStore{
baseStore: baseStore{},
receivedWorks: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "informer-watcher-store"),
BaseClientWatchStore: store.BaseClientWatchStore[*workv1.ManifestWork]{},
receivedWorks: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "informer-watcher-store"),
},
watcher: newWorkWatcher(),
watcher: store.NewWatcher(),
}
// start a goroutine to process the received work events from the work queue with current store.
@@ -42,23 +45,23 @@ func NewSourceInformerWatcherStore(ctx context.Context) *SourceInformerWatcherSt
return s
}
func (s *SourceInformerWatcherStore) Add(work *workv1.ManifestWork) error {
func (s *SourceInformerWatcherStore) Add(work runtime.Object) error {
s.watcher.Receive(watch.Event{Type: watch.Added, Object: work})
return nil
}
func (s *SourceInformerWatcherStore) Update(work *workv1.ManifestWork) error {
func (s *SourceInformerWatcherStore) Update(work runtime.Object) error {
s.watcher.Receive(watch.Event{Type: watch.Modified, Object: work})
return nil
}
func (s *SourceInformerWatcherStore) Delete(work *workv1.ManifestWork) error {
func (s *SourceInformerWatcherStore) Delete(work runtime.Object) error {
s.watcher.Receive(watch.Event{Type: watch.Deleted, Object: work})
return nil
}
func (s *SourceInformerWatcherStore) HasInitiated() bool {
return s.initiated && s.informer.HasSynced()
return s.Initiated && s.informer.HasSynced()
}
func (s *SourceInformerWatcherStore) GetWatcher(namespace string, opts metav1.ListOptions) (watch.Interface, error) {
@@ -71,8 +74,8 @@ func (s *SourceInformerWatcherStore) GetWatcher(namespace string, opts metav1.Li
func (s *SourceInformerWatcherStore) SetInformer(informer cache.SharedIndexInformer) {
s.informer = informer
s.store = informer.GetStore()
s.initiated = true
s.Store = informer.GetStore()
s.Initiated = true
}
// AgentInformerWatcherStore extends the baseStore.
@@ -81,36 +84,21 @@ func (s *SourceInformerWatcherStore) SetInformer(informer cache.SharedIndexInfor
//
// It is used for building ManifestWork agent client.
type AgentInformerWatcherStore struct {
baseStore
informer cache.SharedIndexInformer
watcher *workWatcher
store.AgentInformerWatcherStore[*workv1.ManifestWork]
}
var _ WorkClientWatcherStore = &AgentInformerWatcherStore{}
var _ store.ClientWatcherStore[*workv1.ManifestWork] = &AgentInformerWatcherStore{}
func NewAgentInformerWatcherStore() *AgentInformerWatcherStore {
return &AgentInformerWatcherStore{
baseStore: baseStore{},
watcher: newWorkWatcher(),
AgentInformerWatcherStore: store.AgentInformerWatcherStore[*workv1.ManifestWork]{
BaseClientWatchStore: store.BaseClientWatchStore[*workv1.ManifestWork]{},
Watcher: store.NewWatcher(),
},
}
}
func (s *AgentInformerWatcherStore) Add(work *workv1.ManifestWork) error {
s.watcher.Receive(watch.Event{Type: watch.Added, Object: work})
return nil
}
func (s *AgentInformerWatcherStore) Update(work *workv1.ManifestWork) error {
s.watcher.Receive(watch.Event{Type: watch.Modified, Object: work})
return nil
}
func (s *AgentInformerWatcherStore) Delete(work *workv1.ManifestWork) error {
s.watcher.Receive(watch.Event{Type: watch.Deleted, Object: work})
return nil
}
func (s *AgentInformerWatcherStore) HandleReceivedWork(action types.ResourceAction, work *workv1.ManifestWork) error {
func (s *AgentInformerWatcherStore) HandleReceivedResource(action types.ResourceAction, work *workv1.ManifestWork) error {
switch action {
case types.Added:
return s.Add(work.DeepCopy())
@@ -154,17 +142,3 @@ func (s *AgentInformerWatcherStore) HandleReceivedWork(action types.ResourceActi
return fmt.Errorf("unsupported resource action %s", action)
}
}
func (s *AgentInformerWatcherStore) GetWatcher(namespace string, opts metav1.ListOptions) (watch.Interface, error) {
return s.watcher, nil
}
func (s *AgentInformerWatcherStore) HasInitiated() bool {
return s.initiated && s.informer.HasSynced()
}
func (s *AgentInformerWatcherStore) SetInformer(informer cache.SharedIndexInformer) {
s.informer = informer
s.store = informer.GetStore()
s.initiated = true
}

View File

@@ -6,6 +6,7 @@ import (
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/cache"
@@ -14,7 +15,8 @@ import (
workv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/utils"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/store"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/utils"
)
// ListLocalWorksFunc loads the works from the local environment.
@@ -25,14 +27,14 @@ type watchEvent struct {
Type watch.EventType
}
var _ WorkClientWatcherStore = &SourceLocalWatcherStore{}
var _ store.ClientWatcherStore[*workv1.ManifestWork] = &SourceLocalWatcherStore{}
// SourceLocalWatcherStore caches the works in this local store and provide the watch ability by watch event channel.
//
// It is used for building ManifestWork source client.
type SourceLocalWatcherStore struct {
baseSourceStore
watcher *workWatcher
watcher *store.Watcher
eventQueue cache.Queue
}
@@ -43,23 +45,23 @@ func NewSourceLocalWatcherStore(ctx context.Context, listFunc ListLocalWorksFunc
return nil, err
}
// A local store to cache the works
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
// A local localStore to cache the works
localStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
for _, work := range works {
if errs := utils.Validate(work); len(errs) != 0 {
if errs := utils.ValidateWork(work); len(errs) != 0 {
return nil, fmt.Errorf("%s", errs.ToAggregate().Error())
}
if err := store.Add(work.DeepCopy()); err != nil {
if err := localStore.Add(work.DeepCopy()); err != nil {
return nil, err
}
}
s := &SourceLocalWatcherStore{
baseSourceStore: baseSourceStore{
baseStore: baseStore{
store: store,
initiated: true,
BaseClientWatchStore: store.BaseClientWatchStore[*workv1.ManifestWork]{
Store: localStore,
Initiated: true,
},
// A queue to save the received work events, it helps us retry events
@@ -67,7 +69,7 @@ func NewSourceLocalWatcherStore(ctx context.Context, listFunc ListLocalWorksFunc
receivedWorks: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "local-watcher-store"),
},
watcher: newWorkWatcher(),
watcher: store.NewWatcher(),
// A queue to save the work client send events, if run a client without a watcher,
// it will block the client, this queue helps to resolve this blocking.
@@ -92,43 +94,58 @@ func NewSourceLocalWatcherStore(ctx context.Context, listFunc ListLocalWorksFunc
}
// Add a work to the cache and send an event to the event queue
func (s *SourceLocalWatcherStore) Add(work *workv1.ManifestWork) error {
func (s *SourceLocalWatcherStore) Add(work runtime.Object) error {
s.Lock()
defer s.Unlock()
if err := s.store.Add(work); err != nil {
if err := s.Store.Add(work); err != nil {
return err
}
return s.eventQueue.Add(&watchEvent{Key: key(work), Type: watch.Added})
key, err := key(work)
if err != nil {
return err
}
return s.eventQueue.Add(&watchEvent{Key: key, Type: watch.Added})
}
// Update a work in the cache and send an event to the event queue
func (s *SourceLocalWatcherStore) Update(work *workv1.ManifestWork) error {
func (s *SourceLocalWatcherStore) Update(work runtime.Object) error {
s.Lock()
defer s.Unlock()
if err := s.store.Update(work); err != nil {
if err := s.Store.Update(work); err != nil {
return err
}
return s.eventQueue.Update(&watchEvent{Key: key(work), Type: watch.Modified})
key, err := key(work)
if err != nil {
return err
}
return s.eventQueue.Update(&watchEvent{Key: key, Type: watch.Modified})
}
// Delete a work from the cache and send an event to the event queue
func (s *SourceLocalWatcherStore) Delete(work *workv1.ManifestWork) error {
func (s *SourceLocalWatcherStore) Delete(work runtime.Object) error {
s.Lock()
defer s.Unlock()
if err := s.store.Delete(work); err != nil {
if err := s.Store.Delete(work); err != nil {
return err
}
return s.eventQueue.Update(&watchEvent{Key: key(work), Type: watch.Deleted})
key, err := key(work)
if err != nil {
return err
}
return s.eventQueue.Update(&watchEvent{Key: key, Type: watch.Deleted})
}
func (s *SourceLocalWatcherStore) HasInitiated() bool {
return s.initiated
return s.Initiated
}
func (s *SourceLocalWatcherStore) GetWatcher(namespace string, opts metav1.ListOptions) (watch.Interface, error) {
@@ -167,7 +184,7 @@ func (s *SourceLocalWatcherStore) processLoop() {
return
}
obj, exists, err := s.store.GetByKey(evt.Key)
obj, exists, err := s.Store.GetByKey(evt.Key)
if err != nil {
klog.Errorf("failed to get the work %s, %v", evt.Key, err)
return
@@ -210,6 +227,10 @@ func (s *SourceLocalWatcherStore) processLoop() {
}
}
func key(work *workv1.ManifestWork) string {
return work.Namespace + "/" + work.Name
func key(obj runtime.Object) (string, error) {
work, ok := obj.(*workv1.ManifestWork)
if !ok {
return "", fmt.Errorf("obj %T is not a work", obj)
}
return work.Namespace + "/" + work.Name, nil
}

View File

@@ -222,24 +222,22 @@ func CachingCertificateLoader(certFile, keyFile string) func() (*tls.Certificate
// And a goroutine will be started to periodically refresh client certificates for this connection.
func AutoLoadTLSConfig(caFile, certFile, keyFile string, conn Connection) (*tls.Config, error) {
var tlsConfig *tls.Config
if caFile != "" {
certPool, err := rootCAs(caFile)
if err != nil {
return nil, err
}
tlsConfig = &tls.Config{
RootCAs: certPool,
MinVersion: tls.VersionTLS13,
MaxVersion: tls.VersionTLS13,
}
if certFile != "" && keyFile != "" {
// Set client certificate and key getter for tls config
tlsConfig.GetClientCertificate = func(cri *tls.CertificateRequestInfo) (*tls.Certificate, error) {
return CachingCertificateLoader(certFile, keyFile)()
}
// Start a goroutine to periodically refresh client certificates for this connection
StartClientCertRotating(tlsConfig.GetClientCertificate, conn)
certPool, err := rootCAs(caFile)
if err != nil {
return nil, err
}
tlsConfig = &tls.Config{
RootCAs: certPool,
MinVersion: tls.VersionTLS13,
MaxVersion: tls.VersionTLS13,
}
if certFile != "" && keyFile != "" {
// Set client certificate and key getter for tls config
tlsConfig.GetClientCertificate = func(cri *tls.CertificateRequestInfo) (*tls.Certificate, error) {
return CachingCertificateLoader(certFile, keyFile)()
}
// Start a goroutine to periodically refresh client certificates for this connection
StartClientCertRotating(tlsConfig.GetClientCertificate, conn)
}
return tlsConfig, nil

View File

@@ -5,6 +5,7 @@ import (
"crypto/tls"
"fmt"
"os"
"sync"
"time"
"golang.org/x/oauth2"
@@ -30,7 +31,8 @@ type GRPCDialer struct {
KeepAliveOptions KeepAliveOptions
TLSConfig *tls.Config
TokenFile string
conn *grpc.ClientConn
mu sync.Mutex // Mutex to protect the connection.
conn *grpc.ClientConn // Cached gRPC client connection.
}
// KeepAliveOptions holds the keepalive options for the gRPC client.
@@ -43,6 +45,15 @@ type KeepAliveOptions struct {
// Dial connects to the gRPC server and returns a gRPC client connection.
func (d *GRPCDialer) Dial() (*grpc.ClientConn, error) {
// Return the cached connection if it exists and is ready.
// Should not return a nil or unready connection, otherwise the caller (cloudevents client)
// will not use the connection for sending and receiving events in reconnect scenarios.
// lock the connection to ensure the connnection is not created by multiple goroutines concurrently.
d.mu.Lock()
defer d.mu.Unlock()
if d.conn != nil && (d.conn.GetState() == connectivity.Connecting || d.conn.GetState() == connectivity.Ready) {
return d.conn, nil
}
// Prepare gRPC dial options.
dialOpts := []grpc.DialOption{}
if d.KeepAliveOptions.Enable {
@@ -94,6 +105,8 @@ func (d *GRPCDialer) Dial() (*grpc.ClientConn, error) {
// Close closes the gRPC client connection.
func (d *GRPCDialer) Close() error {
d.mu.Lock()
defer d.mu.Unlock()
if d.conn != nil {
return d.conn.Close()
}
@@ -192,10 +205,12 @@ func BuildGRPCOptionsFromFlags(configPath string) (*GRPCOptions, error) {
// Set the keepalive options
options.Dialer.KeepAliveOptions = keepAliveOptions
// Set up TLS configuration for the gRPC connection, the certificates will be reloaded periodically.
options.Dialer.TLSConfig, err = cert.AutoLoadTLSConfig(config.CAFile, config.ClientCertFile, config.ClientKeyFile, options.Dialer)
if err != nil {
return nil, err
if config.CAFile != "" {
// Set up TLS configuration for the gRPC connection, the certificates will be reloaded periodically.
options.Dialer.TLSConfig, err = cert.AutoLoadTLSConfig(config.CAFile, config.ClientCertFile, config.ClientKeyFile, options.Dialer)
if err != nil {
return nil, err
}
}
return options, nil

View File

@@ -1,50 +0,0 @@
package lister
import (
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
workv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/common"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/store"
)
// WatcherStoreLister list the ManifestWorks from WorkClientWatcherStore
type WatcherStoreLister struct {
store store.WorkClientWatcherStore
}
func NewWatcherStoreLister(store store.WorkClientWatcherStore) *WatcherStoreLister {
return &WatcherStoreLister{
store: store,
}
}
// List returns the ManifestWorks from a WorkClientWatcherStore with list options
func (l *WatcherStoreLister) List(options types.ListOptions) ([]*workv1.ManifestWork, error) {
opts := metav1.ListOptions{}
if options.Source != types.SourceAll {
opts.LabelSelector = fmt.Sprintf("%s=%s", common.CloudEventsOriginalSourceLabelKey, options.Source)
}
list, err := l.store.List(options.ClusterName, opts)
if err != nil {
return nil, err
}
works := []*workv1.ManifestWork{}
for _, work := range list.Items {
cloudEventsDataType := work.Annotations[common.CloudEventsDataTypeAnnotationKey]
if cloudEventsDataType != options.CloudEventsDataType.String() {
continue
}
works = append(works, &work)
}
return works, nil
}

View File

@@ -1,253 +0,0 @@
package work
import (
"context"
"fmt"
"k8s.io/klog/v2"
workclientset "open-cluster-management.io/api/client/work/clientset/versioned"
workv1client "open-cluster-management.io/api/client/work/clientset/versioned/typed/work/v1"
workv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
agentclient "open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/client"
agentlister "open-cluster-management.io/sdk-go/pkg/cloudevents/work/agent/lister"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/internal"
sourceclient "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/client"
sourcelister "open-cluster-management.io/sdk-go/pkg/cloudevents/work/source/lister"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/statushash"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/store"
)
// ClientHolder holds a manifestwork client that implements the ManifestWorkInterface based on different configuration
//
// ClientHolder also implements the ManifestWorksGetter interface.
type ClientHolder struct {
workClientSet workclientset.Interface
}
var _ workv1client.ManifestWorksGetter = &ClientHolder{}
// WorkInterface returns a workclientset Interface
func (h *ClientHolder) WorkInterface() workclientset.Interface {
return h.workClientSet
}
// ManifestWorks returns a ManifestWorkInterface
func (h *ClientHolder) ManifestWorks(namespace string) workv1client.ManifestWorkInterface {
return h.workClientSet.WorkV1().ManifestWorks(namespace)
}
// ClientHolderBuilder builds the ClientHolder with different configuration.
type ClientHolderBuilder struct {
config any
watcherStore store.WorkClientWatcherStore
codec generic.Codec[*workv1.ManifestWork]
sourceID string
clusterName string
clientID string
resync bool
}
// NewClientHolderBuilder returns a ClientHolderBuilder with a given configuration.
//
// Available configurations:
// - MQTTOptions (*mqtt.MQTTOptions): builds a manifestwork client based on cloudevents with MQTT
// - GRPCOptions (*grpc.GRPCOptions): builds a manifestwork client based on cloudevents with GRPC
// - KafkaOptions (*kafka.KafkaOptions): builds a manifestwork client based on cloudevents with Kafka
//
// TODO using a specified config instead of any
func NewClientHolderBuilder(config any) *ClientHolderBuilder {
return &ClientHolderBuilder{
config: config,
resync: true,
}
}
// WithClientID set the client ID for source/agent cloudevents client.
func (b *ClientHolderBuilder) WithClientID(clientID string) *ClientHolderBuilder {
b.clientID = clientID
return b
}
// WithSourceID set the source ID when building a manifestwork client for a source.
func (b *ClientHolderBuilder) WithSourceID(sourceID string) *ClientHolderBuilder {
b.sourceID = sourceID
return b
}
// WithClusterName set the managed cluster name when building a manifestwork client for an agent.
func (b *ClientHolderBuilder) WithClusterName(clusterName string) *ClientHolderBuilder {
b.clusterName = clusterName
return b
}
// WithCodec add codec when building a manifestwork client based on cloudevents.
func (b *ClientHolderBuilder) WithCodec(codec generic.Codec[*workv1.ManifestWork]) *ClientHolderBuilder {
b.codec = codec
return b
}
// WithWorkClientWatcherStore set the WorkClientWatcherStore. The client will use this store to caches the works and
// watch the work events.
func (b *ClientHolderBuilder) WithWorkClientWatcherStore(store store.WorkClientWatcherStore) *ClientHolderBuilder {
b.watcherStore = store
return b
}
// WithResyncEnabled control the client resync (Default is true), if it's true, the resync happens when
// 1. after the client's store is initiated
// 2. the client reconnected
func (b *ClientHolderBuilder) WithResyncEnabled(resync bool) *ClientHolderBuilder {
b.resync = resync
return b
}
// NewSourceClientHolder returns a ClientHolder for a source
func (b *ClientHolderBuilder) NewSourceClientHolder(ctx context.Context) (*ClientHolder, error) {
if len(b.clientID) == 0 {
return nil, fmt.Errorf("client id is required")
}
if len(b.sourceID) == 0 {
return nil, fmt.Errorf("source id is required")
}
if b.watcherStore == nil {
return nil, fmt.Errorf("a watcher store is required")
}
options, err := generic.BuildCloudEventsSourceOptions(b.config, b.clientID, b.sourceID)
if err != nil {
return nil, err
}
cloudEventsClient, err := generic.NewCloudEventSourceClient[*workv1.ManifestWork](
ctx,
options,
sourcelister.NewWatcherStoreLister(b.watcherStore),
statushash.ManifestWorkStatusHash,
b.codec,
)
if err != nil {
return nil, err
}
// start to subscribe
cloudEventsClient.Subscribe(ctx, b.watcherStore.HandleReceivedWork)
manifestWorkClient := sourceclient.NewManifestWorkSourceClient(b.sourceID, cloudEventsClient, b.watcherStore)
workClient := &internal.WorkV1ClientWrapper{ManifestWorkClient: manifestWorkClient}
workClientSet := &internal.WorkClientSetWrapper{WorkV1ClientWrapper: workClient}
// start a go routine to receive client reconnect signal
go func() {
for {
select {
case <-ctx.Done():
return
case <-cloudEventsClient.ReconnectedChan():
if !b.resync {
klog.V(4).Infof("resync is disabled, do nothing")
continue
}
// when receiving a client reconnected signal, we resync all clusters for this source
if err := cloudEventsClient.Resync(ctx, types.ClusterAll); err != nil {
klog.Errorf("failed to send resync request, %v", err)
}
}
}
}()
if !b.resync {
return &ClientHolder{workClientSet: workClientSet}, nil
}
// start a go routine to resync the works after this client's store is initiated
go func() {
if store.WaitForStoreInit(ctx, b.watcherStore.HasInitiated) {
if err := cloudEventsClient.Resync(ctx, types.ClusterAll); err != nil {
klog.Errorf("failed to send resync request, %v", err)
}
}
}()
return &ClientHolder{workClientSet: workClientSet}, nil
}
// NewAgentClientHolder returns a ClientHolder for an agent
func (b *ClientHolderBuilder) NewAgentClientHolder(ctx context.Context) (*ClientHolder, error) {
if len(b.clientID) == 0 {
return nil, fmt.Errorf("client id is required")
}
if len(b.clusterName) == 0 {
return nil, fmt.Errorf("cluster name is required")
}
if b.watcherStore == nil {
return nil, fmt.Errorf("watcher store is required")
}
options, err := generic.BuildCloudEventsAgentOptions(b.config, b.clusterName, b.clientID)
if err != nil {
return nil, err
}
cloudEventsClient, err := generic.NewCloudEventAgentClient[*workv1.ManifestWork](
ctx,
options,
agentlister.NewWatcherStoreLister(b.watcherStore),
statushash.ManifestWorkStatusHash,
b.codec,
)
if err != nil {
return nil, err
}
// start to subscribe
cloudEventsClient.Subscribe(ctx, b.watcherStore.HandleReceivedWork)
manifestWorkClient := agentclient.NewManifestWorkAgentClient(cloudEventsClient, b.watcherStore, b.clusterName)
workClient := &internal.WorkV1ClientWrapper{ManifestWorkClient: manifestWorkClient}
workClientSet := &internal.WorkClientSetWrapper{WorkV1ClientWrapper: workClient}
// start a go routine to receive client reconnect signal
go func() {
for {
select {
case <-ctx.Done():
return
case <-cloudEventsClient.ReconnectedChan():
if !b.resync {
klog.V(4).Infof("resync is disabled, do nothing")
continue
}
// when receiving a client reconnected signal, we resync all sources for this agent
// TODO after supporting multiple sources, we should only resync agent known sources
if err := cloudEventsClient.Resync(ctx, types.SourceAll); err != nil {
klog.Errorf("failed to send resync request, %v", err)
}
}
}
}()
if !b.resync {
return &ClientHolder{workClientSet: workClientSet}, nil
}
// start a go routine to resync the works after this client's store is initiated
go func() {
if store.WaitForStoreInit(ctx, b.watcherStore.HasInitiated) {
if err := cloudEventsClient.Resync(ctx, types.SourceAll); err != nil {
klog.Errorf("failed to send resync request, %v", err)
}
}
}()
return &ClientHolder{workClientSet: workClientSet}, nil
}

View File

@@ -1,42 +0,0 @@
package lister
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
workv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/payload"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/store"
)
// WatcherStoreLister list the ManifestWorks from the WorkClientWatcherStore.
type WatcherStoreLister struct {
store store.WorkClientWatcherStore
}
func NewWatcherStoreLister(store store.WorkClientWatcherStore) *WatcherStoreLister {
return &WatcherStoreLister{
store: store,
}
}
// List returns the ManifestWorks from the WorkClientWatcherCache with list options.
func (l *WatcherStoreLister) List(options types.ListOptions) ([]*workv1.ManifestWork, error) {
list, err := l.store.List(options.ClusterName, metav1.ListOptions{})
if err != nil {
return nil, err
}
works := []*workv1.ManifestWork{}
for _, work := range list.Items {
// Currently, the source client only support the ManifestBundle
// TODO: when supporting multiple cloud events data types, need a way
// to known the work event data type
if options.CloudEventsDataType != payload.ManifestBundleEventDataType {
continue
}
works = append(works, &work)
}
return works, nil
}

View File

@@ -1,18 +0,0 @@
package statushash
import (
"crypto/sha256"
"encoding/json"
"fmt"
workv1 "open-cluster-management.io/api/work/v1"
)
// ManifestWorkStatusHash returns the SHA256 checksum of a ManifestWork status.
func ManifestWorkStatusHash(work *workv1.ManifestWork) (string, error) {
statusBytes, err := json.Marshal(work.Status)
if err != nil {
return "", fmt.Errorf("failed to marshal work status, %v", err)
}
return fmt.Sprintf("%x", sha256.Sum256(statusBytes)), nil
}

View File

@@ -1,208 +0,0 @@
package utils
import (
"bytes"
"encoding/json"
"fmt"
"github.com/bwmarrin/snowflake"
jsonpatch "github.com/evanphx/json-patch"
"github.com/google/uuid"
"k8s.io/apimachinery/pkg/api/validation"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
metav1validation "k8s.io/apimachinery/pkg/apis/meta/v1/validation"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/client-go/tools/cache"
workv1 "open-cluster-management.io/api/work/v1"
"open-cluster-management.io/sdk-go/pkg/apis/work/v1/validator"
"open-cluster-management.io/sdk-go/pkg/cloudevents/work/common"
)
// Patch applies the patch to a work with the patch type.
func Patch(patchType types.PatchType, work *workv1.ManifestWork, patchData []byte) (*workv1.ManifestWork, error) {
workData, err := json.Marshal(work)
if err != nil {
return nil, err
}
var patchedData []byte
switch patchType {
case types.JSONPatchType:
var patchObj jsonpatch.Patch
patchObj, err = jsonpatch.DecodePatch(patchData)
if err != nil {
return nil, err
}
patchedData, err = patchObj.Apply(workData)
if err != nil {
return nil, err
}
case types.MergePatchType:
patchedData, err = jsonpatch.MergePatch(workData, patchData)
if err != nil {
return nil, err
}
default:
return nil, fmt.Errorf("unsupported patch type: %s", patchType)
}
patchedWork := &workv1.ManifestWork{}
if err := json.Unmarshal(patchedData, patchedWork); err != nil {
return nil, err
}
return patchedWork, nil
}
// UID returns a v5 UUID based on sourceID, work name and namespace to make sure it is consistent
func UID(sourceID, namespace, name string) string {
id := fmt.Sprintf("%s-%s-%s-%s", sourceID, common.ManifestWorkGR.String(), namespace, name)
return uuid.NewSHA1(uuid.NameSpaceOID, []byte(id)).String()
}
// ListWorksWithOptions retrieves the manifestworks from store which matches the options.
func ListWorksWithOptions(store cache.Store, namespace string, opts metav1.ListOptions) ([]*workv1.ManifestWork, error) {
var err error
labelSelector := labels.Everything()
fieldSelector := fields.Everything()
if len(opts.LabelSelector) != 0 {
labelSelector, err = labels.Parse(opts.LabelSelector)
if err != nil {
return nil, fmt.Errorf("invalid labels selector %q: %v", opts.LabelSelector, err)
}
}
if len(opts.FieldSelector) != 0 {
fieldSelector, err = fields.ParseSelector(opts.FieldSelector)
if err != nil {
return nil, fmt.Errorf("invalid fields selector %q: %v", opts.FieldSelector, err)
}
}
works := []*workv1.ManifestWork{}
// list with labels
if err := cache.ListAll(store, labelSelector, func(obj interface{}) {
work, ok := obj.(*workv1.ManifestWork)
if !ok {
return
}
if namespace != metav1.NamespaceAll && work.Namespace != namespace {
return
}
workFieldSet := fields.Set{
"metadata.name": work.Name,
"metadata.namespace": work.Namespace,
}
if !fieldSelector.Matches(workFieldSet) {
return
}
works = append(works, work)
}); err != nil {
return nil, err
}
return works, nil
}
func Validate(work *workv1.ManifestWork) field.ErrorList {
fldPath := field.NewPath("metadata")
errs := field.ErrorList{}
if work.UID == "" {
errs = append(errs, field.Required(fldPath.Child("uid"), "field not set"))
}
if work.ResourceVersion == "" {
errs = append(errs, field.Required(fldPath.Child("resourceVersion"), "field not set"))
}
if work.Name == "" {
errs = append(errs, field.Required(fldPath.Child("name"), "field not set"))
}
for _, msg := range validation.ValidateNamespaceName(work.Name, false) {
errs = append(errs, field.Invalid(fldPath.Child("name"), work.Name, msg))
}
if work.Namespace == "" {
errs = append(errs, field.Required(fldPath.Child("namespace"), "field not set"))
}
for _, msg := range validation.ValidateNamespaceName(work.Namespace, false) {
errs = append(errs, field.Invalid(fldPath.Child("namespace"), work.Namespace, msg))
}
errs = append(errs, validation.ValidateAnnotations(work.Annotations, fldPath.Child("annotations"))...)
errs = append(errs, validation.ValidateFinalizers(work.Finalizers, fldPath.Child("finalizers"))...)
errs = append(errs, metav1validation.ValidateLabels(work.Labels, fldPath.Child("labels"))...)
if err := validator.ManifestValidator.ValidateManifests(work.Spec.Workload.Manifests); err != nil {
errs = append(errs, field.Invalid(field.NewPath("spec"), "spec", err.Error()))
}
return errs
}
// Encode ensures the given work's manifests are encoded
func Encode(work *workv1.ManifestWork) error {
for index, manifest := range work.Spec.Workload.Manifests {
if manifest.Raw == nil {
if manifest.Object == nil {
return fmt.Errorf("the Object and Raw of the manifest[%d] for the work (%s/%s) are both `nil`",
index, work.Namespace, work.Name)
}
var buf bytes.Buffer
if err := unstructured.UnstructuredJSONScheme.Encode(manifest.Object, &buf); err != nil {
return err
}
work.Spec.Workload.Manifests[index].Raw = buf.Bytes()
}
}
return nil
}
// CompareSnowflakeSequenceIDs compares two snowflake sequence IDs.
// Returns true if the current ID is greater than the last.
// If the last sequence ID is empty, then the current is greater.
func CompareSnowflakeSequenceIDs(last, current string) (bool, error) {
if current != "" && last == "" {
return true, nil
}
lastSID, err := snowflake.ParseString(last)
if err != nil {
return false, fmt.Errorf("unable to parse last sequence ID: %s, %v", last, err)
}
currentSID, err := snowflake.ParseString(current)
if err != nil {
return false, fmt.Errorf("unable to parse current sequence ID: %s %v", current, err)
}
if currentSID.Node() != lastSID.Node() {
return false, fmt.Errorf("sequence IDs (%s,%s) are not from the same node", last, current)
}
if currentSID.Time() != lastSID.Time() {
return currentSID.Time() > lastSID.Time(), nil
}
return currentSID.Step() > lastSID.Step(), nil
}