mirror of
https://gitee.com/johng/gf
synced 2026-07-04 04:52:48 +08:00
add timeout feature for Do/Receive functions of package gredis
This commit is contained in:
@ -15,7 +15,6 @@ package gredis
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/gogf/gf/util/gconv"
|
||||
"time"
|
||||
|
||||
"github.com/gogf/gf/container/gmap"
|
||||
@ -167,6 +166,7 @@ func (r *Redis) Conn() *Conn {
|
||||
}
|
||||
|
||||
// Alias of Conn, see Conn.
|
||||
// Deprecated.
|
||||
func (r *Redis) GetConn() *Conn {
|
||||
return r.Conn()
|
||||
}
|
||||
@ -208,21 +208,27 @@ func (r *Redis) Stats() *PoolStats {
|
||||
// Do sends a command to the server and returns the received reply.
|
||||
// Do automatically get a connection from pool, and close it when the reply received.
|
||||
// It does not really "close" the connection, but drops it back to the connection pool.
|
||||
func (r *Redis) Do(command string, args ...interface{}) (interface{}, error) {
|
||||
func (r *Redis) Do(commandName string, args ...interface{}) (interface{}, error) {
|
||||
conn := &Conn{r.pool.Get()}
|
||||
defer conn.Close()
|
||||
return conn.Do(command, args...)
|
||||
return conn.Do(commandName, args...)
|
||||
}
|
||||
|
||||
// DoWithTimeout sends a command to the server and returns the received reply.
|
||||
// The timeout overrides the read timeout set when dialing the connection.
|
||||
func (r *Redis) DoWithTimeout(timeout time.Duration, commandName string, args ...interface{}) (interface{}, error) {
|
||||
conn := &Conn{r.pool.Get()}
|
||||
defer conn.Close()
|
||||
return conn.DoWithTimeout(timeout, commandName, args...)
|
||||
}
|
||||
|
||||
// DoVar returns value from Do as gvar.Var.
|
||||
func (r *Redis) DoVar(command string, args ...interface{}) (*gvar.Var, error) {
|
||||
v, err := r.Do(command, args...)
|
||||
if result, ok := v.([]byte); ok {
|
||||
return gvar.New(gconv.UnsafeBytesToStr(result)), err
|
||||
}
|
||||
// It treats all returned slice as string slice.
|
||||
if result, ok := v.([]interface{}); ok {
|
||||
return gvar.New(gconv.Strings(result)), err
|
||||
}
|
||||
return gvar.New(v), err
|
||||
func (r *Redis) DoVar(commandName string, args ...interface{}) (*gvar.Var, error) {
|
||||
return resultToVar(r.Do(commandName, args...))
|
||||
}
|
||||
|
||||
// DoVarWithTimeout returns value from Do as gvar.Var.
|
||||
// The timeout overrides the read timeout set when dialing the connection.
|
||||
func (r *Redis) DoVarWithTimeout(timeout time.Duration, commandName string, args ...interface{}) (*gvar.Var, error) {
|
||||
return resultToVar(r.DoWithTimeout(timeout, commandName, args...))
|
||||
}
|
||||
|
||||
@ -7,14 +7,19 @@
|
||||
package gredis
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"github.com/gogf/gf/container/gvar"
|
||||
"github.com/gogf/gf/internal/json"
|
||||
"github.com/gogf/gf/util/gconv"
|
||||
"github.com/gomodule/redigo/redis"
|
||||
"reflect"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Do sends a command to the server and returns the received reply.
|
||||
// It uses json.Marshal for struct/slice/map type values before committing them to redis.
|
||||
func (c *Conn) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
|
||||
// The timeout overrides the read timeout set when dialing the connection.
|
||||
func (c *Conn) do(timeout time.Duration, commandName string, args ...interface{}) (reply interface{}, err error) {
|
||||
var (
|
||||
reflectValue reflect.Value
|
||||
reflectKind reflect.Kind
|
||||
@ -40,17 +45,64 @@ func (c *Conn) Do(commandName string, args ...interface{}) (reply interface{}, e
|
||||
}
|
||||
}
|
||||
}
|
||||
if timeout > 0 {
|
||||
conn, ok := c.Conn.(redis.ConnWithTimeout)
|
||||
if !ok {
|
||||
return gvar.New(nil), errors.New(`current connection does not support "ConnWithTimeout"`)
|
||||
}
|
||||
return conn.DoWithTimeout(timeout, commandName, args...)
|
||||
}
|
||||
return c.Conn.Do(commandName, args...)
|
||||
}
|
||||
|
||||
// Do sends a command to the server and returns the received reply.
|
||||
// It uses json.Marshal for struct/slice/map type values before committing them to redis.
|
||||
func (c *Conn) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
|
||||
return c.do(0, commandName, args...)
|
||||
}
|
||||
|
||||
// DoWithTimeout sends a command to the server and returns the received reply.
|
||||
// The timeout overrides the read timeout set when dialing the connection.
|
||||
func (c *Conn) DoWithTimeout(timeout time.Duration, commandName string, args ...interface{}) (reply interface{}, err error) {
|
||||
return c.do(timeout, commandName, args...)
|
||||
}
|
||||
|
||||
// DoVar retrieves and returns the result from command as gvar.Var.
|
||||
func (c *Conn) DoVar(command string, args ...interface{}) (*gvar.Var, error) {
|
||||
v, err := c.Do(command, args...)
|
||||
return gvar.New(v), err
|
||||
func (c *Conn) DoVar(commandName string, args ...interface{}) (*gvar.Var, error) {
|
||||
return resultToVar(c.Do(commandName, args...))
|
||||
}
|
||||
|
||||
// DoVarWithTimeout retrieves and returns the result from command as gvar.Var.
|
||||
// The timeout overrides the read timeout set when dialing the connection.
|
||||
func (c *Conn) DoVarWithTimeout(timeout time.Duration, commandName string, args ...interface{}) (*gvar.Var, error) {
|
||||
return resultToVar(c.DoWithTimeout(timeout, commandName, args...))
|
||||
}
|
||||
|
||||
// ReceiveVar receives a single reply as gvar.Var from the Redis server.
|
||||
func (c *Conn) ReceiveVar() (*gvar.Var, error) {
|
||||
v, err := c.Receive()
|
||||
return gvar.New(v), err
|
||||
return resultToVar(c.Receive())
|
||||
}
|
||||
|
||||
// ReceiveVarWithTimeout receives a single reply as gvar.Var from the Redis server.
|
||||
// The timeout overrides the read timeout set when dialing the connection.
|
||||
func (c *Conn) ReceiveVarWithTimeout(timeout time.Duration) (*gvar.Var, error) {
|
||||
conn, ok := c.Conn.(redis.ConnWithTimeout)
|
||||
if !ok {
|
||||
return gvar.New(nil), errors.New(`current connection does not support "ConnWithTimeout"`)
|
||||
}
|
||||
return resultToVar(conn.ReceiveWithTimeout(timeout))
|
||||
}
|
||||
|
||||
// resultToVar converts redis operation result to gvar.Var.
|
||||
func resultToVar(result interface{}, err error) (*gvar.Var, error) {
|
||||
if err == nil {
|
||||
if result, ok := result.([]byte); ok {
|
||||
return gvar.New(gconv.UnsafeBytesToStr(result)), err
|
||||
}
|
||||
// It treats all returned slice as string slice.
|
||||
if result, ok := result.([]interface{}); ok {
|
||||
return gvar.New(gconv.Strings(result)), err
|
||||
}
|
||||
}
|
||||
return gvar.New(result), err
|
||||
}
|
||||
|
||||
51
database/gredis/gredis_z_unit_conn_test.go
Normal file
51
database/gredis/gredis_z_unit_conn_test.go
Normal file
@ -0,0 +1,51 @@
|
||||
// Copyright 2019 gf Author(https://github.com/gogf/gf). All Rights Reserved.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the MIT License.
|
||||
// If a copy of the MIT was not distributed with this file,
|
||||
// You can obtain one at https://github.com/gogf/gf.
|
||||
|
||||
package gredis_test
|
||||
|
||||
import (
|
||||
"github.com/gogf/gf/database/gredis"
|
||||
"github.com/gogf/gf/test/gtest"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestConn_DoWithTimeout(t *testing.T) {
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
redis := gredis.New(config)
|
||||
t.AssertNE(redis, nil)
|
||||
conn := redis.Conn()
|
||||
defer conn.Close()
|
||||
|
||||
_, err := conn.DoWithTimeout(time.Second, "set", "test", "123")
|
||||
t.Assert(err, nil)
|
||||
defer conn.DoWithTimeout(time.Second, "del", "test")
|
||||
|
||||
r, err := conn.DoWithTimeout(time.Second, "get", "test")
|
||||
t.Assert(err, nil)
|
||||
t.Assert(r, "123")
|
||||
})
|
||||
}
|
||||
|
||||
func TestConn_ReceiveVarWithTimeout(t *testing.T) {
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
redis := gredis.New(config)
|
||||
t.AssertNE(redis, nil)
|
||||
conn := redis.Conn()
|
||||
defer conn.Close()
|
||||
|
||||
_, err := conn.DoVarWithTimeout(time.Second, "Subscribe", "gf")
|
||||
t.Assert(err, nil)
|
||||
|
||||
v, err := redis.DoVarWithTimeout(time.Second, "PUBLISH", "gf", "test")
|
||||
t.Assert(err, nil)
|
||||
t.Assert(v.String(), "1")
|
||||
|
||||
v, _ = conn.ReceiveVar()
|
||||
t.Assert(len(v.Strings()), 3)
|
||||
t.Assert(v.Strings()[2], "test")
|
||||
})
|
||||
}
|
||||
@ -18,7 +18,6 @@ import (
|
||||
|
||||
"github.com/gogf/gf/database/gredis"
|
||||
"github.com/gogf/gf/test/gtest"
|
||||
redis2 "github.com/gomodule/redigo/redis"
|
||||
)
|
||||
|
||||
var (
|
||||
@ -181,28 +180,20 @@ func Test_Error(t *testing.T) {
|
||||
t.Assert(err, nil)
|
||||
t.Assert(v.String(), "v")
|
||||
|
||||
conn := redis.GetConn()
|
||||
conn := redis.Conn()
|
||||
defer conn.Close()
|
||||
_, err = conn.DoVar("SET", "k", "v")
|
||||
t.Assert(err, nil)
|
||||
|
||||
//v, err = conn.ReceiveVar()
|
||||
//t.Assert(err, nil)
|
||||
//t.Assert(v.String(), "v")
|
||||
_, err = conn.DoVar("Subscribe", "gf")
|
||||
t.Assert(err, nil)
|
||||
|
||||
psc := redis2.PubSubConn{Conn: conn}
|
||||
psc.Subscribe("gf")
|
||||
redis.DoVar("PUBLISH", "gf", "gf test")
|
||||
go func() {
|
||||
for {
|
||||
v, _ := conn.ReceiveVar()
|
||||
switch obj := v.Val().(type) {
|
||||
case redis2.Message:
|
||||
t.Assert(string(obj.Data), "gf test")
|
||||
case redis2.Subscription:
|
||||
_, err = redis.DoVar("PUBLISH", "gf", "test")
|
||||
t.Assert(err, nil)
|
||||
|
||||
}
|
||||
}
|
||||
}()
|
||||
v, _ = conn.ReceiveVar()
|
||||
t.Assert(len(v.Strings()), 3)
|
||||
t.Assert(v.Strings()[2], "test")
|
||||
|
||||
time.Sleep(time.Second)
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user