improve package feature for gtcp

This commit is contained in:
John
2019-12-11 14:50:25 +08:00
parent 2804834540
commit d7e19bc3f3
4 changed files with 272 additions and 79 deletions

View File

@ -19,7 +19,6 @@ import (
type Conn struct {
net.Conn // Underlying TCP connection object.
reader *bufio.Reader // Buffer reader for connection.
buffer []byte // Buffer object.
recvDeadline time.Time // Timeout point for reading.
sendDeadline time.Time // Timeout point for writing.
recvBufferWait time.Duration // Interval duration for reading buffer.

View File

@ -13,34 +13,23 @@ import (
)
const (
gPKG_DEFAULT_MAX_DATA_SIZE = 65535 // (Byte) Max package size.
gPKG_DEFAULT_HEADER_SIZE = 2 // Header size for simple package protocol.
gPKG_MAX_HEADER_SIZE = 4 // Max header size for simple package protocol.
gPKG_HEADER_SIZE_DEFAULT = 2 // Header size for simple package protocol.
gPKG_HEADER_SIZE_MAX = 4 // Max header size for simple package protocol.
)
// Package option for simple protocol.
type PkgOption struct {
HeaderSize int // It's 2 bytes in default, 4 bytes max.
MaxDataSize int // (Byte) Data field size, it's 2 bytes in default, which means 65535 bytes.
Retry Retry // Retry policy.
}
// HeaderSize is used to mark the data length for next data receiving.
// It's 2 bytes in default, 4 bytes max, which stands for the max data length
// from 65535 to 4294967295 bytes.
HeaderSize int
// getPkgOption wraps and returns the PkgOption.
// If no option given, it returns a new option with default value.
func getPkgOption(option ...PkgOption) (*PkgOption, error) {
pkgOption := PkgOption{}
if len(option) > 0 {
pkgOption = option[0]
}
if pkgOption.HeaderSize == 0 {
pkgOption.HeaderSize = gPKG_DEFAULT_HEADER_SIZE
}
if pkgOption.MaxDataSize == 0 {
pkgOption.MaxDataSize = gPKG_DEFAULT_MAX_DATA_SIZE
} else if pkgOption.MaxDataSize > 0xFFFFFF {
return nil, fmt.Errorf(`package size %d exceeds allowed max size %d`, pkgOption.MaxDataSize, 0xFFFFFF)
}
return &pkgOption, nil
// MaxDataSize is the data field size in bytes for data length validation.
// If it's not manually set, it'll automatically be set correspondingly with the HeaderSize.
MaxDataSize int
// Retry policy when operation fails.
Retry Retry
}
// SendPkg send data using simple package protocol.
@ -48,7 +37,7 @@ func getPkgOption(option ...PkgOption) (*PkgOption, error) {
// Simple package protocol: DataLength(24bit)|DataField(variant)。
//
// Note that,
// 1. The DataLength is the length of DataField, which does not contain the header size 2 bytes.
// 1. The DataLength is the length of DataField, which does not contain the header size.
// 2. The integer bytes of the package are encoded using BigEndian order.
func (c *Conn) SendPkg(data []byte, option ...PkgOption) error {
pkgOption, err := getPkgOption(option...)
@ -57,16 +46,18 @@ func (c *Conn) SendPkg(data []byte, option ...PkgOption) error {
}
length := len(data)
if length > pkgOption.MaxDataSize {
return fmt.Errorf(`data size %d exceeds max pkg size %d`, length, pkgOption.MaxDataSize)
return fmt.Errorf(
`data too long, data size %d exceeds allowed max data size %d`,
length, pkgOption.MaxDataSize,
)
}
offset := gPKG_MAX_HEADER_SIZE - pkgOption.HeaderSize
buffer := make([]byte, gPKG_MAX_HEADER_SIZE+len(data))
offset := gPKG_HEADER_SIZE_MAX - pkgOption.HeaderSize
buffer := make([]byte, gPKG_HEADER_SIZE_MAX+len(data))
binary.BigEndian.PutUint32(buffer[0:], uint32(length))
copy(buffer[gPKG_MAX_HEADER_SIZE:], data)
copy(buffer[gPKG_HEADER_SIZE_MAX:], data)
if pkgOption.Retry.Count > 0 {
return c.Send(buffer[offset:], pkgOption.Retry)
}
//fmt.Println("SendPkg:", buffer[offset:])
return c.Send(buffer[offset:])
}
@ -100,56 +91,39 @@ func (c *Conn) SendRecvPkgWithTimeout(data []byte, timeout time.Duration, option
// RecvPkg receives data from connection using simple package protocol.
func (c *Conn) RecvPkg(option ...PkgOption) (result []byte, err error) {
var temp []byte
var buffer []byte
var length int
pkgOption, err := getPkgOption(option...)
if err != nil {
return nil, err
}
for {
for {
if len(c.buffer) >= pkgOption.HeaderSize {
if length <= 0 {
switch pkgOption.HeaderSize {
case 1:
// It fills with zero if the header size is lesser than 4 bytes (uint32).
length = int(binary.BigEndian.Uint32([]byte{0, 0, 0, c.buffer[0]}))
case 2:
length = int(binary.BigEndian.Uint32([]byte{0, 0, c.buffer[0], c.buffer[1]}))
case 3:
length = int(binary.BigEndian.Uint32([]byte{0, c.buffer[0], c.buffer[1], c.buffer[2]}))
default:
length = int(binary.BigEndian.Uint32([]byte{c.buffer[0], c.buffer[1], c.buffer[2], c.buffer[3]}))
}
}
// It here validates the size of the package.
// It clears the buffer and returns error immediately if it validates failed.
if length < 0 || length > pkgOption.MaxDataSize {
c.buffer = c.buffer[:0]
return nil, fmt.Errorf(`invalid package size %d`, length)
}
// It continues reading until it receives complete bytes of the package.
if len(c.buffer) < length+pkgOption.HeaderSize {
break
}
result = c.buffer[pkgOption.HeaderSize : pkgOption.HeaderSize+length]
c.buffer = c.buffer[pkgOption.HeaderSize+length:]
length = 0
return
} else {
break
}
}
temp, err = c.Recv(0, pkgOption.Retry)
if err != nil {
break
}
if len(temp) > 0 {
c.buffer = append(c.buffer, temp...)
}
//fmt.Println("RecvPkg:", c.buffer)
// Header field.
buffer, err = c.Recv(pkgOption.HeaderSize, pkgOption.Retry)
if err != nil {
return nil, err
}
return
switch pkgOption.HeaderSize {
case 1:
// It fills with zero if the header size is lesser than 4 bytes (uint32).
length = int(binary.BigEndian.Uint32([]byte{0, 0, 0, buffer[0]}))
case 2:
length = int(binary.BigEndian.Uint32([]byte{0, 0, buffer[0], buffer[1]}))
case 3:
length = int(binary.BigEndian.Uint32([]byte{0, buffer[0], buffer[1], buffer[2]}))
default:
length = int(binary.BigEndian.Uint32([]byte{buffer[0], buffer[1], buffer[2], buffer[3]}))
}
// It here validates the size of the package.
// It clears the buffer and returns error immediately if it validates failed.
if length < 0 || length > pkgOption.MaxDataSize {
return nil, fmt.Errorf(`invalid package size %d`, length)
}
// Empty package.
if length == 0 {
return nil, nil
}
// Data field.
return c.Recv(length, pkgOption.Retry)
}
// RecvPkgWithTimeout reads data from connection with timeout using simple package protocol.
@ -161,3 +135,40 @@ func (c *Conn) RecvPkgWithTimeout(timeout time.Duration, option ...PkgOption) (d
data, err = c.RecvPkg(option...)
return
}
// getPkgOption wraps and returns the PkgOption.
// If no option given, it returns a new option with default value.
func getPkgOption(option ...PkgOption) (*PkgOption, error) {
pkgOption := PkgOption{}
if len(option) > 0 {
pkgOption = option[0]
}
if pkgOption.HeaderSize == 0 {
pkgOption.HeaderSize = gPKG_HEADER_SIZE_DEFAULT
}
if pkgOption.HeaderSize > gPKG_HEADER_SIZE_MAX {
return nil, fmt.Errorf(
`package header size %d definition exceeds max header size %d`,
pkgOption.HeaderSize, gPKG_HEADER_SIZE_MAX,
)
}
if pkgOption.MaxDataSize == 0 {
switch pkgOption.HeaderSize {
case 1:
pkgOption.MaxDataSize = 0xFF
case 2:
pkgOption.MaxDataSize = 0xFFFF
case 3:
pkgOption.MaxDataSize = 0xFFFFFF
case 4:
pkgOption.MaxDataSize = 0xFFFFFFFF
}
}
if pkgOption.MaxDataSize > 0xFFFFFFFF {
return nil, fmt.Errorf(
`package data size %d definition exceeds allowed max data size %d`,
pkgOption.MaxDataSize, 0xFFFFFFFF,
)
}
return &pkgOption, nil
}

View File

@ -110,32 +110,32 @@ func (s *Server) Close() error {
func (s *Server) Run() (err error) {
if s.handler == nil {
err = errors.New("start running failed: socket handler not defined")
glog.Error(err)
glog.Fatal(err)
return
}
if s.tlsConfig != nil {
// TLS Server
s.listen, err = tls.Listen("tcp", s.address, s.tlsConfig)
if err != nil {
glog.Error(err)
glog.Fatal(err)
return
}
} else {
// Normal Server
addr, err := net.ResolveTCPAddr("tcp", s.address)
if err != nil {
glog.Error(err)
glog.Fatal(err)
return err
}
s.listen, err = net.ListenTCP("tcp", addr)
if err != nil {
glog.Error(err)
glog.Fatal(err)
return err
}
}
// Listening loop.
for {
if conn, err := s.listen.Accept(); err != nil {
glog.Error(err)
return err
} else if conn != nil {
go s.handler(NewConnByNetConn(conn))

183
net/gtcp/gtcp_unit_test.go Normal file
View File

@ -0,0 +1,183 @@
// Copyright 2017 gf Author(https://github.com/gogf/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.
package gtcp_test
import (
"fmt"
"github.com/gogf/gf/container/garray"
"github.com/gogf/gf/net/gtcp"
"github.com/gogf/gf/test/gtest"
"github.com/gogf/gf/util/gconv"
"testing"
"time"
)
var (
ports = garray.NewIntArray(true)
)
func init() {
for i := 9000; i <= 10000; i++ {
ports.Append(i)
}
}
func Test_Package_Basic(t *testing.T) {
p := ports.PopRand()
s := gtcp.NewServer(fmt.Sprintf(`:%d`, p), func(conn *gtcp.Conn) {
defer conn.Close()
for {
data, err := conn.RecvPkg()
if err != nil {
break
}
conn.SendPkg(data)
}
})
go s.Run()
defer s.Close()
time.Sleep(100 * time.Millisecond)
// SendPkg
gtest.Case(t, func() {
conn, err := gtcp.NewConn(fmt.Sprintf("127.0.0.1:%d", p))
gtest.Assert(err, nil)
defer conn.Close()
for i := 0; i < 100; i++ {
err := conn.SendPkg([]byte(gconv.String(i)))
gtest.Assert(err, nil)
}
for i := 0; i < 100; i++ {
err := conn.SendPkgWithTimeout([]byte(gconv.String(i)), time.Second)
gtest.Assert(err, nil)
}
})
// SendPkg with big data - failure.
gtest.Case(t, func() {
conn, err := gtcp.NewConn(fmt.Sprintf("127.0.0.1:%d", p))
gtest.Assert(err, nil)
defer conn.Close()
data := make([]byte, 65536)
err = conn.SendPkg(data)
gtest.AssertNE(err, nil)
})
// SendRecvPkg
gtest.Case(t, func() {
conn, err := gtcp.NewConn(fmt.Sprintf("127.0.0.1:%d", p))
gtest.Assert(err, nil)
defer conn.Close()
for i := 100; i < 200; i++ {
data := []byte(gconv.String(i))
result, err := conn.SendRecvPkg(data)
gtest.Assert(err, nil)
gtest.Assert(result, data)
}
for i := 100; i < 200; i++ {
data := []byte(gconv.String(i))
result, err := conn.SendRecvPkgWithTimeout(data, time.Second)
gtest.Assert(err, nil)
gtest.Assert(result, data)
}
})
// SendRecvPkg with big data - failure.
gtest.Case(t, func() {
conn, err := gtcp.NewConn(fmt.Sprintf("127.0.0.1:%d", p))
gtest.Assert(err, nil)
defer conn.Close()
data := make([]byte, 65536)
result, err := conn.SendRecvPkg(data)
gtest.AssertNE(err, nil)
gtest.Assert(result, nil)
})
// SendRecvPkg with big data - success.
gtest.Case(t, func() {
conn, err := gtcp.NewConn(fmt.Sprintf("127.0.0.1:%d", p))
gtest.Assert(err, nil)
defer conn.Close()
data := make([]byte, 65500)
data[100] = byte(65)
data[65400] = byte(85)
result, err := conn.SendRecvPkg(data)
gtest.Assert(err, nil)
gtest.Assert(result, data)
})
}
func Test_Package_Timeout(t *testing.T) {
p := ports.PopRand()
s := gtcp.NewServer(fmt.Sprintf(`:%d`, p), func(conn *gtcp.Conn) {
defer conn.Close()
for {
data, err := conn.RecvPkg()
if err != nil {
break
}
time.Sleep(time.Second)
gtest.Assert(conn.SendPkg(data), nil)
}
})
go s.Run()
defer s.Close()
time.Sleep(100 * time.Millisecond)
gtest.Case(t, func() {
conn, err := gtcp.NewConn(fmt.Sprintf("127.0.0.1:%d", p))
gtest.Assert(err, nil)
defer conn.Close()
data := []byte("10000")
result, err := conn.SendRecvPkgWithTimeout(data, time.Millisecond*500)
gtest.AssertNE(err, nil)
gtest.Assert(result, nil)
})
gtest.Case(t, func() {
conn, err := gtcp.NewConn(fmt.Sprintf("127.0.0.1:%d", p))
gtest.Assert(err, nil)
defer conn.Close()
data := []byte("10000")
result, err := conn.SendRecvPkgWithTimeout(data, time.Second*2)
gtest.Assert(err, nil)
gtest.Assert(result, data)
})
}
func Test_Package_Option(t *testing.T) {
p := ports.PopRand()
s := gtcp.NewServer(fmt.Sprintf(`:%d`, p), func(conn *gtcp.Conn) {
defer conn.Close()
option := gtcp.PkgOption{HeaderSize: 1}
for {
data, err := conn.RecvPkg(option)
if err != nil {
break
}
gtest.Assert(conn.SendPkg(data, option), nil)
}
})
go s.Run()
defer s.Close()
time.Sleep(100 * time.Millisecond)
// SendRecvPkg with big data - failure.
gtest.Case(t, func() {
conn, err := gtcp.NewConn(fmt.Sprintf("127.0.0.1:%d", p))
gtest.Assert(err, nil)
defer conn.Close()
data := make([]byte, 0xFF+1)
result, err := conn.SendRecvPkg(data, gtcp.PkgOption{HeaderSize: 1})
gtest.AssertNE(err, nil)
gtest.Assert(result, nil)
})
// SendRecvPkg with big data - success.
gtest.Case(t, func() {
conn, err := gtcp.NewConn(fmt.Sprintf("127.0.0.1:%d", p))
gtest.Assert(err, nil)
defer conn.Close()
data := make([]byte, 0xFF)
data[100] = byte(65)
data[200] = byte(85)
result, err := conn.SendRecvPkg(data, gtcp.PkgOption{HeaderSize: 1})
gtest.Assert(err, nil)
gtest.Assert(result, data)
})
}