Merge pull request #226 from cv65kr/feat/graceful-shutdown

Enable graceful shutdown for gRPC server
This commit is contained in:
Stefan Prodan
2022-10-20 11:48:43 +03:00
committed by GitHub
9 changed files with 145 additions and 108 deletions

View File

@@ -18,6 +18,7 @@ import (
"github.com/stefanprodan/podinfo/pkg/grpc"
"github.com/stefanprodan/podinfo/pkg/signals"
"github.com/stefanprodan/podinfo/pkg/version"
go_grpc "google.golang.org/grpc"
)
func main() {
@@ -33,7 +34,7 @@ func main() {
fs.StringSlice("backend-url", []string{}, "backend service URL")
fs.Duration("http-client-timeout", 2*time.Minute, "client timeout duration")
fs.Duration("http-server-timeout", 30*time.Second, "server read and write timeout duration")
fs.Duration("http-server-shutdown-timeout", 5*time.Second, "server graceful shutdown timeout duration")
fs.Duration("server-shutdown-timeout", 5*time.Second, "server graceful shutdown timeout duration")
fs.String("data-path", "/data", "data local path")
fs.String("config-path", "", "config dir path")
fs.String("cert-path", "/data/cert", "certificate path for HTTPS port")
@@ -135,9 +136,10 @@ func main() {
}
// start gRPC server
var grpcServer *go_grpc.Server
if grpcCfg.Port > 0 {
grpcSrv, _ := grpc.NewServer(&grpcCfg, logger)
go grpcSrv.ListenAndServe()
grpcServer = grpcSrv.ListenAndServe()
}
// load HTTP server config
@@ -155,8 +157,12 @@ func main() {
// start HTTP server
srv, _ := api.NewServer(&srvCfg, logger)
httpServer, httpsServer, healthy, ready := srv.ListenAndServe()
// graceful shutdown
stopCh := signals.SetupSignalHandler()
srv.ListenAndServe(stopCh)
sd, _ := signals.NewShutdown(srvCfg.ServerShutdownTimeout, logger)
sd.Graceful(stopCh, httpServer, httpsServer, grpcServer, healthy, ready)
}
func initZap(logLevel string) (*zap.Logger, error) {

6
go.mod
View File

@@ -30,7 +30,7 @@ require (
go.opentelemetry.io/otel/trace v1.10.0
go.uber.org/zap v1.23.0
golang.org/x/net v0.0.0-20220927171203-f486391704dc
google.golang.org/grpc v1.49.0
google.golang.org/grpc v1.50.1
)
// Fix CVE-2022-28948
@@ -41,9 +41,7 @@ require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.1.3 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/felixge/httpsnoop v1.0.3 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-openapi/jsonpointer v0.19.5 // indirect
@@ -66,13 +64,11 @@ require (
github.com/prometheus/client_model v0.2.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/spf13/afero v1.8.2 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/subosito/gotenv v1.4.1 // indirect
github.com/swaggo/files v0.0.0-20220610200504-28940afbdbfe // indirect
github.com/urfave/cli/v2 v2.3.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.10.0 // indirect
go.opentelemetry.io/otel/metric v0.32.1 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect

12
go.sum
View File

@@ -78,8 +78,6 @@ github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWH
github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cpuguy83/go-md2man/v2 v2.0.2 h1:p1EgwI/C7NhT0JmVkwCD2ZBK8j4aeHQX2pMHHBfMQ6w=
github.com/cpuguy83/go-md2man/v2 v2.0.2/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
@@ -103,7 +101,6 @@ github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSw
github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3ZUKE=
github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI=
github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
@@ -299,10 +296,7 @@ github.com/prometheus/procfs v0.8.0/go.mod h1:z7EfXMXOkbkqb9IINtpCn86r/to3BnA0ua
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/russross/blackfriday/v2 v2.1.0 h1:JIOH55/0cWyOuilr9/qlrm0BSXldqnqwMsf35Ld67mk=
github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
@@ -339,8 +333,6 @@ github.com/swaggo/http-swagger v1.3.3 h1:Hu5Z0L9ssyBLofaama21iYaF2VbWyA8jdohaaCG
github.com/swaggo/http-swagger v1.3.3/go.mod h1:sE+4PjD89IxMPm77FnkDz0sdO+p5lbXzrVWT6OTVVGo=
github.com/swaggo/swag v1.8.6 h1:2rgOaLbonWu1PLP6G+/rYjSvPg0jQE0HtrEKuE380eg=
github.com/swaggo/swag v1.8.6/go.mod h1:jMLeXOOmYyjk8PvHTsXBdrubsNd9gUJTTCzL5iBnseg=
github.com/urfave/cli/v2 v2.3.0 h1:qph92Y649prgesehzOrQjdWyxFOp/QVM+6imKHad91M=
github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
@@ -703,8 +695,8 @@ google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAG
google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=
google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU=
google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/grpc v1.49.0 h1:WTLtQzmQori5FUH25Pq4WT22oCsv8USpQ+F6rqtsmxw=
google.golang.org/grpc v1.49.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI=
google.golang.org/grpc v1.50.1 h1:DS/BukOZWp8s6p4Dt/tOaJaTQyPyOoCcrjroHuCeLzY=
google.golang.org/grpc v1.50.1/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=

View File

@@ -141,7 +141,7 @@ func (s *Server) getCacheConn() (redis.Conn, error) {
return redis.Dial("tcp", redisUrl.Host, opts...)
}
func (s *Server) startCachePool(ticker *time.Ticker, stopCh <-chan struct{}) {
func (s *Server) startCachePool(ticker *time.Ticker) {
if s.config.CacheServer == "" {
return
}
@@ -169,8 +169,6 @@ func (s *Server) startCachePool(ticker *time.Ticker, stopCh <-chan struct{}) {
setVersion()
for {
select {
case <-stopCh:
return
case <-ticker.C:
setVersion()
}

View File

@@ -11,17 +11,17 @@ import (
func NewMockServer() *Server {
config := &Config{
Port: "9898",
HttpServerShutdownTimeout: 5 * time.Second,
HttpServerTimeout: 30 * time.Second,
BackendURL: []string{},
ConfigPath: "/config",
DataPath: "/data",
HttpClientTimeout: 30 * time.Second,
UIColor: "blue",
UIPath: ".ui",
UIMessage: "Greetings",
Hostname: "localhost",
Port: "9898",
ServerShutdownTimeout: 5 * time.Second,
HttpServerTimeout: 30 * time.Second,
BackendURL: []string{},
ConfigPath: "/config",
DataPath: "/data",
HttpClientTimeout: 30 * time.Second,
UIColor: "blue",
UIPath: ".ui",
UIMessage: "Greetings",
Hostname: "localhost",
}
logger, _ := zap.NewDevelopment()

View File

@@ -14,7 +14,6 @@ import (
"github.com/gomodule/redigo/redis"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/viper"
_ "github.com/stefanprodan/podinfo/pkg/api/docs"
"github.com/stefanprodan/podinfo/pkg/fscache"
httpSwagger "github.com/swaggo/http-swagger"
@@ -47,32 +46,32 @@ var (
)
type Config struct {
HttpClientTimeout time.Duration `mapstructure:"http-client-timeout"`
HttpServerTimeout time.Duration `mapstructure:"http-server-timeout"`
HttpServerShutdownTimeout time.Duration `mapstructure:"http-server-shutdown-timeout"`
BackendURL []string `mapstructure:"backend-url"`
UILogo string `mapstructure:"ui-logo"`
UIMessage string `mapstructure:"ui-message"`
UIColor string `mapstructure:"ui-color"`
UIPath string `mapstructure:"ui-path"`
DataPath string `mapstructure:"data-path"`
ConfigPath string `mapstructure:"config-path"`
CertPath string `mapstructure:"cert-path"`
Host string `mapstructure:"host"`
Port string `mapstructure:"port"`
SecurePort string `mapstructure:"secure-port"`
PortMetrics int `mapstructure:"port-metrics"`
Hostname string `mapstructure:"hostname"`
H2C bool `mapstructure:"h2c"`
RandomDelay bool `mapstructure:"random-delay"`
RandomDelayUnit string `mapstructure:"random-delay-unit"`
RandomDelayMin int `mapstructure:"random-delay-min"`
RandomDelayMax int `mapstructure:"random-delay-max"`
RandomError bool `mapstructure:"random-error"`
Unhealthy bool `mapstructure:"unhealthy"`
Unready bool `mapstructure:"unready"`
JWTSecret string `mapstructure:"jwt-secret"`
CacheServer string `mapstructure:"cache-server"`
HttpClientTimeout time.Duration `mapstructure:"http-client-timeout"`
HttpServerTimeout time.Duration `mapstructure:"http-server-timeout"`
ServerShutdownTimeout time.Duration `mapstructure:"server-shutdown-timeout"`
BackendURL []string `mapstructure:"backend-url"`
UILogo string `mapstructure:"ui-logo"`
UIMessage string `mapstructure:"ui-message"`
UIColor string `mapstructure:"ui-color"`
UIPath string `mapstructure:"ui-path"`
DataPath string `mapstructure:"data-path"`
ConfigPath string `mapstructure:"config-path"`
CertPath string `mapstructure:"cert-path"`
Host string `mapstructure:"host"`
Port string `mapstructure:"port"`
SecurePort string `mapstructure:"secure-port"`
PortMetrics int `mapstructure:"port-metrics"`
Hostname string `mapstructure:"hostname"`
H2C bool `mapstructure:"h2c"`
RandomDelay bool `mapstructure:"random-delay"`
RandomDelayUnit string `mapstructure:"random-delay-unit"`
RandomDelayMin int `mapstructure:"random-delay-min"`
RandomDelayMax int `mapstructure:"random-delay-max"`
RandomError bool `mapstructure:"random-error"`
Unhealthy bool `mapstructure:"unhealthy"`
Unready bool `mapstructure:"unready"`
JWTSecret string `mapstructure:"jwt-secret"`
CacheServer string `mapstructure:"cache-server"`
}
type Server struct {
@@ -153,7 +152,7 @@ func (s *Server) registerMiddlewares() {
}
}
func (s *Server) ListenAndServe(stopCh <-chan struct{}) {
func (s *Server) ListenAndServe() (*http.Server, *http.Server, *int32, *int32) {
ctx := context.Background()
go s.startMetricsServer()
@@ -183,7 +182,7 @@ func (s *Server) ListenAndServe(stopCh <-chan struct{}) {
// start redis connection pool
ticker := time.NewTicker(30 * time.Second)
s.startCachePool(ticker, stopCh)
s.startCachePool(ticker)
// create the http server
srv := s.startServer()
@@ -199,48 +198,7 @@ func (s *Server) ListenAndServe(stopCh <-chan struct{}) {
atomic.StoreInt32(&ready, 1)
}
// wait for SIGTERM or SIGINT
<-stopCh
ctx, cancel := context.WithTimeout(ctx, s.config.HttpServerShutdownTimeout)
defer cancel()
// all calls to /healthz and /readyz will fail from now on
atomic.StoreInt32(&healthy, 0)
atomic.StoreInt32(&ready, 0)
// close cache pool
if s.pool != nil {
_ = s.pool.Close()
}
s.logger.Info("Shutting down HTTP/HTTPS server", zap.Duration("timeout", s.config.HttpServerShutdownTimeout))
// wait for Kubernetes readiness probe to remove this instance from the load balancer
// the readiness check interval must be lower than the timeout
if viper.GetString("level") != "debug" {
time.Sleep(3 * time.Second)
}
// stop OpenTelemetry tracer provider
if s.tracerProvider != nil {
if err := s.tracerProvider.Shutdown(ctx); err != nil {
s.logger.Warn("stopping tracer provider", zap.Error(err))
}
}
// determine if the http server was started
if srv != nil {
if err := srv.Shutdown(ctx); err != nil {
s.logger.Warn("HTTP server graceful shutdown failed", zap.Error(err))
}
}
// determine if the secure server was started
if secureSrv != nil {
if err := secureSrv.Shutdown(ctx); err != nil {
s.logger.Warn("HTTPS server graceful shutdown failed", zap.Error(err))
}
}
return srv, secureSrv, &healthy, &ready
}
func (s *Server) startServer() *http.Server {

View File

@@ -30,7 +30,7 @@ func NewServer(config *Config, logger *zap.Logger) (*Server, error) {
return srv, nil
}
func (s *Server) ListenAndServe() {
func (s *Server) ListenAndServe() *grpc.Server {
listener, err := net.Listen("tcp", fmt.Sprintf(":%v", s.config.Port))
if err != nil {
s.logger.Fatal("failed to listen", zap.Int("port", s.config.Port))
@@ -42,7 +42,11 @@ func (s *Server) ListenAndServe() {
grpc_health_v1.RegisterHealthServer(srv, server)
server.SetServingStatus(s.config.ServiceName, grpc_health_v1.HealthCheckResponse_SERVING)
if err := srv.Serve(listener); err != nil {
s.logger.Fatal("failed to serve", zap.Error(err))
}
go func() {
if err := srv.Serve(listener); err != nil {
s.logger.Fatal("failed to serve", zap.Error(err))
}
}()
return srv
}

83
pkg/signals/shutdown.go Normal file
View File

@@ -0,0 +1,83 @@
package signals
import (
"context"
"net/http"
"sync/atomic"
"time"
"github.com/gomodule/redigo/redis"
"github.com/spf13/viper"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.uber.org/zap"
"google.golang.org/grpc"
)
type Shutdown struct {
logger *zap.Logger
pool *redis.Pool
tracerProvider *sdktrace.TracerProvider
serverShutdownTimeout time.Duration
}
func NewShutdown(serverShutdownTimeout time.Duration, logger *zap.Logger) (*Shutdown, error) {
srv := &Shutdown{
logger: logger,
serverShutdownTimeout: serverShutdownTimeout,
}
return srv, nil
}
func (s *Shutdown) Graceful(stopCh <-chan struct{}, httpServer *http.Server, httpsServer *http.Server, grpcServer *grpc.Server, healthy *int32, ready *int32) {
ctx := context.Background()
// wait for SIGTERM or SIGINT
<-stopCh
ctx, cancel := context.WithTimeout(ctx, s.serverShutdownTimeout)
defer cancel()
// all calls to /healthz and /readyz will fail from now on
atomic.StoreInt32(healthy, 0)
atomic.StoreInt32(ready, 0)
// close cache pool
if s.pool != nil {
_ = s.pool.Close()
}
s.logger.Info("Shutting down HTTP/HTTPS server", zap.Duration("timeout", s.serverShutdownTimeout))
// wait for Kubernetes readiness probe to remove this instance from the load balancer
// the readiness check interval must be lower than the timeout
if viper.GetString("level") != "debug" {
time.Sleep(3 * time.Second)
}
// stop OpenTelemetry tracer provider
if s.tracerProvider != nil {
if err := s.tracerProvider.Shutdown(ctx); err != nil {
s.logger.Warn("stopping tracer provider", zap.Error(err))
}
}
// determine if the GRPC was started
if grpcServer != nil {
s.logger.Info("Shutting down GRPC server", zap.Duration("timeout", s.serverShutdownTimeout))
grpcServer.GracefulStop()
}
// determine if the http server was started
if httpServer != nil {
if err := httpServer.Shutdown(ctx); err != nil {
s.logger.Warn("HTTP server graceful shutdown failed", zap.Error(err))
}
}
// determine if the secure server was started
if httpsServer != nil {
if err := httpsServer.Shutdown(ctx); err != nil {
s.logger.Warn("HTTPS server graceful shutdown failed", zap.Error(err))
}
}
}

View File

@@ -8,4 +8,4 @@ import (
"syscall"
)
var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}
var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM, syscall.SIGINT}