diff --git a/contrib/balancer/balancer.go b/contrib/grpc/balancer/balancer.go similarity index 77% rename from contrib/balancer/balancer.go rename to contrib/grpc/balancer/balancer.go index 85c16d002..9768ff103 100644 --- a/contrib/balancer/balancer.go +++ b/contrib/grpc/balancer/balancer.go @@ -21,16 +21,16 @@ const ( ) func init() { - Register(gsel.SelectorRandom, gsel.NewSelectorRandom()) - Register(gsel.SelectorWeight, gsel.NewSelectorWeight()) - Register(gsel.SelectorRoundRobin, gsel.NewSelectorRoundRobin()) - Register(gsel.SelectorLeastConnection, gsel.NewSelectorLeastConnection()) + Register(gsel.SelectorRandom, gsel.NewBuilderRandom()) + Register(gsel.SelectorWeight, gsel.NewBuilderWeight()) + Register(gsel.SelectorRoundRobin, gsel.NewBuilderRoundRobin()) + Register(gsel.SelectorLeastConnection, gsel.NewBuilderLeastConnection()) } -func Register(name string, selector gsel.Selector) { +func Register(name string, builder gsel.Builder) { balancer.Register(base.NewBalancerBuilder( name, - &Builder{selector: selector}, + &Builder{builder: builder}, base.Config{HealthCheck: true}, )) } diff --git a/contrib/balancer/balancer_builder.go b/contrib/grpc/balancer/balancer_builder.go similarity index 96% rename from contrib/balancer/balancer_builder.go rename to contrib/grpc/balancer/balancer_builder.go index 0b02a4349..e77c727ea 100644 --- a/contrib/balancer/balancer_builder.go +++ b/contrib/grpc/balancer/balancer_builder.go @@ -18,7 +18,7 @@ import ( // Builder implements grpc balancer base.PickerBuilder, // which returns a picker that will be used by gRPC to pick a SubConn. type Builder struct { - selector gsel.Selector + builder gsel.Builder } // Build returns a picker that will be used by gRPC to pick a SubConn. @@ -42,7 +42,7 @@ func (b *Builder) Build(info base.PickerBuildInfo) balancer.Picker { }) } p := &Picker{ - selector: b.selector, + selector: b.builder.Build(), } if err := p.selector.Update(nodes); err != nil { panic(err) diff --git a/contrib/balancer/balancer_node.go b/contrib/grpc/balancer/balancer_node.go similarity index 100% rename from contrib/balancer/balancer_node.go rename to contrib/grpc/balancer/balancer_node.go diff --git a/contrib/balancer/balancer_picker.go b/contrib/grpc/balancer/balancer_picker.go similarity index 100% rename from contrib/balancer/balancer_picker.go rename to contrib/grpc/balancer/balancer_picker.go diff --git a/contrib/balancer/balancer_test.go b/contrib/grpc/balancer/balancer_test.go similarity index 79% rename from contrib/balancer/balancer_test.go rename to contrib/grpc/balancer/balancer_test.go index 1c4c6672b..44134fb2b 100644 --- a/contrib/balancer/balancer_test.go +++ b/contrib/grpc/balancer/balancer_test.go @@ -9,10 +9,9 @@ package balancer_test import ( "testing" - "github.com/gogf/gf/contrib/balancer/v2" "github.com/gogf/gf/v2/net/gsel" ) func Test_Register(t *testing.T) { - balancer.Register("test", gsel.NewSelectorRandom()) + Register("test", gsel.NewSelectorRandom()) } diff --git a/contrib/balancer/go.mod b/contrib/grpc/balancer/go.mod similarity index 60% rename from contrib/balancer/go.mod rename to contrib/grpc/balancer/go.mod index 6659e228e..0b5a9133c 100644 --- a/contrib/balancer/go.mod +++ b/contrib/grpc/balancer/go.mod @@ -1,4 +1,4 @@ -module github.com/gogf/gf/contrib/balancer/v2 +module github.com/gogf/gf/contrib/grpc/balancer/v2 go 1.15 @@ -8,4 +8,4 @@ require ( google.golang.org/grpc v1.43.0 ) -replace github.com/gogf/gf/v2 => ../../ +replace github.com/gogf/gf/v2 => ../../../ diff --git a/contrib/balancer/go.sum b/contrib/grpc/balancer/go.sum similarity index 100% rename from contrib/balancer/go.sum rename to contrib/grpc/balancer/go.sum diff --git a/contrib/grpc/resolver/go.mod b/contrib/grpc/resolver/go.mod new file mode 100644 index 000000000..e779c8a28 --- /dev/null +++ b/contrib/grpc/resolver/go.mod @@ -0,0 +1,10 @@ +module github.com/gogf/gf/contrib/grpc/resolver/v2 + +go 1.15 + +require ( + github.com/gogf/gf/v2 v2.0.0-rc2 + google.golang.org/grpc v1.43.0 +) + +replace github.com/gogf/gf/v2 => ../../../ diff --git a/contrib/resolver/go.sum b/contrib/grpc/resolver/go.sum similarity index 100% rename from contrib/resolver/go.sum rename to contrib/grpc/resolver/go.sum diff --git a/contrib/resolver/resolver.go b/contrib/grpc/resolver/resolver.go similarity index 100% rename from contrib/resolver/resolver.go rename to contrib/grpc/resolver/resolver.go diff --git a/contrib/resolver/resolver_builder.go b/contrib/grpc/resolver/resolver_builder.go similarity index 100% rename from contrib/resolver/resolver_builder.go rename to contrib/grpc/resolver/resolver_builder.go diff --git a/contrib/resolver/resolver_resolver.go b/contrib/grpc/resolver/resolver_resolver.go similarity index 100% rename from contrib/resolver/resolver_resolver.go rename to contrib/grpc/resolver/resolver_resolver.go diff --git a/contrib/registry/etcd/etcd_watcher.go b/contrib/registry/etcd/etcd_watcher.go index f5197956b..db8fea8cc 100644 --- a/contrib/registry/etcd/etcd_watcher.go +++ b/contrib/registry/etcd/etcd_watcher.go @@ -18,21 +18,19 @@ var ( ) type watcher struct { - key string - ctx context.Context - cancel context.CancelFunc - watchChan etcd3.WatchChan - watcher etcd3.Watcher - kv etcd3.KV - initialized bool + key string + ctx context.Context + cancel context.CancelFunc + watchChan etcd3.WatchChan + watcher etcd3.Watcher + kv etcd3.KV } func newWatcher(ctx context.Context, key string, client *etcd3.Client) (*watcher, error) { w := &watcher{ - key: key, - watcher: etcd3.NewWatcher(client), - kv: etcd3.NewKV(client), - initialized: false, + key: key, + watcher: etcd3.NewWatcher(client), + kv: etcd3.NewKV(client), } w.ctx, w.cancel = context.WithCancel(ctx) w.watchChan = w.watcher.Watch(w.ctx, key, etcd3.WithPrefix(), etcd3.WithRev(0)) @@ -44,10 +42,6 @@ func newWatcher(ctx context.Context, key string, client *etcd3.Client) (*watcher } func (w *watcher) Proceed() ([]*gsvc.Service, error) { - if !w.initialized { - w.initialized = true - return w.getServicesByPrefix() - } select { case <-w.ctx.Done(): return nil, w.ctx.Err() diff --git a/contrib/resolver/go.mod b/contrib/resolver/go.mod deleted file mode 100644 index d610b1a29..000000000 --- a/contrib/resolver/go.mod +++ /dev/null @@ -1,10 +0,0 @@ -module github.com/gogf/gf/contrib/resolver/v2 - -go 1.15 - -require ( - github.com/gogf/gf/v2 v2.0.0-rc2 - google.golang.org/grpc v1.43.0 -) - -replace github.com/gogf/gf/v2 => ../../ diff --git a/example/go.mod b/example/go.mod index affcdb4e1..3d7e716eb 100644 --- a/example/go.mod +++ b/example/go.mod @@ -4,8 +4,8 @@ go 1.15 require ( github.com/gogf/gf/contrib/registry/etcd/v2 v2.0.0-rc2 - github.com/gogf/gf/contrib/resolver/v2 v2.0.0-rc2 - github.com/gogf/gf/contrib/balancer/v2 v2.0.0-rc2 + github.com/gogf/gf/contrib/grpc/resolver/v2 v2.0.0-rc2 + github.com/gogf/gf/contrib/grpc/balancer/v2 v2.0.0-rc2 github.com/gogf/gf/v2 v2.0.0-rc2 github.com/golang/protobuf v1.5.2 google.golang.org/grpc v1.43.0 @@ -13,8 +13,8 @@ require ( ) replace ( - github.com/gogf/gf/contrib/balancer/v2 => ../contrib/balancer/ - github.com/gogf/gf/contrib/registry/etcd/v2 => ../contrib/registry/etcd/ - github.com/gogf/gf/contrib/resolver/v2 => ../contrib/resolver/ +github.com/gogf/gf/contrib/registry/etcd/v2 => ../contrib/registry/etcd/ + github.com/gogf/gf/contrib/grpc/balancer/v2 => ../contrib/grpc/balancer/ + github.com/gogf/gf/contrib/grpc/resolver/v2 => ../contrib/grpc/resolver/ github.com/gogf/gf/v2 => ../ ) diff --git a/example/registry/http/client/main.go b/example/registry/http/client/main.go index 8282fb11b..86fd60478 100644 --- a/example/registry/http/client/main.go +++ b/example/registry/http/client/main.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "time" "github.com/gogf/gf/contrib/registry/etcd/v2" "github.com/gogf/gf/v2/frame/g" @@ -12,10 +13,13 @@ import ( func main() { gsvc.SetRegistry(etcd.New(`127.0.0.1:2379`)) - res, err := g.Client().Get(gctx.New(), `http://hello.svc:12345/`) - if err != nil { - panic(err) + for i := 0; i < 100; i++ { + res, err := g.Client().Get(gctx.New(), `http://hello.svc/`) + if err != nil { + panic(err) + } + fmt.Println(res.ReadAllString()) + res.Close() + time.Sleep(time.Second) } - defer res.Close() - fmt.Println(res.ReadAllString()) } diff --git a/example/registry/http/server/main.go b/example/registry/http/server/main.go index 619b487ae..355d9cbb0 100644 --- a/example/registry/http/server/main.go +++ b/example/registry/http/server/main.go @@ -12,6 +12,7 @@ func main() { s := g.Server(`hello.svc`) s.BindHandler("/", func(r *ghttp.Request) { + g.Log().Info(r.Context(), `request received`) r.Response.Write(`Hello world`) }) s.Run() diff --git a/net/gclient/gclient.go b/net/gclient/gclient.go index 6881f82d2..93dcdd0c7 100644 --- a/net/gclient/gclient.go +++ b/net/gclient/gclient.go @@ -17,6 +17,7 @@ import ( "github.com/gogf/gf/v2" "github.com/gogf/gf/v2/errors/gerror" + "github.com/gogf/gf/v2/net/gsel" "github.com/gogf/gf/v2/os/gfile" ) @@ -31,6 +32,7 @@ type Client struct { retryCount int // Retry count when request fails. retryInterval time.Duration // Retry interval when request fails. middlewareHandler []HandlerFunc // Interceptor handlers + selectorBuilder gsel.Builder // Builder for request balance. } const ( diff --git a/net/gclient/gclient_discovery.go b/net/gclient/gclient_discovery.go index c6fe5cbc1..43e4f204d 100644 --- a/net/gclient/gclient_discovery.go +++ b/net/gclient/gclient_discovery.go @@ -9,14 +9,35 @@ package gclient import ( "net/http" + "github.com/gogf/gf/v2/container/gmap" + "github.com/gogf/gf/v2/internal/intlog" + "github.com/gogf/gf/v2/net/gsel" "github.com/gogf/gf/v2/net/gsvc" "github.com/gogf/gf/v2/os/gctx" + "github.com/gogf/gf/v2/os/glog" ) const ( discoveryMiddlewareHandled gctx.StrKey = `MiddlewareClientDiscoveryHandled` ) +type discoveryNode struct { + service *gsvc.Service + address string +} + +func (n *discoveryNode) Service() *gsvc.Service { + return n.service +} + +func (n *discoveryNode) Address() string { + return n.address +} + +var ( + clientSelectorMap = gmap.New(true) +) + // internalMiddlewareDiscovery is a client middleware that enables service discovery feature for client. func internalMiddlewareDiscovery(c *Client, r *http.Request) (response *Response, err error) { var ctx = r.Context() @@ -25,15 +46,51 @@ func internalMiddlewareDiscovery(c *Client, r *http.Request) (response *Response if ctx.Value(discoveryMiddlewareHandled) != nil { return c.Next(r) } - if gsvc.GetRegistry() != nil { - service, err := gsvc.Get(ctx, r.URL.Host) - if err != nil { - return nil, err - } - if service != nil { - r.URL.Host = service.Address() - r.Host = service.Address() - } + if gsvc.GetRegistry() == nil { + return c.Next(r) } + var service *gsvc.Service + service, err = gsvc.GetWithWatch(ctx, r.URL.Host, func(service *gsvc.Service) { + intlog.Printf(ctx, `http client watching service "%s" changed`, service.KeyWithoutEndpoints()) + // If service changed, it removes it from map cache, + // which makes it re-cache later. + clientSelectorMap.Remove(service.KeyWithoutEndpoints()) + }) + if err != nil { + return nil, err + } + if service == nil { + return c.Next(r) + } + // Balancer. + selector := clientSelectorMap.GetOrSetFuncLock( + service.KeyWithoutEndpoints(), + func() interface{} { + intlog.Printf(ctx, `http client create selector for service "%s"`, service.KeyWithoutEndpoints()) + // Build selector and cache it in internal map. + nodes := make([]gsel.Node, 0) + for _, address := range service.Endpoints { + nodes = append(nodes, &discoveryNode{ + service: service, + address: address, + }) + } + selector := gsel.GetBuilder().Build() + if err = selector.Update(nodes); err != nil { + glog.Error(ctx, err) + } + return selector + }, + ).(gsel.Selector) + // Pick one node from multiple addresses. + node, done, err := selector.Pick(ctx) + if err != nil { + return nil, err + } + if done != nil { + defer done(ctx, gsel.DoneInfo{}) + } + r.URL.Host = node.Address() + r.Host = node.Address() return c.Next(r) } diff --git a/net/gsel/gsel.go b/net/gsel/gsel.go index 096e03baa..3b6d1249a 100644 --- a/net/gsel/gsel.go +++ b/net/gsel/gsel.go @@ -13,6 +13,11 @@ import ( "github.com/gogf/gf/v2/net/gsvc" ) +// Builder creates and returns selector in runtime. +type Builder interface { + Build() Selector +} + // Selector for service balancer. type Selector interface { // Pick selects and returns service. diff --git a/net/gsel/gsel_builder.go b/net/gsel/gsel_builder.go new file mode 100644 index 000000000..31035a343 --- /dev/null +++ b/net/gsel/gsel_builder.go @@ -0,0 +1,20 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +package gsel + +// defaultBuilder is the default Builder for globally used purpose. +var defaultBuilder = NewBuilderRoundRobin() + +// SetBuilder sets the default builder for globally used purpose. +func SetBuilder(builder Builder) { + defaultBuilder = builder +} + +// GetBuilder returns the default builder for globally used purpose. +func GetBuilder() Builder { + return defaultBuilder +} diff --git a/net/gsel/gsel_builder_least_connection.go b/net/gsel/gsel_builder_least_connection.go new file mode 100644 index 000000000..d13b8c70b --- /dev/null +++ b/net/gsel/gsel_builder_least_connection.go @@ -0,0 +1,17 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +package gsel + +type builderLeastConnection struct{} + +func NewBuilderLeastConnection() Builder { + return &builderLeastConnection{} +} + +func (*builderLeastConnection) Build() Selector { + return NewSelectorLeastConnection() +} diff --git a/net/gsel/gsel_builder_random.go b/net/gsel/gsel_builder_random.go new file mode 100644 index 000000000..49448f033 --- /dev/null +++ b/net/gsel/gsel_builder_random.go @@ -0,0 +1,17 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +package gsel + +type builderRandom struct{} + +func NewBuilderRandom() Builder { + return &builderRandom{} +} + +func (*builderRandom) Build() Selector { + return NewSelectorRandom() +} diff --git a/net/gsel/gsel_builder_round_robin.go b/net/gsel/gsel_builder_round_robin.go new file mode 100644 index 000000000..bff17a337 --- /dev/null +++ b/net/gsel/gsel_builder_round_robin.go @@ -0,0 +1,17 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +package gsel + +type builderRoundRobin struct{} + +func NewBuilderRoundRobin() Builder { + return &builderRoundRobin{} +} + +func (*builderRoundRobin) Build() Selector { + return NewSelectorRoundRobin() +} diff --git a/net/gsel/gsel_builder_weight.go b/net/gsel/gsel_builder_weight.go new file mode 100644 index 000000000..a2d3b6b8f --- /dev/null +++ b/net/gsel/gsel_builder_weight.go @@ -0,0 +1,17 @@ +// Copyright GoFrame Author(https://goframe.org). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +package gsel + +type builderWeight struct{} + +func NewBuilderWeight() Builder { + return &builderWeight{} +} + +func (*builderWeight) Build() Selector { + return NewSelectorWeight() +} diff --git a/net/gsvc/gsvc_discovery.go b/net/gsvc/gsvc_discovery.go index eea085fad..f4908feee 100644 --- a/net/gsvc/gsvc_discovery.go +++ b/net/gsvc/gsvc_discovery.go @@ -14,17 +14,25 @@ import ( "github.com/gogf/gf/v2/errors/gcode" "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/os/glog" + "github.com/gogf/gf/v2/util/gutil" ) var ( watchedServiceMap = gmap.New(true) ) +type ServiceWatch func(service *Service) + func Get(ctx context.Context, name string) (service *Service, err error) { + return GetWithWatch(ctx, name, nil) +} + +func GetWithWatch(ctx context.Context, name string, watch ServiceWatch) (service *Service, err error) { v := watchedServiceMap.GetOrSetFuncLock(name, func() interface{} { var ( s = NewServiceWithName(name) services []*Service + watcher Watcher ) services, err = Search(ctx, SearchInput{ Prefix: s.Prefix, @@ -42,7 +50,11 @@ func Get(ctx context.Context, name string) (service *Service, err error) { } service = services[0] // Watch the service changes in goroutine. - go watchAndUpdateService(ctx, service) + watcher, err = Watch(ctx, service.KeyWithoutEndpoints()) + if err != nil { + return nil + } + go watchAndUpdateService(watcher, service, watch) return service }) if v != nil { @@ -51,19 +63,14 @@ func Get(ctx context.Context, name string) (service *Service, err error) { return } -func watchAndUpdateService(ctx context.Context, service *Service) { +func watchAndUpdateService(watcher Watcher, service *Service, watchFunc ServiceWatch) { var ( + ctx = context.Background() err error - watcher Watcher services []*Service ) for { time.Sleep(time.Second) - watcher, err = Watch(ctx, service.KeyWithoutEndpoints()) - if err != nil { - glog.Error(ctx, err) - continue - } services, err = watcher.Proceed() if err != nil { glog.Error(ctx, err) @@ -71,6 +78,13 @@ func watchAndUpdateService(ctx context.Context, service *Service) { } if len(services) > 0 { watchedServiceMap.Set(service.Name, services[0]) + if watchFunc != nil { + gutil.TryCatch(func() { + watchFunc(services[0]) + }, func(exception error) { + glog.Error(ctx, exception) + }) + } } } }