mirror of
https://gitee.com/johng/gf
synced 2026-06-07 10:22:11 +08:00
完善gtcp连接池示例代码
This commit is contained in:
@ -18,9 +18,9 @@ import (
|
||||
// 对象池
|
||||
type Pool struct {
|
||||
list *glist.List // 可用/闲置的文件指针链表
|
||||
idle int64 // (毫秒)闲置最大时间,超过该时间则被系统回收
|
||||
closed *gtype.Bool // 连接池是否已关闭
|
||||
newFunc func()(interface{}, error) // 创建对象的方法定义
|
||||
Expire int64 // (毫秒)闲置最大时间,超过该时间则被系统回收
|
||||
NewFunc func()(interface{}, error) // 创建对象的方法定义
|
||||
}
|
||||
|
||||
// 对象池数据项
|
||||
@ -33,11 +33,11 @@ type poolItem struct {
|
||||
func New(expire int, newFunc...func() (interface{}, error)) *Pool {
|
||||
r := &Pool {
|
||||
list : glist.New(),
|
||||
idle : int64(expire),
|
||||
closed : gtype.NewBool(),
|
||||
Expire : int64(expire),
|
||||
}
|
||||
if len(newFunc) > 0 {
|
||||
r.newFunc = newFunc[0]
|
||||
r.NewFunc = newFunc[0]
|
||||
}
|
||||
go r.expireCheckingLoop()
|
||||
return r
|
||||
@ -46,7 +46,7 @@ func New(expire int, newFunc...func() (interface{}, error)) *Pool {
|
||||
// 放一个临时对象到池中
|
||||
func (p *Pool) Put(item interface{}) {
|
||||
p.list.PushBack(&poolItem{
|
||||
expire : gtime.Millisecond() + p.idle,
|
||||
expire : gtime.Millisecond() + p.Expire,
|
||||
value : item,
|
||||
})
|
||||
}
|
||||
@ -63,8 +63,8 @@ func (p *Pool) Get() (interface{}, error) {
|
||||
break
|
||||
}
|
||||
}
|
||||
if p.newFunc != nil {
|
||||
return p.newFunc()
|
||||
if p.NewFunc != nil {
|
||||
return p.NewFunc()
|
||||
}
|
||||
return nil, errors.New("pool is empty")
|
||||
}
|
||||
|
||||
@ -140,7 +140,7 @@ func SendWithTimeout(conn net.Conn, data []byte, timeout time.Duration, retry...
|
||||
// 发送数据并等待接收返回数据
|
||||
func SendReceive(conn net.Conn, data []byte, retry...Retry) ([]byte, error) {
|
||||
if err := Send(conn, data, retry...); err == nil {
|
||||
return Receive(conn)
|
||||
return Receive(conn, retry...)
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
@ -149,7 +149,7 @@ func SendReceive(conn net.Conn, data []byte, retry...Retry) ([]byte, error) {
|
||||
// 发送数据并等待接收返回数据(带返回超时等待时间)
|
||||
func SendReceiveWithTimeout(conn net.Conn, data []byte, timeout time.Duration, retry...Retry) ([]byte, error) {
|
||||
if err := Send(conn, data, retry...); err == nil {
|
||||
return ReceiveWithTimeout(conn, timeout)
|
||||
return ReceiveWithTimeout(conn, timeout, retry...)
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@ -15,13 +15,17 @@ import (
|
||||
|
||||
// 封装的链接对象
|
||||
type Conn struct {
|
||||
net.Conn // 继承底层链接接口对象
|
||||
pool *gpool.Pool // 对应的链接池对象
|
||||
err error // 是否有错误产生
|
||||
net.Conn // 继承底层链接接口对象
|
||||
pool *gpool.Pool // 对应的链接池对象
|
||||
status int // 当前对象的状态,主要用于失败重连判断
|
||||
}
|
||||
|
||||
const (
|
||||
gDEFAULT_POOL_EXPIRE = 3000 // (毫秒)默认链接对象过期时间
|
||||
gDEFAULT_POOL_EXPIRE = 60000 // (毫秒)默认链接对象过期时间
|
||||
gCONN_STATUS_UNKNOWN = 0 // 未知,表示未经过连通性操作;
|
||||
gCONN_STATUS_ACTIVE = 1 // 正常,表示已经经过连通性操作
|
||||
gCONN_STATUS_ERROR = 2 // 错误,表示该接口操作产生了错误,不应当被循环使用了
|
||||
|
||||
)
|
||||
|
||||
var (
|
||||
@ -40,8 +44,8 @@ func NewConn(addr string, timeout...int) (*Conn, error) {
|
||||
pool = gpool.New(gDEFAULT_POOL_EXPIRE, func() (interface{}, error) {
|
||||
if conn, err := NewNetConn(addr, timeout...); err == nil {
|
||||
return &Conn {
|
||||
Conn : conn,
|
||||
pool : pool,
|
||||
Conn : conn,
|
||||
pool : pool,
|
||||
}, nil
|
||||
} else {
|
||||
return nil, err
|
||||
@ -70,7 +74,8 @@ func NewConnByNetConn(conn net.Conn) *Conn {
|
||||
|
||||
// 覆盖底层接口对象的Close方法
|
||||
func (c *Conn) Close() error {
|
||||
if c.pool != nil && c.err == nil {
|
||||
if c.pool != nil && c.status == gCONN_STATUS_ACTIVE {
|
||||
c.status = gCONN_STATUS_UNKNOWN
|
||||
c.pool.Put(c)
|
||||
} else {
|
||||
c.Conn.Close()
|
||||
@ -80,9 +85,19 @@ func (c *Conn) Close() error {
|
||||
|
||||
// 发送数据
|
||||
func (c *Conn) Send(data []byte, retry...Retry) error {
|
||||
err := Send(c, data, retry...)
|
||||
var err error
|
||||
if err = Send(c, data, retry...); err != nil && c.status == gCONN_STATUS_UNKNOWN {
|
||||
if v, e := c.pool.NewFunc(); e == nil {
|
||||
c.Conn = v.(net.Conn)
|
||||
err = Send(c, data, retry...)
|
||||
} else {
|
||||
err = e
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
c.err = err
|
||||
c.status = gCONN_STATUS_ERROR
|
||||
} else {
|
||||
c.status = gCONN_STATUS_ACTIVE
|
||||
}
|
||||
return err
|
||||
}
|
||||
@ -90,44 +105,48 @@ func (c *Conn) Send(data []byte, retry...Retry) error {
|
||||
// 接收数据
|
||||
func (c *Conn) Receive(retry...Retry) ([]byte, error) {
|
||||
data, err := Receive(c, retry...)
|
||||
if err != nil && len(data) == 0 && c.status == gCONN_STATUS_UNKNOWN {
|
||||
if v, e := c.pool.NewFunc(); e == nil {
|
||||
c.Conn = v.(net.Conn)
|
||||
data, err = Receive(c, retry...)
|
||||
} else {
|
||||
err = e
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
c.err = err
|
||||
c.status = gCONN_STATUS_ERROR
|
||||
} else {
|
||||
c.status = gCONN_STATUS_ACTIVE
|
||||
}
|
||||
return data, err
|
||||
}
|
||||
|
||||
// 带超时时间的数据获取
|
||||
func (c *Conn) ReceiveWithTimeout(timeout time.Duration, retry...Retry) ([]byte, error) {
|
||||
data, err := ReceiveWithTimeout(c, timeout, retry...)
|
||||
if err != nil {
|
||||
c.err = err
|
||||
}
|
||||
return data, err
|
||||
c.SetReadDeadline(time.Now().Add(timeout))
|
||||
return c.Receive(retry...)
|
||||
}
|
||||
|
||||
// 带超时时间的数据发送
|
||||
func (c *Conn) SendWithTimeout(data []byte, timeout time.Duration, retry...Retry) error {
|
||||
err := SendWithTimeout(c, data, timeout, retry...)
|
||||
if err != nil {
|
||||
c.err = err
|
||||
}
|
||||
return err
|
||||
c.SetWriteDeadline(time.Now().Add(timeout))
|
||||
return c.Send(data, retry...)
|
||||
}
|
||||
|
||||
// 发送数据并等待接收返回数据
|
||||
func (c *Conn) SendReceive(data []byte, retry...Retry) ([]byte, error) {
|
||||
data, err := SendReceive(c, data, retry...)
|
||||
if err != nil {
|
||||
c.err = err
|
||||
if err := c.Send(data, retry...); err == nil {
|
||||
return c.Receive(retry...)
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
return data, err
|
||||
}
|
||||
|
||||
// 发送数据并等待接收返回数据(带返回超时等待时间)
|
||||
func (c *Conn) SendReceiveWithTimeout(data []byte, timeout time.Duration, retry...Retry) ([]byte, error) {
|
||||
data, err := SendReceiveWithTimeout(c, data, timeout, retry...)
|
||||
if err != nil {
|
||||
c.err = err
|
||||
if err := c.Send(data, retry...); err == nil {
|
||||
return c.ReceiveWithTimeout(timeout, retry...)
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
return data, err
|
||||
}
|
||||
@ -68,7 +68,7 @@ func Send(pid int, data []byte, group...string) error {
|
||||
func getConnByPid(pid int) (*gtcp.Conn, error) {
|
||||
port := getPortByPid(pid)
|
||||
if port > 0 {
|
||||
if conn, err := gtcp.NewConn("127.0.0.1", port); err == nil {
|
||||
if conn, err := gtcp.NewConn(fmt.Sprintf("127.0.0.1:%d", port)); err == nil {
|
||||
return conn, nil
|
||||
} else {
|
||||
return nil, err
|
||||
|
||||
21
geg/net/gtcp/gtcp_conn_client.go
Normal file
21
geg/net/gtcp/gtcp_conn_client.go
Normal file
@ -0,0 +1,21 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"gitee.com/johng/gf/g/net/gtcp"
|
||||
"gitee.com/johng/gf/g/util/gconv"
|
||||
"gitee.com/johng/gf/g/os/glog"
|
||||
"time"
|
||||
)
|
||||
|
||||
func main() {
|
||||
conn, err := gtcp.NewConn("127.0.0.1:8999")
|
||||
if err != nil {
|
||||
glog.Fatal(err)
|
||||
}
|
||||
for i := 0; i < 10000; i++ {
|
||||
if err := conn.Send([]byte(gconv.String(i))); err != nil {
|
||||
glog.Error(err)
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
19
geg/net/gtcp/gtcp_conn_server.go
Normal file
19
geg/net/gtcp/gtcp_conn_server.go
Normal file
@ -0,0 +1,19 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"net"
|
||||
"gitee.com/johng/gf/g/os/glog"
|
||||
"gitee.com/johng/gf/g/net/gtcp"
|
||||
)
|
||||
|
||||
func main() {
|
||||
gtcp.NewServer(":8999", func(conn net.Conn) {
|
||||
c := gtcp.NewConnByNetConn(conn)
|
||||
defer c.Close()
|
||||
for {
|
||||
if data, err := c.Receive(); err == nil {
|
||||
glog.Println(string(data))
|
||||
}
|
||||
}
|
||||
}).Run()
|
||||
}
|
||||
41
geg/net/gtcp/gtcp_pool1.go
Normal file
41
geg/net/gtcp/gtcp_pool1.go
Normal file
@ -0,0 +1,41 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
"net"
|
||||
"gitee.com/johng/gf/g/net/gtcp"
|
||||
"gitee.com/johng/gf/g/os/glog"
|
||||
"gitee.com/johng/gf/g/os/gtime"
|
||||
)
|
||||
|
||||
func main() {
|
||||
// Server
|
||||
go gtcp.NewServer("127.0.0.1:8999", func(conn net.Conn) {
|
||||
c := gtcp.NewConnByNetConn(conn)
|
||||
defer c.Close()
|
||||
for {
|
||||
if data, _ := c.Receive(); len(data) > 0 {
|
||||
c.Send(append([]byte("> "), data...))
|
||||
}
|
||||
//return
|
||||
}
|
||||
}).Run()
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
// Client
|
||||
for {
|
||||
if conn, err := gtcp.NewConn("127.0.0.1:8999"); err == nil {
|
||||
if b, err := conn.SendReceive([]byte(gtime.Datetime())); err == nil {
|
||||
fmt.Println(string(b), conn.LocalAddr(), conn.RemoteAddr())
|
||||
} else {
|
||||
fmt.Println(err)
|
||||
}
|
||||
conn.Close()
|
||||
} else {
|
||||
glog.Error(err)
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
@ -1,35 +1,38 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"gitee.com/johng/gf/g/net/gtcp"
|
||||
"fmt"
|
||||
"time"
|
||||
"net"
|
||||
"gitee.com/johng/gf/g/os/gtime"
|
||||
"fmt"
|
||||
"gitee.com/johng/gf/g/net/gtcp"
|
||||
"gitee.com/johng/gf/g/os/glog"
|
||||
"gitee.com/johng/gf/g/os/gtime"
|
||||
)
|
||||
|
||||
func main() {
|
||||
// Server
|
||||
go gtcp.NewServer("127.0.0.1:8999", func(conn net.Conn) {
|
||||
c := gtcp.NewConnByNetConn(conn)
|
||||
defer c.Close()
|
||||
for {
|
||||
buffer := make([]byte, 1024)
|
||||
if length, err := conn.Read(buffer); err == nil {
|
||||
conn.Write(append([]byte("> "), buffer[0 : length]...))
|
||||
if data, _ := c.Receive(); len(data) > 0 {
|
||||
c.Send(append([]byte("> "), data...))
|
||||
}
|
||||
//conn.Close()
|
||||
return
|
||||
}
|
||||
}).Run()
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
// Client
|
||||
for {
|
||||
if conn, err := gtcp.NewConn("127.0.0.1", 8999); err == nil {
|
||||
if conn, err := gtcp.NewConn("127.0.0.1:8999"); err == nil {
|
||||
if b, err := conn.SendReceive([]byte(gtime.Datetime())); err == nil {
|
||||
fmt.Println(string(b), conn.LocalAddr(), conn.RemoteAddr())
|
||||
conn.Close()
|
||||
} else {
|
||||
glog.Error(err)
|
||||
fmt.Println(err)
|
||||
}
|
||||
conn.Close()
|
||||
} else {
|
||||
glog.Error(err)
|
||||
}
|
||||
@ -1,10 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"net"
|
||||
"gitee.com/johng/gf/g/net/gtcp"
|
||||
)
|
||||
|
||||
func main() {
|
||||
gtcp.NewNetConn("127.0.0.1", )
|
||||
}
|
||||
60
geg/net/gtcp/tcp_server_client.go
Normal file
60
geg/net/gtcp/tcp_server_client.go
Normal file
@ -0,0 +1,60 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
"net"
|
||||
)
|
||||
|
||||
func main() {
|
||||
addr := "127.0.0.1:8999"
|
||||
|
||||
// Server
|
||||
go func() {
|
||||
tcpaddr, err := net.ResolveTCPAddr("tcp4", addr)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
listen, err := net.ListenTCP("tcp", tcpaddr)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
for {
|
||||
if conn, err := listen.Accept(); err != nil {
|
||||
panic(err)
|
||||
} else if conn != nil {
|
||||
go func(conn net.Conn) {
|
||||
buffer := make([]byte, 1024)
|
||||
n, err := conn.Read(buffer)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
} else {
|
||||
fmt.Println(">", string(buffer[0 : n]))
|
||||
}
|
||||
conn.Close()
|
||||
}(conn)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
// Client
|
||||
if conn, err := net.Dial("tcp", addr); err == nil {
|
||||
for i := 0; i < 2; i++ {
|
||||
_, err := conn.Write([]byte("hello"))
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
conn.Close()
|
||||
break
|
||||
} else {
|
||||
fmt.Println("ok")
|
||||
}
|
||||
// sleep 10 seconds and re-send
|
||||
time.Sleep(10*time.Second)
|
||||
}
|
||||
} else {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user