improve gtcp

This commit is contained in:
John
2019-06-03 23:53:59 +08:00
parent 6ac437a3a5
commit 00a8ef63b6
13 changed files with 138 additions and 95 deletions

View File

@ -25,18 +25,18 @@ func Encrypt(plainText []byte, key []byte, iv...[]byte) ([]byte, error) {
return nil, err
}
blockSize := block.BlockSize()
plainText = PKCS5Padding(plainText, blockSize)
ivValue := ([]byte)(nil)
plainText = PKCS5Padding(plainText, blockSize)
ivValue := ([]byte)(nil)
if len(iv) > 0 {
ivValue = iv[0]
} else {
ivValue = []byte(ivDefValue)
}
blockMode := cipher.NewCBCEncrypter(block, ivValue)
ciphertext := make([]byte, len(plainText))
blockMode.CryptBlocks(ciphertext, plainText)
cipherText := make([]byte, len(plainText))
blockMode.CryptBlocks(cipherText, plainText)
return ciphertext, nil
return cipherText, nil
}
// AES解密, 使用CBC模式注意key必须为16/24/32位长度iv初始化向量为非必需参数

View File

@ -84,10 +84,12 @@ func (c *Conn) Send(data []byte, retry...Retry) error {
}
}
// 获取数据,指定读取的数据长度(length < 1表示获取所有可读数据),以及重试策略(retry)
// 阻塞等待获取指定读取的数据长度,并给定重试策略。
//
// 需要注意:
// 1、往往在socket通信中需要指定固定的数据结构并在设定对应的长度字段并在读取数据时便于区分包大小
// 2、当length < 1时表示获取缓冲区所有的数据,但是可能会引起包解析问题(可能出现断包情况),因此需要解析端注意解析策略;
// 2、当length < 0时表示获取缓冲区所有的数据,但是可能会引起包解析问题(可能出现粘包/断包情况),因此需要解析端注意解析策略;
// 3、当length = 0时表示获取一次缓冲区的数据后立即返回
func (c *Conn) Recv(length int, retry...Retry) ([]byte, error) {
var err error // 读取错误
var size int // 读取长度
@ -106,7 +108,7 @@ func (c *Conn) Recv(length int, retry...Retry) ([]byte, error) {
// 如果已经读取到数据(这点很关键,表明缓冲区已经有数据,剩下的操作就是将所有数据读取完毕)
// 那么可以设置读取全部缓冲数据的超时时间;如果没有接收到任何数据,那么将会进入读取阻塞(或者自定义的超时阻塞);
// 仅对读取全部缓冲区数据操作有效
if length <= 0 && index > 0 {
if length < 0 && index > 0 {
bufferWait = true
c.conn.SetReadDeadline(time.Now().Add(c.recvBufferWait))
}
@ -155,6 +157,10 @@ func (c *Conn) Recv(length int, retry...Retry) ([]byte, error) {
}
break
}
// 只获取一次数据
if length == 0 {
break
}
}
return buffer[:index], err
}

View File

@ -10,31 +10,31 @@ import (
"encoding/binary"
"errors"
"fmt"
"github.com/gogf/gf/g/crypto/gcrc32"
"time"
)
const (
// 默认允许最大的简单协议包大小(byte), 65535 byte
gPKG_MAX_SIZE = 65535
gPKG_MAX_DATA_SIZE = 65535
// 简单协议包头大小
gPKG_HEADER_SIZE = 3
)
// 数据读取选项
type Option struct {
MaxSize int // (byte)数据读取的最大包大小最大不能超过3字节(0xFFFFFF,15MB)默认为65535byte
Secret []byte // (可选)安全通信密钥
Retry Retry // 失败重试
type PkgOption struct {
MaxSize int // (byte)数据读取的最大包大小最大不能超过3字节(0xFFFFFF,15MB)默认为65535byte
Retry Retry // 失败重试
}
// getPkgOption wraps and returns the option.
// getPkgOption wraps and returns the PkgOption.
// If no option given, it returns a new option with default value.
func getPkgOption(option...Option) (*Option, error) {
pkgOption := Option{}
func getPkgOption(option...PkgOption) (*PkgOption, error) {
pkgOption := PkgOption{}
if len(option) > 0 {
pkgOption = option[0]
}
if pkgOption.MaxSize == 0 {
pkgOption.MaxSize = gPKG_MAX_SIZE
pkgOption.MaxSize = gPKG_MAX_DATA_SIZE
} else if pkgOption.MaxSize > 0xFFFFFF {
return nil, fmt.Errorf(`package size %d exceeds allowed max size %d`, pkgOption.MaxSize, 0xFFFFFF)
}
@ -42,47 +42,40 @@ func getPkgOption(option...Option) (*Option, error) {
}
// 根据简单协议发送数据包。
// 简单协议数据格式:总长度(24bit)|校验位(32bit,可选)|数据(变长)。
//
// 简单协议数据格式:数据长度(24bit)|数据字段(变长)。
//
// 注意:
// 1. "长度"包含自身3字节及"校验位"4字节(可选)
// 2. 当Secret有提供时"校验位"才会存在,否则该字段为空
// 3. "校验位"提供简单的数据完整性及防篡改校验,默认没有开启。
// 4. 由于"总长度"为3字节并且使用的BigEndian字节序因此这里最后返回的buffer使用了buffer[1:]。
func (c *Conn) SendPkg(data []byte, option...Option) error {
// 1. "数据长度"仅为"数据字段"的长度不包含头信息的长度字段3字节
// 2. 由于"数据长度"为3字节并且使用的BigEndian字节序因此这里最后返回的buffer使用了buffer[1:]
func (c *Conn) SendPkg(data []byte, option...PkgOption) error {
pkgOption, err := getPkgOption(option...)
if err != nil {
return err
}
headerSize := 3
if len(pkgOption.Secret) > 0 {
headerSize = 7
}
length := len(data)
if length > pkgOption.MaxSize - headerSize {
return errors.New(fmt.Sprintf(`data size %d exceeds max pkg size %d`, length, gPKG_MAX_SIZE - headerSize))
}
buffer := make([]byte, headerSize + 1 + len(data))
copy(buffer[headerSize + 1 : ], data)
binary.BigEndian.PutUint32(buffer[0 : ], uint32(headerSize + length))
if len(pkgOption.Secret) > 0 {
binary.BigEndian.PutUint32(buffer[4 : ], gcrc32.Encrypt(append(data, pkgOption.Secret...)))
if length > pkgOption.MaxSize {
return errors.New(fmt.Sprintf(`data size %d exceeds max pkg size %d`, length, gPKG_MAX_DATA_SIZE))
}
buffer := make([]byte, gPKG_HEADER_SIZE + 1 + len(data))
binary.BigEndian.PutUint32(buffer[0 : ], uint32(length))
copy(buffer[gPKG_HEADER_SIZE + 1 : ], data)
if pkgOption.Retry.Count > 0 {
c.Send(buffer[1:], pkgOption.Retry)
}
//fmt.Println("SendPkg:", buffer[1:])
return c.Send(buffer[1:])
}
// 简单协议: 带超时时间的数据发送
func (c *Conn) SendPkgWithTimeout(data []byte, timeout time.Duration, option...Option) error {
func (c *Conn) SendPkgWithTimeout(data []byte, timeout time.Duration, option...PkgOption) error {
c.SetSendDeadline(time.Now().Add(timeout))
defer c.SetSendDeadline(time.Time{})
return c.SendPkg(data, option...)
}
// 简单协议: 发送数据并等待接收返回数据
func (c *Conn) SendRecvPkg(data []byte, option...Option) ([]byte, error) {
func (c *Conn) SendRecvPkg(data []byte, option...PkgOption) ([]byte, error) {
if err := c.SendPkg(data, option...); err == nil {
return c.RecvPkg(option...)
} else {
@ -91,7 +84,7 @@ func (c *Conn) SendRecvPkg(data []byte, option...Option) ([]byte, error) {
}
// 简单协议: 发送数据并等待接收返回数据(带返回超时等待时间)
func (c *Conn) SendRecvPkgWithTimeout(data []byte, timeout time.Duration, option...Option) ([]byte, error) {
func (c *Conn) SendRecvPkgWithTimeout(data []byte, timeout time.Duration, option...PkgOption) ([]byte, error) {
if err := c.SendPkg(data, option...); err == nil {
return c.RecvPkgWithTimeout(timeout, option...)
} else {
@ -100,48 +93,37 @@ func (c *Conn) SendRecvPkgWithTimeout(data []byte, timeout time.Duration, option
}
// 简单协议: 获取一个数据包。
func (c *Conn) RecvPkg(option...Option) (result []byte, err error) {
func (c *Conn) RecvPkg(option...PkgOption) (result []byte, err error) {
var temp []byte
var length int
pkgOption, err := getPkgOption(option...)
if err != nil {
return nil, err
}
headerSize := 3
if len(pkgOption.Secret) > 0 {
headerSize = 7
}
for {
// 先根据对象的缓冲区数据进行计算
for {
if len(c.buffer) >= headerSize {
// 注意"长度"为3个字节不满足4个字节的uint32类型因此这里"低位"补0
if len(c.buffer) >= gPKG_HEADER_SIZE {
// 注意"数据长度"为3个字节不满足4个字节的uint32类型因此这里"低位"补0
length = int(binary.BigEndian.Uint32([]byte{0, c.buffer[0], c.buffer[1], c.buffer[2]}))
// 解析的大小是否符合规范,清空从该连接接收到的所有数据包
if length <= 0 || length + headerSize > pkgOption.MaxSize {
if length < 0 || length > pkgOption.MaxSize {
c.buffer = c.buffer[:0]
return nil, fmt.Errorf(`invalid package size %d`, length)
}
// 不满足包大小,需要继续读取
if len(c.buffer) < length {
if len(c.buffer) < length + gPKG_HEADER_SIZE {
break
}
// 数据校验,如果失败,丢弃该数据包
receivedCrc32 := binary.BigEndian.Uint32(c.buffer[3 : headerSize])
calculatedCrc32 := gcrc32.Encrypt(c.buffer[headerSize : length])
if receivedCrc32 != calculatedCrc32 {
c.buffer = c.buffer[length: ]
return nil, fmt.Errorf(`data CRC32 validates failed, received %d, caculated %d`, receivedCrc32, calculatedCrc32)
}
result = c.buffer[headerSize : length]
c.buffer = c.buffer[length: ]
result = c.buffer[gPKG_HEADER_SIZE : gPKG_HEADER_SIZE + length]
c.buffer = c.buffer[gPKG_HEADER_SIZE + length: ]
return
} else {
break
}
}
// 读取系统socket缓冲区的完整数据
temp, err = c.Recv(-1, option...)
temp, err = c.Recv(0, pkgOption.Retry)
if err != nil {
break
}
@ -154,8 +136,8 @@ func (c *Conn) RecvPkg(option...Option) (result []byte, err error) {
}
// 简单协议: 带超时时间的消息包获取
func (c *Conn) RecvPkgWithTimeout(timeout time.Duration, option...Option) ([]byte, error) {
func (c *Conn) RecvPkgWithTimeout(timeout time.Duration, option...PkgOption) ([]byte, error) {
c.SetRecvDeadline(time.Now().Add(timeout))
defer c.SetRecvDeadline(time.Time{})
return c.RecvPkg(retry...)
return c.RecvPkg(option...)
}

View File

@ -9,41 +9,41 @@ package gtcp
import "time"
// 简单协议: (面向短链接)发送消息包
func SendPkg(addr string, data []byte, retry...Retry) error {
func SendPkg(addr string, data []byte, option...PkgOption) error {
conn, err := NewConn(addr)
if err != nil {
return err
}
defer conn.Close()
return conn.SendPkg(data, retry...)
return conn.SendPkg(data, option...)
}
// 简单协议: (面向短链接)发送数据并等待接收返回数据
func SendRecvPkg(addr string, data []byte, retry...Retry) ([]byte, error) {
func SendRecvPkg(addr string, data []byte, option...PkgOption) ([]byte, error) {
conn, err := NewConn(addr)
if err != nil {
return nil, err
}
defer conn.Close()
return conn.SendRecvPkg(data, retry...)
return conn.SendRecvPkg(data, option...)
}
// 简单协议: (面向短链接)带超时时间的数据发送
func SendPkgWithTimeout(addr string, data []byte, timeout time.Duration, retry...Retry) error {
func SendPkgWithTimeout(addr string, data []byte, timeout time.Duration, option...PkgOption) error {
conn, err := NewConn(addr)
if err != nil {
return err
}
defer conn.Close()
return conn.SendPkgWithTimeout(data, timeout, retry...)
return conn.SendPkgWithTimeout(data, timeout, option...)
}
// 简单协议: (面向短链接)发送数据并等待接收返回数据(带返回超时等待时间)
func SendRecvPkgWithTimeout(addr string, data []byte, timeout time.Duration, retry...Retry) ([]byte, error) {
func SendRecvPkgWithTimeout(addr string, data []byte, timeout time.Duration, option...PkgOption) ([]byte, error) {
conn, err := NewConn(addr)
if err != nil {
return nil, err
}
defer conn.Close()
return conn.SendRecvPkgWithTimeout(data, timeout, retry...)
return conn.SendRecvPkgWithTimeout(data, timeout, option...)
}

View File

@ -11,11 +11,11 @@ import (
)
// 简单协议: (方法覆盖)发送数据
func (c *PoolConn) SendPkg(data []byte, retry...Retry) (err error) {
if err = c.Conn.SendPkg(data, retry...); err != nil && c.status == gCONN_STATUS_UNKNOWN {
func (c *PoolConn) SendPkg(data []byte, option...PkgOption) (err error) {
if err = c.Conn.SendPkg(data, option...); err != nil && c.status == gCONN_STATUS_UNKNOWN {
if v, e := c.pool.NewFunc(); e == nil {
c.Conn = v.(*PoolConn).Conn
err = c.Conn.SendPkg(data, retry...)
err = c.Conn.SendPkg(data, option...)
} else {
err = e
}
@ -29,8 +29,8 @@ func (c *PoolConn) SendPkg(data []byte, retry...Retry) (err error) {
}
// 简单协议: (方法覆盖)接收数据
func (c *PoolConn) RecvPkg(retry...Retry) ([]byte, error) {
data, err := c.Conn.RecvPkg(retry...)
func (c *PoolConn) RecvPkg(option...PkgOption) ([]byte, error) {
data, err := c.Conn.RecvPkg(option...)
if err != nil {
c.status = gCONN_STATUS_ERROR
} else {
@ -40,32 +40,32 @@ func (c *PoolConn) RecvPkg(retry...Retry) ([]byte, error) {
}
// 简单协议: (方法覆盖)带超时时间的数据获取
func (c *PoolConn) RecvPkgWithTimeout(timeout time.Duration, retry...Retry) ([]byte, error) {
func (c *PoolConn) RecvPkgWithTimeout(timeout time.Duration, option...PkgOption) ([]byte, error) {
c.SetRecvDeadline(time.Now().Add(timeout))
defer c.SetRecvDeadline(time.Time{})
return c.RecvPkg(retry...)
return c.RecvPkg(option...)
}
// 简单协议: (方法覆盖)带超时时间的数据发送
func (c *PoolConn) SendPkgWithTimeout(data []byte, timeout time.Duration, retry...Retry) error {
func (c *PoolConn) SendPkgWithTimeout(data []byte, timeout time.Duration, option...PkgOption) error {
c.SetSendDeadline(time.Now().Add(timeout))
defer c.SetSendDeadline(time.Time{})
return c.SendPkg(data, retry...)
return c.SendPkg(data, option...)
}
// 简单协议: (方法覆盖)发送数据并等待接收返回数据
func (c *PoolConn) SendRecvPkg(data []byte, retry...Retry) ([]byte, error) {
if err := c.SendPkg(data, retry...); err == nil {
return c.RecvPkg(retry...)
func (c *PoolConn) SendRecvPkg(data []byte, option...PkgOption) ([]byte, error) {
if err := c.SendPkg(data, option...); err == nil {
return c.RecvPkg(option...)
} else {
return nil, err
}
}
// 简单协议: (方法覆盖)发送数据并等待接收返回数据(带返回超时等待时间)
func (c *PoolConn) SendRecvPkgWithTimeout(data []byte, timeout time.Duration, retry...Retry) ([]byte, error) {
if err := c.SendPkg(data, retry...); err == nil {
return c.RecvPkgWithTimeout(timeout, retry...)
func (c *PoolConn) SendRecvPkgWithTimeout(data []byte, timeout time.Duration, option...PkgOption) ([]byte, error) {
if err := c.SendPkg(data, option...); err == nil {
return c.RecvPkgWithTimeout(timeout, option...)
} else {
return nil, err
}

View File

@ -88,8 +88,11 @@ func (c *Conn) Send(data []byte, retry...Retry) error {
}
}
// 接收数据.
// 注意UDP协议存在消息边界因此使用 length<=0 可以获取缓冲区所有消息包数据,即一个完整包。
// 接收UDP协议数据.
//
// 注意事项:
// 1、UDP协议存在消息边界因此使用 length < 0 可以获取缓冲区所有消息包数据,即一个完整包;
// 2、当length = 0时表示获取当前的缓冲区数据获取一次后立即返回
func (c *Conn) Recv(length int, retry...Retry) ([]byte, error) {
var err error // 读取错误
var size int // 读取长度
@ -105,7 +108,7 @@ func (c *Conn) Recv(length int, retry...Retry) ([]byte, error) {
}
for {
if length <= 0 && index > 0 {
if length < 0 && index > 0 {
bufferWait = true
c.conn.SetReadDeadline(time.Now().Add(c.recvBufferWait))
}
@ -157,6 +160,10 @@ func (c *Conn) Recv(length int, retry...Retry) ([]byte, error) {
}
break
}
// 只获取一次数据
if length == 0 {
break
}
}
return buffer[:index], err
}

View File

@ -20,6 +20,11 @@ type apiString interface {
String() string
}
// Type assert api for Error().
type apiError interface {
Error() string
}
var (
// Empty strings.
emptyStringMap = map[string]struct{}{
@ -143,6 +148,10 @@ func String(i interface{}) string {
// If the variable implements the String() interface,
// then use that interface to perform the conversion
return f.String()
} else if f, ok := value.(apiError); ok {
// If the variable implements the Error() interface,
// then use that interface to perform the conversion
return f.Error()
} else {
// Finally we use json.Marshal to convert.
jsonContent, _ := json.Marshal(value)