mirror of
https://gitee.com/johng/gf
synced 2026-06-07 02:12:11 +08:00
# Description
The `etcd.NewWithClient()` function internally does not set a
`DialTimeout` value, which causes it to default to 0. This leads to all
`context.WithTimeout(context.Background(), r.etcdConfig.DialTimeout)`
calls immediately timing out, as a timeout of 0 results in instant
expiration.
# Example
```go
package main
import (
"context"
"testing"
"time"
"github.com/gogf/gf/contrib/registry/etcd/v2"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/net/gsvc"
clientv3 "go.etcd.io/etcd/client/v3"
)
func TestEtcdWithClient(t *testing.T) {
cli, _ := clientv3.New(clientv3.Config{
Endpoints: []string{"http://127.0.0.1:2379"},
DialTimeout: 2 * time.Second,
})
defer cli.Close()
registry := etcd.NewWithClient(cli)
_, err := registry.Register(context.Background(), &gsvc.LocalService{
Name: "test",
Endpoints: gsvc.NewEndpoints("127.0.0.1:8888"),
})
if err != nil {
t.Error(gerror.Stack(err))
return
}
}
```
Running tool: /opt/homebrew/bin/go test -test.fullpath=true -timeout 30s
-run ^TestEtcdWithClient$ etop.roommanageserver
=== RUN TestEtcdWithClient
{"level":"warn","ts":"2026-01-31T09:59:06.994867+0800","logger":"etcd-client","caller":"v3@v3.6.7/retry_interceptor.go:65","msg":"retrying
of unary invoker
failed","target":"etcd-endpoints://0x14000262f00/127.0.0.1:2379","method":"/etcdserverpb.Lease/LeaseGrant","attempt":0,"error":"rpc
error: code = DeadlineExceeded desc = context deadline exceeded"}
/Users/guolihui/projects/mpl-poker/room-manage-server/main_test.go:27:
1. etcd grant failed with keepalive ttl "10s"
1).
github.com/gogf/gf/contrib/registry/etcd/v2.(*Registry).doRegisterLease
/Users/guolihui/projects/mpl-poker/room-manage-server/gfv2/contrib/registry/etcd/etcd_registrar.go:38
2). github.com/gogf/gf/contrib/registry/etcd/v2.(*Registry).Register
/Users/guolihui/projects/mpl-poker/room-manage-server/gfv2/contrib/registry/etcd/etcd_registrar.go:24
3). etop%2eroommanageserver.TestEtcdWithClient
/Users/guolihui/projects/mpl-poker/room-manage-server/main_test.go:22
2. context deadline exceeded
--- FAIL: TestEtcdWithClient (0.00s)
168 lines
4.3 KiB
Go
168 lines
4.3 KiB
Go
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
|
|
//
|
|
// This Source Code Form is subject to the terms of the MIT License.
|
|
// If a copy of the MIT was not distributed with this file,
|
|
// You can obtain one at https://github.com/gogf/gf.
|
|
|
|
// Package etcd implements service Registry and Discovery using etcd.
|
|
package etcd
|
|
|
|
import (
|
|
"strings"
|
|
"time"
|
|
|
|
etcd3 "go.etcd.io/etcd/client/v3"
|
|
"google.golang.org/grpc"
|
|
|
|
"github.com/gogf/gf/v2/errors/gcode"
|
|
"github.com/gogf/gf/v2/errors/gerror"
|
|
"github.com/gogf/gf/v2/frame/g"
|
|
"github.com/gogf/gf/v2/net/gsvc"
|
|
"github.com/gogf/gf/v2/os/glog"
|
|
"github.com/gogf/gf/v2/text/gstr"
|
|
)
|
|
|
|
var (
|
|
_ gsvc.Registry = &Registry{}
|
|
)
|
|
|
|
// Registry implements gsvc.Registry interface.
|
|
type Registry struct {
|
|
client *etcd3.Client
|
|
kv etcd3.KV
|
|
lease etcd3.Lease
|
|
keepaliveTTL time.Duration
|
|
logger glog.ILogger
|
|
etcdConfig etcd3.Config
|
|
}
|
|
|
|
// Option is the option for the etcd registry.
|
|
type Option struct {
|
|
Logger glog.ILogger
|
|
KeepaliveTTL time.Duration
|
|
|
|
// DialTimeout is the timeout for failing to establish a connection.
|
|
DialTimeout time.Duration
|
|
|
|
// AutoSyncInterval is the interval to update endpoints with its latest members.
|
|
AutoSyncInterval time.Duration
|
|
|
|
DialOptions []grpc.DialOption
|
|
}
|
|
|
|
const (
|
|
// DefaultKeepAliveTTL is the default keepalive TTL.
|
|
DefaultKeepAliveTTL = 10 * time.Second
|
|
|
|
// DefaultDialTimeout is the timeout for failing to establish a connection.
|
|
DefaultDialTimeout = time.Second * 5
|
|
)
|
|
|
|
// New creates and returns a new etcd registry.
|
|
// Support Etcd Address format: ip:port,ip:port...,ip:port@username:password
|
|
func New(address string, option ...Option) *Registry {
|
|
if address == "" {
|
|
panic(gerror.NewCode(gcode.CodeInvalidParameter, `invalid etcd address ""`))
|
|
}
|
|
addressAndAuth := gstr.SplitAndTrim(address, "@")
|
|
var (
|
|
endpoints []string
|
|
userName, password string
|
|
)
|
|
switch len(addressAndAuth) {
|
|
case 1:
|
|
endpoints = gstr.SplitAndTrim(address, ",")
|
|
default:
|
|
endpoints = gstr.SplitAndTrim(addressAndAuth[0], ",")
|
|
parts := gstr.SplitAndTrim(strings.Join(addressAndAuth[1:], "@"), ":")
|
|
switch len(parts) {
|
|
case 2:
|
|
userName = parts[0]
|
|
password = parts[1]
|
|
default:
|
|
panic(gerror.NewCode(gcode.CodeInvalidParameter, `invalid etcd auth not support ":" at username or password `))
|
|
}
|
|
}
|
|
if len(endpoints) == 0 {
|
|
panic(gerror.NewCodef(gcode.CodeInvalidParameter, `invalid etcd address "%s"`, address))
|
|
}
|
|
cfg := etcd3.Config{Endpoints: endpoints}
|
|
if userName != "" {
|
|
cfg.Username = userName
|
|
}
|
|
if password != "" {
|
|
cfg.Password = password
|
|
}
|
|
|
|
cfg.DialTimeout = DefaultDialTimeout
|
|
|
|
var usedOption Option
|
|
if len(option) > 0 {
|
|
usedOption = option[0]
|
|
}
|
|
if usedOption.DialTimeout > 0 {
|
|
cfg.DialTimeout = usedOption.DialTimeout
|
|
}
|
|
if usedOption.AutoSyncInterval > 0 {
|
|
cfg.AutoSyncInterval = usedOption.AutoSyncInterval
|
|
}
|
|
|
|
client, err := etcd3.New(cfg)
|
|
if err != nil {
|
|
panic(gerror.Wrap(err, `create etcd client failed`))
|
|
}
|
|
r := NewWithClient(client, option...)
|
|
r.etcdConfig = cfg
|
|
return r
|
|
}
|
|
|
|
// NewWithClient creates and returns a new etcd registry with the given client.
|
|
func NewWithClient(client *etcd3.Client, option ...Option) *Registry {
|
|
r := &Registry{
|
|
client: client,
|
|
kv: etcd3.NewKV(client),
|
|
}
|
|
r.etcdConfig.DialTimeout = DefaultDialTimeout
|
|
if len(option) > 0 {
|
|
r.logger = option[0].Logger
|
|
r.keepaliveTTL = option[0].KeepaliveTTL
|
|
if option[0].DialTimeout > 0 {
|
|
r.etcdConfig.DialTimeout = option[0].DialTimeout
|
|
}
|
|
}
|
|
if r.logger == nil {
|
|
r.logger = g.Log()
|
|
}
|
|
if r.keepaliveTTL == 0 {
|
|
r.keepaliveTTL = DefaultKeepAliveTTL
|
|
}
|
|
return r
|
|
}
|
|
|
|
// extractResponseToServices extracts etcd watch response context to service list.
|
|
func extractResponseToServices(res *etcd3.GetResponse) ([]gsvc.Service, error) {
|
|
if res == nil || res.Kvs == nil {
|
|
return nil, nil
|
|
}
|
|
var (
|
|
services []gsvc.Service
|
|
servicePrefixMap = make(map[string]*Service)
|
|
)
|
|
for _, kv := range res.Kvs {
|
|
service, err := gsvc.NewServiceWithKV(
|
|
string(kv.Key), string(kv.Value),
|
|
)
|
|
if err != nil {
|
|
return services, err
|
|
}
|
|
s := NewService(service)
|
|
if v, ok := servicePrefixMap[service.GetPrefix()]; ok {
|
|
v.Endpoints = append(v.Endpoints, service.GetEndpoints()...)
|
|
} else {
|
|
servicePrefixMap[s.GetPrefix()] = s
|
|
services = append(services, s)
|
|
}
|
|
}
|
|
return services, nil
|
|
}
|