mirror of
https://gitee.com/johng/gf
synced 2026-06-06 16:21:40 +08:00
add tracing feature for package gredis
This commit is contained in:
10
.example/net/gtrace/3.http+redis/config.toml
Normal file
10
.example/net/gtrace/3.http+redis/config.toml
Normal file
@ -0,0 +1,10 @@
|
||||
|
||||
# Redis.
|
||||
[redis]
|
||||
default = "127.0.0.1:6379,0?tracing=1"
|
||||
cache = "127.0.0.1:6379,1?tracing=1"
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
63
.example/net/gtrace/3.http+redis/main.go
Normal file
63
.example/net/gtrace/3.http+redis/main.go
Normal file
@ -0,0 +1,63 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"github.com/gogf/gf/errors/gerror"
|
||||
"github.com/gogf/gf/frame/g"
|
||||
"github.com/gogf/gf/net/ghttp"
|
||||
"go.opentelemetry.io/otel/exporters/trace/jaeger"
|
||||
sdkTrace "go.opentelemetry.io/otel/sdk/trace"
|
||||
)
|
||||
|
||||
const (
|
||||
JaegerEndpoint = "http://localhost:14268/api/traces"
|
||||
ServiceName = "TracingHttpServerWithRedis"
|
||||
)
|
||||
|
||||
// initTracer creates a new trace provider instance and registers it as global trace provider.
|
||||
func initTracer() func() {
|
||||
// Create and install Jaeger export pipeline.
|
||||
flush, err := jaeger.InstallNewPipeline(
|
||||
jaeger.WithCollectorEndpoint(JaegerEndpoint),
|
||||
jaeger.WithProcess(jaeger.Process{
|
||||
ServiceName: ServiceName,
|
||||
}),
|
||||
jaeger.WithSDK(&sdkTrace.Config{DefaultSampler: sdkTrace.AlwaysSample()}),
|
||||
)
|
||||
if err != nil {
|
||||
g.Log().Fatal(err)
|
||||
}
|
||||
return flush
|
||||
}
|
||||
|
||||
func main() {
|
||||
flush := initTracer()
|
||||
defer flush()
|
||||
|
||||
s := g.Server()
|
||||
s.Group("/", func(group *ghttp.RouterGroup) {
|
||||
group.Middleware(ghttp.MiddlewareServerTracing)
|
||||
group.ALL("/redis", new(redisTracingApi))
|
||||
})
|
||||
s.SetPort(8199)
|
||||
s.Run()
|
||||
}
|
||||
|
||||
type redisTracingApi struct{}
|
||||
|
||||
func (api *redisTracingApi) Set(r *ghttp.Request) {
|
||||
_, err := g.Redis().Ctx(r.Context()).Do("SET", r.GetString("key"), r.GetString("value"))
|
||||
if err != nil {
|
||||
r.Response.WriteExit(gerror.Current(err))
|
||||
}
|
||||
r.Response.Write("ok")
|
||||
}
|
||||
|
||||
func (api *redisTracingApi) Get(r *ghttp.Request) {
|
||||
value, err := g.Redis().Ctx(r.Context()).DoVar(
|
||||
"GET", r.GetString("key"),
|
||||
)
|
||||
if err != nil {
|
||||
r.Response.WriteExit(gerror.Current(err))
|
||||
}
|
||||
r.Response.Write(value.String())
|
||||
}
|
||||
@ -51,7 +51,7 @@ type ConfigNode struct {
|
||||
UpdatedAt string `json:"updatedAt"` // (Optional) The filed name of table for automatic-filled updated datetime.
|
||||
DeletedAt string `json:"deletedAt"` // (Optional) The filed name of table for automatic-filled updated datetime.
|
||||
TimeMaintainDisabled bool `json:"timeMaintainDisabled"` // (Optional) Disable the automatic time maintaining feature.
|
||||
Tracing bool `json:"tracing"` // (Optional) Tracing enable the tracing feature of database.
|
||||
Tracing bool `json:"tracing"` // (Optional) Tracing enable the tracing feature for database.
|
||||
}
|
||||
|
||||
// configs is internal used configuration object.
|
||||
|
||||
@ -14,6 +14,7 @@
|
||||
package gredis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
@ -24,14 +25,17 @@ import (
|
||||
|
||||
// Redis client.
|
||||
type Redis struct {
|
||||
pool *redis.Pool // Underlying connection pool.
|
||||
group string // Configuration group.
|
||||
config Config // Configuration.
|
||||
pool *redis.Pool // Underlying connection pool.
|
||||
group string // Configuration group.
|
||||
config *Config // Configuration.
|
||||
ctx context.Context // Context.
|
||||
}
|
||||
|
||||
// Redis connection.
|
||||
type Conn struct {
|
||||
redis.Conn
|
||||
ctx context.Context
|
||||
redis *Redis
|
||||
}
|
||||
|
||||
// Redis configuration.
|
||||
@ -46,7 +50,8 @@ type Config struct {
|
||||
MaxConnLifetime time.Duration `json:"maxConnLifetime"` // Maximum lifetime of the connection (default is 30 seconds, not allowed to be set to 0)
|
||||
ConnectTimeout time.Duration `json:"connectTimeout"` // Dial connection timeout.
|
||||
TLS bool `json:"tls"` // Specifies the config to use when a TLS connection is dialed.
|
||||
TLSSkipVerify bool `json:"tlsSkipVerify"` // Disables server name verification when connecting over TLS
|
||||
TLSSkipVerify bool `json:"tlsSkipVerify"` // Disables server name verification when connecting over TLS.
|
||||
Tracing bool `json:"tracing"` // Tracing enable the tracing feature for redis.
|
||||
}
|
||||
|
||||
// Pool statistics.
|
||||
@ -55,11 +60,11 @@ type PoolStats struct {
|
||||
}
|
||||
|
||||
const (
|
||||
gDEFAULT_POOL_IDLE_TIMEOUT = 10 * time.Second
|
||||
gDEFAULT_POOL_CONN_TIMEOUT = 10 * time.Second
|
||||
gDEFAULT_POOL_MAX_IDLE = 10
|
||||
gDEFAULT_POOL_MAX_ACTIVE = 100
|
||||
gDEFAULT_POOL_MAX_LIFE_TIME = 30 * time.Second
|
||||
defaultPoolIdleTimeout = 10 * time.Second
|
||||
defaultPoolConnTimeout = 10 * time.Second
|
||||
defaultPoolMaxIdle = 10
|
||||
defaultPoolMaxActive = 100
|
||||
defaultPoolMaxLifeTime = 30 * time.Second
|
||||
)
|
||||
|
||||
var (
|
||||
@ -69,25 +74,25 @@ var (
|
||||
|
||||
// New creates a redis client object with given configuration.
|
||||
// Redis client maintains a connection pool automatically.
|
||||
func New(config Config) *Redis {
|
||||
func New(config *Config) *Redis {
|
||||
// The MaxIdle is the most important attribute of the connection pool.
|
||||
// Only if this attribute is set, the created connections from client
|
||||
// can not exceed the limit of the server.
|
||||
if config.MaxIdle == 0 {
|
||||
config.MaxIdle = gDEFAULT_POOL_MAX_IDLE
|
||||
config.MaxIdle = defaultPoolMaxIdle
|
||||
}
|
||||
// This value SHOULD NOT exceed the connection limit of redis server.
|
||||
if config.MaxActive == 0 {
|
||||
config.MaxActive = gDEFAULT_POOL_MAX_ACTIVE
|
||||
config.MaxActive = defaultPoolMaxActive
|
||||
}
|
||||
if config.IdleTimeout == 0 {
|
||||
config.IdleTimeout = gDEFAULT_POOL_IDLE_TIMEOUT
|
||||
config.IdleTimeout = defaultPoolIdleTimeout
|
||||
}
|
||||
if config.ConnectTimeout == 0 {
|
||||
config.ConnectTimeout = gDEFAULT_POOL_CONN_TIMEOUT
|
||||
config.ConnectTimeout = defaultPoolConnTimeout
|
||||
}
|
||||
if config.MaxConnLifetime == 0 {
|
||||
config.MaxConnLifetime = gDEFAULT_POOL_MAX_LIFE_TIME
|
||||
config.MaxConnLifetime = defaultPoolMaxLifeTime
|
||||
}
|
||||
return &Redis{
|
||||
config: config,
|
||||
@ -158,11 +163,29 @@ func (r *Redis) Close() error {
|
||||
return r.pool.Close()
|
||||
}
|
||||
|
||||
// Clone clones and returns a new Redis object, which is a shallow copy of current one.
|
||||
func (r *Redis) Clone() *Redis {
|
||||
newRedis := New(r.config)
|
||||
*newRedis = *r
|
||||
return newRedis
|
||||
}
|
||||
|
||||
// Ctx is a channing function which sets the context for next operation.
|
||||
func (r *Redis) Ctx(ctx context.Context) *Redis {
|
||||
newRedis := r.Clone()
|
||||
newRedis.ctx = ctx
|
||||
return newRedis
|
||||
}
|
||||
|
||||
// Conn returns a raw underlying connection object,
|
||||
// which expose more methods to communicate with server.
|
||||
// **You should call Close function manually if you do not use this connection any further.**
|
||||
func (r *Redis) Conn() *Conn {
|
||||
return &Conn{r.pool.Get()}
|
||||
return &Conn{
|
||||
Conn: r.pool.Get(),
|
||||
ctx: r.ctx,
|
||||
redis: r,
|
||||
}
|
||||
}
|
||||
|
||||
// Alias of Conn, see Conn.
|
||||
@ -209,7 +232,11 @@ func (r *Redis) Stats() *PoolStats {
|
||||
// Do automatically get a connection from pool, and close it when the reply received.
|
||||
// It does not really "close" the connection, but drops it back to the connection pool.
|
||||
func (r *Redis) Do(commandName string, args ...interface{}) (interface{}, error) {
|
||||
conn := &Conn{r.pool.Get()}
|
||||
conn := &Conn{
|
||||
Conn: r.pool.Get(),
|
||||
ctx: r.ctx,
|
||||
redis: r,
|
||||
}
|
||||
defer conn.Close()
|
||||
return conn.Do(commandName, args...)
|
||||
}
|
||||
@ -217,7 +244,11 @@ func (r *Redis) Do(commandName string, args ...interface{}) (interface{}, error)
|
||||
// DoWithTimeout sends a command to the server and returns the received reply.
|
||||
// The timeout overrides the read timeout set when dialing the connection.
|
||||
func (r *Redis) DoWithTimeout(timeout time.Duration, commandName string, args ...interface{}) (interface{}, error) {
|
||||
conn := &Conn{r.pool.Get()}
|
||||
conn := &Conn{
|
||||
Conn: r.pool.Get(),
|
||||
ctx: r.ctx,
|
||||
redis: r,
|
||||
}
|
||||
defer conn.Close()
|
||||
return conn.DoWithTimeout(timeout, commandName, args...)
|
||||
}
|
||||
|
||||
@ -30,7 +30,7 @@ var (
|
||||
|
||||
// SetConfig sets the global configuration for specified group.
|
||||
// If <name> is not passed, it sets configuration for the default group name.
|
||||
func SetConfig(config Config, name ...string) {
|
||||
func SetConfig(config *Config, name ...string) {
|
||||
group := DefaultGroupName
|
||||
if len(name) > 0 {
|
||||
group = name[0]
|
||||
@ -59,15 +59,15 @@ func SetConfigByStr(str string, name ...string) error {
|
||||
|
||||
// GetConfig returns the global configuration with specified group name.
|
||||
// If <name> is not passed, it returns configuration of the default group name.
|
||||
func GetConfig(name ...string) (config Config, ok bool) {
|
||||
func GetConfig(name ...string) (config *Config, ok bool) {
|
||||
group := DefaultGroupName
|
||||
if len(name) > 0 {
|
||||
group = name[0]
|
||||
}
|
||||
if v := configs.Get(group); v != nil {
|
||||
return v.(Config), true
|
||||
return v.(*Config), true
|
||||
}
|
||||
return Config{}, false
|
||||
return &Config{}, false
|
||||
}
|
||||
|
||||
// RemoveConfig removes the global configuration with specified group.
|
||||
@ -85,11 +85,11 @@ func RemoveConfig(name ...string) {
|
||||
|
||||
// ConfigFromStr parses and returns config from given str.
|
||||
// Eg: host:port[,db,pass?maxIdle=x&maxActive=x&idleTimeout=x&maxConnLifetime=x]
|
||||
func ConfigFromStr(str string) (config Config, err error) {
|
||||
array, _ := gregex.MatchString(`([^:]+):*(\d*),{0,1}(\d*),{0,1}(.*)\?(.+?)`, str)
|
||||
func ConfigFromStr(str string) (config *Config, err error) {
|
||||
array, _ := gregex.MatchString(`^([^:]+):*(\d*),{0,1}(\d*),{0,1}(.*)\?(.+)$`, str)
|
||||
if len(array) == 6 {
|
||||
parse, _ := gstr.Parse(array[5])
|
||||
config = Config{
|
||||
config = &Config{
|
||||
Host: array[1],
|
||||
Port: gconv.Int(array[2]),
|
||||
Db: gconv.Int(array[3]),
|
||||
@ -116,11 +116,14 @@ func ConfigFromStr(str string) (config Config, err error) {
|
||||
if v, ok := parse["skipVerify"]; ok {
|
||||
config.TLSSkipVerify = gconv.Bool(v)
|
||||
}
|
||||
if v, ok := parse["tracing"]; ok {
|
||||
config.Tracing = gconv.Bool(v)
|
||||
}
|
||||
return
|
||||
}
|
||||
array, _ = gregex.MatchString(`([^:]+):*(\d*),{0,1}(\d*),{0,1}(.*)`, str)
|
||||
if len(array) == 5 {
|
||||
config = Config{
|
||||
config = &Config{
|
||||
Host: array[1],
|
||||
Port: gconv.Int(array[2]),
|
||||
Db: gconv.Int(array[3]),
|
||||
|
||||
@ -7,11 +7,19 @@
|
||||
package gredis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"github.com/gogf/gf"
|
||||
"github.com/gogf/gf/container/gvar"
|
||||
"github.com/gogf/gf/internal/json"
|
||||
"github.com/gogf/gf/os/gtime"
|
||||
"github.com/gogf/gf/util/gconv"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
"go.opentelemetry.io/otel"
|
||||
"go.opentelemetry.io/otel/codes"
|
||||
"go.opentelemetry.io/otel/label"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
"reflect"
|
||||
"time"
|
||||
)
|
||||
@ -52,7 +60,40 @@ func (c *Conn) do(timeout time.Duration, commandName string, args ...interface{}
|
||||
}
|
||||
return conn.DoWithTimeout(timeout, commandName, args...)
|
||||
}
|
||||
return c.Conn.Do(commandName, args...)
|
||||
timestampMilli1 := gtime.TimestampMilli()
|
||||
reply, err = c.Conn.Do(commandName, args...)
|
||||
timestampMilli2 := gtime.TimestampMilli()
|
||||
// Tracing.
|
||||
if !c.redis.config.Tracing {
|
||||
return
|
||||
}
|
||||
tr := otel.GetTracerProvider().Tracer(
|
||||
"github.com/gogf/gf/database/gredis",
|
||||
trace.WithInstrumentationVersion(fmt.Sprintf(`%s`, gf.VERSION)),
|
||||
)
|
||||
_, span := tr.Start(c.ctx, commandName)
|
||||
defer span.End()
|
||||
if err != nil {
|
||||
span.SetStatus(codes.Error, fmt.Sprintf(`%+v`, err))
|
||||
}
|
||||
span.SetAttributes(
|
||||
label.String("redis.host", c.redis.config.Host),
|
||||
label.Int("redis.port", c.redis.config.Port),
|
||||
label.Int("redis.db", c.redis.config.Db),
|
||||
)
|
||||
jsonBytes, _ := json.Marshal(args)
|
||||
span.AddEvent("redis.execution", trace.WithAttributes(
|
||||
label.String(`redis.execution.command`, commandName),
|
||||
label.String(`redis.execution.cost`, fmt.Sprintf(`%d ms`, timestampMilli2-timestampMilli1)),
|
||||
label.String(`redis.execution.arguments`, string(jsonBytes)),
|
||||
))
|
||||
return
|
||||
}
|
||||
|
||||
// Ctx is a channing function which sets the context for next operation.
|
||||
func (c *Conn) Ctx(ctx context.Context) *Conn {
|
||||
c.ctx = ctx
|
||||
return c
|
||||
}
|
||||
|
||||
// Do sends a command to the server and returns the received reply.
|
||||
|
||||
@ -22,7 +22,7 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
config = gredis.Config{
|
||||
config = &gredis.Config{
|
||||
Host: "127.0.0.1",
|
||||
Port: 6379,
|
||||
Db: 1,
|
||||
@ -139,7 +139,7 @@ func Test_Instance(t *testing.T) {
|
||||
|
||||
func Test_Error(t *testing.T) {
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
config1 := gredis.Config{
|
||||
config1 := &gredis.Config{
|
||||
Host: "127.0.0.2",
|
||||
Port: 6379,
|
||||
Db: 1,
|
||||
@ -149,7 +149,7 @@ func Test_Error(t *testing.T) {
|
||||
_, err := redis.Do("info")
|
||||
t.AssertNE(err, nil)
|
||||
|
||||
config1 = gredis.Config{
|
||||
config1 = &gredis.Config{
|
||||
Host: "127.0.0.1",
|
||||
Port: 6379,
|
||||
Db: 1,
|
||||
@ -159,7 +159,7 @@ func Test_Error(t *testing.T) {
|
||||
_, err = redis.Do("info")
|
||||
t.AssertNE(err, nil)
|
||||
|
||||
config1 = gredis.Config{
|
||||
config1 = &gredis.Config{
|
||||
Host: "127.0.0.1",
|
||||
Port: 6379,
|
||||
Db: 100,
|
||||
|
||||
Reference in New Issue
Block a user