Skip to content

Commit

Permalink
optimize grpc server shutdown and add grpc network type
Browse files Browse the repository at this point in the history
  • Loading branch information
daheige committed Oct 3, 2020
1 parent 7d9a08e commit 756dcbd
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 71 deletions.
3 changes: 2 additions & 1 deletion example/clients/go/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (
)

const (
address = "localhost:8081"
address = "localhost:8081" // grpc server and http gateway on share port
// address = "localhost:50051" // grpc server port without http gateway
// address = "localhost:50050" // nginx grpc_pass port
defaultName = "golang grpc"
)
Expand Down
10 changes: 8 additions & 2 deletions example/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@ func main() {
gmicro.WithRouteOpt(route),
gmicro.WithShutdownFunc(shutdownFunc),
gmicro.WithPreShutdownDelay(2*time.Second),
gmicro.WithShutdownTimeout(6*time.Second),
gmicro.WithHandlerFromEndpoint(pb.RegisterGreeterServiceHandlerFromEndpoint),
gmicro.WithLogger(gmicro.LoggerFunc(log.Printf)),
gmicro.WithRequestAccess(true),
gmicro.WithPrometheus(true),
gmicro.WithGRPCServerOption(grpc.ConnectionTimeout(10*time.Second)),
gmicro.WithGRPCNetwork("tcp"), // grpc server start network
)

// register grpc service
Expand Down Expand Up @@ -82,10 +84,14 @@ func main() {

s.AddRoute(newRoute2)

// you can start grpc server and http gateway on one port
log.Fatalln(s.StartGRPCAndHTTPServer(sharePort))

// You can also specify ports for grpc and http gw separately
// log.Fatalln(s.Start(sharePort,50051))
// you can also specify ports for grpc and http gw separately
// log.Fatalln(s.Start(sharePort, 50051))

// you can start server without http gateway
// log.Fatalln(s.StartGRPCWithoutGateway(50051))
}

// rpc service entry
Expand Down
163 changes: 99 additions & 64 deletions micro.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,32 @@ import (
"strings"
"time"

grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
grpc_validator "github.com/grpc-ecosystem/go-grpc-middleware/validator"
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
middleware "github.com/grpc-ecosystem/go-grpc-middleware"
gRecovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
gValidator "github.com/grpc-ecosystem/go-grpc-middleware/validator"
gPrometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
gRuntime "github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/prometheus/client_golang/prometheus/promhttp"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
"google.golang.org/grpc/reflection"

"github.com/grpc-ecosystem/grpc-gateway/runtime"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/reflection"
"google.golang.org/grpc/status"
)

const (
// the default timeout before the server shutdown abruptly
defaultShutdownTimeout = 8 * time.Second
defaultShutdownTimeout = 5 * time.Second

// the default time waiting for running goroutines to finish their jobs before the shutdown starts
// the default time waiting for running goroutines to finish their jobs before the shutdown start.
defaultPreShutdownDelay = 2 * time.Second
)

// refer: https://github.com/grpc-ecosystem/grpc-gateway/blob/master/docs/_docs/customizingyourgateway.md
var defaultMuxOption = runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.JSONPb{EmitDefaults: true})
var defaultMuxOption = gRuntime.WithMarshalerOption(gRuntime.MIMEWildcard,
&gRuntime.JSONPb{EmitDefaults: true})

// AnnotatorFunc is the annotator function is for injecting meta data from http request into gRPC context
type AnnotatorFunc func(context.Context, *http.Request) metadata.MD
Expand All @@ -47,10 +47,11 @@ type AnnotatorFunc func(context.Context, *http.Request) metadata.MD
// to steps to reverse-proxy the HTTP/1 requests to gRPC
// handlerFromEndpoint http gw endPoint
// automatically dials to "endpoint" and closes the connection when "ctx" gets done.
type HandlerFromEndpoint func(ctx context.Context, mux *runtime.ServeMux, grpcAddressAndPort string, opts []grpc.DialOption) error
type HandlerFromEndpoint func(ctx context.Context, mux *gRuntime.ServeMux,
grpcAddressAndPort string, opts []grpc.DialOption) error

// HTTPHandlerFunc is the http middleware handler function.
type HTTPHandlerFunc func(*runtime.ServeMux) http.Handler
type HTTPHandlerFunc func(*gRuntime.ServeMux) http.Handler

// Service represents the microservice.
type Service struct {
Expand All @@ -59,16 +60,17 @@ type Service struct {
httpHandler HTTPHandlerFunc // http.Handler
gRPCAddress string // gRPC host eg: ip:port
httpServerAddress string // http server host eg: ip:port
gRPCNetwork string // the gRPC network must be "tcp", "tcp4", "tcp6"
recovery func() // goroutine exec recover catch stack
shutdownFunc func() // shutdown func
shutdownTimeout time.Duration // shutdown wait time
preShutdownDelay time.Duration
interruptSignals []os.Signal // interrupt signal
annotators []AnnotatorFunc
staticDir string // static dir
errorHandler runtime.ProtoErrorHandlerFunc // gRPC error handler
mux *runtime.ServeMux // gRPC gw runtime serverMux
muxOptions []runtime.ServeMuxOption // gRPC mux options
errorHandler gRuntime.ProtoErrorHandlerFunc // gRPC error handler
mux *gRuntime.ServeMux // gRPC gw runtime serverMux
muxOptions []gRuntime.ServeMuxOption // gRPC mux options
routes []Route // gRPC http router
streamInterceptors []grpc.StreamServerInterceptor // gRPC steam interceptor
unaryInterceptors []grpc.UnaryServerInterceptor // gRPC server interceptor
Expand All @@ -80,8 +82,8 @@ type Service struct {
enablePrometheus bool // enable prometheus monitor
}

// DefaultHTTPHandler is the default http handler which does nothing
func DefaultHTTPHandler(mux *runtime.ServeMux) http.Handler {
// DefaultHTTPHandler is the default http handler which does nothing.
func DefaultHTTPHandler(mux *gRuntime.ServeMux) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
mux.ServeHTTP(w, r)
})
Expand Down Expand Up @@ -110,7 +112,7 @@ func GRPCHandlerFunc(grpcServer *grpc.Server, otherHandler http.Handler) http.Ha
func defaultService() *Service {
s := Service{}
s.httpHandler = DefaultHTTPHandler
s.errorHandler = runtime.DefaultHTTPError
s.errorHandler = gRuntime.DefaultHTTPError
s.shutdownFunc = func() {}
s.shutdownTimeout = defaultShutdownTimeout
s.preShutdownDelay = defaultPreShutdownDelay
Expand All @@ -132,12 +134,12 @@ func defaultService() *Service {
s.unaryInterceptors = []grpc.UnaryServerInterceptor{}

// install validator interceptor
s.streamInterceptors = append(s.streamInterceptors, grpc_validator.StreamServerInterceptor())
s.unaryInterceptors = append(s.unaryInterceptors, grpc_validator.UnaryServerInterceptor())
s.streamInterceptors = append(s.streamInterceptors, gValidator.StreamServerInterceptor())
s.unaryInterceptors = append(s.unaryInterceptors, gValidator.UnaryServerInterceptor())

// install panic handler which will turn panics into gRPC errors
s.streamInterceptors = append(s.streamInterceptors, grpc_recovery.StreamServerInterceptor())
s.unaryInterceptors = append(s.unaryInterceptors, grpc_recovery.UnaryServerInterceptor())
s.streamInterceptors = append(s.streamInterceptors, gRecovery.StreamServerInterceptor())
s.unaryInterceptors = append(s.unaryInterceptors, gRecovery.UnaryServerInterceptor())

// apply default marshaler option for mux, can be replaced by using MuxOption
s.muxOptions = append(s.muxOptions, defaultMuxOption)
Expand All @@ -150,7 +152,7 @@ func NewService(opts ...Option) *Service {
s := defaultService()

// app option functions.
s.apply(opts...)
s.apply(opts)

// install request interceptor
if s.enableRequestAccess {
Expand All @@ -164,8 +166,8 @@ func NewService(opts ...Option) *Service {

// install prometheus interceptor
if s.enablePrometheus {
s.streamInterceptors = append(s.streamInterceptors, grpc_prometheus.StreamServerInterceptor)
s.unaryInterceptors = append(s.unaryInterceptors, grpc_prometheus.UnaryServerInterceptor)
s.streamInterceptors = append(s.streamInterceptors, gPrometheus.StreamServerInterceptor)
s.unaryInterceptors = append(s.unaryInterceptors, gPrometheus.UnaryServerInterceptor)

// add /metrics HTTP/1 endpoint
routeMetrics := Route{
Expand All @@ -180,17 +182,19 @@ func NewService(opts ...Option) *Service {
}

// init gateway mux
s.muxOptions = append(s.muxOptions, runtime.WithProtoErrorHandler(s.errorHandler))
s.muxOptions = append(s.muxOptions, gRuntime.WithProtoErrorHandler(s.errorHandler))

// init annotators
for _, annotator := range s.annotators {
s.muxOptions = append(s.muxOptions, runtime.WithMetadata(annotator))
s.muxOptions = append(s.muxOptions, gRuntime.WithMetadata(annotator))
}

s.mux = runtime.NewServeMux(s.muxOptions...)
s.mux = gRuntime.NewServeMux(s.muxOptions...)

s.gRPCServerOptions = append(s.gRPCServerOptions, grpc_middleware.WithStreamServerChain(s.streamInterceptors...))
s.gRPCServerOptions = append(s.gRPCServerOptions, grpc_middleware.WithUnaryServerChain(s.unaryInterceptors...))
s.gRPCServerOptions = append(s.gRPCServerOptions,
middleware.WithStreamServerChain(s.streamInterceptors...))
s.gRPCServerOptions = append(s.gRPCServerOptions,
middleware.WithUnaryServerChain(s.unaryInterceptors...))

s.GRPCServer = grpc.NewServer(
s.gRPCServerOptions...,
Expand All @@ -200,10 +204,10 @@ func NewService(opts ...Option) *Service {
// http server addr is specified in the startGRPCGateway method below
if s.HTTPServer == nil {
s.HTTPServer = &http.Server{
ReadHeaderTimeout: 5 * time.Second, //read header timeout
ReadTimeout: 5 * time.Second, //read request timeout
WriteTimeout: 10 * time.Second, //write timeout
IdleTimeout: 20 * time.Second, //tcp idle time
ReadHeaderTimeout: 5 * time.Second, // read header timeout
ReadTimeout: 5 * time.Second, // read request timeout
WriteTimeout: 10 * time.Second, // write timeout
IdleTimeout: 20 * time.Second, // tcp idle time
}
}

Expand Down Expand Up @@ -324,7 +328,11 @@ func (s *Service) startGRPCServer() error {
// register reflection service on gRPC server.
reflection.Register(s.GRPCServer)

lis, err := net.Listen("tcp", s.gRPCAddress)
if s.gRPCNetwork == "" {
s.gRPCNetwork = "tcp"
}

lis, err := net.Listen(s.gRPCNetwork, s.gRPCAddress)
if err != nil {
return err
}
Expand All @@ -351,7 +359,8 @@ func (s *Service) startGRPCGateway() error {

// this is the fallback handler that will serve static files,
// if file does not exist, then a 404 error will be returned.
s.mux.Handle("GET", AllPattern(), func(w http.ResponseWriter, r *http.Request, pathParams map[string]string) {
s.mux.Handle("GET", AllPattern(), func(w http.ResponseWriter, r *http.Request,
pathParams map[string]string) {
dir := s.staticDir
if s.staticDir == "" {
dir, _ = os.Getwd()
Expand Down Expand Up @@ -382,22 +391,49 @@ func (s *Service) Stop() {

// we wait for a duration of preShutdownDelay for running goroutines to finish their jobs
if s.preShutdownDelay > 0 {
s.logger.Printf("Waiting for %v before shutdown starts\n", s.preShutdownDelay)
s.logger.Printf("Waiting for %v before shutdown start\n", s.preShutdownDelay)
time.Sleep(s.preShutdownDelay)
}

// gracefully stop gRPC server first
s.GRPCServer.GracefulStop()

// gracefully stop http server
s.httpServerShutdown()
}

// httpServerShutdown http gateway server graceful shutdown.
func (s *Service) httpServerShutdown() {
done := make(chan struct{}, 1)
ctx, cancel := context.WithTimeout(
context.Background(),
s.shutdownTimeout,
)

defer cancel()

// gracefully stop http server
go s.HTTPServer.Shutdown(ctx)
<-ctx.Done()
// Doesn't block if no connections, but will otherwise wait
// until the timeout deadline.
// Optionally, you could run srv.Shutdown in a goroutine and block on
// if your application should wait for other services
// to finalize based on context cancellation.
// gracefully stop http server
go func() {
defer s.recovery()
defer close(done)

if err := s.HTTPServer.Shutdown(ctx); err != nil {
s.logger.Printf("Http server shutdown error: %v", err.Error())
}
}()

select {
case <-ctx.Done():
s.logger.Printf("Server shutdown ctx cancel error: %v", ctx.Err())
case <-done:
s.logger.Printf("Server shutdown success")
}
}

// ===The following method is mainly for grpc server and http gw server to start on one port==//
Expand Down Expand Up @@ -476,24 +512,12 @@ func (s *Service) stopGRPCAndHTTPServer() {

// we wait for a duration of preShutdownDelay for running goroutines to finish their jobs
if s.preShutdownDelay > 0 {
s.logger.Printf("Waiting for %v before shutdown starts\n", s.preShutdownDelay)
s.logger.Printf("Waiting for %v before shutdown start\n", s.preShutdownDelay)
time.Sleep(s.preShutdownDelay)
}

ctx, cancel := context.WithTimeout(
context.Background(),
s.shutdownTimeout,
)
defer cancel()

// gracefully stop http server
// Doesn't block if no connections, but will otherwise wait
// until the timeout deadline.
// Optionally, you could run srv.Shutdown in a goroutine and block on
// if your application should wait for other services
// to finalize based on context cancellation.
go s.HTTPServer.Shutdown(ctx)
<-ctx.Done()
// graceful server shutdown
s.httpServerShutdown()
}

// The following method is only used to start the grpc server, but not start http gw.
Expand All @@ -503,7 +527,7 @@ func NewServiceWithoutGateway(opts ...Option) *Service {
s := defaultService()

// app option functions.
s.apply(opts...)
s.apply(opts)

// install request interceptor
if s.enableRequestAccess {
Expand All @@ -517,14 +541,16 @@ func NewServiceWithoutGateway(opts ...Option) *Service {

// install prometheus interceptor
if s.enablePrometheus {
s.streamInterceptors = append(s.streamInterceptors, grpc_prometheus.StreamServerInterceptor)
s.unaryInterceptors = append(s.unaryInterceptors, grpc_prometheus.UnaryServerInterceptor)
s.streamInterceptors = append(s.streamInterceptors, gPrometheus.StreamServerInterceptor)
s.unaryInterceptors = append(s.unaryInterceptors, gPrometheus.UnaryServerInterceptor)
}

s.muxOptions = nil

s.gRPCServerOptions = append(s.gRPCServerOptions, grpc_middleware.WithStreamServerChain(s.streamInterceptors...))
s.gRPCServerOptions = append(s.gRPCServerOptions, grpc_middleware.WithUnaryServerChain(s.unaryInterceptors...))
s.gRPCServerOptions = append(s.gRPCServerOptions,
middleware.WithStreamServerChain(s.streamInterceptors...))
s.gRPCServerOptions = append(s.gRPCServerOptions,
middleware.WithUnaryServerChain(s.unaryInterceptors...))

s.GRPCServer = grpc.NewServer(
s.gRPCServerOptions...,
Expand Down Expand Up @@ -569,21 +595,30 @@ func (s *Service) StartGRPCWithoutGateway(grpcPort int) error {
func (s *Service) StopGRPCWithoutGateway() {
// we wait for a duration of preShutdownDelay for running goroutines to finish their jobs
if s.preShutdownDelay > 0 {
s.logger.Printf("Waiting for %v before shutdown starts\n", s.preShutdownDelay)
s.logger.Printf("Waiting for %v before shutdown start\n", s.preShutdownDelay)
time.Sleep(s.preShutdownDelay)
}

done := make(chan struct{}, 1)
ctx, cancel := context.WithTimeout(
context.Background(),
s.shutdownTimeout,
)

defer cancel()

// gracefully stop gRPC server first
s.GRPCServer.GracefulStop()
// gracefully stop gRPC server
go func() {
defer s.recovery()
defer close(done)

<-ctx.Done()
s.GRPCServer.GracefulStop()
}()

s.logger.Printf("gRPC server shutdown success")
select {
case <-ctx.Done():
s.logger.Printf("Grpc server shutdown ctx cancel error: %v", ctx.Err())
case <-done:
s.logger.Printf("Grpc server shutdown success")
}
}
Loading

0 comments on commit 756dcbd

Please sign in to comment.