diff --git a/contrib/registry/etcd/etcd.go b/contrib/registry/etcd/etcd.go index 45201a299..d4b12870b 100644 --- a/contrib/registry/etcd/etcd.go +++ b/contrib/registry/etcd/etcd.go @@ -3,8 +3,11 @@ package etcd import ( "time" + "github.com/gogf/gf/v2/errors/gcode" + "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/net/gsvc" "github.com/gogf/gf/v2/os/glog" + "github.com/gogf/gf/v2/text/gstr" etcd3 "go.etcd.io/etcd/client/v3" ) @@ -30,3 +33,60 @@ type Option struct { const ( DefaultKeepAliveTTL = 10 * time.Second ) + +func New(address string, option ...Option) (*Registry, error) { + endpoints := gstr.SplitAndTrim(address, ",") + if len(endpoints) == 0 { + return nil, gerror.NewCodef(gcode.CodeInvalidParameter, `invalid etcd address "%s"`, address) + } + client, err := etcd3.New(etcd3.Config{ + Endpoints: endpoints, + }) + if err != nil { + return nil, gerror.Wrap(err, `create etcd client failed`) + } + return NewWithClient(client, option...), nil +} + +func NewWithClient(client *etcd3.Client, option ...Option) *Registry { + r := &Registry{ + client: client, + kv: etcd3.NewKV(client), + } + if len(option) > 0 { + r.logger = option[0].Logger + r.keepaliveTTL = option[0].KeepaliveTTL + } + if r.keepaliveTTL == 0 { + r.keepaliveTTL = DefaultKeepAliveTTL + } + return r +} + +// extractResponseToServices extracts etcd watch response context to service list. +func extractResponseToServices(res *etcd3.GetResponse) ([]*gsvc.Service, error) { + if res == nil || res.Kvs == nil { + return nil, nil + } + var ( + services []*gsvc.Service + serviceKey string + serviceMap = make(map[string]*gsvc.Service) + ) + for _, kv := range res.Kvs { + service, err := gsvc.NewServiceWithKV(kv.Key, kv.Value) + if err != nil { + return services, err + } + if service != nil { + serviceKey = service.KeyWithoutEndpoints() + if s, ok := serviceMap[serviceKey]; ok { + s.Endpoints = append(s.Endpoints, service.Endpoints...) + } else { + serviceMap[serviceKey] = service + services = append(services, service) + } + } + } + return services, nil +} diff --git a/contrib/registry/etcd/etcd_discovery.go b/contrib/registry/etcd/etcd_discovery.go new file mode 100644 index 000000000..c20178621 --- /dev/null +++ b/contrib/registry/etcd/etcd_discovery.go @@ -0,0 +1,42 @@ +package etcd + +import ( + "context" + + "github.com/gogf/gf/v2/net/gsvc" + etcd3 "go.etcd.io/etcd/client/v3" +) + +func (r *Registry) Search(ctx context.Context, in gsvc.SearchInput) ([]*gsvc.Service, error) { + res, err := r.kv.Get(ctx, in.Key(), etcd3.WithPrefix()) + if err != nil { + return nil, err + } + services, err := extractResponseToServices(res) + if err != nil { + return nil, err + } + // Service filter. + filteredServices := make([]*gsvc.Service, 0) + for _, v := range services { + if in.Deployment != "" && in.Deployment != v.Deployment { + continue + } + if in.Namespace != "" && in.Namespace != v.Namespace { + continue + } + if in.Name != "" && in.Name != v.Name { + continue + } + if in.Version != "" && in.Version != v.Version { + continue + } + service := v + filteredServices = append(filteredServices, service) + } + return filteredServices, nil +} + +func (r *Registry) Watch(ctx context.Context, key string) (gsvc.Watcher, error) { + return newWatcher(ctx, key, r.client) +} diff --git a/contrib/registry/etcd/etcd_registry.go b/contrib/registry/etcd/etcd_registry.go index 6db312482..7d6d56145 100644 --- a/contrib/registry/etcd/etcd_registry.go +++ b/contrib/registry/etcd/etcd_registry.go @@ -3,42 +3,10 @@ package etcd import ( "context" - "github.com/gogf/gf/v2/errors/gcode" - "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/net/gsvc" - "github.com/gogf/gf/v2/text/gstr" etcd3 "go.etcd.io/etcd/client/v3" ) -func New(address string, option ...Option) (*Registry, error) { - endpoints := gstr.SplitAndTrim(address, ",") - if len(endpoints) == 0 { - return nil, gerror.NewCodef(gcode.CodeInvalidParameter, `invalid etcd address "%s"`, address) - } - client, err := etcd3.New(etcd3.Config{ - Endpoints: endpoints, - }) - if err != nil { - return nil, gerror.Wrap(err, `create etcd client failed`) - } - return NewWithClient(client, option...), nil -} - -func NewWithClient(client *etcd3.Client, option ...Option) *Registry { - r := &Registry{ - client: client, - kv: etcd3.NewKV(client), - } - if len(option) > 0 { - r.logger = option[0].Logger - r.keepaliveTTL = option[0].KeepaliveTTL - } - if r.keepaliveTTL == 0 { - r.keepaliveTTL = DefaultKeepAliveTTL - } - return r -} - func (r *Registry) Register(ctx context.Context, service *gsvc.Service) error { r.lease = etcd3.NewLease(r.client) grant, err := r.lease.Grant(ctx, int64(r.keepaliveTTL.Seconds())) @@ -67,40 +35,6 @@ func (r *Registry) Deregister(ctx context.Context, service *gsvc.Service) error return err } -func (r *Registry) Search(ctx context.Context, in gsvc.SearchInput) ([]*gsvc.Service, error) { - res, err := r.kv.Get(ctx, in.Key(), etcd3.WithPrefix()) - if err != nil { - return nil, err - } - services, err := extractResponseToServices(res) - if err != nil { - return nil, err - } - // Service filter. - filteredServices := make([]*gsvc.Service, 0) - for _, v := range services { - if in.Deployment != "" && in.Deployment != v.Deployment { - continue - } - if in.Namespace != "" && in.Namespace != v.Namespace { - continue - } - if in.Name != "" && in.Name != v.Name { - continue - } - if in.Version != "" && in.Version != v.Version { - continue - } - service := v - filteredServices = append(filteredServices, service) - } - return filteredServices, nil -} - -func (r *Registry) Watch(ctx context.Context, key string) (gsvc.Watcher, error) { - return newWatcher(ctx, key, r.client) -} - // doKeepAlive continuously keeps alive the lease from ETCD. func (r *Registry) doKeepAlive( ctx context.Context, leaseID etcd3.LeaseID, keepAliceCh <-chan *etcd3.LeaseKeepAliveResponse, @@ -122,21 +56,3 @@ func (r *Registry) doKeepAlive( } } } - -// extractResponseToServices extracts etcd watch response context to service list. -func extractResponseToServices(res *etcd3.GetResponse) ([]*gsvc.Service, error) { - var services []*gsvc.Service - if res == nil || res.Kvs == nil { - return services, nil - } - for _, kv := range res.Kvs { - service, err := gsvc.NewServiceFromKV(kv.Key, kv.Value) - if err != nil { - return services, err - } - if service != nil { - services = append(services, service) - } - } - return services, nil -} diff --git a/contrib/resolver/resolver_builder.go b/contrib/resolver/resolver_builder.go index e980fa92e..e64571148 100644 --- a/contrib/resolver/resolver_builder.go +++ b/contrib/resolver/resolver_builder.go @@ -8,18 +8,14 @@ import ( "google.golang.org/grpc/resolver" ) -const Name = "katyusha" +const Name = "GoFrameResolver" type Builder struct { - registry gsvc.Registry } // NewBuilder creates a builder which is used to factory registry resolvers. -func NewBuilder(registry gsvc.Registry) resolver.Builder { - b := &Builder{ - registry: registry, - } - return b +func NewBuilder() resolver.Builder { + return &Builder{} } func (b *Builder) Build( @@ -30,7 +26,7 @@ func (b *Builder) Build( watcher gsvc.Watcher ctx, cancel = context.WithCancel(context.Background()) ) - if watcher, err = b.registry.Watch(ctx, target.URL.Path); err != nil { + if watcher, err = gsvc.Watch(ctx, target.URL.Path); err != nil { cancel() return nil, gerror.Wrap(err, `registry.Watch failed`) } diff --git a/contrib/resolver/resolver_resolver.go b/contrib/resolver/resolver_resolver.go index 9b480b6c2..45f625471 100644 --- a/contrib/resolver/resolver_resolver.go +++ b/contrib/resolver/resolver_resolver.go @@ -4,7 +4,6 @@ import ( "context" "time" - "github.com/gogf/gf/v2/container/gset" "github.com/gogf/gf/v2/contrib/balancer" "github.com/gogf/gf/v2/net/gsvc" "github.com/gogf/gf/v2/os/glog" @@ -40,21 +39,19 @@ func (r *Resolver) watch() { func (r *Resolver) update(services []*gsvc.Service) { var ( - err error - addresses = make([]resolver.Address, 0) - addressSet = gset.NewStrSet() + err error + addresses = make([]resolver.Address, 0) ) for _, service := range services { - if !addressSet.AddIfNotExist(service.Address) { - continue + for _, endpoint := range service.Endpoints { + addr := resolver.Address{ + Addr: endpoint, + ServerName: service.Name, + Attributes: newAttributesFromMetadata(service.Metadata), + } + addr.Attributes = addr.Attributes.WithValue(balancer.RawSvcKeyInSubConnInfo, service) + addresses = append(addresses, addr) } - addr := resolver.Address{ - Addr: service.Address, - ServerName: service.Name, - Attributes: newAttributesFromMetadata(service.Metadata), - } - addr.Attributes = addr.Attributes.WithValue(balancer.RawSvcKeyInSubConnInfo, service) - addresses = append(addresses, addr) } if len(addresses) == 0 { r.logger.Noticef(r.ctx, "empty addresses parsed from: %+v", services) diff --git a/net/gsvc/gsvc.go b/net/gsvc/gsvc.go index 136758bc1..5f79a20cc 100644 --- a/net/gsvc/gsvc.go +++ b/net/gsvc/gsvc.go @@ -9,16 +9,27 @@ package gsvc import ( "context" + + "github.com/gogf/gf/v2/errors/gerror" ) // Registry interface for service. type Registry interface { + Registrar + Discovery +} + +// Registrar interface for service registrar. +type Registrar interface { // Register registers `service` to Registry. Register(ctx context.Context, service *Service) error // Deregister off-lines and removes `service` from Registry. Deregister(ctx context.Context, service *Service) error +} +// Discovery interface for service discovery. +type Discovery interface { // Search searches and returns services with specified condition. Search(ctx context.Context, in SearchInput) ([]*Service, error) @@ -42,7 +53,7 @@ type Service struct { Namespace string // Service Namespace, to indicate different service in the same environment with the same Name. Name string // Name for the service. Version string // Service version, eg: v1.0.0, v2.1.1, etc. - Address string // Service address, single one, pattern: IP:port, eg: 192.168.1.2:8000. + Endpoints []string // Service Endpoints, pattern: IP:port, eg: 192.168.1.2:8000. Metadata map[string]interface{} // Custom data for this service, which can be set using JSON by environment or command-line. } @@ -63,3 +74,30 @@ type WatchInput struct { Name string // Name for the service. Version string // Service version, eg: v1.0.0, v2.1.1, etc.} } + +const ( + DefaultPrefix = `goframe` + DefaultDeployment = `default` + DefaultNamespace = `default` + DefaultVersion = `latest` +) + +const ( + EnvPrefix = `GF_GSVC_PREFIX` + EnvDeployment = `GF_GSVC_DEPLOYMENT` + EnvNamespace = `GF_GSVC_NAMESPACE` + EnvName = `GF_GSVC_Name` + EnvVersion = `GF_GSVC_VERSION` +) + +var ( + defaultRegistry Registry +) + +// SetRegistry sets the default Registry implements as your own implemented interface. +func SetRegistry(registry Registry) { + if registry == nil { + panic(gerror.New(`invalid Registry value "nil" given`)) + } + defaultRegistry = registry +} diff --git a/net/gsvc/gsvc_discovery.go b/net/gsvc/gsvc_discovery.go new file mode 100644 index 000000000..eea085fad --- /dev/null +++ b/net/gsvc/gsvc_discovery.go @@ -0,0 +1,92 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +package gsvc + +import ( + "context" + "time" + + "github.com/gogf/gf/v2/container/gmap" + "github.com/gogf/gf/v2/errors/gcode" + "github.com/gogf/gf/v2/errors/gerror" + "github.com/gogf/gf/v2/os/glog" +) + +var ( + watchedServiceMap = gmap.New(true) +) + +func Get(ctx context.Context, name string) (service *Service, err error) { + v := watchedServiceMap.GetOrSetFuncLock(name, func() interface{} { + var ( + s = NewServiceWithName(name) + services []*Service + ) + services, err = Search(ctx, SearchInput{ + Prefix: s.Prefix, + Deployment: s.Deployment, + Namespace: s.Namespace, + Name: s.Name, + Version: s.Version, + }) + if err != nil { + return nil + } + if len(services) == 0 { + err = gerror.NewCodef(gcode.CodeNotFound, `service not found with name "%s"`, name) + return nil + } + service = services[0] + // Watch the service changes in goroutine. + go watchAndUpdateService(ctx, service) + return service + }) + if v != nil { + service = v.(*Service) + } + return +} + +func watchAndUpdateService(ctx context.Context, service *Service) { + var ( + err error + watcher Watcher + services []*Service + ) + for { + time.Sleep(time.Second) + watcher, err = Watch(ctx, service.KeyWithoutEndpoints()) + if err != nil { + glog.Error(ctx, err) + continue + } + services, err = watcher.Proceed() + if err != nil { + glog.Error(ctx, err) + continue + } + if len(services) > 0 { + watchedServiceMap.Set(service.Name, services[0]) + } + } +} + +// Search searches and returns services with specified condition. +func Search(ctx context.Context, in SearchInput) ([]*Service, error) { + if defaultRegistry == nil { + return nil, gerror.NewCodef(gcode.CodeNotImplemented, `no Registry is registered`) + } + return defaultRegistry.Search(ctx, in) +} + +// Watch watches specified condition changes. +func Watch(ctx context.Context, key string) (Watcher, error) { + if defaultRegistry == nil { + return nil, gerror.NewCodef(gcode.CodeNotImplemented, `no Registry is registered`) + } + return defaultRegistry.Watch(ctx, key) +} diff --git a/net/gsvc/gsvc_registry.go b/net/gsvc/gsvc_registry.go index 9e1c904b0..06638b777 100644 --- a/net/gsvc/gsvc_registry.go +++ b/net/gsvc/gsvc_registry.go @@ -9,35 +9,22 @@ package gsvc import ( "context" + "github.com/gogf/gf/v2/errors/gcode" "github.com/gogf/gf/v2/errors/gerror" ) -var defaultRegistry Registry - -// SetRegistry sets the default Registry implements as your own implemented interface. -func SetRegistry(registry Registry) { - if registry == nil { - panic(gerror.New(`invalid Registry value "nil" given`)) - } - defaultRegistry = registry -} - // Register registers `service` to default registry.. func Register(ctx context.Context, service *Service) error { + if defaultRegistry == nil { + return gerror.NewCodef(gcode.CodeNotImplemented, `no Registry is registered`) + } return defaultRegistry.Register(ctx, service) } // Deregister removes `service` from default registry. func Deregister(ctx context.Context, service *Service) error { + if defaultRegistry == nil { + return gerror.NewCodef(gcode.CodeNotImplemented, `no Registry is registered`) + } return defaultRegistry.Deregister(ctx, service) } - -// Search searches and returns services with specified condition. -func Search(ctx context.Context, in SearchInput) ([]*Service, error) { - return defaultRegistry.Search(ctx, in) -} - -// Watch watches specified condition changes. -func Watch(ctx context.Context, key string) (Watcher, error) { - return defaultRegistry.Watch(ctx, key) -} diff --git a/net/gsvc/gsvc_service.go b/net/gsvc/gsvc_service.go index 1c1f77d21..96e16ce38 100644 --- a/net/gsvc/gsvc_service.go +++ b/net/gsvc/gsvc_service.go @@ -13,19 +13,27 @@ import ( "github.com/gogf/gf/v2/errors/gcode" "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/internal/intlog" + "github.com/gogf/gf/v2/os/gcmd" "github.com/gogf/gf/v2/text/gstr" ) const ( - DefaultPrefix = `goframe` - DefaultDeployment = `default` - DefaultNamespace = `default` - DefaultVersion = `latest` + separator = "/" ) -// NewServiceFromKV creates and returns service from `key` and `value`. -func NewServiceFromKV(key, value []byte) (s *Service, err error) { - array := gstr.Split(string(key), "/") +// NewServiceWithName creates and returns service from `name`. +func NewServiceWithName(name string) (s *Service) { + s = &Service{ + Name: name, + Metadata: make(map[string]interface{}), + } + s.autoFillDefaultAttributes() + return s +} + +// NewServiceWithKV creates and returns service from `key` and `value`. +func NewServiceWithKV(key, value []byte) (s *Service, err error) { + array := gstr.Split(string(key), separator) if len(array) < 6 { return nil, gerror.NewCodef(gcode.CodeInvalidParameter, `invalid service key "%s"`, key) } @@ -35,7 +43,7 @@ func NewServiceFromKV(key, value []byte) (s *Service, err error) { Namespace: array[2], Name: array[3], Version: array[4], - Address: array[5], + Endpoints: gstr.Split(array[5], ","), Metadata: make(map[string]interface{}), } s.autoFillDefaultAttributes() @@ -49,6 +57,13 @@ func NewServiceFromKV(key, value []byte) (s *Service, err error) { // Key formats the service information and returns the Service as registering key. func (s *Service) Key() string { + serviceNameUnique := s.KeyWithoutEndpoints() + serviceNameUnique += separator + gstr.Join(s.Endpoints, ",") + return serviceNameUnique +} + +// KeyWithoutEndpoints formats the service information and returns a string as unique name of service. +func (s *Service) KeyWithoutEndpoints() string { s.autoFillDefaultAttributes() return gstr.Join([]string{ s.Prefix, @@ -56,8 +71,7 @@ func (s *Service) Key() string { s.Namespace, s.Name, s.Version, - s.Address, - }, "/") + }, separator) } func (s *Service) Value() string { @@ -70,15 +84,18 @@ func (s *Service) Value() string { func (s *Service) autoFillDefaultAttributes() { if s.Prefix == "" { - s.Prefix = DefaultPrefix + s.Prefix = gcmd.GetOptWithEnv(EnvPrefix, DefaultPrefix).String() } if s.Deployment == "" { - s.Deployment = DefaultDeployment + s.Deployment = gcmd.GetOptWithEnv(EnvDeployment, DefaultDeployment).String() } if s.Namespace == "" { - s.Namespace = DefaultNamespace + s.Namespace = gcmd.GetOptWithEnv(EnvNamespace, DefaultNamespace).String() } if s.Version == "" { - s.Version = DefaultVersion + s.Version = gcmd.GetOptWithEnv(EnvVersion, DefaultVersion).String() + } + if s.Name == "" { + s.Name = gcmd.GetOptWithEnv(EnvName).String() } }