-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathserver.go
123 lines (101 loc) · 2.8 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package grpcbase
import (
"context"
"fmt"
"net"
"sync"
"github.com/go-nacelle/nacelle/v2"
"github.com/go-nacelle/process/v2"
"github.com/go-nacelle/service/v2"
"github.com/google/uuid"
"google.golang.org/grpc"
)
type (
Server struct {
Config *nacelle.Config `service:"config"`
Logger nacelle.Logger `service:"logger"`
Services *nacelle.ServiceContainer `service:"services"`
Health *nacelle.Health `service:"health"`
tagModifiers []nacelle.TagModifier
initializer ServerInitializer
listener *net.TCPListener
server *grpc.Server
once *sync.Once
stopped chan struct{}
host string
port int
serverOptions []grpc.ServerOption
healthToken healthToken
healthStatus *process.HealthComponentStatus
}
ServerInitializer interface {
Init(context.Context, *grpc.Server) error
}
ServerInitializerFunc func(context.Context, *grpc.Server) error
)
func (f ServerInitializerFunc) Init(ctx context.Context, server *grpc.Server) error {
return f(ctx, server)
}
func NewServer(initializer ServerInitializer, configs ...ConfigFunc) *Server {
options := getOptions(configs)
return &Server{
tagModifiers: options.tagModifiers,
initializer: initializer,
once: &sync.Once{},
stopped: make(chan struct{}),
serverOptions: options.serverOptions,
healthToken: healthToken(uuid.New().String()),
}
}
func (s *Server) Init(ctx context.Context) (err error) {
healthStatus, err := s.Health.Register(s.healthToken)
if err != nil {
return err
}
s.healthStatus = healthStatus
grpcConfig := &Config{}
if err = s.Config.Load(grpcConfig, s.tagModifiers...); err != nil {
return err
}
s.listener, err = makeListener(grpcConfig.GRPCHost, grpcConfig.GRPCPort)
if err != nil {
return
}
if err := service.Inject(ctx, s.Services, s.initializer); err != nil {
return err
}
s.host = grpcConfig.GRPCHost
s.port = grpcConfig.GRPCPort
s.server = grpc.NewServer(s.serverOptions...)
err = s.initializer.Init(ctx, s.server)
return
}
func (s *Server) Run(ctx context.Context) error {
defer s.listener.Close()
s.healthStatus.Update(true)
s.Logger.Info("Serving gRPC on %s:%d", s.host, s.port)
if err := s.server.Serve(s.listener); err != nil {
select {
case <-s.stopped:
default:
return err
}
}
s.Logger.Info("No longer serving gRPC on %s:%d", s.host, s.port)
return nil
}
func (s *Server) Stop(ctx context.Context) error {
s.once.Do(func() {
s.Logger.Info("Shutting down gRPC server")
close(s.stopped)
s.server.GracefulStop()
})
return nil
}
func makeListener(host string, port int) (*net.TCPListener, error) {
addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("%s:%d", host, port))
if err != nil {
return nil, err
}
return net.ListenTCP("tcp", addr)
}