From 61a0a163b770be8db6a200054f5d2a01fd360bb9 Mon Sep 17 00:00:00 2001 From: John Date: Wed, 18 Jul 2018 11:43:30 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E6=88=90gtcp/gudp=E5=8C=85=E7=9A=84?= =?UTF-8?q?=E6=94=B9=E8=BF=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- TODO | 3 + g/net/gtcp/{tcp.go => gtcp.go} | 0 g/net/gtcp/gtcp_conn.go | 159 +++++++++++++++++++ g/net/gtcp/gtcp_func.go | 80 ++++++++++ g/net/gtcp/{tcp_pool.go => gtcp_pool.go} | 65 +++----- g/net/gtcp/{tcp_server.go => gtcp_server.go} | 2 +- g/net/gtcp/tcp_func.go | 156 ------------------ g/net/gudp/gudp.go | 11 ++ g/net/gudp/gudp_conn.go | 145 +++++++++++++++++ g/net/gudp/gudp_func.go | 50 ++++++ g/net/gudp/{udp.go => gudp_server.go} | 35 +++- g/net/gudp/udp_server.go | 42 ----- g/os/gproc/gproc_comm_receive.go | 2 +- g/os/gproc/gproc_comm_send.go | 2 +- geg/net/gtcp/gtcp_conn_client.go | 21 --- geg/net/gtcp/gtcp_conn_server.go | 19 --- geg/net/gtcp/gtcp_pool1.go | 12 +- geg/net/gtcp/gtcp_pool2.go | 12 +- geg/net/gtcp/gtcp_server.go | 17 -- geg/net/gtcp/gtcp_server_client.go | 43 +++++ geg/net/gtcp/tcp_server_client.go | 102 +++++++----- geg/net/gudp/gudp_server_client.go | 40 +++++ geg/net/{udp.go => gudp/udp_client.go} | 14 +- geg/net/gudp/udp_server.go | 29 ++++ geg/net/udp_server.go | 16 -- geg/other/test.go | 72 ++++++++- geg/util/gutil/map_to_struct.go | 24 +++ 27 files changed, 790 insertions(+), 383 deletions(-) rename g/net/gtcp/{tcp.go => gtcp.go} (100%) create mode 100644 g/net/gtcp/gtcp_conn.go create mode 100644 g/net/gtcp/gtcp_func.go rename g/net/gtcp/{tcp_pool.go => gtcp_pool.go} (61%) rename g/net/gtcp/{tcp_server.go => gtcp_server.go} (94%) delete mode 100644 g/net/gtcp/tcp_func.go create mode 100644 g/net/gudp/gudp.go create mode 100644 g/net/gudp/gudp_conn.go create mode 100644 g/net/gudp/gudp_func.go rename g/net/gudp/{udp.go => gudp_server.go} (61%) delete mode 100644 g/net/gudp/udp_server.go delete mode 100644 geg/net/gtcp/gtcp_conn_client.go delete mode 100644 geg/net/gtcp/gtcp_conn_server.go delete mode 100644 geg/net/gtcp/gtcp_server.go create mode 100644 geg/net/gtcp/gtcp_server_client.go create mode 100644 geg/net/gudp/gudp_server_client.go rename geg/net/{udp.go => gudp/udp_client.go} (60%) create mode 100644 geg/net/gudp/udp_server.go delete mode 100644 geg/net/udp_server.go create mode 100644 geg/util/gutil/map_to_struct.go diff --git a/TODO b/TODO index 1e37db204..2edb18ee6 100644 --- a/TODO +++ b/TODO @@ -13,6 +13,9 @@ gtime增加对时区转换的封装,并简化失去转换时对类似+80500时 改进gf-orm的where查询功能,参考thinkphp 里的where查询语法; gpage分页增加对自定义后缀的支持,如:2.html, 2.php等等; gvalid包增加tag与校验规则绑定的支持特性; +ghttp获取参数支持直接转struct功能; +map转struct增加对tag的支持; + DONE: 1. gconv完善针对不同类型的判断,例如:尽量减少sprintf("%v", xxx)来执行string类型的转换; diff --git a/g/net/gtcp/tcp.go b/g/net/gtcp/gtcp.go similarity index 100% rename from g/net/gtcp/tcp.go rename to g/net/gtcp/gtcp.go diff --git a/g/net/gtcp/gtcp_conn.go b/g/net/gtcp/gtcp_conn.go new file mode 100644 index 000000000..27a287905 --- /dev/null +++ b/g/net/gtcp/gtcp_conn.go @@ -0,0 +1,159 @@ +// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +package gtcp + +import ( + "net" + "time" + "io" + "bufio" +) + +// 封装的链接对象 +type Conn struct { + net.Conn + reader *bufio.Reader // 当前链接的缓冲读取对象 +} + +// 创建TCP链接 +func NewConn(addr string, timeout...int) (*Conn, error) { + if conn, err := NewNetConn(addr, timeout...); err == nil { + return &Conn { + Conn : conn, + }, nil + } else { + return nil, err + } +} + +// 将net.Conn接口对象转换为*gtcp.Conn对象(注意递归影响,因为*gtcp.Conn本身也实现了net.Conn接口) +func NewConnByNetConn(conn net.Conn) *Conn { + return &Conn { Conn: conn } +} + +// 发送数据 +func (c *Conn) Send(data []byte, retry...Retry) error { + length := 0 + for { + n, err := c.Write(data) + if err != nil { + // 链接已关闭 + if err == io.EOF { + return err + } + // 其他错误,重试之后仍不能成功 + if len(retry) == 0 || retry[0].Count == 0 { + return err + } + if len(retry) > 0 { + retry[0].Count-- + if retry[0].Interval == 0 { + retry[0].Interval = gDEFAULT_RETRY_INTERVAL + } + time.Sleep(time.Duration(retry[0].Interval) * time.Millisecond) + } + } else { + length += n + if length == len(data) { + return nil + } + } + } +} + +// 获取数据,指定读取的数据长度(length < 1表示获取所有可读数据),以及重试策略(retry) +// 需要注意: +// 1、往往在socket通信中需要指定固定的数据结构,并在设定对应的长度字段,并在读取数据时便于区分包大小; +// 2、当length < 1时表示获取缓冲区所有的数据,但是可能会引起包解析问题(可能出现非完整的包情况),因此需要解析端注意解析策略; +func (c *Conn) Receive(length int, retry...Retry) ([]byte, error) { + var err error // 读取错误 + var size int // 读取长度 + var index int // 已读取长度 + var buffer []byte // 读取缓冲区 + + if c.reader == nil { + c.reader = bufio.NewReader(c) + } + if length > 0 { + buffer = make([]byte, length) + } else { + buffer = make([]byte, gDEFAULT_READ_BUFFER_SIZE) + } + for { + size, err = c.reader.Read(buffer[index:]) + if size > 0 { + index += size + if length > 0 { + // 如果指定了读取大小,那么必须读取到指定长度才返回 + if index == length { + break + } + } else { + // 否则读取所有缓冲区数据,直到没有可读数据为止 + if c.reader.Buffered() < 1 { + break + } + // 如果长度超过了自定义的读取缓冲区,那么自动增长 + if index >= gDEFAULT_READ_BUFFER_SIZE { + buffer = append(buffer, make([]byte, gDEFAULT_READ_BUFFER_SIZE)...) + } + } + } + if err != nil { + // 链接已关闭 + if err == io.EOF { + break + } + if len(retry) > 0 { + // 其他错误,重试之后仍不能成功 + if retry[0].Count == 0 { + break + } + retry[0].Count-- + if retry[0].Interval == 0 { + retry[0].Interval = gDEFAULT_RETRY_INTERVAL + } + time.Sleep(time.Duration(retry[0].Interval) * time.Millisecond) + continue + } + break + } + } + return buffer[:index], err +} + +// 带超时时间的数据获取 +func (c *Conn) ReceiveWithTimeout(length int, timeout time.Duration, retry...Retry) ([]byte, error) { + c.SetReadDeadline(time.Now().Add(timeout)) + defer c.SetReadDeadline(time.Time{}) + return c.Receive(length, retry...) +} + +// 带超时时间的数据发送 +func (c *Conn) SendWithTimeout(data []byte, timeout time.Duration, retry...Retry) error { + c.SetWriteDeadline(time.Now().Add(timeout)) + defer c.SetWriteDeadline(time.Time{}) + return c.Send(data, retry...) +} + +// 发送数据并等待接收返回数据 +func (c *Conn) SendReceive(data []byte, receive int, retry...Retry) ([]byte, error) { + if err := c.Send(data, retry...); err == nil { + return c.Receive(receive, retry...) + } else { + return nil, err + } +} + +// 发送数据并等待接收返回数据(带返回超时等待时间) +func (c *Conn) SendReceiveWithTimeout(data []byte, receive int, timeout time.Duration, retry...Retry) ([]byte, error) { + if err := c.Send(data, retry...); err == nil { + return c.ReceiveWithTimeout(receive, timeout, retry...) + } else { + return nil, err + } +} \ No newline at end of file diff --git a/g/net/gtcp/gtcp_func.go b/g/net/gtcp/gtcp_func.go new file mode 100644 index 000000000..8f71eac13 --- /dev/null +++ b/g/net/gtcp/gtcp_func.go @@ -0,0 +1,80 @@ +// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +package gtcp + +import ( + "net" + "time" +) + +const ( + gDEFAULT_RETRY_INTERVAL = 100 // (毫秒)默认重试时间间隔 + gDEFAULT_READ_BUFFER_SIZE = 1024 // 默认数据读取缓冲区大小 +) + +type Retry struct { + Count int // 重试次数 + Interval int // 重试间隔(毫秒) +} + +// 常见的二进制数据校验方式,生成校验结果 +func Checksum(buffer []byte) uint32 { + var checksum uint32 + for _, b := range buffer { + checksum += uint32(b) + } + return checksum +} + +// 创建原生TCP链接, addr地址格式形如:127.0.0.1:80 +func NewNetConn(addr string, timeout...int) (net.Conn, error) { + if len(timeout) > 0 { + return net.DialTimeout("tcp", addr, time.Duration(timeout[0]) * time.Millisecond) + } else { + return net.Dial("tcp", addr) + } +} + +// (面向短链接)发送数据 +func Send(addr string, data []byte, retry...Retry) error { + conn, err := NewConn(addr) + if err != nil { + return err + } + defer conn.Close() + return conn.Send(data, retry...) +} + +// (面向短链接)发送数据并等待接收返回数据 +func SendReceive(addr string, data []byte, receive int, retry...Retry) ([]byte, error) { + conn, err := NewConn(addr) + if err != nil { + return nil, err + } + defer conn.Close() + return conn.SendReceive(data, receive, retry...) +} + +// (面向短链接)带超时时间的数据发送 +func SendWithTimeout(addr string, data []byte, timeout time.Duration, retry...Retry) error { + conn, err := NewConn(addr) + if err != nil { + return err + } + defer conn.Close() + return conn.SendWithTimeout(data, timeout, retry...) +} + +// (面向短链接)发送数据并等待接收返回数据(带返回超时等待时间) +func SendReceiveWithTimeout(addr string, data []byte, receive int, timeout time.Duration, retry...Retry) ([]byte, error) { + conn, err := NewConn(addr) + if err != nil { + return nil, err + } + defer conn.Close() + return conn.SendReceiveWithTimeout(data, receive, timeout, retry...) +} diff --git a/g/net/gtcp/tcp_pool.go b/g/net/gtcp/gtcp_pool.go similarity index 61% rename from g/net/gtcp/tcp_pool.go rename to g/net/gtcp/gtcp_pool.go index 935ed8c61..6393c15eb 100644 --- a/g/net/gtcp/tcp_pool.go +++ b/g/net/gtcp/gtcp_pool.go @@ -7,15 +7,14 @@ package gtcp import ( - "net" "time" "gitee.com/johng/gf/g/container/gmap" "gitee.com/johng/gf/g/container/gpool" ) -// 封装的链接对象 -type Conn struct { - net.Conn // 继承底层链接接口对象 +// 链接池链接对象 +type PoolConn struct { + *Conn // 继承底层链接接口对象 pool *gpool.Pool // 对应的链接池对象 status int // 当前对象的状态,主要用于失败重连判断 } @@ -33,8 +32,8 @@ var ( pools = gmap.NewStringInterfaceMap() ) -// 创建TCP链接 -func NewConn(addr string, timeout...int) (*Conn, error) { +// 创建TCP链接池对象 +func NewPoolConn(addr string, timeout...int) (*PoolConn, error) { var pool *gpool.Pool if v := pools.Get(addr); v == nil { pools.LockFunc(func(m map[string]interface{}) { @@ -42,11 +41,8 @@ func NewConn(addr string, timeout...int) (*Conn, error) { pool = v.(*gpool.Pool) } else { pool = gpool.New(gDEFAULT_POOL_EXPIRE, func() (interface{}, error) { - if conn, err := NewNetConn(addr, timeout...); err == nil { - return &Conn { - Conn : conn, - pool : pool, - }, nil + if conn, err := NewConn(addr, timeout...); err == nil { + return &PoolConn { conn, pool, gCONN_STATUS_ACTIVE }, nil } else { return nil, err } @@ -59,21 +55,14 @@ func NewConn(addr string, timeout...int) (*Conn, error) { } if v, err := pool.Get(); err == nil { - return v.(*Conn), nil + return v.(*PoolConn), nil } else { return nil, err } } -// 将net.Conn接口对象转换为*gtcp.Conn对象(注意递归影响,因为*gtcp.Conn本身也实现了net.Conn接口) -func NewConnByNetConn(conn net.Conn) *Conn { - return &Conn { - Conn : conn, - } -} - // 覆盖底层接口对象的Close方法 -func (c *Conn) Close() error { +func (c *PoolConn) Close() error { if c.pool != nil && c.status == gCONN_STATUS_ACTIVE { c.status = gCONN_STATUS_UNKNOWN c.pool.Put(c) @@ -84,12 +73,12 @@ func (c *Conn) Close() error { } // 发送数据 -func (c *Conn) Send(data []byte, retry...Retry) error { +func (c *PoolConn) Send(data []byte, retry...Retry) error { var err error - if err = Send(c, data, retry...); err != nil && c.status == gCONN_STATUS_UNKNOWN { + if err = c.Send(data, retry...); err != nil && c.status == gCONN_STATUS_UNKNOWN { if v, e := c.pool.NewFunc(); e == nil { - c.Conn = v.(net.Conn) - err = Send(c, data, retry...) + c.Conn = v.(*PoolConn).Conn + err = c.Send(data, retry...) } else { err = e } @@ -103,16 +92,8 @@ func (c *Conn) Send(data []byte, retry...Retry) error { } // 接收数据 -func (c *Conn) Receive(retry...Retry) ([]byte, error) { - data, err := Receive(c, retry...) - if err != nil && len(data) == 0 && c.status == gCONN_STATUS_UNKNOWN { - if v, e := c.pool.NewFunc(); e == nil { - c.Conn = v.(net.Conn) - data, err = Receive(c, retry...) - } else { - err = e - } - } +func (c *PoolConn) Receive(length int, retry...Retry) ([]byte, error) { + data, err := c.Receive(length, retry...) if err != nil { c.status = gCONN_STATUS_ERROR } else { @@ -122,30 +103,32 @@ func (c *Conn) Receive(retry...Retry) ([]byte, error) { } // 带超时时间的数据获取 -func (c *Conn) ReceiveWithTimeout(timeout time.Duration, retry...Retry) ([]byte, error) { +func (c *PoolConn) ReceiveWithTimeout(length int, timeout time.Duration, retry...Retry) ([]byte, error) { c.SetReadDeadline(time.Now().Add(timeout)) - return c.Receive(retry...) + defer c.SetReadDeadline(time.Time{}) + return c.Receive(length, retry...) } // 带超时时间的数据发送 -func (c *Conn) SendWithTimeout(data []byte, timeout time.Duration, retry...Retry) error { +func (c *PoolConn) SendWithTimeout(data []byte, timeout time.Duration, retry...Retry) error { c.SetWriteDeadline(time.Now().Add(timeout)) + defer c.SetWriteDeadline(time.Time{}) return c.Send(data, retry...) } // 发送数据并等待接收返回数据 -func (c *Conn) SendReceive(data []byte, retry...Retry) ([]byte, error) { +func (c *PoolConn) SendReceive(data []byte, receive int, retry...Retry) ([]byte, error) { if err := c.Send(data, retry...); err == nil { - return c.Receive(retry...) + return c.Receive(receive, retry...) } else { return nil, err } } // 发送数据并等待接收返回数据(带返回超时等待时间) -func (c *Conn) SendReceiveWithTimeout(data []byte, timeout time.Duration, retry...Retry) ([]byte, error) { +func (c *PoolConn) SendReceiveWithTimeout(data []byte, receive int, timeout time.Duration, retry...Retry) ([]byte, error) { if err := c.Send(data, retry...); err == nil { - return c.ReceiveWithTimeout(timeout, retry...) + return c.ReceiveWithTimeout(receive, timeout, retry...) } else { return nil, err } diff --git a/g/net/gtcp/tcp_server.go b/g/net/gtcp/gtcp_server.go similarity index 94% rename from g/net/gtcp/tcp_server.go rename to g/net/gtcp/gtcp_server.go index b6d2de209..815cadd05 100644 --- a/g/net/gtcp/tcp_server.go +++ b/g/net/gtcp/gtcp_server.go @@ -28,7 +28,7 @@ func (s *Server) Run() error { if s.handler == nil { return errors.New("start running failed: socket handler not defined") } - tcpaddr, err := net.ResolveTCPAddr("tcp4", s.address) + tcpaddr, err := net.ResolveTCPAddr("tcp", s.address) if err != nil { return err } diff --git a/g/net/gtcp/tcp_func.go b/g/net/gtcp/tcp_func.go deleted file mode 100644 index e315aee86..000000000 --- a/g/net/gtcp/tcp_func.go +++ /dev/null @@ -1,156 +0,0 @@ -// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. -// -// This Source Code Form is subject to the terms of the MIT License. -// If a copy of the MIT was not distributed with this file, -// You can obtain one at https://gitee.com/johng/gf. - -package gtcp - -import ( - "io" - "net" - "time" - "bytes" -) - -const ( - gDEFAULT_RETRY_INTERVAL = 100 // (毫秒)默认重试时间间隔 - gDEFAULT_READ_BUFFER_SIZE = 1024 // 默认数据读取缓冲区大小 - -) - -type Retry struct { - Count int // 重试次数 - Interval int // 重试间隔(毫秒) -} - -// 自定义的包分割符号,用于标识包是否读取结束 -// 注意: -// 1. 必须使用gtcp包来发送和接收tcp数据才有效; -// 2. 只有在发送的字节数为buffer size倍数时才有效; -var pkgSplitMark = []byte{0, 'E', 'O', 'P', 0} - -// 常见的二进制数据校验方式,生成校验结果 -func Checksum(buffer []byte) uint32 { - var checksum uint32 - for _, b := range buffer { - checksum += uint32(b) - } - return checksum -} - -// 创建原生TCP链接, addr地址格式形如:127.0.0.1:80 -func NewNetConn(addr string, timeout...int) (net.Conn, error) { - if len(timeout) > 0 { - return net.DialTimeout("tcp", addr, time.Duration(timeout[0]) * time.Millisecond) - } else { - return net.Dial("tcp", addr) - } -} - -// 获取数据 -func Receive(conn net.Conn, retry...Retry) ([]byte, error) { - var err error - var length int - var buffer []byte - size := gDEFAULT_READ_BUFFER_SIZE - data := make([]byte, 0) - for { - buffer = make([]byte, size) - length, err = conn.Read(buffer) - // 这里使用 "&&" 只要有数据不管有无错误都将先进行解析 - if length < 1 && err != nil { - // 链接已关闭 - if err == io.EOF { - break - } - if len(retry) > 0 { - // 其他错误,重试之后仍不能成功 - if retry[0].Count == 0 { - break - } - retry[0].Count-- - if retry[0].Interval == 0 { - retry[0].Interval = gDEFAULT_RETRY_INTERVAL - } - time.Sleep(time.Duration(retry[0].Interval) * time.Millisecond) - continue - } - break - } else { - // 自定义结束标识符判断 - if length == len(pkgSplitMark) && bytes.Compare(pkgSplitMark, buffer[0 : length]) == 0 { - break - } - data = append(data, buffer[0 : length]...) - if length < gDEFAULT_READ_BUFFER_SIZE || err == io.EOF { - break - } - } - } - return data, err -} - -// 带超时时间的数据获取 -func ReceiveWithTimeout(conn net.Conn, timeout time.Duration, retry...Retry) ([]byte, error) { - conn.SetReadDeadline(time.Now().Add(timeout)) - return Receive(conn, retry...) -} - -// 发送数据 -func Send(conn net.Conn, data []byte, retry...Retry) error { - if len(data) % gDEFAULT_READ_BUFFER_SIZE == 0 { - data = append(data, pkgSplitMark...) - } - length := 0 - for { - n, err := conn.Write(data) - if err != nil { - // 链接已关闭 - if err == io.EOF { - return err - } - // 其他错误,重试之后仍不能成功 - if len(retry) == 0 || retry[0].Count == 0 { - return err - } - if len(retry) > 0 { - retry[0].Count-- - if retry[0].Interval == 0 { - retry[0].Interval = gDEFAULT_RETRY_INTERVAL - } - time.Sleep(time.Duration(retry[0].Interval) * time.Millisecond) - } - } else { - length += n - if length == len(data) { - return nil - } - } - } - return nil -} - -// 带超时时间的数据发送 -func SendWithTimeout(conn net.Conn, data []byte, timeout time.Duration, retry...Retry) error { - conn.SetWriteDeadline(time.Now().Add(timeout)) - return Send(conn, data, retry...) -} - -// 发送数据并等待接收返回数据 -func SendReceive(conn net.Conn, data []byte, retry...Retry) ([]byte, error) { - if err := Send(conn, data, retry...); err == nil { - return Receive(conn, retry...) - } else { - return nil, err - } -} - -// 发送数据并等待接收返回数据(带返回超时等待时间) -func SendReceiveWithTimeout(conn net.Conn, data []byte, timeout time.Duration, retry...Retry) ([]byte, error) { - if err := Send(conn, data, retry...); err == nil { - return ReceiveWithTimeout(conn, timeout, retry...) - } else { - return nil, err - } -} diff --git a/g/net/gudp/gudp.go b/g/net/gudp/gudp.go new file mode 100644 index 000000000..53c66ff28 --- /dev/null +++ b/g/net/gudp/gudp.go @@ -0,0 +1,11 @@ +// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +// UDP +package gudp + + + diff --git a/g/net/gudp/gudp_conn.go b/g/net/gudp/gudp_conn.go new file mode 100644 index 000000000..b1c9fce23 --- /dev/null +++ b/g/net/gudp/gudp_conn.go @@ -0,0 +1,145 @@ +// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +package gudp + +import ( + "net" + "time" + "io" +) + +// 封装的链接对象 +type Conn struct { + *net.UDPConn + raddr *net.UDPAddr +} + +const ( + gDEFAULT_RETRY_INTERVAL = 100 // (毫秒)默认重试时间间隔 + gDEFAULT_READ_BUFFER_SIZE = 1024 // 默认数据读取缓冲区大小 +) + +type Retry struct { + Count int // 重试次数 + Interval int // 重试间隔(毫秒) +} + +// 创建TCP链接 +func NewConn(raddr string, laddr...string) (*Conn, error) { + if conn, err := NewNetConn(raddr, laddr...); err == nil { + return NewConnByNetConn(conn), nil + } else { + return nil, err + } +} + +// 将*net.UDPConn对象转换为*Conn对象 +func NewConnByNetConn(udp *net.UDPConn) *Conn { + return &Conn { + UDPConn : udp, + } +} + +// 发送数据 +func (c *Conn) Send(data []byte, retry...Retry) error { + var err error + var size int + var length int + for { + if c.raddr != nil { + size, err = c.WriteToUDP(data, c.raddr) + } else { + size, err = c.Write(data) + } + if err != nil { + // 链接已关闭 + if err == io.EOF { + return err + } + // 其他错误,重试之后仍不能成功 + if len(retry) == 0 || retry[0].Count == 0 { + return err + } + if len(retry) > 0 { + retry[0].Count-- + if retry[0].Interval == 0 { + retry[0].Interval = gDEFAULT_RETRY_INTERVAL + } + time.Sleep(time.Duration(retry[0].Interval) * time.Millisecond) + } + } else { + length += size + if length == len(data) { + return nil + } + } + } + return nil +} + +// 接收数据 +func (c *Conn) Receive(length int, retry...Retry) ([]byte, error) { + var err error // 读取错误 + var size int // 读取长度 + var index int // 已读取长度 + var buffer []byte // 读取缓冲区 + + if length > 0 { + buffer = make([]byte, length) + } else { + buffer = make([]byte, gDEFAULT_READ_BUFFER_SIZE) + } + for { + size, c.raddr, err = c.ReadFromUDP(buffer[index:]) + if size > 0 { + index += size + if length > 0 { + // 如果指定了读取大小,那么必须读取到指定长度才返回 + if index == length { + break + } + } else { + // 如果长度超过了自定义的读取缓冲区,那么自动增长 + if index >= gDEFAULT_READ_BUFFER_SIZE { + buffer = append(buffer, make([]byte, gDEFAULT_READ_BUFFER_SIZE)...) + } + if size < gDEFAULT_READ_BUFFER_SIZE { + break + } + } + } + if err != nil { + // 链接已关闭 + if err == io.EOF { + break + } + if len(retry) > 0 { + // 其他错误,重试之后仍不能成功 + if retry[0].Count == 0 { + break + } + retry[0].Count-- + if retry[0].Interval == 0 { + retry[0].Interval = gDEFAULT_RETRY_INTERVAL + } + time.Sleep(time.Duration(retry[0].Interval) * time.Millisecond) + continue + } + break + } + } + return buffer[:index], err +} + +// 发送数据并等待接收返回数据 +func (c *Conn) SendReceive(data []byte, receive int, retry...Retry) ([]byte, error) { + if err := c.Send(data, retry...); err == nil { + return c.Receive(receive, retry...) + } else { + return nil, err + } +} \ No newline at end of file diff --git a/g/net/gudp/gudp_func.go b/g/net/gudp/gudp_func.go new file mode 100644 index 000000000..d70502bea --- /dev/null +++ b/g/net/gudp/gudp_func.go @@ -0,0 +1,50 @@ +// Copyright 2017-2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +package gudp + +import "net" + +// 创建标准库UDP链接操作对象 +func NewNetConn(raddr string, laddr...string) (*net.UDPConn, error) { + var err error + var rudpaddr, ludpaddr *net.UDPAddr + rudpaddr, err = net.ResolveUDPAddr("udp", raddr) + if err != nil { + return nil, err + } + if len(laddr) > 0 { + ludpaddr, err = net.ResolveUDPAddr("udp", laddr[0]) + if err != nil { + return nil, err + } + } + conn, err := net.DialUDP("udp", ludpaddr, rudpaddr) + if err != nil { + return nil, err + } + return conn, nil +} + +// (面向短链接)发送数据 +func Send(addr string, data []byte, retry...Retry) error { + conn, err := NewConn(addr) + if err != nil { + return err + } + defer conn.Close() + return conn.Send(data, retry...) +} + +// (面向短链接)发送数据并等待接收返回数据 +func SendReceive(addr string, data []byte, receive int, retry...Retry) ([]byte, error) { + conn, err := NewConn(addr) + if err != nil { + return nil, err + } + defer conn.Close() + return conn.SendReceive(data, receive, retry...) +} \ No newline at end of file diff --git a/g/net/gudp/udp.go b/g/net/gudp/gudp_server.go similarity index 61% rename from g/net/gudp/udp.go rename to g/net/gudp/gudp_server.go index 967957943..3931b1ccc 100644 --- a/g/net/gudp/udp.go +++ b/g/net/gudp/gudp_server.go @@ -4,11 +4,12 @@ // If a copy of the MIT was not distributed with this file, // You can obtain one at https://gitee.com/johng/gf. -// UDP服务端 + package gudp import ( "net" + "errors" "gitee.com/johng/gf/g/container/gmap" "gitee.com/johng/gf/g/util/gconv" ) @@ -20,7 +21,7 @@ const ( // tcp server结构体 type Server struct { address string - handler func (*net.UDPConn) + handler func (*Conn) } // Server表,用以存储和检索名称与Server对象之间的关联关系 @@ -42,7 +43,7 @@ func GetServer(name...interface{}) (*Server) { } // 创建一个tcp server对象,并且可以选择指定一个单例名字 -func NewServer (address string, handler func (*net.UDPConn), names...string) *Server { +func NewServer (address string, handler func (*Conn), names...string) *Server { s := &Server{address, handler} if len(names) > 0 { serverMapping.Set(names[0], s) @@ -50,3 +51,31 @@ func NewServer (address string, handler func (*net.UDPConn), names...string) *Se return s } +// 设置参数 - address +func (s *Server) SetAddress (address string) { + s.address = address +} + +// 设置参数 - handler +func (s *Server) SetHandler (handler func (*Conn)) { + s.handler = handler +} + +// 执行监听 +func (s *Server) Run() error { + if s.handler == nil { + return errors.New("start running failed: socket handler not defined") + } + addr, err := net.ResolveUDPAddr("udp", s.address) + if err != nil { + return err + } + conn, err := net.ListenUDP("udp", addr) + if err != nil { + return err + } + for { + s.handler(NewConnByNetConn(conn)) + } + return nil +} diff --git a/g/net/gudp/udp_server.go b/g/net/gudp/udp_server.go deleted file mode 100644 index faeca9e57..000000000 --- a/g/net/gudp/udp_server.go +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. -// -// This Source Code Form is subject to the terms of the MIT License. -// If a copy of the MIT was not distributed with this file, -// You can obtain one at https://gitee.com/johng/gf. - - -package gudp - -import ( - "net" - "errors" -) - -// 设置参数 - address -func (s *Server) SetAddress (address string) { - s.address = address -} - -// 设置参数 - handler -func (s *Server) SetHandler (handler func (*net.UDPConn)) { - s.handler = handler -} - -// 执行监听 -func (s *Server) Run() error { - if s.handler == nil { - return errors.New("start running failed: socket handler not defined") - } - tcpaddr, err := net.ResolveUDPAddr("udp4", s.address) - if err != nil { - return err - } - listen, err := net.ListenUDP("udp", tcpaddr) - if err != nil { - return err - } - for { - s.handler(listen) - } - return nil -} diff --git a/g/os/gproc/gproc_comm_receive.go b/g/os/gproc/gproc_comm_receive.go index d46ed1a78..2336ce586 100644 --- a/g/os/gproc/gproc_comm_receive.go +++ b/g/os/gproc/gproc_comm_receive.go @@ -65,7 +65,7 @@ func tcpServiceHandler(conn *gtcp.Conn) { var retry = gtcp.Retry{3, 10} for { var result []byte - buffer, err := conn.Receive(retry) + buffer, err := conn.Receive(-1, retry) if len(buffer) > 0 { var msgs []*Msg for _, msg := range bufferToMsgs(buffer) { diff --git a/g/os/gproc/gproc_comm_send.go b/g/os/gproc/gproc_comm_send.go index 324635af9..c18a0d896 100644 --- a/g/os/gproc/gproc_comm_send.go +++ b/g/os/gproc/gproc_comm_send.go @@ -46,7 +46,7 @@ func Send(pid int, data []byte, group...string) error { for i := gPROC_COMM_FAILURE_RETRY_COUNT; i > 0; i-- { if conn, err = getConnByPid(pid); err == nil { defer conn.Close() - buf, err = conn.SendReceiveWithTimeout(buffer, gPROC_COMM_SEND_TIMEOUT*time.Millisecond) + buf, err = conn.SendReceiveWithTimeout(buffer, -1, gPROC_COMM_SEND_TIMEOUT*time.Millisecond) if len(buf) > 0 { // 如果有返回值,如果不是"ok",那么表示是错误信息 if !bytes.EqualFold(buf, []byte("ok")) { diff --git a/geg/net/gtcp/gtcp_conn_client.go b/geg/net/gtcp/gtcp_conn_client.go deleted file mode 100644 index 622853ae3..000000000 --- a/geg/net/gtcp/gtcp_conn_client.go +++ /dev/null @@ -1,21 +0,0 @@ -package main - -import ( - "gitee.com/johng/gf/g/net/gtcp" - "gitee.com/johng/gf/g/util/gconv" - "gitee.com/johng/gf/g/os/glog" - "time" -) - -func main() { - conn, err := gtcp.NewConn("127.0.0.1:8999") - if err != nil { - glog.Fatal(err) - } - for i := 0; i < 10000; i++ { - if err := conn.Send([]byte(gconv.String(i))); err != nil { - glog.Error(err) - } - time.Sleep(time.Second) - } -} \ No newline at end of file diff --git a/geg/net/gtcp/gtcp_conn_server.go b/geg/net/gtcp/gtcp_conn_server.go deleted file mode 100644 index 01b4dcbe5..000000000 --- a/geg/net/gtcp/gtcp_conn_server.go +++ /dev/null @@ -1,19 +0,0 @@ -package main - -import ( - "net" - "gitee.com/johng/gf/g/os/glog" - "gitee.com/johng/gf/g/net/gtcp" -) - -func main() { - gtcp.NewServer(":8999", func(conn net.Conn) { - c := gtcp.NewConnByNetConn(conn) - defer c.Close() - for { - if data, err := c.Receive(); err == nil { - glog.Println(string(data)) - } - } - }).Run() -} \ No newline at end of file diff --git a/geg/net/gtcp/gtcp_pool1.go b/geg/net/gtcp/gtcp_pool1.go index ea1fd865e..2166da4cf 100644 --- a/geg/net/gtcp/gtcp_pool1.go +++ b/geg/net/gtcp/gtcp_pool1.go @@ -15,10 +15,12 @@ func main() { c := gtcp.NewConnByNetConn(conn) defer c.Close() for { - if data, _ := c.Receive(); len(data) > 0 { - c.Send(append([]byte("> "), data...)) + if data, _ := c.Receive(-1); len(data) > 0 { + if err := c.Send(append([]byte("> "), data...)); err != nil { + fmt.Println(err) + } } - //return + // return } }).Run() @@ -26,8 +28,8 @@ func main() { // Client for { - if conn, err := gtcp.NewConn("127.0.0.1:8999"); err == nil { - if b, err := conn.SendReceive([]byte(gtime.Datetime())); err == nil { + if conn, err := gtcp.NewPoolConn("127.0.0.1:8999"); err == nil { + if b, err := conn.SendReceive([]byte(gtime.Datetime()), -1); err == nil { fmt.Println(string(b), conn.LocalAddr(), conn.RemoteAddr()) } else { fmt.Println(err) diff --git a/geg/net/gtcp/gtcp_pool2.go b/geg/net/gtcp/gtcp_pool2.go index 7c90f43d4..e039cece7 100644 --- a/geg/net/gtcp/gtcp_pool2.go +++ b/geg/net/gtcp/gtcp_pool2.go @@ -15,8 +15,10 @@ func main() { c := gtcp.NewConnByNetConn(conn) defer c.Close() for { - if data, _ := c.Receive(); len(data) > 0 { - c.Send(append([]byte("> "), data...)) + if data, _ := c.Receive(-1); len(data) > 0 { + if err := c.Send(append([]byte("> "), data...)); err != nil { + fmt.Println(err) + } } return } @@ -26,8 +28,8 @@ func main() { // Client for { - if conn, err := gtcp.NewConn("127.0.0.1:8999"); err == nil { - if b, err := conn.SendReceive([]byte(gtime.Datetime())); err == nil { + if conn, err := gtcp.NewPoolConn("127.0.0.1:8999"); err == nil { + if b, err := conn.SendReceive([]byte(gtime.Datetime()), -1); err == nil { fmt.Println(string(b), conn.LocalAddr(), conn.RemoteAddr()) } else { fmt.Println(err) @@ -38,4 +40,6 @@ func main() { } time.Sleep(time.Second) } + + select{} } \ No newline at end of file diff --git a/geg/net/gtcp/gtcp_server.go b/geg/net/gtcp/gtcp_server.go deleted file mode 100644 index 1c09d89ea..000000000 --- a/geg/net/gtcp/gtcp_server.go +++ /dev/null @@ -1,17 +0,0 @@ -package main - -import ( - "net" - "gitee.com/johng/gf/g/net/gtcp" -) - -func main() { - gtcp.NewServer(":8999", func(conn net.Conn) { - defer conn.Close() - for { - if data, err := gtcp.Receive(conn); err == nil { - gtcp.Send(conn, append([]byte("> "), data...)) - } - } - }).Run() -} \ No newline at end of file diff --git a/geg/net/gtcp/gtcp_server_client.go b/geg/net/gtcp/gtcp_server_client.go new file mode 100644 index 000000000..506806c8f --- /dev/null +++ b/geg/net/gtcp/gtcp_server_client.go @@ -0,0 +1,43 @@ +package main + +import ( + "fmt" + "time" + "net" + "gitee.com/johng/gf/g/net/gtcp" + "gitee.com/johng/gf/g/os/glog" + "gitee.com/johng/gf/g/os/gtime" +) + +func main() { + // Server + go gtcp.NewServer("127.0.0.1:8999", func(conn net.Conn) { + c := gtcp.NewConnByNetConn(conn) + defer c.Close() + for { + if data, _ := c.Receive(-1); len(data) > 0 { + if err := c.Send(append([]byte("> "), data...)); err != nil { + fmt.Println(err) + } + } + //return + } + }).Run() + + time.Sleep(time.Second) + + // Client + for { + if conn, err := gtcp.NewConn("127.0.0.1:8999"); err == nil { + if b, err := conn.SendReceive([]byte(gtime.Datetime()), -1); err == nil { + fmt.Println(string(b), conn.LocalAddr(), conn.RemoteAddr()) + } else { + fmt.Println(err) + } + conn.Close() + } else { + glog.Error(err) + } + time.Sleep(time.Second) + } +} \ No newline at end of file diff --git a/geg/net/gtcp/tcp_server_client.go b/geg/net/gtcp/tcp_server_client.go index 52d2cd028..058f7ac68 100644 --- a/geg/net/gtcp/tcp_server_client.go +++ b/geg/net/gtcp/tcp_server_client.go @@ -1,60 +1,76 @@ -package main + package main -import ( - "fmt" - "time" - "net" -) + import ( + "fmt" + "time" + "net" + ) -func main() { - addr := "127.0.0.1:8999" + func main() { + addr := "127.0.0.1:8999" - // Server - go func() { - tcpaddr, err := net.ResolveTCPAddr("tcp4", addr) - if err != nil { - panic(err) - } - listen, err := net.ListenTCP("tcp", tcpaddr) - if err != nil { - panic(err) - } - for { - if conn, err := listen.Accept(); err != nil { + // Server + go func() { + tcpaddr, err := net.ResolveTCPAddr("tcp4", addr) + if err != nil { panic(err) - } else if conn != nil { - go func(conn net.Conn) { - buffer := make([]byte, 1024) - n, err := conn.Read(buffer) - if err != nil { - fmt.Println(err) - } else { - fmt.Println(">", string(buffer[0 : n])) - } - conn.Close() - }(conn) } - } - }() + listen, err := net.ListenTCP("tcp", tcpaddr) + if err != nil { + panic(err) + } + for { + if conn, err := listen.Accept(); err != nil { + panic(err) + } else if conn != nil { + go func(conn net.Conn) { + for { + buffer := make([]byte, 1024) + n, err := conn.Read(buffer) + if err != nil { + fmt.Println(err) + break + } else { + fmt.Println(">", string(buffer[0 : n])) + conn.Close() + } + } - time.Sleep(time.Second) + }(conn) + } + } + }() - // Client - if conn, err := net.Dial("tcp", addr); err == nil { - for i := 0; i < 2; i++ { - _, err := conn.Write([]byte("hello")) + time.Sleep(time.Second) + + // Client + if conn, err := net.Dial("tcp", addr); err == nil { + // first write + _, err := conn.Write([]byte("hello1")) if err != nil { fmt.Println(err) conn.Close() - break + return + } else { + fmt.Println("ok") + } + + // sleep 10 seconds and re-send + time.Sleep(10*time.Second) + + // second write + _, err = conn.Write([]byte("hello2")) + if err != nil { + fmt.Println(err) + conn.Close() + return } else { fmt.Println("ok") } // sleep 10 seconds and re-send time.Sleep(10*time.Second) + } else { + panic(err) } - } else { - panic(err) - } -} \ No newline at end of file + } \ No newline at end of file diff --git a/geg/net/gudp/gudp_server_client.go b/geg/net/gudp/gudp_server_client.go new file mode 100644 index 000000000..70601b3e3 --- /dev/null +++ b/geg/net/gudp/gudp_server_client.go @@ -0,0 +1,40 @@ +package main + +import ( + "fmt" + "time" + "gitee.com/johng/gf/g/os/glog" + "gitee.com/johng/gf/g/os/gtime" + "gitee.com/johng/gf/g/net/gudp" +) + +func main() { + // Server + go gudp.NewServer("127.0.0.1:8999", func(conn *gudp.Conn) { + defer conn.Close() + for { + if data, _ := conn.Receive(-1); len(data) > 0 { + if err := conn.Send(append([]byte("> "), data...)); err != nil { + glog.Error(err) + } + } + } + }).Run() + + time.Sleep(time.Second) + + // Client + for { + if conn, err := gudp.NewConn("127.0.0.1:8999"); err == nil { + if b, err := conn.SendReceive([]byte(gtime.Datetime()), -1); err == nil { + fmt.Println(string(b), conn.LocalAddr(), conn.RemoteAddr()) + } else { + glog.Error(err) + } + conn.Close() + } else { + glog.Error(err) + } + time.Sleep(time.Second) + } +} \ No newline at end of file diff --git a/geg/net/udp.go b/geg/net/gudp/udp_client.go similarity index 60% rename from geg/net/udp.go rename to geg/net/gudp/udp_client.go index da0d30c4f..47167be92 100644 --- a/geg/net/udp.go +++ b/geg/net/gudp/udp_client.go @@ -1,11 +1,9 @@ package main - import ( - "net" "fmt" + "net" "os" ) - func main() { conn, err := net.Dial("udp", "127.0.0.1:8999") defer conn.Close() @@ -13,9 +11,11 @@ func main() { os.Exit(1) } - conn.Write([]byte("")) - var msg [20]byte - n, err := conn.Read(msg[0:]) + conn.Write([]byte("Hello world!")) - fmt.Println(string(msg[0:n])) + buffer := make([]byte, 100) + + conn.Read(buffer) + + fmt.Println(string(buffer)) } \ No newline at end of file diff --git a/geg/net/gudp/udp_server.go b/geg/net/gudp/udp_server.go new file mode 100644 index 000000000..2b63028b2 --- /dev/null +++ b/geg/net/gudp/udp_server.go @@ -0,0 +1,29 @@ +package main +import ( + "fmt" + "net" +) +func main() { + listener, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP("127.0.0.1"), Port: 8999}) + if err != nil { + fmt.Println(err) + return + } + fmt.Println("Local:", listener.LocalAddr().String()) + + data := make([]byte, 1024) + for { + n, remoteAddr, err := listener.ReadFromUDP(data) + if err != nil { + fmt.Println(err) + } + fmt.Println(remoteAddr, string(data[ : n])) + + + + _, err = listener.WriteToUDP([]byte("world"), remoteAddr) + if err != nil { + fmt.Printf(err.Error()) + } + } +} \ No newline at end of file diff --git a/geg/net/udp_server.go b/geg/net/udp_server.go deleted file mode 100644 index cd573277e..000000000 --- a/geg/net/udp_server.go +++ /dev/null @@ -1,16 +0,0 @@ -package main - -import ( - "fmt" - "net" - "gitee.com/johng/gf/g/net/gudp" -) - -func main() { - gudp.NewServer(":8999", func(conn *net.UDPConn) { - buffer := make([]byte, 1024) - if length, addr, err := conn.ReadFromUDP(buffer); err == nil { - fmt.Println(string(buffer[0 : length]), "from", addr.String()) - } - }).Run() -} \ No newline at end of file diff --git a/geg/other/test.go b/geg/other/test.go index cbfd08f05..f25e19111 100644 --- a/geg/other/test.go +++ b/geg/other/test.go @@ -2,13 +2,73 @@ package main import ( "fmt" + "log" + "net" "time" ) func main() { - //t, err := gtime.StrToTime("2018-01-02 13:08:01") - //fmt.Println(err) - //fmt.Println(t.Format("3:4")) - //fmt.Println(time.Now().Format(".000")) - fmt.Println(time.Now().Format(".000")) -} + addr := "127.0.0.1:8999" + + tcpaddr, err := net.ResolveTCPAddr("tcp4", addr) + if err != nil { + log.Fatal(err) + } + listener, err := net.ListenTCP("tcp", tcpaddr) + if err != nil { + log.Fatal(err) + } + + // Server + done := make(chan error) + go func(listener net.Listener, done chan<- error) { + for { + conn, err := listener.Accept() + if err != nil { + done <- err + return + } + go func(conn net.Conn) { + var buffer [1024]byte + n, err := conn.Read(buffer[:]) + if err != nil { + log.Println(err) + } else { + log.Println(">", string(buffer[0:n])) + } + if err := conn.Close(); err != nil { + log.Println("error closing server conn:", err) + } + }(conn) + } + }(listener, done) + + // Client + conn, err := net.Dial("tcp", addr) + if err != nil { + log.Fatal(err) + } + for i := 0; i < 2; i++ { + _, err := conn.Write([]byte("hello")) + if err != nil { + log.Println(err) + err = conn.Close() + if err != nil { + log.Println("error closing client conn:", err) + } + break + } + fmt.Println("ok") + time.Sleep(2 * time.Second) + } + + // Shut the server down and wait for it to report back + err = listener.Close() + if err != nil { + log.Fatal("error closing listener:", err) + } + err = <-done + if err != nil { + log.Println("server returned:", err) + } +} \ No newline at end of file diff --git a/geg/util/gutil/map_to_struct.go b/geg/util/gutil/map_to_struct.go new file mode 100644 index 000000000..668d4586c --- /dev/null +++ b/geg/util/gutil/map_to_struct.go @@ -0,0 +1,24 @@ +package main + +import ( + "gitee.com/johng/gf/g/util/gutil" + "fmt" +) + +type User struct { + Name string + Age int + Adds string +} + +func main() { + m := map[string]interface{} { + "name" : "john", + "age" : 16, + "adds" : "test", + } + o := User{} + e := gutil.MapToStruct(m, &o) + fmt.Println(e) + fmt.Println(o) +}