From 00a8ef63b6a1dfd3f66390c6efaa6fc81dca5a48 Mon Sep 17 00:00:00 2001 From: John Date: Mon, 3 Jun 2019 23:53:59 +0800 Subject: [PATCH] improve gtcp --- g/crypto/gaes/gaes.go | 10 +-- g/net/gtcp/gtcp_conn.go | 12 ++- g/net/gtcp/gtcp_conn_pkg.go | 88 ++++++++----------- g/net/gtcp/gtcp_func_pkg.go | 16 ++-- g/net/gtcp/gtcp_pool_pkg.go | 30 +++---- g/net/gudp/gudp_conn.go | 13 ++- g/util/gconv/gconv.go | 9 ++ .../common/gtcp_common_client.go | 8 +- .../common/gtcp_common_server.go | 4 +- .../{gtcp_server_client1.go => gtcp_basic.go} | 2 +- ...p_server_client2.go => gtcp_empty_data.go} | 0 .../gtcp/pkg_operations/gtcp_pkg_option.go | 39 ++++++++ .../monitor/gtcp_monitor_server.go | 2 +- 13 files changed, 138 insertions(+), 95 deletions(-) rename geg/net/gtcp/pkg_operations/{gtcp_server_client1.go => gtcp_basic.go} (96%) rename geg/net/gtcp/pkg_operations/{gtcp_server_client2.go => gtcp_empty_data.go} (100%) create mode 100644 geg/net/gtcp/pkg_operations/gtcp_pkg_option.go diff --git a/g/crypto/gaes/gaes.go b/g/crypto/gaes/gaes.go index 746deba74..14f34f44b 100644 --- a/g/crypto/gaes/gaes.go +++ b/g/crypto/gaes/gaes.go @@ -25,18 +25,18 @@ func Encrypt(plainText []byte, key []byte, iv...[]byte) ([]byte, error) { return nil, err } blockSize := block.BlockSize() - plainText = PKCS5Padding(plainText, blockSize) - ivValue := ([]byte)(nil) + plainText = PKCS5Padding(plainText, blockSize) + ivValue := ([]byte)(nil) if len(iv) > 0 { ivValue = iv[0] } else { ivValue = []byte(ivDefValue) } blockMode := cipher.NewCBCEncrypter(block, ivValue) - ciphertext := make([]byte, len(plainText)) - blockMode.CryptBlocks(ciphertext, plainText) + cipherText := make([]byte, len(plainText)) + blockMode.CryptBlocks(cipherText, plainText) - return ciphertext, nil + return cipherText, nil } // AES解密, 使用CBC模式,注意key必须为16/24/32位长度,iv初始化向量为非必需参数 diff --git a/g/net/gtcp/gtcp_conn.go b/g/net/gtcp/gtcp_conn.go index e77b4554d..a3778f11c 100644 --- a/g/net/gtcp/gtcp_conn.go +++ b/g/net/gtcp/gtcp_conn.go @@ -84,10 +84,12 @@ func (c *Conn) Send(data []byte, retry...Retry) error { } } -// 获取数据,指定读取的数据长度(length < 1表示获取所有可读数据),以及重试策略(retry) +// 阻塞等待获取指定读取的数据长度,并给定重试策略。 +// // 需要注意: // 1、往往在socket通信中需要指定固定的数据结构,并在设定对应的长度字段,并在读取数据时便于区分包大小; -// 2、当length < 1时表示获取缓冲区所有的数据,但是可能会引起包解析问题(可能出现断包情况),因此需要解析端注意解析策略; +// 2、当length < 0时表示获取缓冲区所有的数据,但是可能会引起包解析问题(可能出现粘包/断包情况),因此需要解析端注意解析策略; +// 3、当length = 0时表示获取一次缓冲区的数据后立即返回; func (c *Conn) Recv(length int, retry...Retry) ([]byte, error) { var err error // 读取错误 var size int // 读取长度 @@ -106,7 +108,7 @@ func (c *Conn) Recv(length int, retry...Retry) ([]byte, error) { // 如果已经读取到数据(这点很关键,表明缓冲区已经有数据,剩下的操作就是将所有数据读取完毕), // 那么可以设置读取全部缓冲数据的超时时间;如果没有接收到任何数据,那么将会进入读取阻塞(或者自定义的超时阻塞); // 仅对读取全部缓冲区数据操作有效 - if length <= 0 && index > 0 { + if length < 0 && index > 0 { bufferWait = true c.conn.SetReadDeadline(time.Now().Add(c.recvBufferWait)) } @@ -155,6 +157,10 @@ func (c *Conn) Recv(length int, retry...Retry) ([]byte, error) { } break } + // 只获取一次数据 + if length == 0 { + break + } } return buffer[:index], err } diff --git a/g/net/gtcp/gtcp_conn_pkg.go b/g/net/gtcp/gtcp_conn_pkg.go index 2d88d2d1f..f9d00cda9 100644 --- a/g/net/gtcp/gtcp_conn_pkg.go +++ b/g/net/gtcp/gtcp_conn_pkg.go @@ -10,31 +10,31 @@ import ( "encoding/binary" "errors" "fmt" - "github.com/gogf/gf/g/crypto/gcrc32" "time" ) const ( // 默认允许最大的简单协议包大小(byte), 65535 byte - gPKG_MAX_SIZE = 65535 + gPKG_MAX_DATA_SIZE = 65535 + // 简单协议包头大小 + gPKG_HEADER_SIZE = 3 ) // 数据读取选项 -type Option struct { - MaxSize int // (byte)数据读取的最大包大小,最大不能超过3字节(0xFFFFFF,15MB),默认为65535byte - Secret []byte // (可选)安全通信密钥 - Retry Retry // 失败重试 +type PkgOption struct { + MaxSize int // (byte)数据读取的最大包大小,最大不能超过3字节(0xFFFFFF,15MB),默认为65535byte + Retry Retry // 失败重试 } -// getPkgOption wraps and returns the option. +// getPkgOption wraps and returns the PkgOption. // If no option given, it returns a new option with default value. -func getPkgOption(option...Option) (*Option, error) { - pkgOption := Option{} +func getPkgOption(option...PkgOption) (*PkgOption, error) { + pkgOption := PkgOption{} if len(option) > 0 { pkgOption = option[0] } if pkgOption.MaxSize == 0 { - pkgOption.MaxSize = gPKG_MAX_SIZE + pkgOption.MaxSize = gPKG_MAX_DATA_SIZE } else if pkgOption.MaxSize > 0xFFFFFF { return nil, fmt.Errorf(`package size %d exceeds allowed max size %d`, pkgOption.MaxSize, 0xFFFFFF) } @@ -42,47 +42,40 @@ func getPkgOption(option...Option) (*Option, error) { } // 根据简单协议发送数据包。 -// 简单协议数据格式:总长度(24bit)|校验位(32bit,可选)|数据(变长)。 +// +// 简单协议数据格式:数据长度(24bit)|数据字段(变长)。 +// // 注意: -// 1. "总长度"包含自身3字节及"校验位"4字节(可选)。 -// 2. 当Secret有提供时,"校验位"才会存在,否则该字段为空。 -// 3. "校验位"提供简单的数据完整性及防篡改校验,默认没有开启。 -// 4. 由于"总长度"为3字节,并且使用的BigEndian字节序,因此这里最后返回的buffer使用了buffer[1:]。 -func (c *Conn) SendPkg(data []byte, option...Option) error { +// 1. "数据长度"仅为"数据字段"的长度,不包含头信息的长度字段3字节。 +// 2. 由于"数据长度"为3字节,并且使用的BigEndian字节序,因此这里最后返回的buffer使用了buffer[1:]。 +func (c *Conn) SendPkg(data []byte, option...PkgOption) error { pkgOption, err := getPkgOption(option...) if err != nil { return err } - headerSize := 3 - if len(pkgOption.Secret) > 0 { - headerSize = 7 - } length := len(data) - if length > pkgOption.MaxSize - headerSize { - return errors.New(fmt.Sprintf(`data size %d exceeds max pkg size %d`, length, gPKG_MAX_SIZE - headerSize)) - } - - buffer := make([]byte, headerSize + 1 + len(data)) - copy(buffer[headerSize + 1 : ], data) - binary.BigEndian.PutUint32(buffer[0 : ], uint32(headerSize + length)) - if len(pkgOption.Secret) > 0 { - binary.BigEndian.PutUint32(buffer[4 : ], gcrc32.Encrypt(append(data, pkgOption.Secret...))) + if length > pkgOption.MaxSize { + return errors.New(fmt.Sprintf(`data size %d exceeds max pkg size %d`, length, gPKG_MAX_DATA_SIZE)) } + buffer := make([]byte, gPKG_HEADER_SIZE + 1 + len(data)) + binary.BigEndian.PutUint32(buffer[0 : ], uint32(length)) + copy(buffer[gPKG_HEADER_SIZE + 1 : ], data) if pkgOption.Retry.Count > 0 { c.Send(buffer[1:], pkgOption.Retry) } + //fmt.Println("SendPkg:", buffer[1:]) return c.Send(buffer[1:]) } // 简单协议: 带超时时间的数据发送 -func (c *Conn) SendPkgWithTimeout(data []byte, timeout time.Duration, option...Option) error { +func (c *Conn) SendPkgWithTimeout(data []byte, timeout time.Duration, option...PkgOption) error { c.SetSendDeadline(time.Now().Add(timeout)) defer c.SetSendDeadline(time.Time{}) return c.SendPkg(data, option...) } // 简单协议: 发送数据并等待接收返回数据 -func (c *Conn) SendRecvPkg(data []byte, option...Option) ([]byte, error) { +func (c *Conn) SendRecvPkg(data []byte, option...PkgOption) ([]byte, error) { if err := c.SendPkg(data, option...); err == nil { return c.RecvPkg(option...) } else { @@ -91,7 +84,7 @@ func (c *Conn) SendRecvPkg(data []byte, option...Option) ([]byte, error) { } // 简单协议: 发送数据并等待接收返回数据(带返回超时等待时间) -func (c *Conn) SendRecvPkgWithTimeout(data []byte, timeout time.Duration, option...Option) ([]byte, error) { +func (c *Conn) SendRecvPkgWithTimeout(data []byte, timeout time.Duration, option...PkgOption) ([]byte, error) { if err := c.SendPkg(data, option...); err == nil { return c.RecvPkgWithTimeout(timeout, option...) } else { @@ -100,48 +93,37 @@ func (c *Conn) SendRecvPkgWithTimeout(data []byte, timeout time.Duration, option } // 简单协议: 获取一个数据包。 -func (c *Conn) RecvPkg(option...Option) (result []byte, err error) { +func (c *Conn) RecvPkg(option...PkgOption) (result []byte, err error) { var temp []byte var length int pkgOption, err := getPkgOption(option...) if err != nil { return nil, err } - headerSize := 3 - if len(pkgOption.Secret) > 0 { - headerSize = 7 - } for { // 先根据对象的缓冲区数据进行计算 for { - if len(c.buffer) >= headerSize { - // 注意"总长度"为3个字节,不满足4个字节的uint32类型,因此这里"低位"补0 + if len(c.buffer) >= gPKG_HEADER_SIZE { + // 注意"数据长度"为3个字节,不满足4个字节的uint32类型,因此这里"低位"补0 length = int(binary.BigEndian.Uint32([]byte{0, c.buffer[0], c.buffer[1], c.buffer[2]})) // 解析的大小是否符合规范,清空从该连接接收到的所有数据包 - if length <= 0 || length + headerSize > pkgOption.MaxSize { + if length < 0 || length > pkgOption.MaxSize { c.buffer = c.buffer[:0] return nil, fmt.Errorf(`invalid package size %d`, length) } // 不满足包大小,需要继续读取 - if len(c.buffer) < length { + if len(c.buffer) < length + gPKG_HEADER_SIZE { break } - // 数据校验,如果失败,丢弃该数据包 - receivedCrc32 := binary.BigEndian.Uint32(c.buffer[3 : headerSize]) - calculatedCrc32 := gcrc32.Encrypt(c.buffer[headerSize : length]) - if receivedCrc32 != calculatedCrc32 { - c.buffer = c.buffer[length: ] - return nil, fmt.Errorf(`data CRC32 validates failed, received %d, caculated %d`, receivedCrc32, calculatedCrc32) - } - result = c.buffer[headerSize : length] - c.buffer = c.buffer[length: ] + result = c.buffer[gPKG_HEADER_SIZE : gPKG_HEADER_SIZE + length] + c.buffer = c.buffer[gPKG_HEADER_SIZE + length: ] return } else { break } } // 读取系统socket缓冲区的完整数据 - temp, err = c.Recv(-1, option...) + temp, err = c.Recv(0, pkgOption.Retry) if err != nil { break } @@ -154,8 +136,8 @@ func (c *Conn) RecvPkg(option...Option) (result []byte, err error) { } // 简单协议: 带超时时间的消息包获取 -func (c *Conn) RecvPkgWithTimeout(timeout time.Duration, option...Option) ([]byte, error) { +func (c *Conn) RecvPkgWithTimeout(timeout time.Duration, option...PkgOption) ([]byte, error) { c.SetRecvDeadline(time.Now().Add(timeout)) defer c.SetRecvDeadline(time.Time{}) - return c.RecvPkg(retry...) + return c.RecvPkg(option...) } \ No newline at end of file diff --git a/g/net/gtcp/gtcp_func_pkg.go b/g/net/gtcp/gtcp_func_pkg.go index 164d513cb..a5d50f234 100644 --- a/g/net/gtcp/gtcp_func_pkg.go +++ b/g/net/gtcp/gtcp_func_pkg.go @@ -9,41 +9,41 @@ package gtcp import "time" // 简单协议: (面向短链接)发送消息包 -func SendPkg(addr string, data []byte, retry...Retry) error { +func SendPkg(addr string, data []byte, option...PkgOption) error { conn, err := NewConn(addr) if err != nil { return err } defer conn.Close() - return conn.SendPkg(data, retry...) + return conn.SendPkg(data, option...) } // 简单协议: (面向短链接)发送数据并等待接收返回数据 -func SendRecvPkg(addr string, data []byte, retry...Retry) ([]byte, error) { +func SendRecvPkg(addr string, data []byte, option...PkgOption) ([]byte, error) { conn, err := NewConn(addr) if err != nil { return nil, err } defer conn.Close() - return conn.SendRecvPkg(data, retry...) + return conn.SendRecvPkg(data, option...) } // 简单协议: (面向短链接)带超时时间的数据发送 -func SendPkgWithTimeout(addr string, data []byte, timeout time.Duration, retry...Retry) error { +func SendPkgWithTimeout(addr string, data []byte, timeout time.Duration, option...PkgOption) error { conn, err := NewConn(addr) if err != nil { return err } defer conn.Close() - return conn.SendPkgWithTimeout(data, timeout, retry...) + return conn.SendPkgWithTimeout(data, timeout, option...) } // 简单协议: (面向短链接)发送数据并等待接收返回数据(带返回超时等待时间) -func SendRecvPkgWithTimeout(addr string, data []byte, timeout time.Duration, retry...Retry) ([]byte, error) { +func SendRecvPkgWithTimeout(addr string, data []byte, timeout time.Duration, option...PkgOption) ([]byte, error) { conn, err := NewConn(addr) if err != nil { return nil, err } defer conn.Close() - return conn.SendRecvPkgWithTimeout(data, timeout, retry...) + return conn.SendRecvPkgWithTimeout(data, timeout, option...) } \ No newline at end of file diff --git a/g/net/gtcp/gtcp_pool_pkg.go b/g/net/gtcp/gtcp_pool_pkg.go index f22b60da6..08eb566d4 100644 --- a/g/net/gtcp/gtcp_pool_pkg.go +++ b/g/net/gtcp/gtcp_pool_pkg.go @@ -11,11 +11,11 @@ import ( ) // 简单协议: (方法覆盖)发送数据 -func (c *PoolConn) SendPkg(data []byte, retry...Retry) (err error) { - if err = c.Conn.SendPkg(data, retry...); err != nil && c.status == gCONN_STATUS_UNKNOWN { +func (c *PoolConn) SendPkg(data []byte, option...PkgOption) (err error) { + if err = c.Conn.SendPkg(data, option...); err != nil && c.status == gCONN_STATUS_UNKNOWN { if v, e := c.pool.NewFunc(); e == nil { c.Conn = v.(*PoolConn).Conn - err = c.Conn.SendPkg(data, retry...) + err = c.Conn.SendPkg(data, option...) } else { err = e } @@ -29,8 +29,8 @@ func (c *PoolConn) SendPkg(data []byte, retry...Retry) (err error) { } // 简单协议: (方法覆盖)接收数据 -func (c *PoolConn) RecvPkg(retry...Retry) ([]byte, error) { - data, err := c.Conn.RecvPkg(retry...) +func (c *PoolConn) RecvPkg(option...PkgOption) ([]byte, error) { + data, err := c.Conn.RecvPkg(option...) if err != nil { c.status = gCONN_STATUS_ERROR } else { @@ -40,32 +40,32 @@ func (c *PoolConn) RecvPkg(retry...Retry) ([]byte, error) { } // 简单协议: (方法覆盖)带超时时间的数据获取 -func (c *PoolConn) RecvPkgWithTimeout(timeout time.Duration, retry...Retry) ([]byte, error) { +func (c *PoolConn) RecvPkgWithTimeout(timeout time.Duration, option...PkgOption) ([]byte, error) { c.SetRecvDeadline(time.Now().Add(timeout)) defer c.SetRecvDeadline(time.Time{}) - return c.RecvPkg(retry...) + return c.RecvPkg(option...) } // 简单协议: (方法覆盖)带超时时间的数据发送 -func (c *PoolConn) SendPkgWithTimeout(data []byte, timeout time.Duration, retry...Retry) error { +func (c *PoolConn) SendPkgWithTimeout(data []byte, timeout time.Duration, option...PkgOption) error { c.SetSendDeadline(time.Now().Add(timeout)) defer c.SetSendDeadline(time.Time{}) - return c.SendPkg(data, retry...) + return c.SendPkg(data, option...) } // 简单协议: (方法覆盖)发送数据并等待接收返回数据 -func (c *PoolConn) SendRecvPkg(data []byte, retry...Retry) ([]byte, error) { - if err := c.SendPkg(data, retry...); err == nil { - return c.RecvPkg(retry...) +func (c *PoolConn) SendRecvPkg(data []byte, option...PkgOption) ([]byte, error) { + if err := c.SendPkg(data, option...); err == nil { + return c.RecvPkg(option...) } else { return nil, err } } // 简单协议: (方法覆盖)发送数据并等待接收返回数据(带返回超时等待时间) -func (c *PoolConn) SendRecvPkgWithTimeout(data []byte, timeout time.Duration, retry...Retry) ([]byte, error) { - if err := c.SendPkg(data, retry...); err == nil { - return c.RecvPkgWithTimeout(timeout, retry...) +func (c *PoolConn) SendRecvPkgWithTimeout(data []byte, timeout time.Duration, option...PkgOption) ([]byte, error) { + if err := c.SendPkg(data, option...); err == nil { + return c.RecvPkgWithTimeout(timeout, option...) } else { return nil, err } diff --git a/g/net/gudp/gudp_conn.go b/g/net/gudp/gudp_conn.go index 0d924734f..32bb952d7 100644 --- a/g/net/gudp/gudp_conn.go +++ b/g/net/gudp/gudp_conn.go @@ -88,8 +88,11 @@ func (c *Conn) Send(data []byte, retry...Retry) error { } } -// 接收数据. -// 注意:UDP协议存在消息边界,因此使用 length<=0 可以获取缓冲区所有消息包数据,即一个完整包。 +// 接收UDP协议数据. +// +// 注意事项: +// 1、UDP协议存在消息边界,因此使用 length < 0 可以获取缓冲区所有消息包数据,即一个完整包; +// 2、当length = 0时,表示获取当前的缓冲区数据,获取一次后立即返回; func (c *Conn) Recv(length int, retry...Retry) ([]byte, error) { var err error // 读取错误 var size int // 读取长度 @@ -105,7 +108,7 @@ func (c *Conn) Recv(length int, retry...Retry) ([]byte, error) { } for { - if length <= 0 && index > 0 { + if length < 0 && index > 0 { bufferWait = true c.conn.SetReadDeadline(time.Now().Add(c.recvBufferWait)) } @@ -157,6 +160,10 @@ func (c *Conn) Recv(length int, retry...Retry) ([]byte, error) { } break } + // 只获取一次数据 + if length == 0 { + break + } } return buffer[:index], err } diff --git a/g/util/gconv/gconv.go b/g/util/gconv/gconv.go index 6b517ca37..635645a6d 100644 --- a/g/util/gconv/gconv.go +++ b/g/util/gconv/gconv.go @@ -20,6 +20,11 @@ type apiString interface { String() string } +// Type assert api for Error(). +type apiError interface { + Error() string +} + var ( // Empty strings. emptyStringMap = map[string]struct{}{ @@ -143,6 +148,10 @@ func String(i interface{}) string { // If the variable implements the String() interface, // then use that interface to perform the conversion return f.String() + } else if f, ok := value.(apiError); ok { + // If the variable implements the Error() interface, + // then use that interface to perform the conversion + return f.Error() } else { // Finally we use json.Marshal to convert. jsonContent, _ := json.Marshal(value) diff --git a/geg/net/gtcp/pkg_operations/common/gtcp_common_client.go b/geg/net/gtcp/pkg_operations/common/gtcp_common_client.go index e4d9ed49c..1669d8784 100644 --- a/geg/net/gtcp/pkg_operations/common/gtcp_common_client.go +++ b/geg/net/gtcp/pkg_operations/common/gtcp_common_client.go @@ -40,21 +40,21 @@ func main() { case "doexit": onServerDoExit(conn, msg) case "heartbeat": onServerHeartBeat(conn, msg) default: - glog.Errorfln("invalid message: %v", msg) + glog.Errorf("invalid message: %v", msg) break } } } func onServerHello(conn *gtcp.Conn, msg *types.Msg) { - glog.Printfln("hello response message from [%s]: %s", conn.RemoteAddr().String(), msg.Data) + glog.Printf("hello response message from [%s]: %s", conn.RemoteAddr().String(), msg.Data) } func onServerHeartBeat(conn *gtcp.Conn, msg *types.Msg) { - glog.Printfln("heartbeat from [%s]", conn.RemoteAddr().String()) + glog.Printf("heartbeat from [%s]", conn.RemoteAddr().String()) } func onServerDoExit(conn *gtcp.Conn, msg *types.Msg) { - glog.Printfln("exit command from [%s]", conn.RemoteAddr().String()) + glog.Printf("exit command from [%s]", conn.RemoteAddr().String()) conn.Close() } \ No newline at end of file diff --git a/geg/net/gtcp/pkg_operations/common/gtcp_common_server.go b/geg/net/gtcp/pkg_operations/common/gtcp_common_server.go index 51bffe544..375c66adb 100644 --- a/geg/net/gtcp/pkg_operations/common/gtcp_common_server.go +++ b/geg/net/gtcp/pkg_operations/common/gtcp_common_server.go @@ -36,10 +36,10 @@ func main() { } func onClientHello(conn *gtcp.Conn, msg *types.Msg) { - glog.Printfln("hello message from [%s]: %s", conn.RemoteAddr().String(), msg.Data) + glog.Printf("hello message from [%s]: %s", conn.RemoteAddr().String(), msg.Data) funcs.SendPkg(conn, msg.Act, "Nice to meet you!") } func onClientHeartBeat(conn *gtcp.Conn, msg *types.Msg) { - glog.Printfln("heartbeat from [%s]", conn.RemoteAddr().String()) + glog.Printf("heartbeat from [%s]", conn.RemoteAddr().String()) } diff --git a/geg/net/gtcp/pkg_operations/gtcp_server_client1.go b/geg/net/gtcp/pkg_operations/gtcp_basic.go similarity index 96% rename from geg/net/gtcp/pkg_operations/gtcp_server_client1.go rename to geg/net/gtcp/pkg_operations/gtcp_basic.go index 867a7e404..f390e1224 100644 --- a/geg/net/gtcp/pkg_operations/gtcp_server_client1.go +++ b/geg/net/gtcp/pkg_operations/gtcp_basic.go @@ -32,7 +32,7 @@ func main() { defer conn.Close() for i := 0; i < 10000; i++ { if err := conn.SendPkg([]byte(gconv.String(i))); err != nil { - glog.Error(err) + glog.Error(err.Error()) } time.Sleep(1*time.Second) } diff --git a/geg/net/gtcp/pkg_operations/gtcp_server_client2.go b/geg/net/gtcp/pkg_operations/gtcp_empty_data.go similarity index 100% rename from geg/net/gtcp/pkg_operations/gtcp_server_client2.go rename to geg/net/gtcp/pkg_operations/gtcp_empty_data.go diff --git a/geg/net/gtcp/pkg_operations/gtcp_pkg_option.go b/geg/net/gtcp/pkg_operations/gtcp_pkg_option.go new file mode 100644 index 000000000..31f603ae5 --- /dev/null +++ b/geg/net/gtcp/pkg_operations/gtcp_pkg_option.go @@ -0,0 +1,39 @@ +package main + +import ( + "fmt" + "github.com/gogf/gf/g/net/gtcp" + "github.com/gogf/gf/g/os/glog" + "github.com/gogf/gf/g/util/gconv" + "time" +) + +func main() { + // Server + go gtcp.NewServer("127.0.0.1:8999", func(conn *gtcp.Conn) { + defer conn.Close() + for { + data, err := conn.RecvPkg(gtcp.PkgOption{MaxSize : 1}) + if err != nil { + fmt.Println(err) + break + } + fmt.Println("RecvPkg:", string(data)) + } + }).Run() + + time.Sleep(time.Second) + + // Client + conn, err := gtcp.NewConn("127.0.0.1:8999") + if err != nil { + panic(err) + } + defer conn.Close() + for i := 0; i < 10000; i++ { + if err := conn.SendPkg([]byte(gconv.String(i))); err != nil { + glog.Error(err) + } + time.Sleep(1*time.Second) + } +} diff --git a/geg/net/gtcp/pkg_operations/monitor/gtcp_monitor_server.go b/geg/net/gtcp/pkg_operations/monitor/gtcp_monitor_server.go index e91f1b34b..d8b2e2979 100644 --- a/geg/net/gtcp/pkg_operations/monitor/gtcp_monitor_server.go +++ b/geg/net/gtcp/pkg_operations/monitor/gtcp_monitor_server.go @@ -21,7 +21,7 @@ func main() { } info := &types.NodeInfo{} if err := json.Unmarshal(data, info); err != nil { - glog.Errorfln("invalid package structure: %s", err.Error()) + glog.Errorf("invalid package structure: %s", err.Error()) } else { glog.Println(info) conn.SendPkg([]byte("ok"))