add options to grpc broker (#1326)

Signed-off-by: Wei Liu <liuweixa@redhat.com>
This commit is contained in:
Wei Liu
2026-01-12 20:50:14 +08:00
committed by GitHub
parent 532b09a440
commit d5e677414c
9 changed files with 83 additions and 22 deletions

2
go.mod
View File

@@ -41,7 +41,7 @@ require (
k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 k8s.io/utils v0.0.0-20250604170112-4c0f3b243397
open-cluster-management.io/addon-framework v1.1.1-0.20251222073158-b5846d76add9 open-cluster-management.io/addon-framework v1.1.1-0.20251222073158-b5846d76add9
open-cluster-management.io/api v1.1.1-0.20260108015315-68cef17a0643 open-cluster-management.io/api v1.1.1-0.20260108015315-68cef17a0643
open-cluster-management.io/sdk-go v1.1.1-0.20260108080638-c607eaaa5d12 open-cluster-management.io/sdk-go v1.1.1-0.20260112054941-b6c1a665df1b
sigs.k8s.io/about-api v0.0.0-20250131010323-518069c31c03 sigs.k8s.io/about-api v0.0.0-20250131010323-518069c31c03
sigs.k8s.io/cluster-inventory-api v0.0.0-20251124125836-445319b6307a sigs.k8s.io/cluster-inventory-api v0.0.0-20251124125836-445319b6307a
sigs.k8s.io/controller-runtime v0.22.4 sigs.k8s.io/controller-runtime v0.22.4

4
go.sum
View File

@@ -587,8 +587,8 @@ open-cluster-management.io/addon-framework v1.1.1-0.20251222073158-b5846d76add9
open-cluster-management.io/addon-framework v1.1.1-0.20251222073158-b5846d76add9/go.mod h1:St9LTEuZ5ADLY9cVXSp+rVE/ZbPJ+hzNQ7/YcsiQVd8= open-cluster-management.io/addon-framework v1.1.1-0.20251222073158-b5846d76add9/go.mod h1:St9LTEuZ5ADLY9cVXSp+rVE/ZbPJ+hzNQ7/YcsiQVd8=
open-cluster-management.io/api v1.1.1-0.20260108015315-68cef17a0643 h1:eA/8UpvFuWr79O7/aAT4bcx/tVG9kkl7+4u9o9dRShM= open-cluster-management.io/api v1.1.1-0.20260108015315-68cef17a0643 h1:eA/8UpvFuWr79O7/aAT4bcx/tVG9kkl7+4u9o9dRShM=
open-cluster-management.io/api v1.1.1-0.20260108015315-68cef17a0643/go.mod h1:YcmA6SpGEekIMxdoeVIIyOaBhMA6ImWRLXP4g8n8T+4= open-cluster-management.io/api v1.1.1-0.20260108015315-68cef17a0643/go.mod h1:YcmA6SpGEekIMxdoeVIIyOaBhMA6ImWRLXP4g8n8T+4=
open-cluster-management.io/sdk-go v1.1.1-0.20260108080638-c607eaaa5d12 h1:WZrNXgDKxAneVjiiNf0b3ApOyjuEgBcz2akw6vYVxJ8= open-cluster-management.io/sdk-go v1.1.1-0.20260112054941-b6c1a665df1b h1:r5U3cDh6kuBmzKnAUqeoYPwwVU/VS9udvpcDEkxh6g4=
open-cluster-management.io/sdk-go v1.1.1-0.20260108080638-c607eaaa5d12/go.mod h1:4haPv/uuKqQ3gxi62/PPknlrUFi132ga0KYLwj5tpx0= open-cluster-management.io/sdk-go v1.1.1-0.20260112054941-b6c1a665df1b/go.mod h1:4haPv/uuKqQ3gxi62/PPknlrUFi132ga0KYLwj5tpx0=
sigs.k8s.io/about-api v0.0.0-20250131010323-518069c31c03 h1:1ShFiMjGQOR/8jTBkmZrk1gORxnvMwm1nOy2/DbHg4U= sigs.k8s.io/about-api v0.0.0-20250131010323-518069c31c03 h1:1ShFiMjGQOR/8jTBkmZrk1gORxnvMwm1nOy2/DbHg4U=
sigs.k8s.io/about-api v0.0.0-20250131010323-518069c31c03/go.mod h1:F1pT4mK53U6F16/zuaPSYpBaR7x5Kjym6aKJJC0/DHU= sigs.k8s.io/about-api v0.0.0-20250131010323-518069c31c03/go.mod h1:F1pT4mK53U6F16/zuaPSYpBaR7x5Kjym6aKJJC0/DHU=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.2 h1:jpcvIRr3GLoUoEKRkHKSmGjxb6lWwrBlJsXc+eUYQHM= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.2 h1:jpcvIRr3GLoUoEKRkHKSmGjxb6lWwrBlJsXc+eUYQHM=

View File

@@ -29,15 +29,19 @@ import (
) )
type GRPCServerOptions struct { type GRPCServerOptions struct {
GRPCServerConfig string GRPCServerConfig string
grpcBrokerOptions *cloudeventsgrpc.BrokerOptions
} }
func NewGRPCServerOptions() *GRPCServerOptions { func NewGRPCServerOptions() *GRPCServerOptions {
return &GRPCServerOptions{} return &GRPCServerOptions{
grpcBrokerOptions: cloudeventsgrpc.NewBrokerOptions(),
}
} }
func (o *GRPCServerOptions) AddFlags(fs *pflag.FlagSet) { func (o *GRPCServerOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.GRPCServerConfig, "server-config", o.GRPCServerConfig, "Location of the server configuration file.") fs.StringVar(&o.GRPCServerConfig, "server-config", o.GRPCServerConfig, "Location of the server configuration file.")
o.grpcBrokerOptions.AddFlags(fs)
} }
func (o *GRPCServerOptions) Run(ctx context.Context, controllerContext *controllercmd.ControllerContext) error { func (o *GRPCServerOptions) Run(ctx context.Context, controllerContext *controllercmd.ControllerContext) error {
@@ -51,8 +55,8 @@ func (o *GRPCServerOptions) Run(ctx context.Context, controllerContext *controll
return err return err
} }
// initlize grpc broker and register services // initialize grpc broker and register services
grpcEventServer := cloudeventsgrpc.NewGRPCBroker() grpcEventServer := cloudeventsgrpc.NewGRPCBroker(o.grpcBrokerOptions)
grpcEventServer.RegisterService(ctx, clusterce.ManagedClusterEventDataType, grpcEventServer.RegisterService(ctx, clusterce.ManagedClusterEventDataType,
cluster.NewClusterService(clients.ClusterClient, clients.ClusterInformers.Cluster().V1().ManagedClusters())) cluster.NewClusterService(clients.ClusterClient, clients.ClusterInformers.Cluster().V1().ManagedClusters()))
grpcEventServer.RegisterService(ctx, csrce.CSREventDataType, grpcEventServer.RegisterService(ctx, csrce.CSREventDataType,
@@ -69,7 +73,7 @@ func (o *GRPCServerOptions) Run(ctx context.Context, controllerContext *controll
// start clients // start clients
go clients.Run(ctx) go clients.Run(ctx)
// initlize and run grpc server // initialize and run grpc server
authorizer := grpcauthz.NewSARAuthorizer(clients.KubeClient) authorizer := grpcauthz.NewSARAuthorizer(clients.KubeClient)
return sdkgrpc.NewGRPCServer(serverOptions). return sdkgrpc.NewGRPCServer(serverOptions).
WithAuthenticator(grpcauthn.NewTokenAuthenticator(clients.KubeClient)). WithAuthenticator(grpcauthn.NewTokenAuthenticator(clients.KubeClient)).

View File

@@ -80,9 +80,8 @@ func TestGRPCServerOptionsFlagTypes(t *testing.T) {
} }
func TestGRPCServerOptionsRunWithInvalidConfig(t *testing.T) { func TestGRPCServerOptionsRunWithInvalidConfig(t *testing.T) {
opts := &GRPCServerOptions{ opts := NewGRPCServerOptions()
GRPCServerConfig: "/nonexistent/path/to/config.yaml", opts.GRPCServerConfig = "/nonexistent/path/to/config.yaml"
}
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel() defer cancel()
@@ -146,9 +145,8 @@ grpc_private_key_file: ` + keyFile + `
t.Fatalf("Failed to create test config file: %v", err) t.Fatalf("Failed to create test config file: %v", err)
} }
opts := &GRPCServerOptions{ opts := NewGRPCServerOptions()
GRPCServerConfig: configFile, opts.GRPCServerConfig = configFile
}
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel() defer cancel()

View File

@@ -83,7 +83,7 @@ var _ = ginkgo.Describe("Registration using GRPC", ginkgo.Ordered, ginkgo.Label(
hook, err := util.NewGRPCServerRegistrationHook(hubCfg) hook, err := util.NewGRPCServerRegistrationHook(hubCfg)
gomega.Expect(err).NotTo(gomega.HaveOccurred()) gomega.Expect(err).NotTo(gomega.HaveOccurred())
grpcEventServer := cloudeventsgrpc.NewGRPCBroker() grpcEventServer := cloudeventsgrpc.NewGRPCBroker(cloudeventsgrpc.NewBrokerOptions())
grpcEventServer.RegisterService(grpcServerCtx, clusterce.ManagedClusterEventDataType, grpcEventServer.RegisterService(grpcServerCtx, clusterce.ManagedClusterEventDataType,
cluster.NewClusterService(hook.ClusterClient, hook.ClusterInformers.Cluster().V1().ManagedClusters())) cluster.NewClusterService(hook.ClusterClient, hook.ClusterInformers.Cluster().V1().ManagedClusters()))
grpcEventServer.RegisterService(grpcServerCtx, csrce.CSREventDataType, grpcEventServer.RegisterService(grpcServerCtx, csrce.CSREventDataType,

View File

@@ -275,7 +275,7 @@ func startGRPCServer(ctx context.Context, temp string, cfg *rest.Config) (string
hook, err := util.NewGRPCServerWorkHook(cfg) hook, err := util.NewGRPCServerWorkHook(cfg)
gomega.Expect(err).NotTo(gomega.HaveOccurred()) gomega.Expect(err).NotTo(gomega.HaveOccurred())
grpcEventServer := cloudeventsgrpc.NewGRPCBroker() grpcEventServer := cloudeventsgrpc.NewGRPCBroker(cloudeventsgrpc.NewBrokerOptions())
grpcEventServer.RegisterService(ctx, payload.ManifestBundleEventDataType, grpcEventServer.RegisterService(ctx, payload.ManifestBundleEventDataType,
serviceswork.NewWorkService(hook.WorkClient, hook.WorkInformers.Work().V1().ManifestWorks())) serviceswork.NewWorkService(hook.WorkClient, hook.WorkInformers.Work().V1().ManifestWorks()))

2
vendor/modules.txt vendored
View File

@@ -1961,7 +1961,7 @@ open-cluster-management.io/api/operator/v1
open-cluster-management.io/api/utils/work/v1/workapplier open-cluster-management.io/api/utils/work/v1/workapplier
open-cluster-management.io/api/work/v1 open-cluster-management.io/api/work/v1
open-cluster-management.io/api/work/v1alpha1 open-cluster-management.io/api/work/v1alpha1
# open-cluster-management.io/sdk-go v1.1.1-0.20260108080638-c607eaaa5d12 # open-cluster-management.io/sdk-go v1.1.1-0.20260112054941-b6c1a665df1b
## explicit; go 1.25.0 ## explicit; go 1.25.0
open-cluster-management.io/sdk-go/pkg/apis/cluster/v1alpha1 open-cluster-management.io/sdk-go/pkg/apis/cluster/v1alpha1
open-cluster-management.io/sdk-go/pkg/apis/cluster/v1beta1 open-cluster-management.io/sdk-go/pkg/apis/cluster/v1beta1

View File

@@ -48,15 +48,18 @@ type GRPCBroker struct {
services map[types.CloudEventsDataType]server.Service services map[types.CloudEventsDataType]server.Service
subscribers map[string]*subscriber // registered subscribers subscribers map[string]*subscriber // registered subscribers
heartbeatCheckInterval time.Duration heartbeatCheckInterval time.Duration
heartbeatDisabled bool
mu sync.RWMutex mu sync.RWMutex
} }
// NewGRPCBroker creates a new gRPC broker with the given gRPC server. // NewGRPCBroker creates a new gRPC broker with the given gRPC server.
func NewGRPCBroker() *GRPCBroker { func NewGRPCBroker(opts *BrokerOptions) *GRPCBroker {
broker := &GRPCBroker{ broker := &GRPCBroker{
subscribers: make(map[string]*subscriber), subscribers: make(map[string]*subscriber),
services: make(map[types.CloudEventsDataType]server.Service), services: make(map[types.CloudEventsDataType]server.Service),
heartbeatCheckInterval: 10 * time.Second, heartbeatCheckInterval: opts.HeartbeatCheckInterval,
heartbeatDisabled: opts.HeartbeatDisabled,
mu: sync.RWMutex{},
} }
return broker return broker
} }
@@ -186,7 +189,10 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
// TODO make the channel size configurable // TODO make the channel size configurable
eventCh := make(chan *pbv1.CloudEvent, 100) eventCh := make(chan *pbv1.CloudEvent, 100)
heartbeater := heartbeat.NewHeartbeater(bkr.heartbeatCheckInterval, 10) var heartbeater *heartbeat.Heartbeater
if !bkr.heartbeatDisabled {
heartbeater = heartbeat.NewHeartbeater(bkr.heartbeatCheckInterval, 10)
}
sendErrCh := make(chan error, 1) sendErrCh := make(chan error, 1)
// send events // send events
@@ -195,11 +201,18 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
// For unrecoverable errors, such as a connection closed by an intermediate proxy, push the error to subscriber's // For unrecoverable errors, such as a connection closed by an intermediate proxy, push the error to subscriber's
// error channel to unregister the subscriber. // error channel to unregister the subscriber.
go func() { go func() {
// Get heartbeat channel or nil if heartbeater is disabled
// Reading from a nil channel blocks forever, so it will never be selected
var heartbeatCh chan *pbv1.CloudEvent
if heartbeater != nil {
heartbeatCh = heartbeater.Heartbeat()
}
for { for {
select { select {
case <-subCtx.Done(): case <-subCtx.Done():
return return
case evt := <-heartbeater.Heartbeat(): case evt := <-heartbeatCh:
if err := subServer.Send(evt); err != nil { if err := subServer.Send(evt); err != nil {
logger.Error(err, "failed to send heartbeat") logger.Error(err, "failed to send heartbeat")
// Unblock producers (handler select) and exit heartbeat ticker. // Unblock producers (handler select) and exit heartbeat ticker.
@@ -253,7 +266,9 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
return err return err
} }
go heartbeater.Start(subCtx) if heartbeater != nil {
go heartbeater.Start(subCtx)
}
select { select {
case err := <-sendErrCh: case err := <-sendErrCh:

View File

@@ -0,0 +1,44 @@
package grpc
import (
"fmt"
"time"
"github.com/spf13/pflag"
)
// BrokerOptions contains configuration options for the GRPCBroker.
type BrokerOptions struct {
// HeartbeatDisabled controls whether heartbeat mechanism is disabled.
// Default: false (heartbeat is enabled by default)
HeartbeatDisabled bool
// HeartbeatCheckInterval is the interval for heartbeat checks.
// Default: 10 seconds
HeartbeatCheckInterval time.Duration
}
// NewBrokerOptions creates a new BrokerOptions with default values.
func NewBrokerOptions() *BrokerOptions {
return &BrokerOptions{
HeartbeatDisabled: false,
HeartbeatCheckInterval: 10 * time.Second,
}
}
// AddFlags adds flags for configuring the broker options.
func (o *BrokerOptions) AddFlags(fs *pflag.FlagSet) {
fs.BoolVar(&o.HeartbeatDisabled, "broker-heartbeat-disabled", o.HeartbeatDisabled,
"Disable heartbeat mechanism for gRPC broker")
fs.DurationVar(&o.HeartbeatCheckInterval, "broker-heartbeat-interval", o.HeartbeatCheckInterval,
"Interval for heartbeat checks in gRPC broker")
}
// Validate checks the broker options for valid values.
func (o *BrokerOptions) Validate() error {
// Validate heartbeat check interval if heartbeat is enabled
if !o.HeartbeatDisabled && o.HeartbeatCheckInterval < 10*time.Second {
return fmt.Errorf("heartbeat_check_interval (%v) must be at least 10 seconds when heartbeat is enabled", o.HeartbeatCheckInterval)
}
return nil
}