mirror of
https://gitee.com/johng/gf
synced 2026-06-07 02:12:11 +08:00
完善gtcp包示例,修复由于缓存组件改动引起的ghttp.Session缓存问题
This commit is contained in:
@ -105,5 +105,5 @@ func (s *Session) Remove (k string) {
|
||||
|
||||
// 更新过期时间(如果用在守护进程中长期使用,需要手动调用进行更新,防止超时被清除)
|
||||
func (s *Session) UpdateExpire() {
|
||||
s.server.sessions.Set(s.id, s, int64(s.server.sessionMaxAge.Val()*1000))
|
||||
s.server.sessions.Set(s.id, s, s.server.sessionMaxAge.Val()*1000)
|
||||
}
|
||||
@ -11,36 +11,53 @@ import (
|
||||
"time"
|
||||
"io"
|
||||
"bufio"
|
||||
"fmt"
|
||||
"strings"
|
||||
"bytes"
|
||||
)
|
||||
|
||||
// 封装的链接对象
|
||||
type Conn struct {
|
||||
net.Conn
|
||||
reader *bufio.Reader // 当前链接的缓冲读取对象
|
||||
conn net.Conn // 底层tcp对象
|
||||
reader *bufio.Reader // 当前链接的缓冲读取对象
|
||||
recvDeadline time.Time // 读取超时时间
|
||||
sendDeadline time.Time // 写入超时时间
|
||||
recvBufferWait time.Duration // 读取全部缓冲区数据时,读取完毕后的写入等待间隔
|
||||
}
|
||||
|
||||
const (
|
||||
gRECV_ALL_WAIT_TIMEOUT = time.Millisecond // 读取全部缓冲数据时,没有缓冲数据时的等待间隔
|
||||
)
|
||||
|
||||
// 创建TCP链接
|
||||
func NewConn(addr string, timeout...int) (*Conn, error) {
|
||||
if conn, err := NewNetConn(addr, timeout...); err == nil {
|
||||
return &Conn {
|
||||
Conn : conn,
|
||||
}, nil
|
||||
return NewConnByNetConn(conn), nil
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// 将net.Conn接口对象转换为*gtcp.Conn对象(注意递归影响,因为*gtcp.Conn本身也实现了net.Conn接口)
|
||||
// 将net.Conn接口对象转换为*gtcp.Conn对象
|
||||
func NewConnByNetConn(conn net.Conn) *Conn {
|
||||
return &Conn { Conn: conn }
|
||||
return &Conn {
|
||||
conn : conn,
|
||||
reader : bufio.NewReader(conn),
|
||||
recvDeadline : time.Time{},
|
||||
sendDeadline : time.Time{},
|
||||
recvBufferWait : gRECV_ALL_WAIT_TIMEOUT,
|
||||
}
|
||||
}
|
||||
|
||||
// 关闭连接
|
||||
func (c *Conn) Close() {
|
||||
c.conn.Close()
|
||||
}
|
||||
|
||||
// 发送数据
|
||||
func (c *Conn) Send(data []byte, retry...Retry) error {
|
||||
length := 0
|
||||
for {
|
||||
n, err := c.Write(data)
|
||||
n, err := c.conn.Write(data)
|
||||
if err != nil {
|
||||
// 链接已关闭
|
||||
if err == io.EOF {
|
||||
@ -70,15 +87,13 @@ func (c *Conn) Send(data []byte, retry...Retry) error {
|
||||
// 需要注意:
|
||||
// 1、往往在socket通信中需要指定固定的数据结构,并在设定对应的长度字段,并在读取数据时便于区分包大小;
|
||||
// 2、当length < 1时表示获取缓冲区所有的数据,但是可能会引起包解析问题(可能出现非完整的包情况),因此需要解析端注意解析策略;
|
||||
func (c *Conn) Receive(length int, retry...Retry) ([]byte, error) {
|
||||
var err error // 读取错误
|
||||
var size int // 读取长度
|
||||
var index int // 已读取长度
|
||||
var buffer []byte // 读取缓冲区
|
||||
func (c *Conn) Recv(length int, retry...Retry) ([]byte, error) {
|
||||
var err error // 读取错误
|
||||
var size int // 读取长度
|
||||
var index int // 已读取长度
|
||||
var buffer []byte // 读取缓冲区
|
||||
var bufferWait bool // 是否设置读取的超时时间
|
||||
|
||||
if c.reader == nil {
|
||||
c.reader = bufio.NewReader(c)
|
||||
}
|
||||
if length > 0 {
|
||||
buffer = make([]byte, length)
|
||||
} else {
|
||||
@ -86,8 +101,14 @@ func (c *Conn) Receive(length int, retry...Retry) ([]byte, error) {
|
||||
}
|
||||
|
||||
for {
|
||||
time.Sleep(time.Second)
|
||||
fmt.Println(c.reader.Buffered())
|
||||
// 缓冲区数据写入等待处理。
|
||||
// 如果已经读取到数据(这点很关键,表明缓冲区已经有数据,剩下的操作就是将所有数据读取完毕),
|
||||
// 那么可以设置读取全部缓冲数据的超时时间;如果没有接收到任何数据,那么将会进入读取阻塞(或者自定义的超时阻塞);
|
||||
// 仅对读取全部缓冲数据操作有效
|
||||
if length <= 0 && index > 0 {
|
||||
bufferWait = true
|
||||
c.conn.SetReadDeadline(time.Now().Add(c.recvBufferWait))
|
||||
}
|
||||
size, err = c.reader.Read(buffer[index:])
|
||||
if size > 0 {
|
||||
index += size
|
||||
@ -97,10 +118,6 @@ func (c *Conn) Receive(length int, retry...Retry) ([]byte, error) {
|
||||
break
|
||||
}
|
||||
} else {
|
||||
// 否则读取所有缓冲区数据,直到没有可读数据为止
|
||||
//if c.reader.Buffered() < 1 {
|
||||
// break
|
||||
//}
|
||||
// 如果长度超过了自定义的读取缓冲区,那么自动增长
|
||||
if index >= gDEFAULT_READ_BUFFER_SIZE {
|
||||
buffer = append(buffer, make([]byte, gDEFAULT_READ_BUFFER_SIZE)...)
|
||||
@ -112,6 +129,12 @@ func (c *Conn) Receive(length int, retry...Retry) ([]byte, error) {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
// 判断数据是否全部读取完毕(由于超时机制的存在,获取的数据完整性不可靠)
|
||||
if bufferWait && isTimeout(err) {
|
||||
c.conn.SetReadDeadline(c.recvDeadline)
|
||||
err = nil
|
||||
break
|
||||
}
|
||||
if len(retry) > 0 {
|
||||
// 其他错误,重试之后仍不能成功
|
||||
if retry[0].Count == 0 {
|
||||
@ -130,34 +153,104 @@ func (c *Conn) Receive(length int, retry...Retry) ([]byte, error) {
|
||||
return buffer[:index], err
|
||||
}
|
||||
|
||||
// 按行读取数据,阻塞读取,直到完成一行读取位置(末尾以'\n'结尾,返回数据不包含换行符)
|
||||
func (c *Conn) RecvLine(retry...Retry) ([]byte, error) {
|
||||
var err error
|
||||
var buffer []byte
|
||||
data := make([]byte, 0)
|
||||
for {
|
||||
buffer, err = c.Recv(1, retry...)
|
||||
if len(buffer) > 0 {
|
||||
data = append(data, buffer...)
|
||||
if buffer[0] == '\n' {
|
||||
break
|
||||
}
|
||||
}
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
if len(data) > 0 {
|
||||
data = bytes.TrimRight(data, "\n\r")
|
||||
}
|
||||
return data, err
|
||||
}
|
||||
|
||||
// 带超时时间的数据获取
|
||||
func (c *Conn) ReceiveWithTimeout(length int, timeout time.Duration, retry...Retry) ([]byte, error) {
|
||||
c.SetReadDeadline(time.Now().Add(timeout))
|
||||
defer c.SetReadDeadline(time.Time{})
|
||||
return c.Receive(length, retry...)
|
||||
func (c *Conn) RecvWithTimeout(length int, timeout time.Duration, retry...Retry) ([]byte, error) {
|
||||
c.SetRecvDeadline(time.Now().Add(timeout))
|
||||
defer c.SetRecvDeadline(time.Time{})
|
||||
return c.Recv(length, retry...)
|
||||
}
|
||||
|
||||
// 带超时时间的数据发送
|
||||
func (c *Conn) SendWithTimeout(data []byte, timeout time.Duration, retry...Retry) error {
|
||||
c.SetWriteDeadline(time.Now().Add(timeout))
|
||||
defer c.SetWriteDeadline(time.Time{})
|
||||
c.SetSendDeadline(time.Now().Add(timeout))
|
||||
defer c.SetSendDeadline(time.Time{})
|
||||
return c.Send(data, retry...)
|
||||
}
|
||||
|
||||
// 发送数据并等待接收返回数据
|
||||
func (c *Conn) SendReceive(data []byte, receive int, retry...Retry) ([]byte, error) {
|
||||
func (c *Conn) SendRecv(data []byte, receive int, retry...Retry) ([]byte, error) {
|
||||
if err := c.Send(data, retry...); err == nil {
|
||||
return c.Receive(receive, retry...)
|
||||
return c.Recv(receive, retry...)
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// 发送数据并等待接收返回数据(带返回超时等待时间)
|
||||
func (c *Conn) SendReceiveWithTimeout(data []byte, receive int, timeout time.Duration, retry...Retry) ([]byte, error) {
|
||||
func (c *Conn) SendRecvWithTimeout(data []byte, receive int, timeout time.Duration, retry...Retry) ([]byte, error) {
|
||||
if err := c.Send(data, retry...); err == nil {
|
||||
return c.ReceiveWithTimeout(receive, timeout, retry...)
|
||||
return c.RecvWithTimeout(receive, timeout, retry...)
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Conn) SetDeadline(t time.Time) error {
|
||||
err := c.conn.SetDeadline(t)
|
||||
if err == nil {
|
||||
c.recvDeadline = t
|
||||
c.sendDeadline = t
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *Conn) SetRecvDeadline(t time.Time) error {
|
||||
err := c.conn.SetReadDeadline(t)
|
||||
if err == nil {
|
||||
c.recvDeadline = t
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *Conn) SetSendDeadline(t time.Time) error {
|
||||
err := c.conn.SetWriteDeadline(t)
|
||||
if err == nil {
|
||||
c.sendDeadline = t
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// 读取全部缓冲区数据时,读取完毕后的写入等待间隔,如果超过该等待时间后仍无可读数据,那么读取操作返回。
|
||||
// 该时间间隔不能设置得太大,会影响Recv读取时长(默认为1毫秒)。
|
||||
func (c *Conn) SetRecvBufferWait(d time.Duration) {
|
||||
c.recvBufferWait = d
|
||||
}
|
||||
|
||||
func (c *Conn) LocalAddr() net.Addr {
|
||||
return c.conn.LocalAddr()
|
||||
}
|
||||
|
||||
func (c *Conn) RemoteAddr() net.Addr {
|
||||
return c.conn.RemoteAddr()
|
||||
}
|
||||
|
||||
// 判断是否是超时错误
|
||||
func isTimeout(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
return strings.Contains(err.Error(), "timeout")
|
||||
}
|
||||
@ -50,13 +50,13 @@ func Send(addr string, data []byte, retry...Retry) error {
|
||||
}
|
||||
|
||||
// (面向短链接)发送数据并等待接收返回数据
|
||||
func SendReceive(addr string, data []byte, receive int, retry...Retry) ([]byte, error) {
|
||||
func SendRecv(addr string, data []byte, receive int, retry...Retry) ([]byte, error) {
|
||||
conn, err := NewConn(addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer conn.Close()
|
||||
return conn.SendReceive(data, receive, retry...)
|
||||
return conn.SendRecv(data, receive, retry...)
|
||||
}
|
||||
|
||||
// (面向短链接)带超时时间的数据发送
|
||||
@ -70,11 +70,11 @@ func SendWithTimeout(addr string, data []byte, timeout time.Duration, retry...Re
|
||||
}
|
||||
|
||||
// (面向短链接)发送数据并等待接收返回数据(带返回超时等待时间)
|
||||
func SendReceiveWithTimeout(addr string, data []byte, receive int, timeout time.Duration, retry...Retry) ([]byte, error) {
|
||||
func SendRecvWithTimeout(addr string, data []byte, receive int, timeout time.Duration, retry...Retry) ([]byte, error) {
|
||||
conn, err := NewConn(addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer conn.Close()
|
||||
return conn.SendReceiveWithTimeout(data, receive, timeout, retry...)
|
||||
return conn.SendRecvWithTimeout(data, receive, timeout, retry...)
|
||||
}
|
||||
|
||||
@ -7,7 +7,6 @@
|
||||
package gtcp
|
||||
|
||||
import (
|
||||
"time"
|
||||
"gitee.com/johng/gf/g/container/gmap"
|
||||
"gitee.com/johng/gf/g/container/gpool"
|
||||
)
|
||||
@ -92,44 +91,12 @@ func (c *PoolConn) Send(data []byte, retry...Retry) error {
|
||||
}
|
||||
|
||||
// 接收数据
|
||||
func (c *PoolConn) Receive(length int, retry...Retry) ([]byte, error) {
|
||||
data, err := c.Conn.Receive(length, retry...)
|
||||
func (c *PoolConn) Recv(length int, retry...Retry) ([]byte, error) {
|
||||
data, err := c.Conn.Recv(length, retry...)
|
||||
if err != nil {
|
||||
c.status = gCONN_STATUS_ERROR
|
||||
} else {
|
||||
c.status = gCONN_STATUS_ACTIVE
|
||||
}
|
||||
return data, err
|
||||
}
|
||||
|
||||
// 带超时时间的数据获取
|
||||
func (c *PoolConn) ReceiveWithTimeout(length int, timeout time.Duration, retry...Retry) ([]byte, error) {
|
||||
c.SetReadDeadline(time.Now().Add(timeout))
|
||||
defer c.SetReadDeadline(time.Time{})
|
||||
return c.Receive(length, retry...)
|
||||
}
|
||||
|
||||
// 带超时时间的数据发送
|
||||
func (c *PoolConn) SendWithTimeout(data []byte, timeout time.Duration, retry...Retry) error {
|
||||
c.SetWriteDeadline(time.Now().Add(timeout))
|
||||
defer c.SetWriteDeadline(time.Time{})
|
||||
return c.Send(data, retry...)
|
||||
}
|
||||
|
||||
// 发送数据并等待接收返回数据
|
||||
func (c *PoolConn) SendReceive(data []byte, receive int, retry...Retry) ([]byte, error) {
|
||||
if err := c.Send(data, retry...); err == nil {
|
||||
return c.Receive(receive, retry...)
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// 发送数据并等待接收返回数据(带返回超时等待时间)
|
||||
func (c *PoolConn) SendReceiveWithTimeout(data []byte, receive int, timeout time.Duration, retry...Retry) ([]byte, error) {
|
||||
if err := c.Send(data, retry...); err == nil {
|
||||
return c.ReceiveWithTimeout(receive, timeout, retry...)
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
@ -10,17 +10,22 @@ import (
|
||||
"net"
|
||||
"time"
|
||||
"io"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// 封装的链接对象
|
||||
type Conn struct {
|
||||
*net.UDPConn
|
||||
raddr *net.UDPAddr
|
||||
conn *net.UDPConn // 底层链接对象
|
||||
raddr *net.UDPAddr // 远程地址
|
||||
recvDeadline time.Time // 读取超时时间
|
||||
sendDeadline time.Time // 写入超时时间
|
||||
recvBufferWait time.Duration // 读取全部缓冲区数据时,读取完毕后的写入等待间隔
|
||||
}
|
||||
|
||||
const (
|
||||
gDEFAULT_RETRY_INTERVAL = 100 // (毫秒)默认重试时间间隔
|
||||
gDEFAULT_READ_BUFFER_SIZE = 1024 // 默认数据读取缓冲区大小
|
||||
gRECV_ALL_WAIT_TIMEOUT = time.Millisecond // 读取全部缓冲数据时,没有缓冲数据时的等待间隔
|
||||
)
|
||||
|
||||
type Retry struct {
|
||||
@ -40,7 +45,10 @@ func NewConn(raddr string, laddr...string) (*Conn, error) {
|
||||
// 将*net.UDPConn对象转换为*Conn对象
|
||||
func NewConnByNetConn(udp *net.UDPConn) *Conn {
|
||||
return &Conn {
|
||||
UDPConn : udp,
|
||||
conn : udp,
|
||||
recvDeadline : time.Time{},
|
||||
sendDeadline : time.Time{},
|
||||
recvBufferWait : gRECV_ALL_WAIT_TIMEOUT,
|
||||
}
|
||||
}
|
||||
|
||||
@ -51,9 +59,9 @@ func (c *Conn) Send(data []byte, retry...Retry) error {
|
||||
var length int
|
||||
for {
|
||||
if c.raddr != nil {
|
||||
size, err = c.WriteToUDP(data, c.raddr)
|
||||
size, err = c.conn.WriteToUDP(data, c.raddr)
|
||||
} else {
|
||||
size, err = c.Write(data)
|
||||
size, err = c.conn.Write(data)
|
||||
}
|
||||
if err != nil {
|
||||
// 链接已关闭
|
||||
@ -82,19 +90,25 @@ func (c *Conn) Send(data []byte, retry...Retry) error {
|
||||
}
|
||||
|
||||
// 接收数据
|
||||
func (c *Conn) Receive(length int, retry...Retry) ([]byte, error) {
|
||||
var err error // 读取错误
|
||||
var size int // 读取长度
|
||||
var index int // 已读取长度
|
||||
var buffer []byte // 读取缓冲区
|
||||
func (c *Conn) Recv(length int, retry...Retry) ([]byte, error) {
|
||||
var err error // 读取错误
|
||||
var size int // 读取长度
|
||||
var index int // 已读取长度
|
||||
var buffer []byte // 读取缓冲区
|
||||
var bufferWait bool // 是否设置读取的超时时间
|
||||
|
||||
if length > 0 {
|
||||
buffer = make([]byte, length)
|
||||
} else {
|
||||
buffer = make([]byte, gDEFAULT_READ_BUFFER_SIZE)
|
||||
}
|
||||
|
||||
for {
|
||||
size, c.raddr, err = c.ReadFromUDP(buffer[index:])
|
||||
if length <= 0 && index > 0 {
|
||||
bufferWait = true
|
||||
c.conn.SetReadDeadline(time.Now().Add(c.recvBufferWait))
|
||||
}
|
||||
size, c.raddr, err = c.conn.ReadFromUDP(buffer[index:])
|
||||
if size > 0 {
|
||||
index += size
|
||||
if length > 0 {
|
||||
@ -117,6 +131,12 @@ func (c *Conn) Receive(length int, retry...Retry) ([]byte, error) {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
// 判断数据是否全部读取完毕(由于超时机制的存在,获取的数据完整性不可靠)
|
||||
if bufferWait && isTimeout(err) {
|
||||
c.conn.SetReadDeadline(c.recvDeadline)
|
||||
err = nil
|
||||
break
|
||||
}
|
||||
if len(retry) > 0 {
|
||||
// 其他错误,重试之后仍不能成功
|
||||
if retry[0].Count == 0 {
|
||||
@ -136,10 +156,84 @@ func (c *Conn) Receive(length int, retry...Retry) ([]byte, error) {
|
||||
}
|
||||
|
||||
// 发送数据并等待接收返回数据
|
||||
func (c *Conn) SendReceive(data []byte, receive int, retry...Retry) ([]byte, error) {
|
||||
func (c *Conn) SendRecv(data []byte, receive int, retry...Retry) ([]byte, error) {
|
||||
if err := c.Send(data, retry...); err == nil {
|
||||
return c.Receive(receive, retry...)
|
||||
return c.Recv(receive, retry...)
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// 带超时时间的数据获取
|
||||
func (c *Conn) RecvWithTimeout(length int, timeout time.Duration, retry...Retry) ([]byte, error) {
|
||||
c.SetRecvDeadline(time.Now().Add(timeout))
|
||||
defer c.SetRecvDeadline(time.Time{})
|
||||
return c.Recv(length, retry...)
|
||||
}
|
||||
|
||||
// 带超时时间的数据发送
|
||||
func (c *Conn) SendWithTimeout(data []byte, timeout time.Duration, retry...Retry) error {
|
||||
c.SetSendDeadline(time.Now().Add(timeout))
|
||||
defer c.SetSendDeadline(time.Time{})
|
||||
return c.Send(data, retry...)
|
||||
}
|
||||
|
||||
// 发送数据并等待接收返回数据(带返回超时等待时间)
|
||||
func (c *Conn) SendRecvWithTimeout(data []byte, receive int, timeout time.Duration, retry...Retry) ([]byte, error) {
|
||||
if err := c.Send(data, retry...); err == nil {
|
||||
return c.RecvWithTimeout(receive, timeout, retry...)
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Conn) SetDeadline(t time.Time) error {
|
||||
err := c.conn.SetDeadline(t)
|
||||
if err == nil {
|
||||
c.recvDeadline = t
|
||||
c.sendDeadline = t
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *Conn) SetRecvDeadline(t time.Time) error {
|
||||
err := c.conn.SetReadDeadline(t)
|
||||
if err == nil {
|
||||
c.recvDeadline = t
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *Conn) SetSendDeadline(t time.Time) error {
|
||||
err := c.conn.SetWriteDeadline(t)
|
||||
if err == nil {
|
||||
c.sendDeadline = t
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// 读取全部缓冲区数据时,读取完毕后的写入等待间隔,如果超过该等待时间后仍无可读数据,那么读取操作返回。
|
||||
// 该时间间隔不能设置得太大,会影响Recv读取时长(默认为1毫秒)。
|
||||
func (c *Conn) SetRecvBufferWait(d time.Duration) {
|
||||
c.recvBufferWait = d
|
||||
}
|
||||
|
||||
func (c *Conn) LocalAddr() net.Addr {
|
||||
return c.conn.LocalAddr()
|
||||
}
|
||||
|
||||
func (c *Conn) RemoteAddr() net.Addr {
|
||||
return c.conn.RemoteAddr()
|
||||
}
|
||||
|
||||
func (c *Conn) Close() error {
|
||||
return c.conn.Close()
|
||||
}
|
||||
|
||||
// 判断是否是超时错误
|
||||
func isTimeout(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
return strings.Contains(err.Error(), "timeout")
|
||||
}
|
||||
@ -40,11 +40,11 @@ func Send(addr string, data []byte, retry...Retry) error {
|
||||
}
|
||||
|
||||
// (面向短链接)发送数据并等待接收返回数据
|
||||
func SendReceive(addr string, data []byte, receive int, retry...Retry) ([]byte, error) {
|
||||
func SendRecv(addr string, data []byte, receive int, retry...Retry) ([]byte, error) {
|
||||
conn, err := NewConn(addr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer conn.Close()
|
||||
return conn.SendReceive(data, receive, retry...)
|
||||
return conn.SendRecv(data, receive, retry...)
|
||||
}
|
||||
@ -38,7 +38,7 @@ func startTcpListening() {
|
||||
}
|
||||
var listen *net.TCPListener
|
||||
for i := gPROC_DEFAULT_TCP_PORT; ; i++ {
|
||||
addr, err := net.ResolveTCPAddr("tcp4", fmt.Sprintf("127.0.0.1:%d", i))
|
||||
addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("127.0.0.1:%d", i))
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
@ -65,7 +65,7 @@ func tcpServiceHandler(conn *gtcp.Conn) {
|
||||
var retry = gtcp.Retry{3, 10}
|
||||
for {
|
||||
var result []byte
|
||||
buffer, err := conn.Receive(-1, retry)
|
||||
buffer, err := conn.Recv(-1, retry)
|
||||
if len(buffer) > 0 {
|
||||
var msgs []*Msg
|
||||
for _, msg := range bufferToMsgs(buffer) {
|
||||
|
||||
@ -15,6 +15,7 @@ import (
|
||||
"errors"
|
||||
"time"
|
||||
"bytes"
|
||||
"gitee.com/johng/gf/g/os/glog"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -46,7 +47,7 @@ func Send(pid int, data []byte, group...string) error {
|
||||
for i := gPROC_COMM_FAILURE_RETRY_COUNT; i > 0; i-- {
|
||||
if conn, err = getConnByPid(pid); err == nil {
|
||||
defer conn.Close()
|
||||
buf, err = conn.SendReceiveWithTimeout(buffer, -1, gPROC_COMM_SEND_TIMEOUT*time.Millisecond)
|
||||
buf, err = conn.SendRecvWithTimeout(buffer, -1, gPROC_COMM_SEND_TIMEOUT*time.Millisecond)
|
||||
if len(buf) > 0 {
|
||||
// 如果有返回值,如果不是"ok",那么表示是错误信息
|
||||
if !bytes.EqualFold(buf, []byte("ok")) {
|
||||
@ -54,8 +55,11 @@ func Send(pid int, data []byte, group...string) error {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if err == nil {
|
||||
break
|
||||
} else {
|
||||
glog.Error(err)
|
||||
}
|
||||
}
|
||||
time.Sleep(gPROC_COMM_FAILURE_RETRY_TIMEOUT*time.Millisecond)
|
||||
|
||||
53
geg/net/gtcp/gtcp_conn.go
Normal file
53
geg/net/gtcp/gtcp_conn.go
Normal file
@ -0,0 +1,53 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"bytes"
|
||||
"gitee.com/johng/gf/g/net/gtcp"
|
||||
"gitee.com/johng/gf/g/util/gconv"
|
||||
)
|
||||
|
||||
func main() {
|
||||
conn, err := gtcp.NewConn("johng.cn:80")
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
if err := conn.Send([]byte("GET / HTTP/1.1\n\n")); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
|
||||
header := make([]byte, 0)
|
||||
content := make([]byte, 0)
|
||||
contentLength := 0
|
||||
for {
|
||||
data, err := conn.RecvLine()
|
||||
// header读取,解析文本长度
|
||||
if len(data) > 0 {
|
||||
array := bytes.Split(data, []byte(": "))
|
||||
// 获得页面内容长度
|
||||
if contentLength == 0 && len(array) == 2 && bytes.EqualFold([]byte("Content-Length"), array[0]) {
|
||||
contentLength = gconv.Int(array[1])
|
||||
}
|
||||
header = append(header, data...)
|
||||
header = append(header, '\n')
|
||||
}
|
||||
// header读取完毕,读取文本内容
|
||||
if contentLength > 0 && len(data) == 0 {
|
||||
content, _ = conn.Recv(contentLength)
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
fmt.Errorf("ERROR: %s\n", err.Error())
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if len(header) > 0 {
|
||||
fmt.Println(string(header))
|
||||
}
|
||||
if len(content) > 0 {
|
||||
fmt.Println(string(content))
|
||||
}
|
||||
}
|
||||
@ -9,7 +9,7 @@ func main() {
|
||||
gtcp.NewServer("127.0.0.1:8999", func(conn *gtcp.Conn) {
|
||||
defer conn.Close()
|
||||
for {
|
||||
data, err := conn.Receive(-1)
|
||||
data, err := conn.Recv(-1)
|
||||
if len(data) > 0 {
|
||||
if err := conn.Send(append([]byte("> "), data...)); err != nil {
|
||||
fmt.Println(err)
|
||||
|
||||
@ -1,14 +1,16 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"gitee.com/johng/gf/g/net/gtcp"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
func main() {
|
||||
data, err := gtcp.SendReceive("www.baidu.com:80", []byte("GET / HTTP/1.1\n\n"), -1)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
data, err := gtcp.SendRecv("www.baidu.com:80", []byte("GET / HTTP/1.1\n\n"), -1)
|
||||
if len(data) > 0 {
|
||||
fmt.Println(string(data))
|
||||
}
|
||||
if err != nil {
|
||||
fmt.Errorf("ERROR: %s\n", err.Error())
|
||||
}
|
||||
fmt.Println(string(data))
|
||||
}
|
||||
@ -13,7 +13,7 @@ func main() {
|
||||
go gtcp.NewServer("127.0.0.1:8999", func(conn *gtcp.Conn) {
|
||||
defer conn.Close()
|
||||
for {
|
||||
data, err := conn.Receive(-1)
|
||||
data, err := conn.Recv(-1)
|
||||
if len(data) > 0 {
|
||||
if err := conn.Send(append([]byte("> "), data...)); err != nil {
|
||||
fmt.Println(err)
|
||||
@ -30,7 +30,7 @@ func main() {
|
||||
// Client
|
||||
for {
|
||||
if conn, err := gtcp.NewPoolConn("127.0.0.1:8999"); err == nil {
|
||||
if b, err := conn.SendReceive([]byte(gtime.Datetime()), -1); err == nil {
|
||||
if b, err := conn.SendRecv([]byte(gtime.Datetime()), -1); err == nil {
|
||||
fmt.Println(string(b), conn.LocalAddr(), conn.RemoteAddr())
|
||||
} else {
|
||||
fmt.Println(err)
|
||||
|
||||
@ -13,7 +13,7 @@ func main() {
|
||||
go gtcp.NewServer("127.0.0.1:8999", func(conn *gtcp.Conn) {
|
||||
defer conn.Close()
|
||||
for {
|
||||
data, err := conn.Receive(-1)
|
||||
data, err := conn.Recv(-1)
|
||||
if len(data) > 0 {
|
||||
if err := conn.Send(append([]byte("> "), data...)); err != nil {
|
||||
fmt.Println(err)
|
||||
@ -31,7 +31,7 @@ func main() {
|
||||
// Client
|
||||
for {
|
||||
if conn, err := gtcp.NewPoolConn("127.0.0.1:8999"); err == nil {
|
||||
if b, err := conn.SendReceive([]byte(gtime.Datetime()), -1); err == nil {
|
||||
if b, err := conn.SendRecv([]byte(gtime.Datetime()), -1); err == nil {
|
||||
fmt.Println(string(b), conn.LocalAddr(), conn.RemoteAddr())
|
||||
} else {
|
||||
fmt.Println(err)
|
||||
|
||||
@ -13,7 +13,7 @@ func main() {
|
||||
go gtcp.NewServer("127.0.0.1:8999", func(conn *gtcp.Conn) {
|
||||
defer conn.Close()
|
||||
for {
|
||||
data, err := conn.Receive(-1)
|
||||
data, err := conn.Recv(-1)
|
||||
if len(data) > 0 {
|
||||
fmt.Println(string(data))
|
||||
}
|
||||
@ -30,6 +30,7 @@ func main() {
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer conn.Close()
|
||||
for i := 0; i < 10000; i++ {
|
||||
if err := conn.Send([]byte(gconv.String(i))); err != nil {
|
||||
glog.Error(err)
|
||||
|
||||
@ -13,7 +13,7 @@ func main() {
|
||||
go gtcp.NewServer("127.0.0.1:8999", func(conn *gtcp.Conn) {
|
||||
defer conn.Close()
|
||||
for {
|
||||
data, err := conn.Receive(-1)
|
||||
data, err := conn.Recv(-1)
|
||||
if len(data) > 0 {
|
||||
if err := conn.Send(append([]byte("> "), data...)); err != nil {
|
||||
fmt.Println(err)
|
||||
@ -30,7 +30,7 @@ func main() {
|
||||
// Client
|
||||
for {
|
||||
if conn, err := gtcp.NewConn("127.0.0.1:8999"); err == nil {
|
||||
if b, err := conn.SendReceive([]byte(gtime.Datetime()), -1); err == nil {
|
||||
if b, err := conn.SendRecv([]byte(gtime.Datetime()), -1); err == nil {
|
||||
fmt.Println(string(b), conn.LocalAddr(), conn.RemoteAddr())
|
||||
} else {
|
||||
fmt.Println(err)
|
||||
|
||||
@ -13,7 +13,7 @@ func main() {
|
||||
go gudp.NewServer("127.0.0.1:8999", func(conn *gudp.Conn) {
|
||||
defer conn.Close()
|
||||
for {
|
||||
if data, _ := conn.Receive(-1); len(data) > 0 {
|
||||
if data, _ := conn.Recv(-1); len(data) > 0 {
|
||||
if err := conn.Send(append([]byte("> "), data...)); err != nil {
|
||||
glog.Error(err)
|
||||
}
|
||||
@ -26,7 +26,7 @@ func main() {
|
||||
// Client
|
||||
for {
|
||||
if conn, err := gudp.NewConn("127.0.0.1:8999"); err == nil {
|
||||
if b, err := conn.SendReceive([]byte(gtime.Datetime()), -1); err == nil {
|
||||
if b, err := conn.SendRecv([]byte(gtime.Datetime()), -1); err == nil {
|
||||
fmt.Println(string(b), conn.LocalAddr(), conn.RemoteAddr())
|
||||
} else {
|
||||
glog.Error(err)
|
||||
|
||||
@ -2,17 +2,11 @@ package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
"errors"
|
||||
"gitee.com/johng/gf/g/net/ghttp"
|
||||
)
|
||||
|
||||
func test() (err error) {
|
||||
defer func() {
|
||||
fmt.Println(err)
|
||||
}()
|
||||
time.Sleep(time.Second)
|
||||
return errors.New("111")
|
||||
}
|
||||
|
||||
func main() {
|
||||
test()
|
||||
r, _ := ghttp.Get("http://johng.cn")
|
||||
fmt.Println(string(r.ReadAll()))
|
||||
}
|
||||
@ -1,25 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"gitee.com/johng/gf/g/os/gpm"
|
||||
"os"
|
||||
"time"
|
||||
"gitee.com/johng/gf/g/os/glog"
|
||||
)
|
||||
|
||||
func main() {
|
||||
m := gproc.New()
|
||||
env := os.Environ()
|
||||
env = append(env, "child=1")
|
||||
p := m.NewProcess(os.Args[0], os.Args, env)
|
||||
if os.Getenv("child") != "" {
|
||||
time.Sleep(3*time.Second)
|
||||
glog.Error("error")
|
||||
} else {
|
||||
pid, err := p.Run()
|
||||
fmt.Println(pid)
|
||||
fmt.Println(err)
|
||||
fmt.Println(p.Wait())
|
||||
}
|
||||
}
|
||||
@ -2,36 +2,21 @@ package main
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"gitee.com/johng/gf/g/container/gmap"
|
||||
"strings"
|
||||
)
|
||||
|
||||
var s = "arwerwerwerwerwerwrwerwerwerwersefsdgsdfgsddsfgsdfsd timeout"
|
||||
|
||||
|
||||
|
||||
func BenchmarkSet1(b *testing.B) {
|
||||
a := gmap.NewStringStringMap()
|
||||
m1 := make(map[string]string)
|
||||
m2 := make(map[string]string)
|
||||
for i := 0; i < 1000000; i ++ {
|
||||
m1[string(i)] = string(i)
|
||||
func Benchmark_Contains(b *testing.B) {
|
||||
for i := 0; i < b.N; i ++ {
|
||||
strings.Contains(s, "timeout")
|
||||
}
|
||||
for i := 0; i < 1000000; i ++ {
|
||||
m2[string(i)] = string(i) + "_2"
|
||||
}
|
||||
a.BatchSet(m1)
|
||||
a.BatchSet(m2)
|
||||
}
|
||||
|
||||
func BenchmarkSet2 (b *testing.B) {
|
||||
a := gmap.NewStringStringMap()
|
||||
m1 := make(map[string]string)
|
||||
m2 := make(map[string]string)
|
||||
for i := 0; i < 1000000; i ++ {
|
||||
m1[string(i)] = string(i)
|
||||
func Benchmark_EqualFold(b *testing.B) {
|
||||
for i := 0; i < b.N; i ++ {
|
||||
strings.EqualFold(s[len(s) - 7:], "timeout")
|
||||
}
|
||||
for i := 0; i < 1000000; i ++ {
|
||||
m2[string(i)] = string(i) + "_2"
|
||||
}
|
||||
a.BatchSet2(m1)
|
||||
a.BatchSet2(m2)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user