This commit is contained in:
Hunk Zhu
2023-10-30 20:27:48 +08:00
committed by GitHub
parent ab5ab4c675
commit 4903ef7887
2 changed files with 49 additions and 22 deletions

View File

@ -9,6 +9,8 @@ package nacos
import (
"context"
"github.com/gogf/gf/v2/errors/gcode"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/net/gsvc"
"github.com/joy999/nacos-sdk-go/model"
)
@ -38,23 +40,16 @@ func newWatcher(ctx context.Context) *Watcher {
// Proceed proceeds watch in blocking way.
// It returns all complete services that watched by `key` if any change.
func (w *Watcher) Proceed() (services []gsvc.Service, err error) {
n := len(w.event)
servicesMap := map[string]gsvc.Service{}
for i := 0; i < n; i++ {
e := <-w.event
if e.Err != nil {
err = e.Err
return
}
newServices := NewServicesFromInstances(e.Services)
for _, s := range newServices {
servicesMap[s.GetName()] = s
}
e, ok := <-w.event
if !ok || e == nil {
err = gerror.NewCode(gcode.CodeNil)
return
}
services = make([]gsvc.Service, 0, len(servicesMap))
for _, s := range servicesMap {
services = append(services, s)
if e.Err != nil {
err = e.Err
return
}
services = NewServicesFromInstances(e.Services)
return
}

View File

@ -7,10 +7,13 @@
package nacos_test
import (
"context"
"sync/atomic"
"testing"
"time"
"github.com/gogf/gf/contrib/registry/nacos/v2"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/gsvc"
"github.com/gogf/gf/v2/os/gctx"
"github.com/gogf/gf/v2/test/gtest"
@ -121,9 +124,25 @@ func TestWatch(t *testing.T) {
})
gtest.C(t, func(t *gtest.T) {
ctx := gctx.New()
watcher, err := registry.Watch(ctx, svc1.GetPrefix())
t.AssertNil(err)
var latestProceedResult atomic.Value
g.Go(ctx, func(ctx context.Context) {
var (
err error
res []gsvc.Service
)
for err == nil {
res, err = watcher.Proceed()
t.AssertNil(err)
latestProceedResult.Store(res)
}
}, func(ctx context.Context, exception error) {
t.Fatal(exception)
})
// Register another service.
svc2 := &gsvc.LocalService{
Name: svc1.Name,
@ -140,12 +159,11 @@ func TestWatch(t *testing.T) {
// Watch and retrieve the service changes:
// svc1 and svc2 is the same service name, which has 2 endpoints.
proceedResult, err := watcher.Proceed()
t.AssertNil(err)
proceedResult, ok := latestProceedResult.Load().([]gsvc.Service)
t.Assert(ok, true)
t.Assert(len(proceedResult), 1)
t.Assert(
sortEndpoints(proceedResult[0].GetEndpoints()),
allEndpoints(proceedResult),
gsvc.Endpoints{svc1.GetEndpoints()[0], svc2.GetEndpoints()[0]},
)
@ -155,11 +173,11 @@ func TestWatch(t *testing.T) {
t.AssertNil(err)
time.Sleep(time.Second * 10)
proceedResult, err = watcher.Proceed()
t.AssertNil(err)
proceedResult, ok = latestProceedResult.Load().([]gsvc.Service)
t.Assert(ok, true)
t.Assert(len(proceedResult), 1)
t.Assert(
sortEndpoints(proceedResult[0].GetEndpoints()),
allEndpoints(proceedResult),
gsvc.Endpoints{svc1.GetEndpoints()[0]},
)
t.AssertNil(watcher.Close())
@ -171,6 +189,20 @@ func TestWatch(t *testing.T) {
})
}
func allEndpoints(services []gsvc.Service) gsvc.Endpoints {
m := map[gsvc.Endpoint]struct{}{}
for _, s := range services {
for _, ep := range s.GetEndpoints() {
m[ep] = struct{}{}
}
}
var endpoints gsvc.Endpoints
for ep := range m {
endpoints = append(endpoints, ep)
}
return sortEndpoints(endpoints)
}
func sortEndpoints(in gsvc.Endpoints) gsvc.Endpoints {
var endpoints gsvc.Endpoints
endpoints = append(endpoints, in...)