increase the test timeout to avoid flaky test (#1378)

Signed-off-by: Wei Liu <liuweixa@redhat.com>
This commit is contained in:
Wei Liu
2026-02-10 14:31:23 +08:00
committed by GitHub
parent fd6a0aaa1e
commit 109bde1f93
18 changed files with 320 additions and 146 deletions

2
go.mod
View File

@@ -41,7 +41,7 @@ require (
k8s.io/utils v0.0.0-20250604170112-4c0f3b243397
open-cluster-management.io/addon-framework v1.2.0
open-cluster-management.io/api v1.2.0
open-cluster-management.io/sdk-go v1.2.0
open-cluster-management.io/sdk-go v1.2.1-0.20260210021626-797891ef0f84
sigs.k8s.io/about-api v0.0.0-20250131010323-518069c31c03
sigs.k8s.io/cluster-inventory-api v0.0.0-20251124125836-445319b6307a
sigs.k8s.io/controller-runtime v0.22.4

4
go.sum
View File

@@ -585,8 +585,8 @@ open-cluster-management.io/addon-framework v1.2.0 h1:/HsZmavrIrRzvH/htZxEhasrIxN
open-cluster-management.io/addon-framework v1.2.0/go.mod h1:dIcbnfLXMbXvO4T32ZY8dmok0OSA0UsrDGR27LLv69Q=
open-cluster-management.io/api v1.2.0 h1:+yeQgJiErrur5S4s205UM37EcZ2XbC9pFSm0xgV5/hU=
open-cluster-management.io/api v1.2.0/go.mod h1:YcmA6SpGEekIMxdoeVIIyOaBhMA6ImWRLXP4g8n8T+4=
open-cluster-management.io/sdk-go v1.2.0 h1:O9LCOoy5JfgK3k4e2OCBY2ZoCqBEwLbEWClqP01FkQI=
open-cluster-management.io/sdk-go v1.2.0/go.mod h1:OHM74Kw1gh9RHxg7QjJlGXCDlPm7x2CtCkejHSdczs4=
open-cluster-management.io/sdk-go v1.2.1-0.20260210021626-797891ef0f84 h1:twhI/2KsCRsNSxfh9HFpdj8CenrMM6G1rwLcvEIsTno=
open-cluster-management.io/sdk-go v1.2.1-0.20260210021626-797891ef0f84/go.mod h1:OHM74Kw1gh9RHxg7QjJlGXCDlPm7x2CtCkejHSdczs4=
sigs.k8s.io/about-api v0.0.0-20250131010323-518069c31c03 h1:1ShFiMjGQOR/8jTBkmZrk1gORxnvMwm1nOy2/DbHg4U=
sigs.k8s.io/about-api v0.0.0-20250131010323-518069c31c03/go.mod h1:F1pT4mK53U6F16/zuaPSYpBaR7x5Kjym6aKJJC0/DHU=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.2 h1:jpcvIRr3GLoUoEKRkHKSmGjxb6lWwrBlJsXc+eUYQHM=

View File

@@ -115,9 +115,9 @@ var _ = BeforeSuite(func() {
SingletonImage: singletonImage,
}
// In most OCM cases, we expect user should see the result in 90 seconds.
// For cases that need more than 90 seconds, please set the timeout in the test case EXPLICITLY.
SetDefaultEventuallyTimeout(90 * time.Second)
// In most OCM cases, we expect user should see the result in 150 seconds.
// For cases that need more than 150 seconds, please set the timeout in the test case EXPLICITLY.
SetDefaultEventuallyTimeout(150 * time.Second)
SetDefaultEventuallyPollingInterval(5 * time.Second)
By("Setup hub")

2
vendor/modules.txt vendored
View File

@@ -1958,7 +1958,7 @@ open-cluster-management.io/api/operator/v1
open-cluster-management.io/api/utils/work/v1/workapplier
open-cluster-management.io/api/work/v1
open-cluster-management.io/api/work/v1alpha1
# open-cluster-management.io/sdk-go v1.2.0
# open-cluster-management.io/sdk-go v1.2.1-0.20260210021626-797891ef0f84
## explicit; go 1.25.0
open-cluster-management.io/sdk-go/pkg/apis/cluster/v1alpha1
open-cluster-management.io/sdk-go/pkg/apis/cluster/v1beta1

View File

@@ -7,7 +7,10 @@ import (
addonapiv1alpha1 "open-cluster-management.io/api/addon/v1alpha1"
cloudevents "github.com/cloudevents/sdk-go/v2"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/utils"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
genericutils "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/utils"
)
var ManagedClusterAddOnEventDataType = types.CloudEventsDataType{
@@ -35,12 +38,15 @@ func (c *ManagedClusterAddOnCodec) Encode(source string, eventType types.CloudEv
}
evt := types.NewEventBuilder(source, eventType).
WithResourceID(addon.Name).
WithResourceID(string(addon.UID)).
WithClusterName(addon.Namespace).
NewEvent()
if addon.ResourceVersion != "" {
evt.SetExtension(types.ExtensionResourceVersion, addon.ResourceVersion)
genericutils.SetResourceVersion(eventType, &evt, addon)
if !addon.DeletionTimestamp.IsZero() {
evt.SetExtension(types.ExtensionDeletionTimestamp, addon.DeletionTimestamp.Time)
return &evt, nil
}
newAddon := addon.DeepCopy()
@@ -58,10 +64,7 @@ func (c *ManagedClusterAddOnCodec) Encode(source string, eventType types.CloudEv
// Decode a cloudevent to a ManagedClusterAddOn
func (c *ManagedClusterAddOnCodec) Decode(evt *cloudevents.Event) (*addonapiv1alpha1.ManagedClusterAddOn, error) {
addon := &addonapiv1alpha1.ManagedClusterAddOn{}
if err := evt.DataAs(addon); err != nil {
return nil, fmt.Errorf("failed to unmarshal event data %s, %v", string(evt.Data()), err)
}
return addon, nil
return utils.DecodeWithDeletionHandling(evt, func() *addonapiv1alpha1.ManagedClusterAddOn {
return &addonapiv1alpha1.ManagedClusterAddOn{}
})
}

View File

@@ -2,11 +2,15 @@ package v1beta1
import (
"fmt"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
addonapiv1beta1 "open-cluster-management.io/api/addon/v1beta1"
cloudevents "github.com/cloudevents/sdk-go/v2"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/utils"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
genericutils "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/utils"
)
var ManagedClusterAddOnEventDataType = types.CloudEventsDataType{
@@ -34,12 +38,15 @@ func (c *ManagedClusterAddOnCodec) Encode(source string, eventType types.CloudEv
}
evt := types.NewEventBuilder(source, eventType).
WithResourceID(addon.Name).
WithResourceID(string(addon.UID)).
WithClusterName(addon.Namespace).
NewEvent()
if addon.ResourceVersion != "" {
evt.SetExtension(types.ExtensionResourceVersion, addon.ResourceVersion)
genericutils.SetResourceVersion(eventType, &evt, addon)
if !addon.DeletionTimestamp.IsZero() {
evt.SetExtension(types.ExtensionDeletionTimestamp, addon.DeletionTimestamp.Time)
return &evt, nil
}
newAddon := addon.DeepCopy()
@@ -57,10 +64,7 @@ func (c *ManagedClusterAddOnCodec) Encode(source string, eventType types.CloudEv
// Decode a cloudevent to a ManagedClusterAddOn
func (c *ManagedClusterAddOnCodec) Decode(evt *cloudevents.Event) (*addonapiv1beta1.ManagedClusterAddOn, error) {
addon := &addonapiv1beta1.ManagedClusterAddOn{}
if err := evt.DataAs(addon); err != nil {
return nil, fmt.Errorf("failed to unmarshal event data %s, %v", string(evt.Data()), err)
}
return addon, nil
return utils.DecodeWithDeletionHandling(evt, func() *addonapiv1beta1.ManagedClusterAddOn {
return &addonapiv1beta1.ManagedClusterAddOn{}
})
}

View File

@@ -8,7 +8,10 @@ import (
clusterv1 "open-cluster-management.io/api/cluster/v1"
cloudevents "github.com/cloudevents/sdk-go/v2"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/utils"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
genericutils "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/utils"
)
var ManagedClusterEventDataType = types.CloudEventsDataType{
@@ -36,12 +39,15 @@ func (c *ManagedClusterCodec) Encode(source string, eventType types.CloudEventsT
}
evt := types.NewEventBuilder(source, eventType).
WithResourceID(cluster.Name).
WithResourceID(string(cluster.UID)).
WithClusterName(cluster.Name).
NewEvent()
if cluster.ResourceVersion != "" {
evt.SetExtension(types.ExtensionResourceVersion, cluster.ResourceVersion)
genericutils.SetResourceVersion(eventType, &evt, cluster)
if !cluster.DeletionTimestamp.IsZero() {
evt.SetExtension(types.ExtensionDeletionTimestamp, cluster.DeletionTimestamp.Time)
return &evt, nil
}
newCluster := cluster.DeepCopy()
@@ -59,10 +65,7 @@ func (c *ManagedClusterCodec) Encode(source string, eventType types.CloudEventsT
// Decode a cloudevent to a ManagedCluster
func (c *ManagedClusterCodec) Decode(evt *cloudevents.Event) (*clusterv1.ManagedCluster, error) {
cluster := &clusterv1.ManagedCluster{}
if err := evt.DataAs(cluster); err != nil {
return nil, fmt.Errorf("failed to unmarshal event data %s, %v", string(evt.Data()), err)
}
return cluster, nil
return utils.DecodeWithDeletionHandling(evt, func() *clusterv1.ManagedCluster {
return &clusterv1.ManagedCluster{}
})
}

View File

@@ -10,7 +10,9 @@ import (
v1 "open-cluster-management.io/api/cluster/v1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/utils"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
genericutils "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/utils"
)
var CSREventDataType = types.CloudEventsDataType{
@@ -47,12 +49,15 @@ func (c *CSRCodec) Encode(source string, eventType types.CloudEventsType, csr *c
}
evt := types.NewEventBuilder(source, eventType).
WithResourceID(csr.Name).
WithResourceID(string(csr.UID)).
WithClusterName(cluster).
NewEvent()
if csr.ResourceVersion != "" {
evt.SetExtension(types.ExtensionResourceVersion, csr.ResourceVersion)
genericutils.SetResourceVersion(eventType, &evt, csr)
if !csr.DeletionTimestamp.IsZero() {
evt.SetExtension(types.ExtensionDeletionTimestamp, csr.DeletionTimestamp.Time)
return &evt, nil
}
newCSR := csr.DeepCopy()
@@ -70,10 +75,7 @@ func (c *CSRCodec) Encode(source string, eventType types.CloudEventsType, csr *c
// Decode a cloudevent to a CSR
func (c *CSRCodec) Decode(evt *cloudevents.Event) (*certificatev1.CertificateSigningRequest, error) {
csr := &certificatev1.CertificateSigningRequest{}
if err := evt.DataAs(csr); err != nil {
return nil, fmt.Errorf("failed to unmarshal event data %s, %v", string(evt.Data()), err)
}
return csr, nil
return utils.DecodeWithDeletionHandling(evt, func() *certificatev1.CertificateSigningRequest {
return &certificatev1.CertificateSigningRequest{}
})
}

View File

@@ -35,7 +35,7 @@ func (c *EventCodec) Encode(source string, eventType types.CloudEventsType, even
}
evt := types.NewEventBuilder(source, eventType).
WithResourceID(event.Name).
WithResourceID(string(event.UID)).
WithClusterName(event.Namespace).
NewEvent()

View File

@@ -8,7 +8,9 @@ import (
coordinationv1 "k8s.io/api/coordination/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/utils"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
genericutils "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/utils"
)
var LeaseEventDataType = types.CloudEventsDataType{
@@ -36,12 +38,15 @@ func (c *LeaseCodec) Encode(source string, eventType types.CloudEventsType, leas
}
evt := types.NewEventBuilder(source, eventType).
WithResourceID(lease.Name).
WithResourceID(string(lease.UID)).
WithClusterName(lease.Namespace).
NewEvent()
if lease.ResourceVersion != "" {
evt.SetExtension(types.ExtensionResourceVersion, lease.ResourceVersion)
genericutils.SetResourceVersion(eventType, &evt, lease)
if !lease.DeletionTimestamp.IsZero() {
evt.SetExtension(types.ExtensionDeletionTimestamp, lease.DeletionTimestamp.Time)
return &evt, nil
}
newLease := lease.DeepCopy()
@@ -59,10 +64,7 @@ func (c *LeaseCodec) Encode(source string, eventType types.CloudEventsType, leas
// Decode a cloudevent to a lease object
func (c *LeaseCodec) Decode(evt *cloudevents.Event) (*coordinationv1.Lease, error) {
lease := &coordinationv1.Lease{}
if err := evt.DataAs(lease); err != nil {
return nil, fmt.Errorf("failed to unmarshal event data %s, %v", string(evt.Data()), err)
}
return lease, nil
return utils.DecodeWithDeletionHandling(evt, func() *coordinationv1.Lease {
return &coordinationv1.Lease{}
})
}

View File

@@ -6,6 +6,7 @@ import (
"sync"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/utils"
@@ -74,3 +75,20 @@ func (s *BaseClientWatchStore[T]) ListAll(ctx context.Context) ([]T, error) {
return resources, nil
}
func (s *BaseClientWatchStore[T]) findObjByUID(ctx context.Context, uid types.UID) (T, bool, error) {
var zero T
objs, err := s.ListAll(ctx)
if err != nil {
return zero, false, err
}
for _, o := range objs {
if o.GetUID() == uid {
return o, true, nil
}
}
return zero, false, nil
}

View File

@@ -48,47 +48,59 @@ func (s *AgentInformerWatcherStore[T]) Delete(resource runtime.Object) error {
}
func (s *AgentInformerWatcherStore[T]) HandleReceivedResource(ctx context.Context, resource T) error {
runtimeObj, err := utils.ToRuntimeObject(resource)
newRuntimeObj, err := utils.ToRuntimeObject(resource)
if err != nil {
return err
}
metaObj, err := meta.Accessor(runtimeObj)
newMetaObj, err := meta.Accessor(newRuntimeObj)
if err != nil {
return err
}
lastResource, exists, err := s.Get(ctx, metaObj.GetNamespace(), metaObj.GetName())
if !newMetaObj.GetDeletionTimestamp().IsZero() {
cachedResource, exists, err := s.findObjByUID(ctx, newMetaObj.GetUID())
if err != nil {
return err
}
if !exists {
return nil
}
cachedMetaObj, err := meta.Accessor(cachedResource)
if err != nil {
return err
}
// trigger an update event if the object is deleting.
// Only need to update generation/finalizer/deletionTimeStamp of the object.
if len(newMetaObj.GetFinalizers()) != 0 {
cachedMetaObj.SetDeletionTimestamp(newMetaObj.GetDeletionTimestamp())
cachedMetaObj.SetFinalizers(newMetaObj.GetFinalizers())
cachedMetaObj.SetGeneration(newMetaObj.GetGeneration())
cachedRuntimeObj, err := utils.ToRuntimeObject(cachedMetaObj)
if err != nil {
return err
}
return s.Update(cachedRuntimeObj)
}
cachedRuntimeObj, err := utils.ToRuntimeObject(cachedMetaObj)
if err != nil {
return err
}
return s.Delete(cachedRuntimeObj)
}
_, exists, err := s.Get(ctx, newMetaObj.GetNamespace(), newMetaObj.GetName())
if err != nil {
return err
}
if !exists {
return s.Add(runtimeObj)
return s.Add(newRuntimeObj)
}
if !metaObj.GetDeletionTimestamp().IsZero() {
// trigger an update event if the object is deleting.
// Only need to update generation/finalizer/deletionTimeStamp of the object.
if len(metaObj.GetFinalizers()) != 0 {
deletingObj, err := meta.Accessor(lastResource)
if err != nil {
return err
}
deletingObj.SetDeletionTimestamp(metaObj.GetDeletionTimestamp())
deletingObj.SetFinalizers(metaObj.GetFinalizers())
deletingObj.SetGeneration(metaObj.GetGeneration())
runtimeObj, err := utils.ToRuntimeObject(deletingObj)
if err != nil {
return err
}
return s.Update(runtimeObj)
}
return s.Delete(runtimeObj)
}
return s.Update(runtimeObj)
return s.Update(newRuntimeObj)
}
func (s *AgentInformerWatcherStore[T]) GetWatcher(ctx context.Context, namespace string, opts metav1.ListOptions) (watch.Interface, error) {

View File

@@ -49,30 +49,48 @@ func (s *SimpleStore[T]) HasInitiated() bool {
}
func (s *SimpleStore[T]) HandleReceivedResource(ctx context.Context, resource T) error {
runtimeObj, err := utils.ToRuntimeObject(resource)
newRuntimeObj, err := utils.ToRuntimeObject(resource)
if err != nil {
return err
}
metaObj, err := meta.Accessor(runtimeObj)
newMetaObj, err := meta.Accessor(newRuntimeObj)
if err != nil {
return err
}
_, exists, err := s.Get(ctx, metaObj.GetNamespace(), metaObj.GetName())
if !newMetaObj.GetDeletionTimestamp().IsZero() {
cachedResource, exists, err := s.findObjByUID(ctx, newMetaObj.GetUID())
if err != nil {
return err
}
if !exists {
return nil
}
if len(newMetaObj.GetFinalizers()) != 0 {
return nil
}
cachedMetaObj, err := meta.Accessor(cachedResource)
if err != nil {
return err
}
cachedRuntimeObj, err := utils.ToRuntimeObject(cachedMetaObj)
if err != nil {
return err
}
return s.Delete(cachedRuntimeObj)
}
_, exists, err := s.Get(ctx, newMetaObj.GetNamespace(), newMetaObj.GetName())
if err != nil {
return err
}
if !exists {
return s.Add(runtimeObj)
return s.Add(newRuntimeObj)
}
if !metaObj.GetDeletionTimestamp().IsZero() {
if len(metaObj.GetFinalizers()) != 0 {
return nil
}
return s.Delete(runtimeObj)
}
return s.Update(runtimeObj)
return s.Update(newRuntimeObj)
}

View File

@@ -0,0 +1,52 @@
package utils
import (
"fmt"
cloudevents "github.com/cloudevents/sdk-go/v2"
cloudeventstypes "github.com/cloudevents/sdk-go/v2/types"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubetypes "k8s.io/apimachinery/pkg/types"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
)
// DecodeWithDeletionHandling decodes a cloudevent into a resource object, handling deletion events.
// For deletion events without data, it extracts UID and DeletionTimestamp from event extensions.
// For other events, it unmarshals the event data into the resource object.
//
// The factory function creates a new instance of type T.
func DecodeWithDeletionHandling[T metav1.Object](evt *cloudevents.Event, factory func() T) (T, error) {
var zero T
obj := factory()
if _, ok := evt.Extensions()[types.ExtensionDeletionTimestamp]; ok {
if len(evt.Data()) == 0 {
resourceID, err := cloudeventstypes.ToString(evt.Extensions()[types.ExtensionResourceID])
if err != nil {
return zero, err
}
deletionTimestamp, err := cloudeventstypes.ToTime(evt.Extensions()[types.ExtensionDeletionTimestamp])
if err != nil {
return zero, err
}
obj.SetUID(kubetypes.UID(resourceID))
obj.SetDeletionTimestamp(&metav1.Time{Time: deletionTimestamp})
return obj, nil
}
if err := evt.DataAs(obj); err != nil {
return zero, fmt.Errorf("failed to unmarshal event, %v", err)
}
return obj, nil
}
if err := evt.DataAs(obj); err != nil {
return zero, fmt.Errorf("failed to unmarshal event, %v", err)
}
return obj, nil
}

View File

@@ -15,6 +15,7 @@ import (
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/payload"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/utils"
)
// CloudEventAgentClient is a client for an agent to resync/send/receive its resources with cloud events.
@@ -74,21 +75,25 @@ func (c *CloudEventAgentClient[T]) Resync(ctx context.Context, source string) er
return err
}
resources := &payload.ResourceVersionList{Versions: make([]payload.ResourceVersion, len(objs))}
for i, obj := range objs {
resources.Versions[i] = payload.ResourceVersion{
ResourceID: string(obj.GetUID()),
// this should be set as generation, since the resource version of the object is local version.
ResourceVersion: obj.GetGeneration(),
}
}
eventType := types.CloudEventsType{
CloudEventsDataType: c.codec.EventDataType(),
SubResource: types.SubResourceSpec,
Action: types.ResyncRequestAction,
}
resources := &payload.ResourceVersionList{Versions: make([]payload.ResourceVersion, len(objs))}
for i, obj := range objs {
rv, err := utils.GetResourceVersionFromObject(eventType, obj)
if err != nil {
return err
}
resources.Versions[i] = payload.ResourceVersion{
ResourceID: string(obj.GetUID()),
ResourceVersion: rv,
}
}
evt := types.NewEventBuilder(c.agentID, eventType).
WithOriginalSource(source).
WithClusterName(c.clusterName).

View File

@@ -22,10 +22,9 @@ type ResourceObject interface {
// The source should ensure its uniqueness and consistency.
GetUID() kubetypes.UID
// GetResourceVersion returns the resource version of this object. The resource version is a required int64 sequence
// number property that must be incremented by the source whenever this resource changes.
// The source should guarantee its incremental nature.
// Deprecated: use GetGeneration() instead.
// GetResourceVersion returns the resourceVersion of this object.
// It is updated by the system whenever any part of the object changes,
// and is used for optimistic concurrency control.
GetResourceVersion() string
// GetGeneration returns the generation number of this object to reflect the spec change of the resource.

View File

@@ -0,0 +1,60 @@
package utils
import (
"strconv"
cloudevents "github.com/cloudevents/sdk-go/v2"
cloudeventstypes "github.com/cloudevents/sdk-go/v2/types"
workpayload "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/payload"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
)
// SetResourceVersion sets the resourceversion extension on a CloudEvent based on the event type.
// For ManifestBundleEvent, it uses the object's generation number, which represents spec changes.
// For other event types, it uses the object's resource version, which represents all changes to the object.
// If the resource version is empty for non-ManifestBundle types, no extension is set.
func SetResourceVersion(eventType types.CloudEventsType, evt *cloudevents.Event, obj generic.ResourceObject) {
if eventType.CloudEventsDataType == workpayload.ManifestBundleEventDataType {
evt.SetExtension(types.ExtensionResourceVersion, obj.GetGeneration())
return
}
rv := obj.GetResourceVersion()
if rv == "" {
return
}
evt.SetExtension(types.ExtensionResourceVersion, rv)
}
// GetResourceVersionFromObject extracts the resource version from a resource object as an int64.
// For ManifestBundleEvent, it returns the object's generation number directly.
// For other event types, it parses the resource version string to an int64.
// Returns an error if the resource version string cannot be parsed.
func GetResourceVersionFromObject(eventType types.CloudEventsType, obj generic.ResourceObject) (int64, error) {
if eventType.CloudEventsDataType == workpayload.ManifestBundleEventDataType {
return obj.GetGeneration(), nil
}
return strconv.ParseInt(obj.GetResourceVersion(), 10, 64)
}
// GetResourceVersionFromEvent extracts the resource version from a CloudEvent extension as an int64.
// For ManifestBundleEvent, it converts the extension value to an integer (generation).
// For other event types, it converts the extension value to a string and parses it to int64.
// Returns an error if the extension is missing, has an invalid type, or cannot be parsed.
func GetResourceVersionFromEvent(eventType types.CloudEventsType, evt cloudevents.Event) (int64, error) {
if eventType.CloudEventsDataType == workpayload.ManifestBundleEventDataType {
// TODO only use string as the resourceversion extension
gen, err := cloudeventstypes.ToInteger(evt.Extensions()[types.ExtensionResourceVersion])
return int64(gen), err
}
rv, err := cloudeventstypes.ToString(evt.Extensions()[types.ExtensionResourceVersion])
if err != nil {
return 0, err
}
return strconv.ParseInt(rv, 10, 64)
}

View File

@@ -25,6 +25,7 @@ import (
grpcprotocol "open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/grpc/protocol"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/payload"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/utils"
"open-cluster-management.io/sdk-go/pkg/cloudevents/server"
)
@@ -287,7 +288,7 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
}
}
// Upon receiving the spec resync event, the source responds by sending resource status events to the broker as follows:
// Upon receiving the spec resync event, the source responds by sending resource spec events to the broker as follows:
// - If the request event message is empty, the source returns all resources associated with the work agent.
// - If the request event message contains resource IDs and versions, the source retrieves the resource with the
// specified ID and compares the versions.
@@ -295,6 +296,7 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
// resend the resource.
// - If the requested resource version is older than the source's current maintained resource version, the source
// sends the resource.
// - If the requested resource does not exist in the source, the source send a delete request to the agent.
func (bkr *GRPCBroker) respondResyncSpecRequest(ctx context.Context, eventDataType types.CloudEventsDataType, evt *cloudevents.Event) error {
log := klog.FromContext(ctx).WithValues(
"eventDataType", eventDataType, "eventType", evt.Type(), "extensions", evt.Extensions())
@@ -320,77 +322,71 @@ func (bkr *GRPCBroker) respondResyncSpecRequest(ctx context.Context, eventDataTy
return err
}
if len(evts) == 0 {
log.V(4).Info("no objs from the lister, do nothing")
return nil
respEventType := types.CloudEventsType{
CloudEventsDataType: eventDataType,
SubResource: types.SubResourceSpec,
Action: types.ResyncResponseAction,
}
for _, evt := range evts {
evt.SetType(respEventType.String())
evtLogger := log.WithValues("eventType", evt.Type(), "extensions", evt.Extensions())
// respond with the deleting resource regardless of the resource version
objLogger := log.WithValues("eventType", evt.Type(), "extensions", evt.Extensions())
if _, ok := evt.Extensions()[types.ExtensionDeletionTimestamp]; ok {
objLogger.V(4).Info("respond spec resync request")
deleteEventTypes := types.CloudEventsType{
CloudEventsDataType: eventDataType,
SubResource: types.SubResourceSpec,
Action: types.DeleteRequestAction,
}
evt.SetType(deleteEventTypes.String())
evtLogger.V(4).Info("respond spec resync request")
err = bkr.HandleEvent(ctx, evt)
if err != nil {
objLogger.Error(err, "failed to handle resync spec request")
evtLogger.Error(err, "failed to handle resync spec request")
}
continue
}
lastResourceVersion := findResourceVersion(evt.ID(), resourceVersions.Versions)
currentResourceVersion, err := cloudeventstypes.ToInteger(evt.Extensions()[types.ExtensionResourceVersion])
resourceID, err := cloudeventstypes.ToString(evt.Extensions()[types.ExtensionResourceID])
if err != nil {
objLogger.V(4).Info("ignore the event since it has a invalid resourceVersion", "error", err)
evtLogger.Error(err, "failed to get resourceid extension")
continue
}
lastResourceVersion := findResourceVersion(resourceID, resourceVersions.Versions)
currentResourceVersion, err := utils.GetResourceVersionFromEvent(respEventType, *evt)
if err != nil {
evtLogger.V(4).Info("ignore the event since it has an invalid resourceVersion", "error", err)
continue
}
// the version of the work is not maintained on source or the source's work is newer than agent, send
// the newer work to agent
if currentResourceVersion == 0 || int64(currentResourceVersion) > lastResourceVersion {
objLogger.V(4).Info("respond spec resync request")
updateEventTypes := types.CloudEventsType{
CloudEventsDataType: eventDataType,
SubResource: types.SubResourceSpec,
Action: types.UpdateRequestAction,
}
evt.SetType(updateEventTypes.String())
if currentResourceVersion == 0 || currentResourceVersion > lastResourceVersion {
evtLogger.V(4).Info("respond spec resync request")
err := bkr.HandleEvent(ctx, evt)
if err != nil {
objLogger.Error(err, "failed to handle resync spec request")
evtLogger.Error(err, "failed to handle resync spec request")
}
}
}
// the resources do not exist on the source, but exist on the agent, delete them
for _, rv := range resourceVersions.Versions {
_, exists := getObj(rv.ResourceID, evts)
_, exists := getEvent(rv.ResourceID, evts)
if exists {
continue
}
deleteEventTypes := types.CloudEventsType{
CloudEventsDataType: eventDataType,
SubResource: types.SubResourceSpec,
Action: types.DeleteRequestAction,
}
obj := types.NewEventBuilder("source", deleteEventTypes).
// for deletion, we don't care about the resourceVersion.
// TODO support to set the source from broker options
evt := types.NewEventBuilder("source", respEventType).
WithResourceID(rv.ResourceID).
WithResourceVersion(rv.ResourceVersion).
WithClusterName(clusterName).
WithDeletionTimestamp(time.Now()).
NewEvent()
evtLogger := log.WithValues("eventType", evt.Type(), "extensions", evt.Extensions())
// send a delete event for the current resource
log.V(4).Info("respond spec resync request")
err := bkr.HandleEvent(ctx, &obj)
evtLogger.V(4).Info("respond spec resync request")
err := bkr.HandleEvent(ctx, &evt)
if err != nil {
log.Error(err, "failed to handle delete request")
evtLogger.Error(err, "failed to handle resync spec request")
}
}
@@ -455,13 +451,13 @@ func findResourceVersion(id string, versions []payload.ResourceVersion) int64 {
return 0
}
// getObj returns the object with the given ID from the list of resources.
func getObj(id string, objs []*cloudevents.Event) (*cloudevents.Event, bool) {
for _, obj := range objs {
resID := obj.Extensions()[types.ExtensionResourceID]
// getEvent returns the event with the given ID from the list of events.
func getEvent(id string, evts []*cloudevents.Event) (*cloudevents.Event, bool) {
for _, evt := range evts {
resID := evt.Extensions()[types.ExtensionResourceID]
resIDStr, ok := resID.(string)
if ok && id == resIDStr {
return obj, true
return evt, true
}
}