From 30040332a73f53e00a82f39788f62516bc4599c3 Mon Sep 17 00:00:00 2001 From: John Guo Date: Wed, 27 Sep 2023 14:40:32 +0800 Subject: [PATCH] improve address configuration for grpc server (#2982) --- contrib/config/polaris/go.sum | 3 -- contrib/registry/polaris/go.sum | 3 -- contrib/rpc/grpcx/grpcx.go | 4 +-- contrib/rpc/grpcx/grpcx_grpc_server.go | 25 +++++++------ contrib/rpc/grpcx/grpcx_grpc_server_config.go | 2 +- .../grpcx/grpcx_grpc_server_config_test.go | 36 +++++++++++++++++++ os/gproc/gproc_signal.go | 28 ++++++++++++--- 7 files changed, 76 insertions(+), 25 deletions(-) diff --git a/contrib/config/polaris/go.sum b/contrib/config/polaris/go.sum index 727a099c5..0f6c3d247 100644 --- a/contrib/config/polaris/go.sum +++ b/contrib/config/polaris/go.sum @@ -400,11 +400,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/polarismesh/polaris-go v1.5.3 h1:RL1m6FThsYCzKYGOLp5HXNCnzeqa5NEsgO0h5kxZXRM= -github.com/polarismesh/polaris-go v1.5.3/go.mod h1:KVMjcp6P2R8MFPKfBPX3kzykyzH0iX8fHCiITcqKda8= github.com/polarismesh/polaris-go v1.5.4 h1:Y/FaZk7OpdjVeRh3b4ZHYXF6xtjTkP0oCmVNVdm/GoQ= github.com/polarismesh/polaris-go v1.5.4/go.mod h1:KVMjcp6P2R8MFPKfBPX3kzykyzH0iX8fHCiITcqKda8= -github.com/polarismesh/specification v1.4.0 h1:fm7sUtFZC2g9+lLmRCtjGrUow47CY5JDFoZXwwCQGGY= github.com/polarismesh/specification v1.4.0/go.mod h1:rDvMMtl5qebPmqiBLNa5Ps0XtwkP31ZLirbH4kXA0YU= github.com/polarismesh/specification v1.4.1 h1:lTZqeyUhhWuKyr6NDKBwmUrNfcUDvKLxWT/uOq71T5A= github.com/polarismesh/specification v1.4.1/go.mod h1:rDvMMtl5qebPmqiBLNa5Ps0XtwkP31ZLirbH4kXA0YU= diff --git a/contrib/registry/polaris/go.sum b/contrib/registry/polaris/go.sum index 727a099c5..0f6c3d247 100644 --- a/contrib/registry/polaris/go.sum +++ b/contrib/registry/polaris/go.sum @@ -400,11 +400,8 @@ github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/polarismesh/polaris-go v1.5.3 h1:RL1m6FThsYCzKYGOLp5HXNCnzeqa5NEsgO0h5kxZXRM= -github.com/polarismesh/polaris-go v1.5.3/go.mod h1:KVMjcp6P2R8MFPKfBPX3kzykyzH0iX8fHCiITcqKda8= github.com/polarismesh/polaris-go v1.5.4 h1:Y/FaZk7OpdjVeRh3b4ZHYXF6xtjTkP0oCmVNVdm/GoQ= github.com/polarismesh/polaris-go v1.5.4/go.mod h1:KVMjcp6P2R8MFPKfBPX3kzykyzH0iX8fHCiITcqKda8= -github.com/polarismesh/specification v1.4.0 h1:fm7sUtFZC2g9+lLmRCtjGrUow47CY5JDFoZXwwCQGGY= github.com/polarismesh/specification v1.4.0/go.mod h1:rDvMMtl5qebPmqiBLNa5Ps0XtwkP31ZLirbH4kXA0YU= github.com/polarismesh/specification v1.4.1 h1:lTZqeyUhhWuKyr6NDKBwmUrNfcUDvKLxWT/uOq71T5A= github.com/polarismesh/specification v1.4.1/go.mod h1:rDvMMtl5qebPmqiBLNa5Ps0XtwkP31ZLirbH4kXA0YU= diff --git a/contrib/rpc/grpcx/grpcx.go b/contrib/rpc/grpcx/grpcx.go index f40cf44f4..fce4bdda6 100644 --- a/contrib/rpc/grpcx/grpcx.go +++ b/contrib/rpc/grpcx/grpcx.go @@ -22,8 +22,8 @@ type ( ) const ( - // FreePortAddress marks the server listens using random free port. - FreePortAddress = ":0" + FreePortAddress = ":0" // FreePortAddress marks the server listens using random free port. + defaultListenAddress = ":0" // Default listening address for grpc server if no address configured. ) const ( diff --git a/contrib/rpc/grpcx/grpcx_grpc_server.go b/contrib/rpc/grpcx/grpcx_grpc_server.go index cbef23b1a..c99fd433e 100644 --- a/contrib/rpc/grpcx/grpcx_grpc_server.go +++ b/contrib/rpc/grpcx/grpcx_grpc_server.go @@ -21,7 +21,6 @@ import ( "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/net/gipv4" "github.com/gogf/gf/v2/net/gsvc" - "github.com/gogf/gf/v2/net/gtcp" "github.com/gogf/gf/v2/os/gctx" "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/os/gproc" @@ -37,6 +36,7 @@ type GrpcServer struct { services []gsvc.Service waitGroup sync.WaitGroup registrar gsvc.Registrar + serviceMu sync.Mutex } // Service implements gsvc.Service interface. @@ -59,11 +59,7 @@ func (s modServer) New(conf ...*GrpcServerConfig) *GrpcServer { config = s.NewConfig() } if config.Address == "" { - randomPort, err := gtcp.GetFreePort() - if err != nil { - g.Log().Fatalf(ctx, `%+v`, err) - } - config.Address = fmt.Sprintf(`:%d`, randomPort) + config.Address = defaultListenAddress } if !gstr.Contains(config.Address, ":") { g.Log().Fatal(ctx, "invalid service address, should contain listening port") @@ -94,6 +90,8 @@ func (s modServer) New(conf ...*GrpcServerConfig) *GrpcServer { // Service binds service list to current server. // Server will automatically register the service list after it starts. func (s *GrpcServer) Service(services ...gsvc.Service) { + s.serviceMu.Lock() + defer s.serviceMu.Unlock() s.services = append(s.services, services...) } @@ -148,6 +146,8 @@ func (s *GrpcServer) doServiceRegister() { if s.registrar == nil { return } + s.serviceMu.Lock() + defer s.serviceMu.Unlock() if len(s.services) == 0 { s.services = []gsvc.Service{&gsvc.LocalService{ Name: s.config.Name, @@ -186,6 +186,8 @@ func (s *GrpcServer) doServiceDeregister() { if s.registrar == nil { return } + s.serviceMu.Lock() + defer s.serviceMu.Unlock() var ctx = gctx.GetInitCtx() for _, service := range s.services { s.Logger().Debugf(ctx, `service deregister: %+v`, service) @@ -257,19 +259,17 @@ func (s *GrpcServer) calculateListenedEndpoints(ctx context.Context) gsvc.Endpoi ) if len(addrArray) == 1 { configItemName := "address" - if len(addresses) != 0 { + if len(s.config.Endpoints) != 0 { configItemName = "endpoint" } panic(gerror.NewCodef( gcode.CodeInvalidConfiguration, - `invalid %s configuration "%s", missing port`, + `invalid "%s" configuration "%s", missing port`, configItemName, address, )) } // IPs. switch addrArray[0] { - case "127.0.0.1": - // Nothing to do. case "0.0.0.0", "": intranetIps, err := gipv4.GetIntranetIpArray() if err != nil { @@ -305,7 +305,10 @@ func (s *GrpcServer) calculateListenedEndpoints(ctx context.Context) gsvc.Endpoi } for _, ip := range listenedIps { for _, port := range listenedPorts { - endpoints = append(endpoints, gsvc.NewEndpoint(fmt.Sprintf(`%s:%d`, ip, port))) + endpoints = append( + endpoints, + gsvc.NewEndpoint(fmt.Sprintf(`%s:%d`, ip, port)), + ) } } } diff --git a/contrib/rpc/grpcx/grpcx_grpc_server_config.go b/contrib/rpc/grpcx/grpcx_grpc_server_config.go index 42bd08ae3..bcc24b121 100644 --- a/contrib/rpc/grpcx/grpcx_grpc_server_config.go +++ b/contrib/rpc/grpcx/grpcx_grpc_server_config.go @@ -18,7 +18,7 @@ import ( // GrpcServerConfig is the configuration for server. type GrpcServerConfig struct { - Address string // (optional) Address for server listening. + Address string // (optional) Single address for server listening, use `:0` or `ip:0` to serve random port. Name string // (optional) Name for current service. Logger *glog.Logger // (optional) Logger for server. LogPath string // (optional) LogPath specifies the directory for storing logging files. diff --git a/contrib/rpc/grpcx/grpcx_grpc_server_config_test.go b/contrib/rpc/grpcx/grpcx_grpc_server_config_test.go index 7049205c5..21ace494f 100644 --- a/contrib/rpc/grpcx/grpcx_grpc_server_config_test.go +++ b/contrib/rpc/grpcx/grpcx_grpc_server_config_test.go @@ -1,11 +1,47 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + package grpcx import ( "testing" + "time" "github.com/gogf/gf/v2/test/gtest" + "github.com/gogf/gf/v2/text/gstr" ) +func Test_Grpcx_Grpc_Server(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + s := Server.New() + s.Start() + time.Sleep(time.Millisecond * 100) + defer s.Stop() + s.serviceMu.Lock() + defer s.serviceMu.Unlock() + t.Assert(len(s.services) != 0, true) + }) +} + +func Test_Grpcx_Grpc_Server_Address(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + c := Server.NewConfig() + c.Address = "127.0.0.1:0" + s := Server.New(c) + s.Start() + time.Sleep(time.Millisecond * 100) + defer s.Stop() + + s.serviceMu.Lock() + defer s.serviceMu.Unlock() + t.Assert(len(s.services) != 0, true) + t.Assert(gstr.Contains(s.services[0].GetEndpoints().String(), "127.0.0.1:"), true) + }) +} + func Test_Grpcx_Grpc_Server_Config(t *testing.T) { cfg := Server.NewConfig() addr := "10.0.0.29:80" diff --git a/os/gproc/gproc_signal.go b/os/gproc/gproc_signal.go index d8f690b0d..8c1e10e42 100644 --- a/os/gproc/gproc_signal.go +++ b/os/gproc/gproc_signal.go @@ -24,6 +24,7 @@ var ( // Use internal variable to guarantee concurrent safety // when multiple Listen happen. signalChan = make(chan os.Signal, 1) + signalHandlerMu sync.Mutex signalHandlerMap = make(map[os.Signal][]SigHandler) shutdownSignalMap = map[os.Signal]struct{}{ syscall.SIGINT: {}, @@ -42,6 +43,8 @@ func init() { // AddSigHandler adds custom signal handler for custom one or more signals. func AddSigHandler(handler SigHandler, signals ...os.Signal) { + signalHandlerMu.Lock() + defer signalHandlerMu.Unlock() for _, sig := range signals { signalHandlerMap[sig] = append(signalHandlerMap[sig], handler) } @@ -54,6 +57,8 @@ func AddSigHandler(handler SigHandler, signals ...os.Signal) { // syscall.SIGTERM, // syscall.SIGABRT. func AddSigHandlerShutdown(handler ...SigHandler) { + signalHandlerMu.Lock() + defer signalHandlerMu.Unlock() for _, h := range handler { for sig := range shutdownSignalMap { signalHandlerMap[sig] = append(signalHandlerMap[sig], h) @@ -64,19 +69,16 @@ func AddSigHandlerShutdown(handler ...SigHandler) { // Listen blocks and does signal listening and handling. func Listen() { var ( - signals = make([]os.Signal, 0) + signals = getHandlerSignals() ctx = context.Background() wg = sync.WaitGroup{} sig os.Signal ) - for s := range signalHandlerMap { - signals = append(signals, s) - } signal.Notify(signalChan, signals...) for { sig = <-signalChan intlog.Printf(ctx, `signal received: %s`, sig.String()) - if handlers, ok := signalHandlerMap[sig]; ok { + if handlers := getHandlersBySignal(sig); len(handlers) > 0 { for _, handler := range handlers { wg.Add(1) var ( @@ -105,3 +107,19 @@ func Listen() { } } } + +func getHandlerSignals() []os.Signal { + signalHandlerMu.Lock() + defer signalHandlerMu.Unlock() + var signals = make([]os.Signal, 0) + for s := range signalHandlerMap { + signals = append(signals, s) + } + return signals +} + +func getHandlersBySignal(sig os.Signal) []SigHandler { + signalHandlerMu.Lock() + defer signalHandlerMu.Unlock() + return signalHandlerMap[sig] +}