starting grpc server with config file (#1071)

Signed-off-by: Wei Liu <liuweixa@redhat.com>
This commit is contained in:
Wei Liu
2025-07-16 10:13:37 +08:00
committed by GitHub
parent 61c8d7f06b
commit 405adb61cd
12 changed files with 274 additions and 168 deletions

1
.gitignore vendored
View File

@@ -4,6 +4,7 @@
/work
/registration-operator
/addon
/server
*.exe
*.dll

View File

@@ -10,9 +10,15 @@ RUN GOOS=${OS} \
GO_BUILD_PACKAGES=./cmd/registration \
make build --warn-undefined-variables
RUN GOOS=${OS} \
GOARCH=${ARCH} \
GO_BUILD_PACKAGES=./cmd/server \
make build --warn-undefined-variables
FROM registry.access.redhat.com/ubi9/ubi-minimal:latest
ENV USER_UID=10001
COPY --from=builder /go/src/open-cluster-management.io/ocm/registration /
COPY --from=builder /go/src/open-cluster-management.io/ocm/server /
USER ${USER_UID}

View File

@@ -17,7 +17,6 @@ import (
"open-cluster-management.io/ocm/pkg/cmd/spoke"
"open-cluster-management.io/ocm/pkg/cmd/webhook"
"open-cluster-management.io/ocm/pkg/features"
"open-cluster-management.io/ocm/pkg/server/grpc"
"open-cluster-management.io/ocm/pkg/version"
)
@@ -63,7 +62,6 @@ func newRegistrationCommand() *cobra.Command {
cmd.AddCommand(hub.NewRegistrationController())
cmd.AddCommand(spoke.NewRegistrationAgent())
cmd.AddCommand(webhook.NewRegistrationWebhook())
cmd.AddCommand(grpc.NewGRPCServer())
return cmd
}

54
cmd/server/main.go Normal file
View File

@@ -0,0 +1,54 @@
package main
import (
goflag "flag"
"fmt"
"os"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
utilflag "k8s.io/component-base/cli/flag"
"k8s.io/component-base/logs"
"open-cluster-management.io/ocm/pkg/cmd/hub"
"open-cluster-management.io/ocm/pkg/version"
)
func main() {
pflag.CommandLine.SetNormalizeFunc(utilflag.WordSepNormalizeFunc)
pflag.CommandLine.AddGoFlagSet(goflag.CommandLine)
logs.AddFlags(pflag.CommandLine)
logs.InitLogs()
defer logs.FlushLogs()
command := newServerCommand()
if err := command.Execute(); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1) //nolint:gocritic
}
}
// newServerCommand creates a new command for the server.
// The server command is responsible for starting the server on the hub.
// The server serves the resources transport between the hub and the managed clusters.
func newServerCommand() *cobra.Command {
cmd := &cobra.Command{
Use: "server",
Short: "Server for transport resources",
Run: func(cmd *cobra.Command, args []string) {
_ = cmd.Help()
os.Exit(1)
},
}
if v := version.Get().String(); len(v) == 0 {
cmd.Version = "<unknown>"
} else {
cmd.Version = v
}
cmd.AddCommand(hub.NewGRPCServerCommand())
return cmd
}

2
go.mod
View File

@@ -41,7 +41,7 @@ require (
k8s.io/utils v0.0.0-20241104100929-3ea5e8cea738
open-cluster-management.io/addon-framework v1.0.0
open-cluster-management.io/api v1.0.1-0.20250703232537-f781272f812e
open-cluster-management.io/sdk-go v1.0.1-0.20250708024404-422b23814b5d
open-cluster-management.io/sdk-go v1.0.1-0.20250714033031-2a092f845650
sigs.k8s.io/about-api v0.0.0-20250131010323-518069c31c03
sigs.k8s.io/cluster-inventory-api v0.0.0-20240730014211-ef0154379848
sigs.k8s.io/controller-runtime v0.20.2

4
go.sum
View File

@@ -493,8 +493,8 @@ open-cluster-management.io/addon-framework v1.0.0 h1:ejTk4hPAJnwCSxQhY/tVDPg3SeH
open-cluster-management.io/addon-framework v1.0.0/go.mod h1:Gw9zRGvuNJJ3XhTYanIuA7FFFw0EjtoE74l5OBZCZf8=
open-cluster-management.io/api v1.0.1-0.20250703232537-f781272f812e h1:CHPatj3lW7pLaI8wWNbXkiWdYScHusu+YV0cVawxMw4=
open-cluster-management.io/api v1.0.1-0.20250703232537-f781272f812e/go.mod h1:KEj/4wbUjdbWktrKLL8+mWzAIzE6Ii3bcRr4CvnBNEg=
open-cluster-management.io/sdk-go v1.0.1-0.20250708024404-422b23814b5d h1:sYgNfYyQ6O7sfiVOUaMuoK/CTeWnTNTfVKY8dWORBgw=
open-cluster-management.io/sdk-go v1.0.1-0.20250708024404-422b23814b5d/go.mod h1:LYX48E3h96XGnm6o+GomV0DSf15w1i9crtggj2HeDvI=
open-cluster-management.io/sdk-go v1.0.1-0.20250714033031-2a092f845650 h1:EY488tDZyXnVn+wuH0TA5kce7GLSvZCargdiVRSwtNw=
open-cluster-management.io/sdk-go v1.0.1-0.20250714033031-2a092f845650/go.mod h1:LYX48E3h96XGnm6o+GomV0DSf15w1i9crtggj2HeDvI=
sigs.k8s.io/about-api v0.0.0-20250131010323-518069c31c03 h1:1ShFiMjGQOR/8jTBkmZrk1gORxnvMwm1nOy2/DbHg4U=
sigs.k8s.io/about-api v0.0.0-20250131010323-518069c31c03/go.mod h1:F1pT4mK53U6F16/zuaPSYpBaR7x5Kjym6aKJJC0/DHU=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.31.0 h1:CPT0ExVicCzcpeN4baWEV2ko2Z/AsiZgEdwgcfwLgMo=

View File

@@ -0,0 +1,27 @@
package hub
import (
"context"
"github.com/spf13/cobra"
"k8s.io/utils/clock"
commonoptions "open-cluster-management.io/ocm/pkg/common/options"
"open-cluster-management.io/ocm/pkg/server/grpc"
"open-cluster-management.io/ocm/pkg/version"
)
func NewGRPCServerCommand() *cobra.Command {
opts := commonoptions.NewOptions()
grpcServerOpts := grpc.NewGRPCServerOptions()
cmdConfig := opts.NewControllerCommandConfig("grpc-server", version.Get(), grpcServerOpts.Run, clock.RealClock{})
cmd := cmdConfig.NewCommandWithContext(context.TODO())
cmd.Use = "grpc"
cmd.Short = "Start the gRPC Server"
flags := cmd.Flags()
opts.AddFlags(flags)
grpcServerOpts.AddFlags(flags)
return cmd
}

View File

@@ -0,0 +1,77 @@
package grpc
import (
"context"
"time"
"github.com/openshift/library-go/pkg/controller/controllercmd"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
addonv1alpha1client "open-cluster-management.io/api/client/addon/clientset/versioned"
addoninformers "open-cluster-management.io/api/client/addon/informers/externalversions"
clusterv1client "open-cluster-management.io/api/client/cluster/clientset/versioned"
clusterv1informers "open-cluster-management.io/api/client/cluster/informers/externalversions"
workclientset "open-cluster-management.io/api/client/work/clientset/versioned"
workinformers "open-cluster-management.io/api/client/work/informers/externalversions"
clusterv1 "open-cluster-management.io/api/cluster/v1"
)
type Clients struct {
KubeClient kubernetes.Interface
ClusterClient clusterv1client.Interface
WorkClient workclientset.Interface
AddOnClient addonv1alpha1client.Interface
KubeInformers kubeinformers.SharedInformerFactory
ClusterInformers clusterv1informers.SharedInformerFactory
WorkInformers workinformers.SharedInformerFactory
AddOnInformers addoninformers.SharedInformerFactory
}
func NewClients(controllerContext *controllercmd.ControllerContext) (*Clients, error) {
kubeClient, err := kubernetes.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return nil, err
}
clusterClient, err := clusterv1client.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return nil, err
}
addonClient, err := addonv1alpha1client.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return nil, err
}
workClient, err := workclientset.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return nil, err
}
return &Clients{
KubeClient: kubeClient,
ClusterClient: clusterClient,
AddOnClient: addonClient,
WorkClient: workClient,
KubeInformers: kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 30*time.Minute,
kubeinformers.WithTweakListOptions(func(listOptions *metav1.ListOptions) {
selector := &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: clusterv1.ClusterNameLabelKey,
Operator: metav1.LabelSelectorOpExists,
},
},
}
listOptions.LabelSelector = metav1.FormatLabelSelector(selector)
})),
ClusterInformers: clusterv1informers.NewSharedInformerFactory(clusterClient, 30*time.Minute),
WorkInformers: workinformers.NewSharedInformerFactoryWithOptions(workClient, 30*time.Minute),
AddOnInformers: addoninformers.NewSharedInformerFactory(addonClient, 30*time.Minute),
}, nil
}
func (h *Clients) Run(ctx context.Context) {
go h.KubeInformers.Start(ctx.Done())
go h.ClusterInformers.Start(ctx.Done())
go h.WorkInformers.Start(ctx.Done())
go h.AddOnInformers.Start(ctx.Done())
}

View File

@@ -0,0 +1,72 @@
package grpc
import (
"context"
"github.com/openshift/library-go/pkg/controller/controllercmd"
"github.com/spf13/pflag"
addonce "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/addon"
clusterce "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/cluster"
csrce "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/csr"
eventce "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/event"
leasece "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/lease"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/payload"
grpcauthn "open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/authn"
grpcoptions "open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/options"
"open-cluster-management.io/ocm/pkg/server/services/addon"
"open-cluster-management.io/ocm/pkg/server/services/cluster"
"open-cluster-management.io/ocm/pkg/server/services/csr"
"open-cluster-management.io/ocm/pkg/server/services/event"
"open-cluster-management.io/ocm/pkg/server/services/lease"
"open-cluster-management.io/ocm/pkg/server/services/work"
)
type GRPCServerOptions struct {
GRPCServerConfig string
}
func NewGRPCServerOptions() *GRPCServerOptions {
return &GRPCServerOptions{}
}
func (o *GRPCServerOptions) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&o.GRPCServerConfig, "server-config", o.GRPCServerConfig, "Location of the server configuration file.")
}
func (o *GRPCServerOptions) Run(ctx context.Context, controllerContext *controllercmd.ControllerContext) error {
serverOptions, err := grpcoptions.LoadGRPCServerOptions(o.GRPCServerConfig)
if err != nil {
return err
}
clients, err := NewClients(controllerContext)
if err != nil {
return err
}
return grpcoptions.NewServer(serverOptions).WithPreStartHooks(clients).WithAuthenticator(
grpcauthn.NewTokenAuthenticator(clients.KubeClient),
).WithAuthenticator(
grpcauthn.NewMtlsAuthenticator(),
).WithService(
clusterce.ManagedClusterEventDataType,
cluster.NewClusterService(clients.ClusterClient, clients.ClusterInformers.Cluster().V1().ManagedClusters()),
).WithService(
csrce.CSREventDataType,
csr.NewCSRService(clients.KubeClient, clients.KubeInformers.Certificates().V1().CertificateSigningRequests()),
).WithService(
addonce.ManagedClusterAddOnEventDataType,
addon.NewAddonService(clients.AddOnClient, clients.AddOnInformers.Addon().V1alpha1().ManagedClusterAddOns()),
).WithService(
eventce.EventEventDataType,
event.NewEventService(clients.KubeClient),
).WithService(
leasece.LeaseEventDataType,
lease.NewLeaseService(clients.KubeClient, clients.KubeInformers.Coordination().V1().Leases()),
).WithService(
payload.ManifestBundleEventDataType,
work.NewWorkService(clients.WorkClient, clients.WorkInformers.Work().V1().ManifestWorks()),
).Run(ctx)
}

View File

@@ -1,146 +0,0 @@
package grpc
import (
"context"
"time"
"github.com/openshift/library-go/pkg/controller/controllercmd"
"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/utils/clock"
addonv1alpha1client "open-cluster-management.io/api/client/addon/clientset/versioned"
addoninformers "open-cluster-management.io/api/client/addon/informers/externalversions"
clusterv1client "open-cluster-management.io/api/client/cluster/clientset/versioned"
clusterv1informers "open-cluster-management.io/api/client/cluster/informers/externalversions"
workclientset "open-cluster-management.io/api/client/work/clientset/versioned"
workinformers "open-cluster-management.io/api/client/work/informers/externalversions"
clusterv1 "open-cluster-management.io/api/cluster/v1"
addonce "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/addon"
clusterce "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/cluster"
csrce "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/csr"
eventce "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/event"
leasece "open-cluster-management.io/sdk-go/pkg/cloudevents/clients/lease"
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work/payload"
grpcauthn "open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/authn"
grpcoptions "open-cluster-management.io/sdk-go/pkg/cloudevents/server/grpc/options"
commonoptions "open-cluster-management.io/ocm/pkg/common/options"
"open-cluster-management.io/ocm/pkg/server/services/addon"
"open-cluster-management.io/ocm/pkg/server/services/cluster"
"open-cluster-management.io/ocm/pkg/server/services/csr"
"open-cluster-management.io/ocm/pkg/server/services/event"
"open-cluster-management.io/ocm/pkg/server/services/lease"
"open-cluster-management.io/ocm/pkg/server/services/work"
"open-cluster-management.io/ocm/pkg/version"
)
func NewGRPCServer() *cobra.Command {
opts := commonoptions.NewOptions()
grpcServerOpts := grpcoptions.NewGRPCServerOptions()
cmdConfig := opts.
NewControllerCommandConfig(
"grpc-server",
version.Get(),
func(ctx context.Context, controllerContext *controllercmd.ControllerContext) error {
clients, err := newClients(controllerContext)
if err != nil {
return err
}
return grpcoptions.NewServer(grpcServerOpts).WithPreStartHooks(clients).WithAuthenticator(
grpcauthn.NewTokenAuthenticator(clients.kubeClient),
).WithAuthenticator(
grpcauthn.NewMtlsAuthenticator(),
).WithService(
clusterce.ManagedClusterEventDataType,
cluster.NewClusterService(clients.clusterClient, clients.clusterInformers.Cluster().V1().ManagedClusters()),
).WithService(
csrce.CSREventDataType,
csr.NewCSRService(clients.kubeClient, clients.kubeInformers.Certificates().V1().CertificateSigningRequests()),
).WithService(
addonce.ManagedClusterAddOnEventDataType,
addon.NewAddonService(clients.addonClient, clients.addonInformers.Addon().V1alpha1().ManagedClusterAddOns()),
).WithService(
eventce.EventEventDataType,
event.NewEventService(clients.kubeClient),
).WithService(
leasece.LeaseEventDataType,
lease.NewLeaseService(clients.kubeClient, clients.kubeInformers.Coordination().V1().Leases()),
).WithService(
payload.ManifestBundleEventDataType,
work.NewWorkService(clients.workClient, clients.workInformers.Work().V1().ManifestWorks()),
).Run(ctx)
},
clock.RealClock{},
)
cmd := cmdConfig.NewCommandWithContext(context.TODO())
cmd.Use = "grpc-server"
cmd.Short = "Start the gRPC Server"
flags := cmd.Flags()
opts.AddFlags(flags)
grpcServerOpts.AddFlags(flags)
return cmd
}
type clients struct {
kubeClient kubernetes.Interface
clusterClient clusterv1client.Interface
workClient workclientset.Interface
addonClient addonv1alpha1client.Interface
kubeInformers kubeinformers.SharedInformerFactory
clusterInformers clusterv1informers.SharedInformerFactory
workInformers workinformers.SharedInformerFactory
addonInformers addoninformers.SharedInformerFactory
}
func newClients(controllerContext *controllercmd.ControllerContext) (*clients, error) {
kubeClient, err := kubernetes.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return nil, err
}
clusterClient, err := clusterv1client.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return nil, err
}
addonClient, err := addonv1alpha1client.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return nil, err
}
workClient, err := workclientset.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return nil, err
}
return &clients{
kubeClient: kubeClient,
clusterClient: clusterClient,
addonClient: addonClient,
workClient: workClient,
kubeInformers: kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 30*time.Minute,
kubeinformers.WithTweakListOptions(func(listOptions *metav1.ListOptions) {
selector := &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: clusterv1.ClusterNameLabelKey,
Operator: metav1.LabelSelectorOpExists,
},
},
}
listOptions.LabelSelector = metav1.FormatLabelSelector(selector)
})),
clusterInformers: clusterv1informers.NewSharedInformerFactory(clusterClient, 30*time.Minute),
workInformers: workinformers.NewSharedInformerFactoryWithOptions(workClient, 30*time.Minute),
addonInformers: addoninformers.NewSharedInformerFactory(addonClient, 30*time.Minute),
}, nil
}
func (h *clients) Run(ctx context.Context) {
go h.kubeInformers.Start(ctx.Done())
go h.clusterInformers.Start(ctx.Done())
go h.workInformers.Start(ctx.Done())
go h.addonInformers.Start(ctx.Done())
}

2
vendor/modules.txt vendored
View File

@@ -1786,7 +1786,7 @@ open-cluster-management.io/api/operator/v1
open-cluster-management.io/api/utils/work/v1/workapplier
open-cluster-management.io/api/work/v1
open-cluster-management.io/api/work/v1alpha1
# open-cluster-management.io/sdk-go v1.0.1-0.20250708024404-422b23814b5d
# open-cluster-management.io/sdk-go v1.0.1-0.20250714033031-2a092f845650
## explicit; go 1.23.6
open-cluster-management.io/sdk-go/pkg/apis/cluster/v1alpha1
open-cluster-management.io/sdk-go/pkg/apis/cluster/v1beta1

View File

@@ -1,27 +1,44 @@
package options
import (
"github.com/spf13/pflag"
"math"
"os"
"time"
"github.com/spf13/pflag"
"gopkg.in/yaml.v2"
)
type GRPCServerOptions struct {
TLSCertFile string
TLSKeyFile string
ClientCAFile string
ServerBindPort string
MaxConcurrentStreams uint32
MaxReceiveMessageSize int
MaxSendMessageSize int
ConnectionTimeout time.Duration
WriteBufferSize int
ReadBufferSize int
MaxConnectionAge time.Duration
ClientMinPingInterval time.Duration
ServerPingInterval time.Duration
ServerPingTimeout time.Duration
PermitPingWithoutStream bool
TLSCertFile string `json:"tls_cert_file" yaml:"tls_cert_file"`
TLSKeyFile string `json:"tls_key_file" yaml:"tls_key_file"`
ClientCAFile string `json:"client_ca_file" yaml:"client_ca_file"`
ServerBindPort string `json:"server_bind_port" yaml:"server_bind_port"`
MaxConcurrentStreams uint32 `json:"max_concurrent_streams" yaml:"max_concurrent_streams"`
MaxReceiveMessageSize int `json:"max_receive_message_size" yaml:"max_receive_message_size"`
MaxSendMessageSize int `json:"max_send_message_size" yaml:"max_send_message_size"`
WriteBufferSize int `json:"write_buffer_size" yaml:"write_buffer_size"`
ReadBufferSize int `json:"read_buffer_size" yaml:"read_buffer_size"`
ConnectionTimeout time.Duration `json:"connection_timeout" yaml:"connection_timeout"`
MaxConnectionAge time.Duration `json:"max_connection_age" yaml:"max_connection_age"`
ClientMinPingInterval time.Duration `json:"client_min_ping_interval" yaml:"client_min_ping_interval"`
ServerPingInterval time.Duration `json:"server_ping_interval" yaml:"server_ping_interval"`
ServerPingTimeout time.Duration `json:"server_ping_timeout" yaml:"server_ping_timeout"`
PermitPingWithoutStream bool `json:"permit_ping_without_stream" yaml:"permit_ping_without_stream"`
}
func LoadGRPCServerOptions(configPath string) (*GRPCServerOptions, error) {
opts := NewGRPCServerOptions()
grpcServerConfig, err := os.ReadFile(configPath)
if err != nil {
return nil, err
}
if err := yaml.Unmarshal(grpcServerConfig, opts); err != nil {
return nil, err
}
return opts, nil
}
func NewGRPCServerOptions() *GRPCServerOptions {