From d5e677414ce1e182129e5f86c03b1b0e71861dff Mon Sep 17 00:00:00 2001 From: Wei Liu Date: Mon, 12 Jan 2026 20:50:14 +0800 Subject: [PATCH] add options to grpc broker (#1326) Signed-off-by: Wei Liu --- go.mod | 2 +- go.sum | 4 +- pkg/server/grpc/options.go | 14 +++--- pkg/server/grpc/options_test.go | 10 ++--- .../registration/spokecluster_grpc_test.go | 2 +- test/integration/work/suite_test.go | 2 +- vendor/modules.txt | 2 +- .../pkg/cloudevents/server/grpc/broker.go | 25 ++++++++--- .../pkg/cloudevents/server/grpc/options.go | 44 +++++++++++++++++++ 9 files changed, 83 insertions(+), 22 deletions(-) create mode 100644 vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/options.go diff --git a/go.mod b/go.mod index b0a7affe7..95f04d4a5 100644 --- a/go.mod +++ b/go.mod @@ -41,7 +41,7 @@ require ( k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 open-cluster-management.io/addon-framework v1.1.1-0.20251222073158-b5846d76add9 open-cluster-management.io/api v1.1.1-0.20260108015315-68cef17a0643 - open-cluster-management.io/sdk-go v1.1.1-0.20260108080638-c607eaaa5d12 + open-cluster-management.io/sdk-go v1.1.1-0.20260112054941-b6c1a665df1b sigs.k8s.io/about-api v0.0.0-20250131010323-518069c31c03 sigs.k8s.io/cluster-inventory-api v0.0.0-20251124125836-445319b6307a sigs.k8s.io/controller-runtime v0.22.4 diff --git a/go.sum b/go.sum index 5d461125a..0ea7450a4 100644 --- a/go.sum +++ b/go.sum @@ -587,8 +587,8 @@ open-cluster-management.io/addon-framework v1.1.1-0.20251222073158-b5846d76add9 open-cluster-management.io/addon-framework v1.1.1-0.20251222073158-b5846d76add9/go.mod h1:St9LTEuZ5ADLY9cVXSp+rVE/ZbPJ+hzNQ7/YcsiQVd8= open-cluster-management.io/api v1.1.1-0.20260108015315-68cef17a0643 h1:eA/8UpvFuWr79O7/aAT4bcx/tVG9kkl7+4u9o9dRShM= open-cluster-management.io/api v1.1.1-0.20260108015315-68cef17a0643/go.mod h1:YcmA6SpGEekIMxdoeVIIyOaBhMA6ImWRLXP4g8n8T+4= -open-cluster-management.io/sdk-go v1.1.1-0.20260108080638-c607eaaa5d12 h1:WZrNXgDKxAneVjiiNf0b3ApOyjuEgBcz2akw6vYVxJ8= -open-cluster-management.io/sdk-go v1.1.1-0.20260108080638-c607eaaa5d12/go.mod h1:4haPv/uuKqQ3gxi62/PPknlrUFi132ga0KYLwj5tpx0= +open-cluster-management.io/sdk-go v1.1.1-0.20260112054941-b6c1a665df1b h1:r5U3cDh6kuBmzKnAUqeoYPwwVU/VS9udvpcDEkxh6g4= +open-cluster-management.io/sdk-go v1.1.1-0.20260112054941-b6c1a665df1b/go.mod h1:4haPv/uuKqQ3gxi62/PPknlrUFi132ga0KYLwj5tpx0= sigs.k8s.io/about-api v0.0.0-20250131010323-518069c31c03 h1:1ShFiMjGQOR/8jTBkmZrk1gORxnvMwm1nOy2/DbHg4U= sigs.k8s.io/about-api v0.0.0-20250131010323-518069c31c03/go.mod h1:F1pT4mK53U6F16/zuaPSYpBaR7x5Kjym6aKJJC0/DHU= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.2 h1:jpcvIRr3GLoUoEKRkHKSmGjxb6lWwrBlJsXc+eUYQHM= diff --git a/pkg/server/grpc/options.go b/pkg/server/grpc/options.go index d86fa9075..3fad04d65 100644 --- a/pkg/server/grpc/options.go +++ b/pkg/server/grpc/options.go @@ -29,15 +29,19 @@ import ( ) type GRPCServerOptions struct { - GRPCServerConfig string + GRPCServerConfig string + grpcBrokerOptions *cloudeventsgrpc.BrokerOptions } func NewGRPCServerOptions() *GRPCServerOptions { - return &GRPCServerOptions{} + return &GRPCServerOptions{ + grpcBrokerOptions: cloudeventsgrpc.NewBrokerOptions(), + } } func (o *GRPCServerOptions) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&o.GRPCServerConfig, "server-config", o.GRPCServerConfig, "Location of the server configuration file.") + o.grpcBrokerOptions.AddFlags(fs) } func (o *GRPCServerOptions) Run(ctx context.Context, controllerContext *controllercmd.ControllerContext) error { @@ -51,8 +55,8 @@ func (o *GRPCServerOptions) Run(ctx context.Context, controllerContext *controll return err } - // initlize grpc broker and register services - grpcEventServer := cloudeventsgrpc.NewGRPCBroker() + // initialize grpc broker and register services + grpcEventServer := cloudeventsgrpc.NewGRPCBroker(o.grpcBrokerOptions) grpcEventServer.RegisterService(ctx, clusterce.ManagedClusterEventDataType, cluster.NewClusterService(clients.ClusterClient, clients.ClusterInformers.Cluster().V1().ManagedClusters())) grpcEventServer.RegisterService(ctx, csrce.CSREventDataType, @@ -69,7 +73,7 @@ func (o *GRPCServerOptions) Run(ctx context.Context, controllerContext *controll // start clients go clients.Run(ctx) - // initlize and run grpc server + // initialize and run grpc server authorizer := grpcauthz.NewSARAuthorizer(clients.KubeClient) return sdkgrpc.NewGRPCServer(serverOptions). WithAuthenticator(grpcauthn.NewTokenAuthenticator(clients.KubeClient)). diff --git a/pkg/server/grpc/options_test.go b/pkg/server/grpc/options_test.go index fdb426e35..9b862b9a0 100644 --- a/pkg/server/grpc/options_test.go +++ b/pkg/server/grpc/options_test.go @@ -80,9 +80,8 @@ func TestGRPCServerOptionsFlagTypes(t *testing.T) { } func TestGRPCServerOptionsRunWithInvalidConfig(t *testing.T) { - opts := &GRPCServerOptions{ - GRPCServerConfig: "/nonexistent/path/to/config.yaml", - } + opts := NewGRPCServerOptions() + opts.GRPCServerConfig = "/nonexistent/path/to/config.yaml" ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) defer cancel() @@ -146,9 +145,8 @@ grpc_private_key_file: ` + keyFile + ` t.Fatalf("Failed to create test config file: %v", err) } - opts := &GRPCServerOptions{ - GRPCServerConfig: configFile, - } + opts := NewGRPCServerOptions() + opts.GRPCServerConfig = configFile ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) defer cancel() diff --git a/test/integration/registration/spokecluster_grpc_test.go b/test/integration/registration/spokecluster_grpc_test.go index 076effd55..c0966cd5b 100644 --- a/test/integration/registration/spokecluster_grpc_test.go +++ b/test/integration/registration/spokecluster_grpc_test.go @@ -83,7 +83,7 @@ var _ = ginkgo.Describe("Registration using GRPC", ginkgo.Ordered, ginkgo.Label( hook, err := util.NewGRPCServerRegistrationHook(hubCfg) gomega.Expect(err).NotTo(gomega.HaveOccurred()) - grpcEventServer := cloudeventsgrpc.NewGRPCBroker() + grpcEventServer := cloudeventsgrpc.NewGRPCBroker(cloudeventsgrpc.NewBrokerOptions()) grpcEventServer.RegisterService(grpcServerCtx, clusterce.ManagedClusterEventDataType, cluster.NewClusterService(hook.ClusterClient, hook.ClusterInformers.Cluster().V1().ManagedClusters())) grpcEventServer.RegisterService(grpcServerCtx, csrce.CSREventDataType, diff --git a/test/integration/work/suite_test.go b/test/integration/work/suite_test.go index b94ee0c4d..031ca818d 100644 --- a/test/integration/work/suite_test.go +++ b/test/integration/work/suite_test.go @@ -275,7 +275,7 @@ func startGRPCServer(ctx context.Context, temp string, cfg *rest.Config) (string hook, err := util.NewGRPCServerWorkHook(cfg) gomega.Expect(err).NotTo(gomega.HaveOccurred()) - grpcEventServer := cloudeventsgrpc.NewGRPCBroker() + grpcEventServer := cloudeventsgrpc.NewGRPCBroker(cloudeventsgrpc.NewBrokerOptions()) grpcEventServer.RegisterService(ctx, payload.ManifestBundleEventDataType, serviceswork.NewWorkService(hook.WorkClient, hook.WorkInformers.Work().V1().ManifestWorks())) diff --git a/vendor/modules.txt b/vendor/modules.txt index f296dfc36..1a8b3c66e 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1961,7 +1961,7 @@ open-cluster-management.io/api/operator/v1 open-cluster-management.io/api/utils/work/v1/workapplier open-cluster-management.io/api/work/v1 open-cluster-management.io/api/work/v1alpha1 -# open-cluster-management.io/sdk-go v1.1.1-0.20260108080638-c607eaaa5d12 +# open-cluster-management.io/sdk-go v1.1.1-0.20260112054941-b6c1a665df1b ## explicit; go 1.25.0 open-cluster-management.io/sdk-go/pkg/apis/cluster/v1alpha1 open-cluster-management.io/sdk-go/pkg/apis/cluster/v1beta1 diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/broker.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/broker.go index d48d335d9..7ba35b4ae 100644 --- a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/broker.go +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/broker.go @@ -48,15 +48,18 @@ type GRPCBroker struct { services map[types.CloudEventsDataType]server.Service subscribers map[string]*subscriber // registered subscribers heartbeatCheckInterval time.Duration + heartbeatDisabled bool mu sync.RWMutex } // NewGRPCBroker creates a new gRPC broker with the given gRPC server. -func NewGRPCBroker() *GRPCBroker { +func NewGRPCBroker(opts *BrokerOptions) *GRPCBroker { broker := &GRPCBroker{ subscribers: make(map[string]*subscriber), services: make(map[types.CloudEventsDataType]server.Service), - heartbeatCheckInterval: 10 * time.Second, + heartbeatCheckInterval: opts.HeartbeatCheckInterval, + heartbeatDisabled: opts.HeartbeatDisabled, + mu: sync.RWMutex{}, } return broker } @@ -186,7 +189,10 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv // TODO make the channel size configurable eventCh := make(chan *pbv1.CloudEvent, 100) - heartbeater := heartbeat.NewHeartbeater(bkr.heartbeatCheckInterval, 10) + var heartbeater *heartbeat.Heartbeater + if !bkr.heartbeatDisabled { + heartbeater = heartbeat.NewHeartbeater(bkr.heartbeatCheckInterval, 10) + } sendErrCh := make(chan error, 1) // send events @@ -195,11 +201,18 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv // For unrecoverable errors, such as a connection closed by an intermediate proxy, push the error to subscriber's // error channel to unregister the subscriber. go func() { + // Get heartbeat channel or nil if heartbeater is disabled + // Reading from a nil channel blocks forever, so it will never be selected + var heartbeatCh chan *pbv1.CloudEvent + if heartbeater != nil { + heartbeatCh = heartbeater.Heartbeat() + } + for { select { case <-subCtx.Done(): return - case evt := <-heartbeater.Heartbeat(): + case evt := <-heartbeatCh: if err := subServer.Send(evt); err != nil { logger.Error(err, "failed to send heartbeat") // Unblock producers (handler select) and exit heartbeat ticker. @@ -253,7 +266,9 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv return err } - go heartbeater.Start(subCtx) + if heartbeater != nil { + go heartbeater.Start(subCtx) + } select { case err := <-sendErrCh: diff --git a/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/options.go b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/options.go new file mode 100644 index 000000000..1a6c8e941 --- /dev/null +++ b/vendor/open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/options.go @@ -0,0 +1,44 @@ +package grpc + +import ( + "fmt" + "time" + + "github.com/spf13/pflag" +) + +// BrokerOptions contains configuration options for the GRPCBroker. +type BrokerOptions struct { + // HeartbeatDisabled controls whether heartbeat mechanism is disabled. + // Default: false (heartbeat is enabled by default) + HeartbeatDisabled bool + + // HeartbeatCheckInterval is the interval for heartbeat checks. + // Default: 10 seconds + HeartbeatCheckInterval time.Duration +} + +// NewBrokerOptions creates a new BrokerOptions with default values. +func NewBrokerOptions() *BrokerOptions { + return &BrokerOptions{ + HeartbeatDisabled: false, + HeartbeatCheckInterval: 10 * time.Second, + } +} + +// AddFlags adds flags for configuring the broker options. +func (o *BrokerOptions) AddFlags(fs *pflag.FlagSet) { + fs.BoolVar(&o.HeartbeatDisabled, "broker-heartbeat-disabled", o.HeartbeatDisabled, + "Disable heartbeat mechanism for gRPC broker") + fs.DurationVar(&o.HeartbeatCheckInterval, "broker-heartbeat-interval", o.HeartbeatCheckInterval, + "Interval for heartbeat checks in gRPC broker") +} + +// Validate checks the broker options for valid values. +func (o *BrokerOptions) Validate() error { + // Validate heartbeat check interval if heartbeat is enabled + if !o.HeartbeatDisabled && o.HeartbeatCheckInterval < 10*time.Second { + return fmt.Errorf("heartbeat_check_interval (%v) must be at least 10 seconds when heartbeat is enabled", o.HeartbeatCheckInterval) + } + return nil +}