diff --git a/g/container/gpool/gpool.go b/g/container/gpool/gpool.go index 8fde9264b..0d9959830 100644 --- a/g/container/gpool/gpool.go +++ b/g/container/gpool/gpool.go @@ -18,9 +18,9 @@ import ( // 对象池 type Pool struct { list *glist.List // 可用/闲置的文件指针链表 - idle int64 // (毫秒)闲置最大时间,超过该时间则被系统回收 closed *gtype.Bool // 连接池是否已关闭 - newFunc func()(interface{}, error) // 创建对象的方法定义 + Expire int64 // (毫秒)闲置最大时间,超过该时间则被系统回收 + NewFunc func()(interface{}, error) // 创建对象的方法定义 } // 对象池数据项 @@ -33,11 +33,11 @@ type poolItem struct { func New(expire int, newFunc...func() (interface{}, error)) *Pool { r := &Pool { list : glist.New(), - idle : int64(expire), closed : gtype.NewBool(), + Expire : int64(expire), } if len(newFunc) > 0 { - r.newFunc = newFunc[0] + r.NewFunc = newFunc[0] } go r.expireCheckingLoop() return r @@ -46,7 +46,7 @@ func New(expire int, newFunc...func() (interface{}, error)) *Pool { // 放一个临时对象到池中 func (p *Pool) Put(item interface{}) { p.list.PushBack(&poolItem{ - expire : gtime.Millisecond() + p.idle, + expire : gtime.Millisecond() + p.Expire, value : item, }) } @@ -63,8 +63,8 @@ func (p *Pool) Get() (interface{}, error) { break } } - if p.newFunc != nil { - return p.newFunc() + if p.NewFunc != nil { + return p.NewFunc() } return nil, errors.New("pool is empty") } diff --git a/g/net/gtcp/tcp_func.go b/g/net/gtcp/tcp_func.go index 0ef6e93e7..e315aee86 100644 --- a/g/net/gtcp/tcp_func.go +++ b/g/net/gtcp/tcp_func.go @@ -140,7 +140,7 @@ func SendWithTimeout(conn net.Conn, data []byte, timeout time.Duration, retry... // 发送数据并等待接收返回数据 func SendReceive(conn net.Conn, data []byte, retry...Retry) ([]byte, error) { if err := Send(conn, data, retry...); err == nil { - return Receive(conn) + return Receive(conn, retry...) } else { return nil, err } @@ -149,7 +149,7 @@ func SendReceive(conn net.Conn, data []byte, retry...Retry) ([]byte, error) { // 发送数据并等待接收返回数据(带返回超时等待时间) func SendReceiveWithTimeout(conn net.Conn, data []byte, timeout time.Duration, retry...Retry) ([]byte, error) { if err := Send(conn, data, retry...); err == nil { - return ReceiveWithTimeout(conn, timeout) + return ReceiveWithTimeout(conn, timeout, retry...) } else { return nil, err } diff --git a/g/net/gtcp/tcp_pool.go b/g/net/gtcp/tcp_pool.go index 46968e603..935ed8c61 100644 --- a/g/net/gtcp/tcp_pool.go +++ b/g/net/gtcp/tcp_pool.go @@ -15,13 +15,17 @@ import ( // 封装的链接对象 type Conn struct { - net.Conn // 继承底层链接接口对象 - pool *gpool.Pool // 对应的链接池对象 - err error // 是否有错误产生 + net.Conn // 继承底层链接接口对象 + pool *gpool.Pool // 对应的链接池对象 + status int // 当前对象的状态,主要用于失败重连判断 } const ( - gDEFAULT_POOL_EXPIRE = 3000 // (毫秒)默认链接对象过期时间 + gDEFAULT_POOL_EXPIRE = 60000 // (毫秒)默认链接对象过期时间 + gCONN_STATUS_UNKNOWN = 0 // 未知,表示未经过连通性操作; + gCONN_STATUS_ACTIVE = 1 // 正常,表示已经经过连通性操作 + gCONN_STATUS_ERROR = 2 // 错误,表示该接口操作产生了错误,不应当被循环使用了 + ) var ( @@ -40,8 +44,8 @@ func NewConn(addr string, timeout...int) (*Conn, error) { pool = gpool.New(gDEFAULT_POOL_EXPIRE, func() (interface{}, error) { if conn, err := NewNetConn(addr, timeout...); err == nil { return &Conn { - Conn : conn, - pool : pool, + Conn : conn, + pool : pool, }, nil } else { return nil, err @@ -70,7 +74,8 @@ func NewConnByNetConn(conn net.Conn) *Conn { // 覆盖底层接口对象的Close方法 func (c *Conn) Close() error { - if c.pool != nil && c.err == nil { + if c.pool != nil && c.status == gCONN_STATUS_ACTIVE { + c.status = gCONN_STATUS_UNKNOWN c.pool.Put(c) } else { c.Conn.Close() @@ -80,9 +85,19 @@ func (c *Conn) Close() error { // 发送数据 func (c *Conn) Send(data []byte, retry...Retry) error { - err := Send(c, data, retry...) + var err error + if err = Send(c, data, retry...); err != nil && c.status == gCONN_STATUS_UNKNOWN { + if v, e := c.pool.NewFunc(); e == nil { + c.Conn = v.(net.Conn) + err = Send(c, data, retry...) + } else { + err = e + } + } if err != nil { - c.err = err + c.status = gCONN_STATUS_ERROR + } else { + c.status = gCONN_STATUS_ACTIVE } return err } @@ -90,44 +105,48 @@ func (c *Conn) Send(data []byte, retry...Retry) error { // 接收数据 func (c *Conn) Receive(retry...Retry) ([]byte, error) { data, err := Receive(c, retry...) + if err != nil && len(data) == 0 && c.status == gCONN_STATUS_UNKNOWN { + if v, e := c.pool.NewFunc(); e == nil { + c.Conn = v.(net.Conn) + data, err = Receive(c, retry...) + } else { + err = e + } + } if err != nil { - c.err = err + c.status = gCONN_STATUS_ERROR + } else { + c.status = gCONN_STATUS_ACTIVE } return data, err } // 带超时时间的数据获取 func (c *Conn) ReceiveWithTimeout(timeout time.Duration, retry...Retry) ([]byte, error) { - data, err := ReceiveWithTimeout(c, timeout, retry...) - if err != nil { - c.err = err - } - return data, err + c.SetReadDeadline(time.Now().Add(timeout)) + return c.Receive(retry...) } // 带超时时间的数据发送 func (c *Conn) SendWithTimeout(data []byte, timeout time.Duration, retry...Retry) error { - err := SendWithTimeout(c, data, timeout, retry...) - if err != nil { - c.err = err - } - return err + c.SetWriteDeadline(time.Now().Add(timeout)) + return c.Send(data, retry...) } // 发送数据并等待接收返回数据 func (c *Conn) SendReceive(data []byte, retry...Retry) ([]byte, error) { - data, err := SendReceive(c, data, retry...) - if err != nil { - c.err = err + if err := c.Send(data, retry...); err == nil { + return c.Receive(retry...) + } else { + return nil, err } - return data, err } // 发送数据并等待接收返回数据(带返回超时等待时间) func (c *Conn) SendReceiveWithTimeout(data []byte, timeout time.Duration, retry...Retry) ([]byte, error) { - data, err := SendReceiveWithTimeout(c, data, timeout, retry...) - if err != nil { - c.err = err + if err := c.Send(data, retry...); err == nil { + return c.ReceiveWithTimeout(timeout, retry...) + } else { + return nil, err } - return data, err } \ No newline at end of file diff --git a/g/os/gproc/gproc_comm_send.go b/g/os/gproc/gproc_comm_send.go index 7bb2a8360..324635af9 100644 --- a/g/os/gproc/gproc_comm_send.go +++ b/g/os/gproc/gproc_comm_send.go @@ -68,7 +68,7 @@ func Send(pid int, data []byte, group...string) error { func getConnByPid(pid int) (*gtcp.Conn, error) { port := getPortByPid(pid) if port > 0 { - if conn, err := gtcp.NewConn("127.0.0.1", port); err == nil { + if conn, err := gtcp.NewConn(fmt.Sprintf("127.0.0.1:%d", port)); err == nil { return conn, nil } else { return nil, err diff --git a/geg/net/gtcp/gtcp_conn_client.go b/geg/net/gtcp/gtcp_conn_client.go new file mode 100644 index 000000000..622853ae3 --- /dev/null +++ b/geg/net/gtcp/gtcp_conn_client.go @@ -0,0 +1,21 @@ +package main + +import ( + "gitee.com/johng/gf/g/net/gtcp" + "gitee.com/johng/gf/g/util/gconv" + "gitee.com/johng/gf/g/os/glog" + "time" +) + +func main() { + conn, err := gtcp.NewConn("127.0.0.1:8999") + if err != nil { + glog.Fatal(err) + } + for i := 0; i < 10000; i++ { + if err := conn.Send([]byte(gconv.String(i))); err != nil { + glog.Error(err) + } + time.Sleep(time.Second) + } +} \ No newline at end of file diff --git a/geg/net/gtcp/gtcp_conn_server.go b/geg/net/gtcp/gtcp_conn_server.go new file mode 100644 index 000000000..01b4dcbe5 --- /dev/null +++ b/geg/net/gtcp/gtcp_conn_server.go @@ -0,0 +1,19 @@ +package main + +import ( + "net" + "gitee.com/johng/gf/g/os/glog" + "gitee.com/johng/gf/g/net/gtcp" +) + +func main() { + gtcp.NewServer(":8999", func(conn net.Conn) { + c := gtcp.NewConnByNetConn(conn) + defer c.Close() + for { + if data, err := c.Receive(); err == nil { + glog.Println(string(data)) + } + } + }).Run() +} \ No newline at end of file diff --git a/geg/net/gtcp/gtcp_pool1.go b/geg/net/gtcp/gtcp_pool1.go new file mode 100644 index 000000000..ea1fd865e --- /dev/null +++ b/geg/net/gtcp/gtcp_pool1.go @@ -0,0 +1,41 @@ +package main + +import ( + "fmt" + "time" + "net" + "gitee.com/johng/gf/g/net/gtcp" + "gitee.com/johng/gf/g/os/glog" + "gitee.com/johng/gf/g/os/gtime" +) + +func main() { + // Server + go gtcp.NewServer("127.0.0.1:8999", func(conn net.Conn) { + c := gtcp.NewConnByNetConn(conn) + defer c.Close() + for { + if data, _ := c.Receive(); len(data) > 0 { + c.Send(append([]byte("> "), data...)) + } + //return + } + }).Run() + + time.Sleep(time.Second) + + // Client + for { + if conn, err := gtcp.NewConn("127.0.0.1:8999"); err == nil { + if b, err := conn.SendReceive([]byte(gtime.Datetime())); err == nil { + fmt.Println(string(b), conn.LocalAddr(), conn.RemoteAddr()) + } else { + fmt.Println(err) + } + conn.Close() + } else { + glog.Error(err) + } + time.Sleep(time.Second) + } +} \ No newline at end of file diff --git a/geg/net/gtcp/tcp_pool.go b/geg/net/gtcp/gtcp_pool2.go similarity index 63% rename from geg/net/gtcp/tcp_pool.go rename to geg/net/gtcp/gtcp_pool2.go index db19c5635..7c90f43d4 100644 --- a/geg/net/gtcp/tcp_pool.go +++ b/geg/net/gtcp/gtcp_pool2.go @@ -1,35 +1,38 @@ package main import ( - "gitee.com/johng/gf/g/net/gtcp" + "fmt" "time" "net" - "gitee.com/johng/gf/g/os/gtime" - "fmt" + "gitee.com/johng/gf/g/net/gtcp" "gitee.com/johng/gf/g/os/glog" + "gitee.com/johng/gf/g/os/gtime" ) func main() { + // Server go gtcp.NewServer("127.0.0.1:8999", func(conn net.Conn) { + c := gtcp.NewConnByNetConn(conn) + defer c.Close() for { - buffer := make([]byte, 1024) - if length, err := conn.Read(buffer); err == nil { - conn.Write(append([]byte("> "), buffer[0 : length]...)) + if data, _ := c.Receive(); len(data) > 0 { + c.Send(append([]byte("> "), data...)) } - //conn.Close() + return } }).Run() time.Sleep(time.Second) + // Client for { - if conn, err := gtcp.NewConn("127.0.0.1", 8999); err == nil { + if conn, err := gtcp.NewConn("127.0.0.1:8999"); err == nil { if b, err := conn.SendReceive([]byte(gtime.Datetime())); err == nil { fmt.Println(string(b), conn.LocalAddr(), conn.RemoteAddr()) - conn.Close() } else { - glog.Error(err) + fmt.Println(err) } + conn.Close() } else { glog.Error(err) } diff --git a/geg/net/gtcp/tcp_server.go b/geg/net/gtcp/gtcp_server.go similarity index 100% rename from geg/net/gtcp/tcp_server.go rename to geg/net/gtcp/gtcp_server.go diff --git a/geg/net/gtcp/tcp_conn.go b/geg/net/gtcp/tcp_conn.go deleted file mode 100644 index b86431686..000000000 --- a/geg/net/gtcp/tcp_conn.go +++ /dev/null @@ -1,10 +0,0 @@ -package main - -import ( - "net" - "gitee.com/johng/gf/g/net/gtcp" -) - -func main() { - gtcp.NewNetConn("127.0.0.1", ) -} \ No newline at end of file diff --git a/geg/net/gtcp/tcp_server_client.go b/geg/net/gtcp/tcp_server_client.go new file mode 100644 index 000000000..52d2cd028 --- /dev/null +++ b/geg/net/gtcp/tcp_server_client.go @@ -0,0 +1,60 @@ +package main + +import ( + "fmt" + "time" + "net" +) + +func main() { + addr := "127.0.0.1:8999" + + // Server + go func() { + tcpaddr, err := net.ResolveTCPAddr("tcp4", addr) + if err != nil { + panic(err) + } + listen, err := net.ListenTCP("tcp", tcpaddr) + if err != nil { + panic(err) + } + for { + if conn, err := listen.Accept(); err != nil { + panic(err) + } else if conn != nil { + go func(conn net.Conn) { + buffer := make([]byte, 1024) + n, err := conn.Read(buffer) + if err != nil { + fmt.Println(err) + } else { + fmt.Println(">", string(buffer[0 : n])) + } + conn.Close() + }(conn) + } + } + }() + + time.Sleep(time.Second) + + // Client + if conn, err := net.Dial("tcp", addr); err == nil { + for i := 0; i < 2; i++ { + _, err := conn.Write([]byte("hello")) + if err != nil { + fmt.Println(err) + conn.Close() + break + } else { + fmt.Println("ok") + } + // sleep 10 seconds and re-send + time.Sleep(10*time.Second) + } + } else { + panic(err) + } + +} \ No newline at end of file