From 4903ef7887d41091849b8efc603797e6e1e10c7e Mon Sep 17 00:00:00 2001 From: Hunk Zhu Date: Mon, 30 Oct 2023 20:27:48 +0800 Subject: [PATCH] fix issue #3100 (#3106) --- contrib/registry/nacos/nacos_watcher.go | 25 ++++++-------- contrib/registry/nacos/nacos_z_test.go | 46 +++++++++++++++++++++---- 2 files changed, 49 insertions(+), 22 deletions(-) diff --git a/contrib/registry/nacos/nacos_watcher.go b/contrib/registry/nacos/nacos_watcher.go index 782e1dc96..fe9fc85b3 100644 --- a/contrib/registry/nacos/nacos_watcher.go +++ b/contrib/registry/nacos/nacos_watcher.go @@ -9,6 +9,8 @@ package nacos import ( "context" + "github.com/gogf/gf/v2/errors/gcode" + "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/net/gsvc" "github.com/joy999/nacos-sdk-go/model" ) @@ -38,23 +40,16 @@ func newWatcher(ctx context.Context) *Watcher { // Proceed proceeds watch in blocking way. // It returns all complete services that watched by `key` if any change. func (w *Watcher) Proceed() (services []gsvc.Service, err error) { - n := len(w.event) - servicesMap := map[string]gsvc.Service{} - for i := 0; i < n; i++ { - e := <-w.event - if e.Err != nil { - err = e.Err - return - } - newServices := NewServicesFromInstances(e.Services) - for _, s := range newServices { - servicesMap[s.GetName()] = s - } + e, ok := <-w.event + if !ok || e == nil { + err = gerror.NewCode(gcode.CodeNil) + return } - services = make([]gsvc.Service, 0, len(servicesMap)) - for _, s := range servicesMap { - services = append(services, s) + if e.Err != nil { + err = e.Err + return } + services = NewServicesFromInstances(e.Services) return } diff --git a/contrib/registry/nacos/nacos_z_test.go b/contrib/registry/nacos/nacos_z_test.go index 8e5a7e1bc..959078c32 100644 --- a/contrib/registry/nacos/nacos_z_test.go +++ b/contrib/registry/nacos/nacos_z_test.go @@ -7,10 +7,13 @@ package nacos_test import ( + "context" + "sync/atomic" "testing" "time" "github.com/gogf/gf/contrib/registry/nacos/v2" + "github.com/gogf/gf/v2/frame/g" "github.com/gogf/gf/v2/net/gsvc" "github.com/gogf/gf/v2/os/gctx" "github.com/gogf/gf/v2/test/gtest" @@ -121,9 +124,25 @@ func TestWatch(t *testing.T) { }) gtest.C(t, func(t *gtest.T) { + ctx := gctx.New() watcher, err := registry.Watch(ctx, svc1.GetPrefix()) t.AssertNil(err) + var latestProceedResult atomic.Value + g.Go(ctx, func(ctx context.Context) { + var ( + err error + res []gsvc.Service + ) + for err == nil { + res, err = watcher.Proceed() + t.AssertNil(err) + latestProceedResult.Store(res) + } + }, func(ctx context.Context, exception error) { + t.Fatal(exception) + }) + // Register another service. svc2 := &gsvc.LocalService{ Name: svc1.Name, @@ -140,12 +159,11 @@ func TestWatch(t *testing.T) { // Watch and retrieve the service changes: // svc1 and svc2 is the same service name, which has 2 endpoints. - proceedResult, err := watcher.Proceed() - - t.AssertNil(err) + proceedResult, ok := latestProceedResult.Load().([]gsvc.Service) + t.Assert(ok, true) t.Assert(len(proceedResult), 1) t.Assert( - sortEndpoints(proceedResult[0].GetEndpoints()), + allEndpoints(proceedResult), gsvc.Endpoints{svc1.GetEndpoints()[0], svc2.GetEndpoints()[0]}, ) @@ -155,11 +173,11 @@ func TestWatch(t *testing.T) { t.AssertNil(err) time.Sleep(time.Second * 10) - proceedResult, err = watcher.Proceed() - t.AssertNil(err) + proceedResult, ok = latestProceedResult.Load().([]gsvc.Service) + t.Assert(ok, true) t.Assert(len(proceedResult), 1) t.Assert( - sortEndpoints(proceedResult[0].GetEndpoints()), + allEndpoints(proceedResult), gsvc.Endpoints{svc1.GetEndpoints()[0]}, ) t.AssertNil(watcher.Close()) @@ -171,6 +189,20 @@ func TestWatch(t *testing.T) { }) } +func allEndpoints(services []gsvc.Service) gsvc.Endpoints { + m := map[gsvc.Endpoint]struct{}{} + for _, s := range services { + for _, ep := range s.GetEndpoints() { + m[ep] = struct{}{} + } + } + var endpoints gsvc.Endpoints + for ep := range m { + endpoints = append(endpoints, ep) + } + return sortEndpoints(endpoints) +} + func sortEndpoints(in gsvc.Endpoints) gsvc.Endpoints { var endpoints gsvc.Endpoints endpoints = append(endpoints, in...)