upgrade sdk-go. (#1286)
Some checks failed
Post / coverage (push) Failing after 36m56s
Post / images (amd64, addon-manager) (push) Failing after 7m41s
Post / images (amd64, placement) (push) Failing after 7m12s
Post / images (amd64, registration) (push) Failing after 7m2s
Post / images (amd64, registration-operator) (push) Failing after 7m6s
Post / images (amd64, work) (push) Failing after 7m11s
Post / images (arm64, addon-manager) (push) Failing after 7m22s
Post / images (arm64, placement) (push) Failing after 7m11s
Post / images (arm64, registration) (push) Failing after 7m2s
Post / images (arm64, registration-operator) (push) Failing after 7m8s
Post / images (arm64, work) (push) Failing after 7m5s
Post / image manifest (addon-manager) (push) Has been skipped
Post / image manifest (placement) (push) Has been skipped
Post / image manifest (registration) (push) Has been skipped
Post / image manifest (registration-operator) (push) Has been skipped
Post / image manifest (work) (push) Has been skipped
Post / trigger clusteradm e2e (push) Has been skipped
Scorecard supply-chain security / Scorecard analysis (push) Failing after 1m35s
Close stale issues and PRs / stale (push) Successful in 42s

Signed-off-by: Morven Cao <lcao@redhat.com>
This commit is contained in:
Morven Cao
2025-12-09 12:04:14 +08:00
committed by GitHub
parent ce60b05e9d
commit 6d5d82c1a2
9 changed files with 72 additions and 25 deletions

2
go.mod
View File

@@ -41,7 +41,7 @@ require (
k8s.io/utils v0.0.0-20250604170112-4c0f3b243397
open-cluster-management.io/addon-framework v1.1.1-0.20251126020917-1a0a9be61322
open-cluster-management.io/api v1.1.1-0.20251124092621-2337d27c3b7f
open-cluster-management.io/sdk-go v1.1.1-0.20251204021848-5e7a48eb49a5
open-cluster-management.io/sdk-go v1.1.1-0.20251209031938-62521c9935ac
sigs.k8s.io/about-api v0.0.0-20250131010323-518069c31c03
sigs.k8s.io/cluster-inventory-api v0.0.0-20240730014211-ef0154379848
sigs.k8s.io/controller-runtime v0.22.4

4
go.sum
View File

@@ -592,8 +592,8 @@ open-cluster-management.io/addon-framework v1.1.1-0.20251126020917-1a0a9be61322
open-cluster-management.io/addon-framework v1.1.1-0.20251126020917-1a0a9be61322/go.mod h1:YqG/M9aLM/jhUXZDb2lEi2gGFU8NHAPTsQEFGk/tiS8=
open-cluster-management.io/api v1.1.1-0.20251124092621-2337d27c3b7f h1:aOEXqgXvWSykMvzw9drHsLGUNP3xhsk1PcdRzfOVXoM=
open-cluster-management.io/api v1.1.1-0.20251124092621-2337d27c3b7f/go.mod h1:Hk/3c114t6Ba5qhpqw+RoA93yEbE2CosG+JzzBZ6aCo=
open-cluster-management.io/sdk-go v1.1.1-0.20251204021848-5e7a48eb49a5 h1:eaaIq3YPE0PPsjGJQqJGvZETkTEjMiVx18O96KTl1a4=
open-cluster-management.io/sdk-go v1.1.1-0.20251204021848-5e7a48eb49a5/go.mod h1:0EZ9M7AtD0b+x9lUo5pYlyFF2aKOk1y88looeOVybwU=
open-cluster-management.io/sdk-go v1.1.1-0.20251209031938-62521c9935ac h1:Wt7rzenZqrtyYI58+lpe9tmf9e5Ft8Wwd0MyDwuJ4ck=
open-cluster-management.io/sdk-go v1.1.1-0.20251209031938-62521c9935ac/go.mod h1:0EZ9M7AtD0b+x9lUo5pYlyFF2aKOk1y88looeOVybwU=
sigs.k8s.io/about-api v0.0.0-20250131010323-518069c31c03 h1:1ShFiMjGQOR/8jTBkmZrk1gORxnvMwm1nOy2/DbHg4U=
sigs.k8s.io/about-api v0.0.0-20250131010323-518069c31c03/go.mod h1:F1pT4mK53U6F16/zuaPSYpBaR7x5Kjym6aKJJC0/DHU=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.2 h1:jpcvIRr3GLoUoEKRkHKSmGjxb6lWwrBlJsXc+eUYQHM=

2
vendor/modules.txt vendored
View File

@@ -1956,7 +1956,7 @@ open-cluster-management.io/api/operator/v1
open-cluster-management.io/api/utils/work/v1/workapplier
open-cluster-management.io/api/work/v1
open-cluster-management.io/api/work/v1alpha1
# open-cluster-management.io/sdk-go v1.1.1-0.20251204021848-5e7a48eb49a5
# open-cluster-management.io/sdk-go v1.1.1-0.20251209031938-62521c9935ac
## explicit; go 1.24.0
open-cluster-management.io/sdk-go/pkg/apis/cluster/v1alpha1
open-cluster-management.io/sdk-go/pkg/apis/cluster/v1beta1

View File

@@ -3,6 +3,7 @@ package codec
import (
"encoding/json"
"fmt"
"github.com/bwmarrin/snowflake"
cloudevents "github.com/cloudevents/sdk-go/v2"
cloudeventstypes "github.com/cloudevents/sdk-go/v2/types"
@@ -142,9 +143,7 @@ func (c *ManifestBundleCodec) Decode(evt *cloudevents.Event) (*workv1.ManifestWo
metaObj.ResourceVersion = ""
// The resourceVersion in cloudevent actually sematically equals to generation, since it increments when
// spec changes
if metaObj.Generation == 0 {
metaObj.Generation = int64(resourceVersion)
}
metaObj.Generation = int64(resourceVersion)
if metaObj.Annotations == nil {
metaObj.Annotations = map[string]string{}
}

View File

@@ -200,7 +200,12 @@ func (c *CloudEventAgentClient[T]) receive(ctx context.Context, evt cloudevents.
action, err := c.specAction(evt.Source(), eventType.CloudEventsDataType, obj)
if err != nil {
logger.Error(err, "failed to generate spec action", "event", evt)
if logger.V(4).Enabled() {
evtData, _ := evt.MarshalJSON()
logger.Error(err, "failed to generate spec action", "event", string(evtData))
} else {
logger.Error(err, "failed to generate spec action")
}
return
}
@@ -211,7 +216,12 @@ func (c *CloudEventAgentClient[T]) receive(ctx context.Context, evt cloudevents.
for _, handler := range handlers {
if err := handler(ctx, action, obj); err != nil {
logger.Error(err, "failed to handle spec event", "event", evt)
if logger.V(4).Enabled() {
evtData, _ := evt.MarshalJSON()
logger.Error(err, "failed to handle spec event", "event", string(evtData))
} else {
logger.Error(err, "failed to handle spec event")
}
}
}
}

View File

@@ -140,9 +140,12 @@ func (c *baseClient) publish(ctx context.Context, evt cloudevents.Event) error {
return fmt.Errorf("the cloudevents client is not ready")
}
logger.V(2).Info("Sending event", "event", evt.Context)
if logger.V(5).Enabled() {
logger.V(5).Info("Sending event", "event", evt.String())
evtData, _ := evt.MarshalJSON()
logger.V(5).Info("Sending event", "event", string(evtData))
} else {
logger.V(2).Info("Sending event",
"eventType", evt.Type(), "extensions", evt.Extensions())
}
if err := c.transport.Send(ctx, evt); err != nil {
return err
@@ -204,9 +207,12 @@ func (c *baseClient) subscribe(ctx context.Context, receive receiveFn) {
if err := c.transport.Receive(receiverCtx, func(ctx context.Context, evt cloudevents.Event) {
receiveLogger := logging.SetLogTracingByCloudEvent(klog.FromContext(ctx), &evt)
ctx = klog.NewContext(ctx, receiveLogger)
receiveLogger.V(2).Info("Received event", "event", evt.Context)
if receiveLogger.V(5).Enabled() {
receiveLogger.V(5).Info("Received event", "event", evt.String())
evtData, _ := evt.MarshalJSON()
receiveLogger.V(5).Info("Received event", "event", string(evtData))
} else {
receiveLogger.V(2).Info("Received event",
"eventType", evt.Type(), "extensions", evt.Extensions())
}
receive(ctx, evt)
}); err != nil {

View File

@@ -204,7 +204,12 @@ func (c *CloudEventSourceClient[T]) receive(ctx context.Context, evt cloudevents
for _, handler := range handlers {
if err := handler(ctx, types.StatusModified, obj); err != nil {
logger.Error(err, "failed to handle status event", "event", evt)
if logger.V(4).Enabled() {
evtData, _ := evt.MarshalJSON()
logger.Error(err, "failed to handle status event", "event", string(evtData))
} else {
logger.Error(err, "failed to handle status event")
}
}
}
}

View File

@@ -6,6 +6,7 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
"os"
"reflect"
"sync"
"time"
@@ -26,9 +27,32 @@ type Connection interface {
type reloadFunc func(*tls.CertificateRequestInfo) (*tls.Certificate, error)
// CertCallbackRefreshDuration is exposed so that integration tests can crank up the reload speed.
// CertCallbackRefreshDuration controls how frequently certificate callbacks reload.
// Default is 5 minutes.
//
// NOTE: This can be overridden via the environment variable
//
// CERT_CALLBACK_REFRESH_DURATION **for testing only**.
// Do NOT rely on this environment variable in production.
var CertCallbackRefreshDuration = 5 * time.Minute
func init() {
// TEST-ONLY OVERRIDE:
// Allow integration tests to reduce reload intervals by setting
// CERT_CALLBACK_REFRESH_DURATION to a valid Go duration string (e.g., "10s").
//
// If the variable is not set or is invalid, the default (5m) is preserved.
if v := os.Getenv("CERT_CALLBACK_REFRESH_DURATION"); v != "" {
d, err := time.ParseDuration(v)
if err != nil {
// Optional: log or print a warning
klog.Warningf("invalid CERT_CALLBACK_REFRESH_DURATION (%q): %v, using default\n", v, err)
return
}
CertCallbackRefreshDuration = d
}
}
type clientCertRotating struct {
sync.RWMutex

View File

@@ -92,7 +92,7 @@ func (bkr *GRPCBroker) Publish(ctx context.Context, pubReq *pbv1.PublishRequest)
return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("failed to parse cloud event type %s, %v", evt.Type(), err))
}
logger.V(4).Info("receive the event with grpc broker", "eventContext", evt.Context)
logger.V(4).Info("receive the event with grpc broker", "eventType", evt.Type(), "extensions", evt.Extensions())
// handler resync request
if eventType.Action == types.ResyncRequestAction {
@@ -234,7 +234,8 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
}
// send the cloudevent to the subscriber
logger.V(4).Info("sending the event to spec subscribers", "subID", subID, "eventContext", evt.Context)
logger.V(4).Info("sending the event to spec subscribers",
"subID", subID, "eventType", evt.Type(), "extensions", evt.Extensions())
select {
case eventCh <- pbEvt:
case <-subCtx.Done():
@@ -280,7 +281,8 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
// - If the requested resource version is older than the source's current maintained resource version, the source
// sends the resource.
func (bkr *GRPCBroker) respondResyncSpecRequest(ctx context.Context, eventDataType types.CloudEventsDataType, evt *cloudevents.Event) error {
log := klog.FromContext(ctx)
log := klog.FromContext(ctx).WithValues(
"eventDataType", eventDataType, "eventType", evt.Type(), "extensions", evt.Extensions())
resourceVersions, err := payload.DecodeSpecResyncRequest(*evt)
if err != nil {
@@ -304,17 +306,18 @@ func (bkr *GRPCBroker) respondResyncSpecRequest(ctx context.Context, eventDataTy
}
if len(objs) == 0 {
log.V(4).Info("no objs from the lister, do nothing", "eventContext", evt.Context)
log.V(4).Info("no objs from the lister, do nothing")
return nil
}
for _, obj := range objs {
// respond with the deleting resource regardless of the resource version
objLogger := log.WithValues("eventType", obj.Type(), "extensions", obj.Extensions())
if _, ok := obj.Extensions()[types.ExtensionDeletionTimestamp]; ok {
log.V(4).Info("respond spec resync request", "eventContext", evt.Context)
objLogger.V(4).Info("respond spec resync request")
err = bkr.handleRes(ctx, obj, eventDataType, "delete_request")
if err != nil {
log.Error(err, "failed to handle resync spec request")
objLogger.Error(err, "failed to handle resync spec request")
}
continue
}
@@ -322,17 +325,17 @@ func (bkr *GRPCBroker) respondResyncSpecRequest(ctx context.Context, eventDataTy
lastResourceVersion := findResourceVersion(obj.ID(), resourceVersions.Versions)
currentResourceVersion, err := cloudeventstypes.ToInteger(obj.Extensions()[types.ExtensionResourceVersion])
if err != nil {
log.V(4).Info("ignore the event since it has a invalid resourceVersion", "eventContext", obj.Context, "error", err)
objLogger.V(4).Info("ignore the event since it has a invalid resourceVersion", "error", err)
continue
}
// the version of the work is not maintained on source or the source's work is newer than agent, send
// the newer work to agent
if currentResourceVersion == 0 || int64(currentResourceVersion) > lastResourceVersion {
log.V(4).Info("respond spec resync request", "eventContext", evt.Context)
objLogger.V(4).Info("respond spec resync request")
err := bkr.handleRes(ctx, obj, eventDataType, "update_request")
if err != nil {
log.Error(err, "failed to handle resync spec request")
objLogger.Error(err, "failed to handle resync spec request")
}
}
}
@@ -356,7 +359,7 @@ func (bkr *GRPCBroker) respondResyncSpecRequest(ctx context.Context, eventDataTy
NewEvent()
// send a delete event for the current resource
log.V(4).Info("respond spec resync request", "eventContext", evt.Context)
log.V(4).Info("respond spec resync request")
err := bkr.handleRes(ctx, &obj, eventDataType, "delete_request")
if err != nil {
log.Error(err, "failed to handle delete request")