From 9171585b2c1f704faa1f90a70136d59cf3b6cab0 Mon Sep 17 00:00:00 2001 From: John Guo Date: Sun, 23 Apr 2023 21:58:17 +0800 Subject: [PATCH] improve auto retrieving of ip addresses for service registering (#2593) --- .../{file_z_test.go => file_z_basic_test.go} | 0 contrib/registry/file/file_z_http_test.go | 50 ++++++++++ contrib/rpc/grpcx/grpcx_grpc_server.go | 94 ++++++++++++++----- net/ghttp/ghttp_server.go | 8 +- net/ghttp/ghttp_server_graceful.go | 1 + net/ghttp/ghttp_server_registry.go | 82 +++++++++++----- 6 files changed, 185 insertions(+), 50 deletions(-) rename contrib/registry/file/{file_z_test.go => file_z_basic_test.go} (100%) create mode 100644 contrib/registry/file/file_z_http_test.go diff --git a/contrib/registry/file/file_z_test.go b/contrib/registry/file/file_z_basic_test.go similarity index 100% rename from contrib/registry/file/file_z_test.go rename to contrib/registry/file/file_z_basic_test.go diff --git a/contrib/registry/file/file_z_http_test.go b/contrib/registry/file/file_z_http_test.go new file mode 100644 index 000000000..0401ca8af --- /dev/null +++ b/contrib/registry/file/file_z_http_test.go @@ -0,0 +1,50 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +package file_test + +import ( + "fmt" + "testing" + "time" + + "github.com/gogf/gf/contrib/registry/file/v2" + "github.com/gogf/gf/v2/frame/g" + "github.com/gogf/gf/v2/net/ghttp" + "github.com/gogf/gf/v2/net/gsvc" + "github.com/gogf/gf/v2/os/gctx" + "github.com/gogf/gf/v2/os/gfile" + "github.com/gogf/gf/v2/test/gtest" + "github.com/gogf/gf/v2/util/guid" +) + +var ctx = gctx.GetInitCtx() + +func Test_HTTP_Registry(t *testing.T) { + var ( + svcName = guid.S() + dirPath = gfile.Temp(svcName) + ) + defer gfile.Remove(dirPath) + gsvc.SetRegistry(file.New(dirPath)) + + s := g.Server(svcName) + s.BindHandler("/http-registry", func(r *ghttp.Request) { + r.Response.Write(svcName) + }) + s.SetDumpRouterMap(false) + s.Start() + defer s.Shutdown() + + time.Sleep(100 * time.Millisecond) + + gtest.C(t, func(t *gtest.T) { + client := g.Client() + client.SetPrefix(fmt.Sprintf("http://%s", svcName)) + // GET + t.Assert(client.GetContent(ctx, "/http-registry"), svcName) + }) +} diff --git a/contrib/rpc/grpcx/grpcx_grpc_server.go b/contrib/rpc/grpcx/grpcx_grpc_server.go index 7831c09b6..ff16dee8e 100644 --- a/contrib/rpc/grpcx/grpcx_grpc_server.go +++ b/contrib/rpc/grpcx/grpcx_grpc_server.go @@ -7,6 +7,7 @@ package grpcx import ( + "context" "fmt" "net" "os" @@ -25,6 +26,7 @@ import ( "github.com/gogf/gf/v2/os/glog" "github.com/gogf/gf/v2/os/gproc" "github.com/gogf/gf/v2/text/gstr" + "github.com/gogf/gf/v2/util/gconv" ) // GrpcServer is the server for GRPC protocol. @@ -104,19 +106,19 @@ func (s *GrpcServer) Run() { // Create listener to bind listening ip and port. s.listener, err = net.Listen("tcp", s.config.Address) if err != nil { - s.config.Logger.Fatalf(ctx, `%+v`, err) + s.Logger().Fatalf(ctx, `%+v`, err) } // Start listening. go func() { if err = s.Server.Serve(s.listener); err != nil { - s.config.Logger.Fatalf(ctx, `%+v`, err) + s.Logger().Fatalf(ctx, `%+v`, err) } }() // Service register. s.doServiceRegister() - s.config.Logger.Infof( + s.Logger().Infof( ctx, "pid[%d]: grpc server started listening on [%s]", gproc.Pid(), s.GetListenedAddress(), @@ -147,7 +149,7 @@ func (s *GrpcServer) doSignalListen() { syscall.SIGKILL, syscall.SIGTERM, syscall.SIGABRT: - s.config.Logger.Infof(ctx, "signal received: %s, gracefully shutting down", sig.String()) + s.Logger().Infof(ctx, "signal received: %s, gracefully shutting down", sig.String()) s.doServiceDeregister() time.Sleep(time.Second) s.Stop() @@ -156,6 +158,11 @@ func (s *GrpcServer) doSignalListen() { } } +// Logger is alias of GetLogger. +func (s *GrpcServer) Logger() *glog.Logger { + return s.config.Logger +} + // doServiceRegister registers current service to Registry. func (s *GrpcServer) doServiceRegister() { if s.registrar == nil { @@ -176,15 +183,19 @@ func (s *GrpcServer) doServiceRegister() { for i, service := range s.services { service = &gsvc.LocalService{ Name: service.GetName(), - Endpoints: s.calculateListenedEndpoints(), + Endpoints: s.calculateListenedEndpoints(ctx), Metadata: service.GetMetadata(), } service.GetMetadata().Sets(gsvc.Metadata{ gsvc.MDProtocol: protocol, }) - s.config.Logger.Debugf(ctx, `service register: %+v`, service) + s.Logger().Debugf(ctx, `service register: %+v`, service) + if len(service.GetEndpoints()) == 0 { + s.Logger().Warningf(ctx, `no endpoints found to register service, abort service registering`) + return + } if service, err = s.registrar.Register(ctx, service); err != nil { - s.config.Logger.Fatalf(ctx, `%+v`, err) + s.Logger().Fatalf(ctx, `%+v`, err) } s.services[i] = service } @@ -197,9 +208,9 @@ func (s *GrpcServer) doServiceDeregister() { } var ctx = gctx.GetInitCtx() for _, service := range s.services { - s.config.Logger.Debugf(ctx, `service deregister: %+v`, service) + s.Logger().Debugf(ctx, `service deregister: %+v`, service) if err := s.registrar.Deregister(ctx, service); err != nil { - s.config.Logger.Errorf(ctx, `%+v`, err) + s.Logger().Errorf(ctx, `%+v`, err) } } } @@ -249,22 +260,59 @@ func (s *GrpcServer) GetListenedPort() int { return -1 } -func (s *GrpcServer) calculateListenedEndpoints() gsvc.Endpoints { +func (s *GrpcServer) calculateListenedEndpoints(ctx context.Context) gsvc.Endpoints { var ( - address = s.config.Address - endpoints = make(gsvc.Endpoints, 0) - listenedPort = s.GetListenedPort() - listenedIps []string + configAddr = s.config.Address + endpoints = make(gsvc.Endpoints, 0) ) - var addrArray = gstr.Split(address, ":") - switch addrArray[0] { - case "0.0.0.0", "": - listenedIps = []string{gipv4.MustGetIntranetIp()} - default: - listenedIps = []string{addrArray[0]} - } - for _, ip := range listenedIps { - endpoints = append(endpoints, gsvc.NewEndpoint(fmt.Sprintf(`%s:%d`, ip, listenedPort))) + for _, address := range gstr.SplitAndTrim(configAddr, ",") { + var ( + addrArray = gstr.Split(address, ":") + listenedIps []string + listenedPorts []int + ) + // IPs. + switch addrArray[0] { + case "127.0.0.1": + // Nothing to do. + case "0.0.0.0", "": + intranetIps, err := gipv4.GetIntranetIpArray() + if err != nil { + s.Logger().Errorf(ctx, `error retrieving intranet ip: %+v`, err) + return nil + } + // If no intranet ips found, it uses all ips that can be retrieved, + // it may include internet ip. + if len(intranetIps) == 0 { + allIps, err := gipv4.GetIpArray() + if err != nil { + s.Logger().Errorf(ctx, `error retrieving ip from current node: %+v`, err) + return nil + } + s.Logger().Noticef( + ctx, + `no intranet ip found, using internet ip to register service: %v`, + allIps, + ) + listenedIps = allIps + break + } + listenedIps = intranetIps + default: + listenedIps = []string{addrArray[0]} + } + // Ports. + switch addrArray[1] { + case "0": + listenedPorts = []int{s.GetListenedPort()} + default: + listenedPorts = []int{gconv.Int(addrArray[1])} + } + for _, ip := range listenedIps { + for _, port := range listenedPorts { + endpoints = append(endpoints, gsvc.NewEndpoint(fmt.Sprintf(`%s:%d`, ip, port))) + } + } } return endpoints } diff --git a/net/ghttp/ghttp_server.go b/net/ghttp/ghttp_server.go index 81f6464c2..03219d53c 100644 --- a/net/ghttp/ghttp_server.go +++ b/net/ghttp/ghttp_server.go @@ -518,9 +518,9 @@ func (s *Server) startServer(fdMap listenerFdMap) { } var array []string if v, ok := fdMap["http"]; ok && len(v) > 0 { - array = strings.Split(v, ",") + array = gstr.SplitAndTrim(v, ",") } else { - array = strings.Split(s.config.Address, ",") + array = gstr.SplitAndTrim(s.config.Address, ",") } for _, v := range array { if len(v) == 0 { @@ -555,7 +555,9 @@ func (s *Server) startServer(fdMap listenerFdMap) { var err error // Create listener. if server.isHttps { - err = server.CreateListenerTLS(s.config.HTTPSCertPath, s.config.HTTPSKeyPath, s.config.TLSConfig) + err = server.CreateListenerTLS( + s.config.HTTPSCertPath, s.config.HTTPSKeyPath, s.config.TLSConfig, + ) } else { err = server.CreateListener() } diff --git a/net/ghttp/ghttp_server_graceful.go b/net/ghttp/ghttp_server_graceful.go index 37dc6b787..7cca44adf 100644 --- a/net/ghttp/ghttp_server_graceful.go +++ b/net/ghttp/ghttp_server_graceful.go @@ -186,6 +186,7 @@ func (s *gracefulServer) GetListenedAddress() string { } // GetListenedPort retrieves and returns one port which is listened to by current server. +// Note that this method is only available if the server is listening on one port. func (s *gracefulServer) GetListenedPort() int { if ln := s.getRawListener(); ln != nil { return ln.Addr().(*net.TCPAddr).Port diff --git a/net/ghttp/ghttp_server_registry.go b/net/ghttp/ghttp_server_registry.go index f682fd19f..bd6c21906 100644 --- a/net/ghttp/ghttp_server_registry.go +++ b/net/ghttp/ghttp_server_registry.go @@ -7,6 +7,7 @@ package ghttp import ( + "context" "fmt" "github.com/gogf/gf/v2/net/gipv4" @@ -37,10 +38,14 @@ func (s *Server) doServiceRegister() { } s.service = &gsvc.LocalService{ Name: s.GetName(), - Endpoints: s.calculateListenedEndpoints(), + Endpoints: s.calculateListenedEndpoints(ctx), Metadata: metadata, } s.Logger().Debugf(ctx, `service register: %+v`, s.service) + if len(s.service.GetEndpoints()) == 0 { + s.Logger().Warningf(ctx, `no endpoints found to register service, abort service registering`) + return + } if s.service, err = s.registrar.Register(ctx, s.service); err != nil { s.Logger().Fatalf(ctx, `%+v`, err) } @@ -58,32 +63,61 @@ func (s *Server) doServiceDeregister() { } } -func (s *Server) calculateListenedEndpoints() gsvc.Endpoints { +func (s *Server) calculateListenedEndpoints(ctx context.Context) gsvc.Endpoints { var ( - address = s.config.Address - endpoints = make(gsvc.Endpoints, 0) - listenedIps []string - listenedPorts []int + configAddr = s.config.Address + endpoints = make(gsvc.Endpoints, 0) ) - if address == "" { - address = s.config.HTTPSAddr + if configAddr == "" { + configAddr = s.config.HTTPSAddr } - var addrArray = gstr.Split(address, ":") - switch addrArray[0] { - case "0.0.0.0", "": - listenedIps = []string{gipv4.MustGetIntranetIp()} - default: - listenedIps = []string{addrArray[0]} - } - switch addrArray[1] { - case "0": - listenedPorts = s.GetListenedPorts() - default: - listenedPorts = []int{gconv.Int(addrArray[1])} - } - for _, ip := range listenedIps { - for _, port := range listenedPorts { - endpoints = append(endpoints, gsvc.NewEndpoint(fmt.Sprintf(`%s:%d`, ip, port))) + for _, address := range gstr.SplitAndTrim(configAddr, ",") { + var ( + addrArray = gstr.Split(address, ":") + listenedIps []string + listenedPorts []int + ) + // IPs. + switch addrArray[0] { + case "127.0.0.1": + // Nothing to do. + case "0.0.0.0", "": + intranetIps, err := gipv4.GetIntranetIpArray() + if err != nil { + s.Logger().Errorf(ctx, `error retrieving intranet ip: %+v`, err) + return nil + } + // If no intranet ips found, it uses all ips that can be retrieved, + // it may include internet ip. + if len(intranetIps) == 0 { + allIps, err := gipv4.GetIpArray() + if err != nil { + s.Logger().Errorf(ctx, `error retrieving ip from current node: %+v`, err) + return nil + } + s.Logger().Noticef( + ctx, + `no intranet ip found, using internet ip to register service: %v`, + allIps, + ) + listenedIps = allIps + break + } + listenedIps = intranetIps + default: + listenedIps = []string{addrArray[0]} + } + // Ports. + switch addrArray[1] { + case "0": + listenedPorts = s.GetListenedPorts() + default: + listenedPorts = []int{gconv.Int(addrArray[1])} + } + for _, ip := range listenedIps { + for _, port := range listenedPorts { + endpoints = append(endpoints, gsvc.NewEndpoint(fmt.Sprintf(`%s:%d`, ip, port))) + } } } return endpoints