From 6d5d82c1a29c1be47c26a8c8ae92a73e45681e7e Mon Sep 17 00:00:00 2001 From: Morven Cao Date: Tue, 9 Dec 2025 12:04:14 +0800 Subject: [PATCH] upgrade sdk-go. (#1286) Signed-off-by: Morven Cao --- go.mod | 2 +- go.sum | 4 +-- vendor/modules.txt | 2 +- .../work/agent/codec/manifestbundle.go | 5 ++-- .../generic/clients/agentclient.go | 14 ++++++++-- .../cloudevents/generic/clients/baseclient.go | 14 +++++++--- .../generic/clients/sourceclient.go | 7 ++++- .../generic/options/cert/rotation.go | 26 ++++++++++++++++++- .../pkg/cloudevents/server/grpc/broker.go | 23 +++++++++------- 9 files changed, 72 insertions(+), 25 deletions(-) diff --git a/go.mod b/go.mod index dc77bd418..5dca086c4 100644 --- a/go.mod +++ b/go.mod @@ -41,7 +41,7 @@ require ( k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 open-cluster-management.io/addon-framework v1.1.1-0.20251126020917-1a0a9be61322 open-cluster-management.io/api v1.1.1-0.20251124092621-2337d27c3b7f - open-cluster-management.io/sdk-go v1.1.1-0.20251204021848-5e7a48eb49a5 + open-cluster-management.io/sdk-go v1.1.1-0.20251209031938-62521c9935ac sigs.k8s.io/about-api v0.0.0-20250131010323-518069c31c03 sigs.k8s.io/cluster-inventory-api v0.0.0-20240730014211-ef0154379848 sigs.k8s.io/controller-runtime v0.22.4 diff --git a/go.sum b/go.sum index f7b9ddb47..256ced170 100644 --- a/go.sum +++ b/go.sum @@ -592,8 +592,8 @@ open-cluster-management.io/addon-framework v1.1.1-0.20251126020917-1a0a9be61322 open-cluster-management.io/addon-framework v1.1.1-0.20251126020917-1a0a9be61322/go.mod h1:YqG/M9aLM/jhUXZDb2lEi2gGFU8NHAPTsQEFGk/tiS8= open-cluster-management.io/api v1.1.1-0.20251124092621-2337d27c3b7f h1:aOEXqgXvWSykMvzw9drHsLGUNP3xhsk1PcdRzfOVXoM= open-cluster-management.io/api v1.1.1-0.20251124092621-2337d27c3b7f/go.mod h1:Hk/3c114t6Ba5qhpqw+RoA93yEbE2CosG+JzzBZ6aCo= -open-cluster-management.io/sdk-go v1.1.1-0.20251204021848-5e7a48eb49a5 h1:eaaIq3YPE0PPsjGJQqJGvZETkTEjMiVx18O96KTl1a4= -open-cluster-management.io/sdk-go v1.1.1-0.20251204021848-5e7a48eb49a5/go.mod h1:0EZ9M7AtD0b+x9lUo5pYlyFF2aKOk1y88looeOVybwU= +open-cluster-management.io/sdk-go v1.1.1-0.20251209031938-62521c9935ac h1:Wt7rzenZqrtyYI58+lpe9tmf9e5Ft8Wwd0MyDwuJ4ck= +open-cluster-management.io/sdk-go v1.1.1-0.20251209031938-62521c9935ac/go.mod h1:0EZ9M7AtD0b+x9lUo5pYlyFF2aKOk1y88looeOVybwU= sigs.k8s.io/about-api v0.0.0-20250131010323-518069c31c03 h1:1ShFiMjGQOR/8jTBkmZrk1gORxnvMwm1nOy2/DbHg4U= sigs.k8s.io/about-api v0.0.0-20250131010323-518069c31c03/go.mod h1:F1pT4mK53U6F16/zuaPSYpBaR7x5Kjym6aKJJC0/DHU= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.2 h1:jpcvIRr3GLoUoEKRkHKSmGjxb6lWwrBlJsXc+eUYQHM= diff --git a/vendor/modules.txt b/vendor/modules.txt index 586fd94ec..b9a5731c8 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1956,7 +1956,7 @@ open-cluster-management.io/api/operator/v1 open-cluster-management.io/api/utils/work/v1/workapplier open-cluster-management.io/api/work/v1 open-cluster-management.io/api/work/v1alpha1 -# open-cluster-management.io/sdk-go v1.1.1-0.20251204021848-5e7a48eb49a5 +# open-cluster-management.io/sdk-go v1.1.1-0.20251209031938-62521c9935ac ## explicit; go 1.24.0 open-cluster-management.io/sdk-go/pkg/apis/cluster/v1alpha1 open-cluster-management.io/sdk-go/pkg/apis/cluster/v1beta1 diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/agent/codec/manifestbundle.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/agent/codec/manifestbundle.go index d4df2fa3d..9c0cca170 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/agent/codec/manifestbundle.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/agent/codec/manifestbundle.go @@ -3,6 +3,7 @@ package codec import ( "encoding/json" "fmt" + "github.com/bwmarrin/snowflake" cloudevents "github.com/cloudevents/sdk-go/v2" cloudeventstypes "github.com/cloudevents/sdk-go/v2/types" @@ -142,9 +143,7 @@ func (c *ManifestBundleCodec) Decode(evt *cloudevents.Event) (*workv1.ManifestWo metaObj.ResourceVersion = "" // The resourceVersion in cloudevent actually sematically equals to generation, since it increments when // spec changes - if metaObj.Generation == 0 { - metaObj.Generation = int64(resourceVersion) - } + metaObj.Generation = int64(resourceVersion) if metaObj.Annotations == nil { metaObj.Annotations = map[string]string{} } diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/clients/agentclient.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/clients/agentclient.go index 5ad9c947a..7550de162 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/clients/agentclient.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/clients/agentclient.go @@ -200,7 +200,12 @@ func (c *CloudEventAgentClient[T]) receive(ctx context.Context, evt cloudevents. action, err := c.specAction(evt.Source(), eventType.CloudEventsDataType, obj) if err != nil { - logger.Error(err, "failed to generate spec action", "event", evt) + if logger.V(4).Enabled() { + evtData, _ := evt.MarshalJSON() + logger.Error(err, "failed to generate spec action", "event", string(evtData)) + } else { + logger.Error(err, "failed to generate spec action") + } return } @@ -211,7 +216,12 @@ func (c *CloudEventAgentClient[T]) receive(ctx context.Context, evt cloudevents. for _, handler := range handlers { if err := handler(ctx, action, obj); err != nil { - logger.Error(err, "failed to handle spec event", "event", evt) + if logger.V(4).Enabled() { + evtData, _ := evt.MarshalJSON() + logger.Error(err, "failed to handle spec event", "event", string(evtData)) + } else { + logger.Error(err, "failed to handle spec event") + } } } } diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/clients/baseclient.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/clients/baseclient.go index 618ec2461..c9067aa08 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/clients/baseclient.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/clients/baseclient.go @@ -140,9 +140,12 @@ func (c *baseClient) publish(ctx context.Context, evt cloudevents.Event) error { return fmt.Errorf("the cloudevents client is not ready") } - logger.V(2).Info("Sending event", "event", evt.Context) if logger.V(5).Enabled() { - logger.V(5).Info("Sending event", "event", evt.String()) + evtData, _ := evt.MarshalJSON() + logger.V(5).Info("Sending event", "event", string(evtData)) + } else { + logger.V(2).Info("Sending event", + "eventType", evt.Type(), "extensions", evt.Extensions()) } if err := c.transport.Send(ctx, evt); err != nil { return err @@ -204,9 +207,12 @@ func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) { if err := c.transport.Receive(receiverCtx, func(ctx context.Context, evt cloudevents.Event) { receiveLogger := logging.SetLogTracingByCloudEvent(klog.FromContext(ctx), &evt) ctx = klog.NewContext(ctx, receiveLogger) - receiveLogger.V(2).Info("Received event", "event", evt.Context) if receiveLogger.V(5).Enabled() { - receiveLogger.V(5).Info("Received event", "event", evt.String()) + evtData, _ := evt.MarshalJSON() + receiveLogger.V(5).Info("Received event", "event", string(evtData)) + } else { + receiveLogger.V(2).Info("Received event", + "eventType", evt.Type(), "extensions", evt.Extensions()) } receive(ctx, evt) }); err != nil { diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/clients/sourceclient.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/clients/sourceclient.go index a14c3c11f..b8d2abd93 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/clients/sourceclient.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/clients/sourceclient.go @@ -204,7 +204,12 @@ func (c *CloudEventSourceClient[T]) receive(ctx context.Context, evt cloudevents for _, handler := range handlers { if err := handler(ctx, types.StatusModified, obj); err != nil { - logger.Error(err, "failed to handle status event", "event", evt) + if logger.V(4).Enabled() { + evtData, _ := evt.MarshalJSON() + logger.Error(err, "failed to handle status event", "event", string(evtData)) + } else { + logger.Error(err, "failed to handle status event") + } } } } diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/cert/rotation.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/cert/rotation.go index 60bde170a..33ad040e4 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/cert/rotation.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/cert/rotation.go @@ -6,6 +6,7 @@ import ( "crypto/tls" "crypto/x509" "fmt" + "os" "reflect" "sync" "time" @@ -26,9 +27,32 @@ type Connection interface { type reloadFunc func(*tls.CertificateRequestInfo) (*tls.Certificate, error) -// CertCallbackRefreshDuration is exposed so that integration tests can crank up the reload speed. +// CertCallbackRefreshDuration controls how frequently certificate callbacks reload. +// Default is 5 minutes. +// +// NOTE: This can be overridden via the environment variable +// +// CERT_CALLBACK_REFRESH_DURATION **for testing only**. +// Do NOT rely on this environment variable in production. var CertCallbackRefreshDuration = 5 * time.Minute +func init() { + // TEST-ONLY OVERRIDE: + // Allow integration tests to reduce reload intervals by setting + // CERT_CALLBACK_REFRESH_DURATION to a valid Go duration string (e.g., "10s"). + // + // If the variable is not set or is invalid, the default (5m) is preserved. + if v := os.Getenv("CERT_CALLBACK_REFRESH_DURATION"); v != "" { + d, err := time.ParseDuration(v) + if err != nil { + // Optional: log or print a warning + klog.Warningf("invalid CERT_CALLBACK_REFRESH_DURATION (%q): %v, using default\n", v, err) + return + } + CertCallbackRefreshDuration = d + } +} + type clientCertRotating struct { sync.RWMutex diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/broker.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/broker.go index 5aede6e33..d48d335d9 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/broker.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/broker.go @@ -92,7 +92,7 @@ func (bkr *GRPCBroker) Publish(ctx context.Context, pubReq *pbv1.PublishRequest) return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("failed to parse cloud event type %s, %v", evt.Type(), err)) } - logger.V(4).Info("receive the event with grpc broker", "eventContext", evt.Context) + logger.V(4).Info("receive the event with grpc broker", "eventType", evt.Type(), "extensions", evt.Extensions()) // handler resync request if eventType.Action == types.ResyncRequestAction { @@ -234,7 +234,8 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv } // send the cloudevent to the subscriber - logger.V(4).Info("sending the event to spec subscribers", "subID", subID, "eventContext", evt.Context) + logger.V(4).Info("sending the event to spec subscribers", + "subID", subID, "eventType", evt.Type(), "extensions", evt.Extensions()) select { case eventCh <- pbEvt: case <-subCtx.Done(): @@ -280,7 +281,8 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv // - If the requested resource version is older than the source's current maintained resource version, the source // sends the resource. func (bkr *GRPCBroker) respondResyncSpecRequest(ctx context.Context, eventDataType types.CloudEventsDataType, evt *cloudevents.Event) error { - log := klog.FromContext(ctx) + log := klog.FromContext(ctx).WithValues( + "eventDataType", eventDataType, "eventType", evt.Type(), "extensions", evt.Extensions()) resourceVersions, err := payload.DecodeSpecResyncRequest(*evt) if err != nil { @@ -304,17 +306,18 @@ func (bkr *GRPCBroker) respondResyncSpecRequest(ctx context.Context, eventDataTy } if len(objs) == 0 { - log.V(4).Info("no objs from the lister, do nothing", "eventContext", evt.Context) + log.V(4).Info("no objs from the lister, do nothing") return nil } for _, obj := range objs { // respond with the deleting resource regardless of the resource version + objLogger := log.WithValues("eventType", obj.Type(), "extensions", obj.Extensions()) if _, ok := obj.Extensions()[types.ExtensionDeletionTimestamp]; ok { - log.V(4).Info("respond spec resync request", "eventContext", evt.Context) + objLogger.V(4).Info("respond spec resync request") err = bkr.handleRes(ctx, obj, eventDataType, "delete_request") if err != nil { - log.Error(err, "failed to handle resync spec request") + objLogger.Error(err, "failed to handle resync spec request") } continue } @@ -322,17 +325,17 @@ func (bkr *GRPCBroker) respondResyncSpecRequest(ctx context.Context, eventDataTy lastResourceVersion := findResourceVersion(obj.ID(), resourceVersions.Versions) currentResourceVersion, err := cloudeventstypes.ToInteger(obj.Extensions()[types.ExtensionResourceVersion]) if err != nil { - log.V(4).Info("ignore the event since it has a invalid resourceVersion", "eventContext", obj.Context, "error", err) + objLogger.V(4).Info("ignore the event since it has a invalid resourceVersion", "error", err) continue } // the version of the work is not maintained on source or the source's work is newer than agent, send // the newer work to agent if currentResourceVersion == 0 || int64(currentResourceVersion) > lastResourceVersion { - log.V(4).Info("respond spec resync request", "eventContext", evt.Context) + objLogger.V(4).Info("respond spec resync request") err := bkr.handleRes(ctx, obj, eventDataType, "update_request") if err != nil { - log.Error(err, "failed to handle resync spec request") + objLogger.Error(err, "failed to handle resync spec request") } } } @@ -356,7 +359,7 @@ func (bkr *GRPCBroker) respondResyncSpecRequest(ctx context.Context, eventDataTy NewEvent() // send a delete event for the current resource - log.V(4).Info("respond spec resync request", "eventContext", evt.Context) + log.V(4).Info("respond spec resync request") err := bkr.handleRes(ctx, &obj, eventDataType, "delete_request") if err != nil { log.Error(err, "failed to handle delete request")