improve auto retrieving of ip addresses for service registering (#2593)

This commit is contained in:
John Guo
2023-04-23 21:58:17 +08:00
committed by GitHub
parent 23d8ef32a3
commit 9171585b2c
6 changed files with 185 additions and 50 deletions

View File

@ -0,0 +1,50 @@
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.
package file_test
import (
"fmt"
"testing"
"time"
"github.com/gogf/gf/contrib/registry/file/v2"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/ghttp"
"github.com/gogf/gf/v2/net/gsvc"
"github.com/gogf/gf/v2/os/gctx"
"github.com/gogf/gf/v2/os/gfile"
"github.com/gogf/gf/v2/test/gtest"
"github.com/gogf/gf/v2/util/guid"
)
var ctx = gctx.GetInitCtx()
func Test_HTTP_Registry(t *testing.T) {
var (
svcName = guid.S()
dirPath = gfile.Temp(svcName)
)
defer gfile.Remove(dirPath)
gsvc.SetRegistry(file.New(dirPath))
s := g.Server(svcName)
s.BindHandler("/http-registry", func(r *ghttp.Request) {
r.Response.Write(svcName)
})
s.SetDumpRouterMap(false)
s.Start()
defer s.Shutdown()
time.Sleep(100 * time.Millisecond)
gtest.C(t, func(t *gtest.T) {
client := g.Client()
client.SetPrefix(fmt.Sprintf("http://%s", svcName))
// GET
t.Assert(client.GetContent(ctx, "/http-registry"), svcName)
})
}

View File

@ -7,6 +7,7 @@
package grpcx
import (
"context"
"fmt"
"net"
"os"
@ -25,6 +26,7 @@ import (
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/os/gproc"
"github.com/gogf/gf/v2/text/gstr"
"github.com/gogf/gf/v2/util/gconv"
)
// GrpcServer is the server for GRPC protocol.
@ -104,19 +106,19 @@ func (s *GrpcServer) Run() {
// Create listener to bind listening ip and port.
s.listener, err = net.Listen("tcp", s.config.Address)
if err != nil {
s.config.Logger.Fatalf(ctx, `%+v`, err)
s.Logger().Fatalf(ctx, `%+v`, err)
}
// Start listening.
go func() {
if err = s.Server.Serve(s.listener); err != nil {
s.config.Logger.Fatalf(ctx, `%+v`, err)
s.Logger().Fatalf(ctx, `%+v`, err)
}
}()
// Service register.
s.doServiceRegister()
s.config.Logger.Infof(
s.Logger().Infof(
ctx,
"pid[%d]: grpc server started listening on [%s]",
gproc.Pid(), s.GetListenedAddress(),
@ -147,7 +149,7 @@ func (s *GrpcServer) doSignalListen() {
syscall.SIGKILL,
syscall.SIGTERM,
syscall.SIGABRT:
s.config.Logger.Infof(ctx, "signal received: %s, gracefully shutting down", sig.String())
s.Logger().Infof(ctx, "signal received: %s, gracefully shutting down", sig.String())
s.doServiceDeregister()
time.Sleep(time.Second)
s.Stop()
@ -156,6 +158,11 @@ func (s *GrpcServer) doSignalListen() {
}
}
// Logger is alias of GetLogger.
func (s *GrpcServer) Logger() *glog.Logger {
return s.config.Logger
}
// doServiceRegister registers current service to Registry.
func (s *GrpcServer) doServiceRegister() {
if s.registrar == nil {
@ -176,15 +183,19 @@ func (s *GrpcServer) doServiceRegister() {
for i, service := range s.services {
service = &gsvc.LocalService{
Name: service.GetName(),
Endpoints: s.calculateListenedEndpoints(),
Endpoints: s.calculateListenedEndpoints(ctx),
Metadata: service.GetMetadata(),
}
service.GetMetadata().Sets(gsvc.Metadata{
gsvc.MDProtocol: protocol,
})
s.config.Logger.Debugf(ctx, `service register: %+v`, service)
s.Logger().Debugf(ctx, `service register: %+v`, service)
if len(service.GetEndpoints()) == 0 {
s.Logger().Warningf(ctx, `no endpoints found to register service, abort service registering`)
return
}
if service, err = s.registrar.Register(ctx, service); err != nil {
s.config.Logger.Fatalf(ctx, `%+v`, err)
s.Logger().Fatalf(ctx, `%+v`, err)
}
s.services[i] = service
}
@ -197,9 +208,9 @@ func (s *GrpcServer) doServiceDeregister() {
}
var ctx = gctx.GetInitCtx()
for _, service := range s.services {
s.config.Logger.Debugf(ctx, `service deregister: %+v`, service)
s.Logger().Debugf(ctx, `service deregister: %+v`, service)
if err := s.registrar.Deregister(ctx, service); err != nil {
s.config.Logger.Errorf(ctx, `%+v`, err)
s.Logger().Errorf(ctx, `%+v`, err)
}
}
}
@ -249,22 +260,59 @@ func (s *GrpcServer) GetListenedPort() int {
return -1
}
func (s *GrpcServer) calculateListenedEndpoints() gsvc.Endpoints {
func (s *GrpcServer) calculateListenedEndpoints(ctx context.Context) gsvc.Endpoints {
var (
address = s.config.Address
endpoints = make(gsvc.Endpoints, 0)
listenedPort = s.GetListenedPort()
listenedIps []string
configAddr = s.config.Address
endpoints = make(gsvc.Endpoints, 0)
)
var addrArray = gstr.Split(address, ":")
switch addrArray[0] {
case "0.0.0.0", "":
listenedIps = []string{gipv4.MustGetIntranetIp()}
default:
listenedIps = []string{addrArray[0]}
}
for _, ip := range listenedIps {
endpoints = append(endpoints, gsvc.NewEndpoint(fmt.Sprintf(`%s:%d`, ip, listenedPort)))
for _, address := range gstr.SplitAndTrim(configAddr, ",") {
var (
addrArray = gstr.Split(address, ":")
listenedIps []string
listenedPorts []int
)
// IPs.
switch addrArray[0] {
case "127.0.0.1":
// Nothing to do.
case "0.0.0.0", "":
intranetIps, err := gipv4.GetIntranetIpArray()
if err != nil {
s.Logger().Errorf(ctx, `error retrieving intranet ip: %+v`, err)
return nil
}
// If no intranet ips found, it uses all ips that can be retrieved,
// it may include internet ip.
if len(intranetIps) == 0 {
allIps, err := gipv4.GetIpArray()
if err != nil {
s.Logger().Errorf(ctx, `error retrieving ip from current node: %+v`, err)
return nil
}
s.Logger().Noticef(
ctx,
`no intranet ip found, using internet ip to register service: %v`,
allIps,
)
listenedIps = allIps
break
}
listenedIps = intranetIps
default:
listenedIps = []string{addrArray[0]}
}
// Ports.
switch addrArray[1] {
case "0":
listenedPorts = []int{s.GetListenedPort()}
default:
listenedPorts = []int{gconv.Int(addrArray[1])}
}
for _, ip := range listenedIps {
for _, port := range listenedPorts {
endpoints = append(endpoints, gsvc.NewEndpoint(fmt.Sprintf(`%s:%d`, ip, port)))
}
}
}
return endpoints
}

View File

@ -518,9 +518,9 @@ func (s *Server) startServer(fdMap listenerFdMap) {
}
var array []string
if v, ok := fdMap["http"]; ok && len(v) > 0 {
array = strings.Split(v, ",")
array = gstr.SplitAndTrim(v, ",")
} else {
array = strings.Split(s.config.Address, ",")
array = gstr.SplitAndTrim(s.config.Address, ",")
}
for _, v := range array {
if len(v) == 0 {
@ -555,7 +555,9 @@ func (s *Server) startServer(fdMap listenerFdMap) {
var err error
// Create listener.
if server.isHttps {
err = server.CreateListenerTLS(s.config.HTTPSCertPath, s.config.HTTPSKeyPath, s.config.TLSConfig)
err = server.CreateListenerTLS(
s.config.HTTPSCertPath, s.config.HTTPSKeyPath, s.config.TLSConfig,
)
} else {
err = server.CreateListener()
}

View File

@ -186,6 +186,7 @@ func (s *gracefulServer) GetListenedAddress() string {
}
// GetListenedPort retrieves and returns one port which is listened to by current server.
// Note that this method is only available if the server is listening on one port.
func (s *gracefulServer) GetListenedPort() int {
if ln := s.getRawListener(); ln != nil {
return ln.Addr().(*net.TCPAddr).Port

View File

@ -7,6 +7,7 @@
package ghttp
import (
"context"
"fmt"
"github.com/gogf/gf/v2/net/gipv4"
@ -37,10 +38,14 @@ func (s *Server) doServiceRegister() {
}
s.service = &gsvc.LocalService{
Name: s.GetName(),
Endpoints: s.calculateListenedEndpoints(),
Endpoints: s.calculateListenedEndpoints(ctx),
Metadata: metadata,
}
s.Logger().Debugf(ctx, `service register: %+v`, s.service)
if len(s.service.GetEndpoints()) == 0 {
s.Logger().Warningf(ctx, `no endpoints found to register service, abort service registering`)
return
}
if s.service, err = s.registrar.Register(ctx, s.service); err != nil {
s.Logger().Fatalf(ctx, `%+v`, err)
}
@ -58,32 +63,61 @@ func (s *Server) doServiceDeregister() {
}
}
func (s *Server) calculateListenedEndpoints() gsvc.Endpoints {
func (s *Server) calculateListenedEndpoints(ctx context.Context) gsvc.Endpoints {
var (
address = s.config.Address
endpoints = make(gsvc.Endpoints, 0)
listenedIps []string
listenedPorts []int
configAddr = s.config.Address
endpoints = make(gsvc.Endpoints, 0)
)
if address == "" {
address = s.config.HTTPSAddr
if configAddr == "" {
configAddr = s.config.HTTPSAddr
}
var addrArray = gstr.Split(address, ":")
switch addrArray[0] {
case "0.0.0.0", "":
listenedIps = []string{gipv4.MustGetIntranetIp()}
default:
listenedIps = []string{addrArray[0]}
}
switch addrArray[1] {
case "0":
listenedPorts = s.GetListenedPorts()
default:
listenedPorts = []int{gconv.Int(addrArray[1])}
}
for _, ip := range listenedIps {
for _, port := range listenedPorts {
endpoints = append(endpoints, gsvc.NewEndpoint(fmt.Sprintf(`%s:%d`, ip, port)))
for _, address := range gstr.SplitAndTrim(configAddr, ",") {
var (
addrArray = gstr.Split(address, ":")
listenedIps []string
listenedPorts []int
)
// IPs.
switch addrArray[0] {
case "127.0.0.1":
// Nothing to do.
case "0.0.0.0", "":
intranetIps, err := gipv4.GetIntranetIpArray()
if err != nil {
s.Logger().Errorf(ctx, `error retrieving intranet ip: %+v`, err)
return nil
}
// If no intranet ips found, it uses all ips that can be retrieved,
// it may include internet ip.
if len(intranetIps) == 0 {
allIps, err := gipv4.GetIpArray()
if err != nil {
s.Logger().Errorf(ctx, `error retrieving ip from current node: %+v`, err)
return nil
}
s.Logger().Noticef(
ctx,
`no intranet ip found, using internet ip to register service: %v`,
allIps,
)
listenedIps = allIps
break
}
listenedIps = intranetIps
default:
listenedIps = []string{addrArray[0]}
}
// Ports.
switch addrArray[1] {
case "0":
listenedPorts = s.GetListenedPorts()
default:
listenedPorts = []int{gconv.Int(addrArray[1])}
}
for _, ip := range listenedIps {
for _, port := range listenedPorts {
endpoints = append(endpoints, gsvc.NewEndpoint(fmt.Sprintf(`%s:%d`, ip, port)))
}
}
}
return endpoints