improve package gsvc

This commit is contained in:
John Guo
2022-01-26 22:23:54 +08:00
parent 65c385c013
commit 5f87591407
9 changed files with 285 additions and 140 deletions

View File

@ -3,8 +3,11 @@ package etcd
import (
"time"
"github.com/gogf/gf/v2/errors/gcode"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/net/gsvc"
"github.com/gogf/gf/v2/os/glog"
"github.com/gogf/gf/v2/text/gstr"
etcd3 "go.etcd.io/etcd/client/v3"
)
@ -30,3 +33,60 @@ type Option struct {
const (
DefaultKeepAliveTTL = 10 * time.Second
)
func New(address string, option ...Option) (*Registry, error) {
endpoints := gstr.SplitAndTrim(address, ",")
if len(endpoints) == 0 {
return nil, gerror.NewCodef(gcode.CodeInvalidParameter, `invalid etcd address "%s"`, address)
}
client, err := etcd3.New(etcd3.Config{
Endpoints: endpoints,
})
if err != nil {
return nil, gerror.Wrap(err, `create etcd client failed`)
}
return NewWithClient(client, option...), nil
}
func NewWithClient(client *etcd3.Client, option ...Option) *Registry {
r := &Registry{
client: client,
kv: etcd3.NewKV(client),
}
if len(option) > 0 {
r.logger = option[0].Logger
r.keepaliveTTL = option[0].KeepaliveTTL
}
if r.keepaliveTTL == 0 {
r.keepaliveTTL = DefaultKeepAliveTTL
}
return r
}
// extractResponseToServices extracts etcd watch response context to service list.
func extractResponseToServices(res *etcd3.GetResponse) ([]*gsvc.Service, error) {
if res == nil || res.Kvs == nil {
return nil, nil
}
var (
services []*gsvc.Service
serviceKey string
serviceMap = make(map[string]*gsvc.Service)
)
for _, kv := range res.Kvs {
service, err := gsvc.NewServiceWithKV(kv.Key, kv.Value)
if err != nil {
return services, err
}
if service != nil {
serviceKey = service.KeyWithoutEndpoints()
if s, ok := serviceMap[serviceKey]; ok {
s.Endpoints = append(s.Endpoints, service.Endpoints...)
} else {
serviceMap[serviceKey] = service
services = append(services, service)
}
}
}
return services, nil
}

View File

@ -0,0 +1,42 @@
package etcd
import (
"context"
"github.com/gogf/gf/v2/net/gsvc"
etcd3 "go.etcd.io/etcd/client/v3"
)
func (r *Registry) Search(ctx context.Context, in gsvc.SearchInput) ([]*gsvc.Service, error) {
res, err := r.kv.Get(ctx, in.Key(), etcd3.WithPrefix())
if err != nil {
return nil, err
}
services, err := extractResponseToServices(res)
if err != nil {
return nil, err
}
// Service filter.
filteredServices := make([]*gsvc.Service, 0)
for _, v := range services {
if in.Deployment != "" && in.Deployment != v.Deployment {
continue
}
if in.Namespace != "" && in.Namespace != v.Namespace {
continue
}
if in.Name != "" && in.Name != v.Name {
continue
}
if in.Version != "" && in.Version != v.Version {
continue
}
service := v
filteredServices = append(filteredServices, service)
}
return filteredServices, nil
}
func (r *Registry) Watch(ctx context.Context, key string) (gsvc.Watcher, error) {
return newWatcher(ctx, key, r.client)
}

View File

@ -3,42 +3,10 @@ package etcd
import (
"context"
"github.com/gogf/gf/v2/errors/gcode"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/net/gsvc"
"github.com/gogf/gf/v2/text/gstr"
etcd3 "go.etcd.io/etcd/client/v3"
)
func New(address string, option ...Option) (*Registry, error) {
endpoints := gstr.SplitAndTrim(address, ",")
if len(endpoints) == 0 {
return nil, gerror.NewCodef(gcode.CodeInvalidParameter, `invalid etcd address "%s"`, address)
}
client, err := etcd3.New(etcd3.Config{
Endpoints: endpoints,
})
if err != nil {
return nil, gerror.Wrap(err, `create etcd client failed`)
}
return NewWithClient(client, option...), nil
}
func NewWithClient(client *etcd3.Client, option ...Option) *Registry {
r := &Registry{
client: client,
kv: etcd3.NewKV(client),
}
if len(option) > 0 {
r.logger = option[0].Logger
r.keepaliveTTL = option[0].KeepaliveTTL
}
if r.keepaliveTTL == 0 {
r.keepaliveTTL = DefaultKeepAliveTTL
}
return r
}
func (r *Registry) Register(ctx context.Context, service *gsvc.Service) error {
r.lease = etcd3.NewLease(r.client)
grant, err := r.lease.Grant(ctx, int64(r.keepaliveTTL.Seconds()))
@ -67,40 +35,6 @@ func (r *Registry) Deregister(ctx context.Context, service *gsvc.Service) error
return err
}
func (r *Registry) Search(ctx context.Context, in gsvc.SearchInput) ([]*gsvc.Service, error) {
res, err := r.kv.Get(ctx, in.Key(), etcd3.WithPrefix())
if err != nil {
return nil, err
}
services, err := extractResponseToServices(res)
if err != nil {
return nil, err
}
// Service filter.
filteredServices := make([]*gsvc.Service, 0)
for _, v := range services {
if in.Deployment != "" && in.Deployment != v.Deployment {
continue
}
if in.Namespace != "" && in.Namespace != v.Namespace {
continue
}
if in.Name != "" && in.Name != v.Name {
continue
}
if in.Version != "" && in.Version != v.Version {
continue
}
service := v
filteredServices = append(filteredServices, service)
}
return filteredServices, nil
}
func (r *Registry) Watch(ctx context.Context, key string) (gsvc.Watcher, error) {
return newWatcher(ctx, key, r.client)
}
// doKeepAlive continuously keeps alive the lease from ETCD.
func (r *Registry) doKeepAlive(
ctx context.Context, leaseID etcd3.LeaseID, keepAliceCh <-chan *etcd3.LeaseKeepAliveResponse,
@ -122,21 +56,3 @@ func (r *Registry) doKeepAlive(
}
}
}
// extractResponseToServices extracts etcd watch response context to service list.
func extractResponseToServices(res *etcd3.GetResponse) ([]*gsvc.Service, error) {
var services []*gsvc.Service
if res == nil || res.Kvs == nil {
return services, nil
}
for _, kv := range res.Kvs {
service, err := gsvc.NewServiceFromKV(kv.Key, kv.Value)
if err != nil {
return services, err
}
if service != nil {
services = append(services, service)
}
}
return services, nil
}

View File

@ -8,18 +8,14 @@ import (
"google.golang.org/grpc/resolver"
)
const Name = "katyusha"
const Name = "GoFrameResolver"
type Builder struct {
registry gsvc.Registry
}
// NewBuilder creates a builder which is used to factory registry resolvers.
func NewBuilder(registry gsvc.Registry) resolver.Builder {
b := &Builder{
registry: registry,
}
return b
func NewBuilder() resolver.Builder {
return &Builder{}
}
func (b *Builder) Build(
@ -30,7 +26,7 @@ func (b *Builder) Build(
watcher gsvc.Watcher
ctx, cancel = context.WithCancel(context.Background())
)
if watcher, err = b.registry.Watch(ctx, target.URL.Path); err != nil {
if watcher, err = gsvc.Watch(ctx, target.URL.Path); err != nil {
cancel()
return nil, gerror.Wrap(err, `registry.Watch failed`)
}

View File

@ -4,7 +4,6 @@ import (
"context"
"time"
"github.com/gogf/gf/v2/container/gset"
"github.com/gogf/gf/v2/contrib/balancer"
"github.com/gogf/gf/v2/net/gsvc"
"github.com/gogf/gf/v2/os/glog"
@ -40,21 +39,19 @@ func (r *Resolver) watch() {
func (r *Resolver) update(services []*gsvc.Service) {
var (
err error
addresses = make([]resolver.Address, 0)
addressSet = gset.NewStrSet()
err error
addresses = make([]resolver.Address, 0)
)
for _, service := range services {
if !addressSet.AddIfNotExist(service.Address) {
continue
for _, endpoint := range service.Endpoints {
addr := resolver.Address{
Addr: endpoint,
ServerName: service.Name,
Attributes: newAttributesFromMetadata(service.Metadata),
}
addr.Attributes = addr.Attributes.WithValue(balancer.RawSvcKeyInSubConnInfo, service)
addresses = append(addresses, addr)
}
addr := resolver.Address{
Addr: service.Address,
ServerName: service.Name,
Attributes: newAttributesFromMetadata(service.Metadata),
}
addr.Attributes = addr.Attributes.WithValue(balancer.RawSvcKeyInSubConnInfo, service)
addresses = append(addresses, addr)
}
if len(addresses) == 0 {
r.logger.Noticef(r.ctx, "empty addresses parsed from: %+v", services)

View File

@ -9,16 +9,27 @@ package gsvc
import (
"context"
"github.com/gogf/gf/v2/errors/gerror"
)
// Registry interface for service.
type Registry interface {
Registrar
Discovery
}
// Registrar interface for service registrar.
type Registrar interface {
// Register registers `service` to Registry.
Register(ctx context.Context, service *Service) error
// Deregister off-lines and removes `service` from Registry.
Deregister(ctx context.Context, service *Service) error
}
// Discovery interface for service discovery.
type Discovery interface {
// Search searches and returns services with specified condition.
Search(ctx context.Context, in SearchInput) ([]*Service, error)
@ -42,7 +53,7 @@ type Service struct {
Namespace string // Service Namespace, to indicate different service in the same environment with the same Name.
Name string // Name for the service.
Version string // Service version, eg: v1.0.0, v2.1.1, etc.
Address string // Service address, single one, pattern: IP:port, eg: 192.168.1.2:8000.
Endpoints []string // Service Endpoints, pattern: IP:port, eg: 192.168.1.2:8000.
Metadata map[string]interface{} // Custom data for this service, which can be set using JSON by environment or command-line.
}
@ -63,3 +74,30 @@ type WatchInput struct {
Name string // Name for the service.
Version string // Service version, eg: v1.0.0, v2.1.1, etc.}
}
const (
DefaultPrefix = `goframe`
DefaultDeployment = `default`
DefaultNamespace = `default`
DefaultVersion = `latest`
)
const (
EnvPrefix = `GF_GSVC_PREFIX`
EnvDeployment = `GF_GSVC_DEPLOYMENT`
EnvNamespace = `GF_GSVC_NAMESPACE`
EnvName = `GF_GSVC_Name`
EnvVersion = `GF_GSVC_VERSION`
)
var (
defaultRegistry Registry
)
// SetRegistry sets the default Registry implements as your own implemented interface.
func SetRegistry(registry Registry) {
if registry == nil {
panic(gerror.New(`invalid Registry value "nil" given`))
}
defaultRegistry = registry
}

View File

@ -0,0 +1,92 @@
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.
package gsvc
import (
"context"
"time"
"github.com/gogf/gf/v2/container/gmap"
"github.com/gogf/gf/v2/errors/gcode"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/os/glog"
)
var (
watchedServiceMap = gmap.New(true)
)
func Get(ctx context.Context, name string) (service *Service, err error) {
v := watchedServiceMap.GetOrSetFuncLock(name, func() interface{} {
var (
s = NewServiceWithName(name)
services []*Service
)
services, err = Search(ctx, SearchInput{
Prefix: s.Prefix,
Deployment: s.Deployment,
Namespace: s.Namespace,
Name: s.Name,
Version: s.Version,
})
if err != nil {
return nil
}
if len(services) == 0 {
err = gerror.NewCodef(gcode.CodeNotFound, `service not found with name "%s"`, name)
return nil
}
service = services[0]
// Watch the service changes in goroutine.
go watchAndUpdateService(ctx, service)
return service
})
if v != nil {
service = v.(*Service)
}
return
}
func watchAndUpdateService(ctx context.Context, service *Service) {
var (
err error
watcher Watcher
services []*Service
)
for {
time.Sleep(time.Second)
watcher, err = Watch(ctx, service.KeyWithoutEndpoints())
if err != nil {
glog.Error(ctx, err)
continue
}
services, err = watcher.Proceed()
if err != nil {
glog.Error(ctx, err)
continue
}
if len(services) > 0 {
watchedServiceMap.Set(service.Name, services[0])
}
}
}
// Search searches and returns services with specified condition.
func Search(ctx context.Context, in SearchInput) ([]*Service, error) {
if defaultRegistry == nil {
return nil, gerror.NewCodef(gcode.CodeNotImplemented, `no Registry is registered`)
}
return defaultRegistry.Search(ctx, in)
}
// Watch watches specified condition changes.
func Watch(ctx context.Context, key string) (Watcher, error) {
if defaultRegistry == nil {
return nil, gerror.NewCodef(gcode.CodeNotImplemented, `no Registry is registered`)
}
return defaultRegistry.Watch(ctx, key)
}

View File

@ -9,35 +9,22 @@ package gsvc
import (
"context"
"github.com/gogf/gf/v2/errors/gcode"
"github.com/gogf/gf/v2/errors/gerror"
)
var defaultRegistry Registry
// SetRegistry sets the default Registry implements as your own implemented interface.
func SetRegistry(registry Registry) {
if registry == nil {
panic(gerror.New(`invalid Registry value "nil" given`))
}
defaultRegistry = registry
}
// Register registers `service` to default registry..
func Register(ctx context.Context, service *Service) error {
if defaultRegistry == nil {
return gerror.NewCodef(gcode.CodeNotImplemented, `no Registry is registered`)
}
return defaultRegistry.Register(ctx, service)
}
// Deregister removes `service` from default registry.
func Deregister(ctx context.Context, service *Service) error {
if defaultRegistry == nil {
return gerror.NewCodef(gcode.CodeNotImplemented, `no Registry is registered`)
}
return defaultRegistry.Deregister(ctx, service)
}
// Search searches and returns services with specified condition.
func Search(ctx context.Context, in SearchInput) ([]*Service, error) {
return defaultRegistry.Search(ctx, in)
}
// Watch watches specified condition changes.
func Watch(ctx context.Context, key string) (Watcher, error) {
return defaultRegistry.Watch(ctx, key)
}

View File

@ -13,19 +13,27 @@ import (
"github.com/gogf/gf/v2/errors/gcode"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/internal/intlog"
"github.com/gogf/gf/v2/os/gcmd"
"github.com/gogf/gf/v2/text/gstr"
)
const (
DefaultPrefix = `goframe`
DefaultDeployment = `default`
DefaultNamespace = `default`
DefaultVersion = `latest`
separator = "/"
)
// NewServiceFromKV creates and returns service from `key` and `value`.
func NewServiceFromKV(key, value []byte) (s *Service, err error) {
array := gstr.Split(string(key), "/")
// NewServiceWithName creates and returns service from `name`.
func NewServiceWithName(name string) (s *Service) {
s = &Service{
Name: name,
Metadata: make(map[string]interface{}),
}
s.autoFillDefaultAttributes()
return s
}
// NewServiceWithKV creates and returns service from `key` and `value`.
func NewServiceWithKV(key, value []byte) (s *Service, err error) {
array := gstr.Split(string(key), separator)
if len(array) < 6 {
return nil, gerror.NewCodef(gcode.CodeInvalidParameter, `invalid service key "%s"`, key)
}
@ -35,7 +43,7 @@ func NewServiceFromKV(key, value []byte) (s *Service, err error) {
Namespace: array[2],
Name: array[3],
Version: array[4],
Address: array[5],
Endpoints: gstr.Split(array[5], ","),
Metadata: make(map[string]interface{}),
}
s.autoFillDefaultAttributes()
@ -49,6 +57,13 @@ func NewServiceFromKV(key, value []byte) (s *Service, err error) {
// Key formats the service information and returns the Service as registering key.
func (s *Service) Key() string {
serviceNameUnique := s.KeyWithoutEndpoints()
serviceNameUnique += separator + gstr.Join(s.Endpoints, ",")
return serviceNameUnique
}
// KeyWithoutEndpoints formats the service information and returns a string as unique name of service.
func (s *Service) KeyWithoutEndpoints() string {
s.autoFillDefaultAttributes()
return gstr.Join([]string{
s.Prefix,
@ -56,8 +71,7 @@ func (s *Service) Key() string {
s.Namespace,
s.Name,
s.Version,
s.Address,
}, "/")
}, separator)
}
func (s *Service) Value() string {
@ -70,15 +84,18 @@ func (s *Service) Value() string {
func (s *Service) autoFillDefaultAttributes() {
if s.Prefix == "" {
s.Prefix = DefaultPrefix
s.Prefix = gcmd.GetOptWithEnv(EnvPrefix, DefaultPrefix).String()
}
if s.Deployment == "" {
s.Deployment = DefaultDeployment
s.Deployment = gcmd.GetOptWithEnv(EnvDeployment, DefaultDeployment).String()
}
if s.Namespace == "" {
s.Namespace = DefaultNamespace
s.Namespace = gcmd.GetOptWithEnv(EnvNamespace, DefaultNamespace).String()
}
if s.Version == "" {
s.Version = DefaultVersion
s.Version = gcmd.GetOptWithEnv(EnvVersion, DefaultVersion).String()
}
if s.Name == "" {
s.Name = gcmd.GetOptWithEnv(EnvName).String()
}
}