diff --git a/net/gtcp/gtcp_conn.go b/net/gtcp/gtcp_conn.go index 53dc1dadd..ca9cc57a7 100644 --- a/net/gtcp/gtcp_conn.go +++ b/net/gtcp/gtcp_conn.go @@ -19,7 +19,6 @@ import ( type Conn struct { net.Conn // Underlying TCP connection object. reader *bufio.Reader // Buffer reader for connection. - buffer []byte // Buffer object. recvDeadline time.Time // Timeout point for reading. sendDeadline time.Time // Timeout point for writing. recvBufferWait time.Duration // Interval duration for reading buffer. diff --git a/net/gtcp/gtcp_conn_pkg.go b/net/gtcp/gtcp_conn_pkg.go index 2abac9bb4..6890f5694 100644 --- a/net/gtcp/gtcp_conn_pkg.go +++ b/net/gtcp/gtcp_conn_pkg.go @@ -13,34 +13,23 @@ import ( ) const ( - gPKG_DEFAULT_MAX_DATA_SIZE = 65535 // (Byte) Max package size. - gPKG_DEFAULT_HEADER_SIZE = 2 // Header size for simple package protocol. - gPKG_MAX_HEADER_SIZE = 4 // Max header size for simple package protocol. + gPKG_HEADER_SIZE_DEFAULT = 2 // Header size for simple package protocol. + gPKG_HEADER_SIZE_MAX = 4 // Max header size for simple package protocol. ) // Package option for simple protocol. type PkgOption struct { - HeaderSize int // It's 2 bytes in default, 4 bytes max. - MaxDataSize int // (Byte) Data field size, it's 2 bytes in default, which means 65535 bytes. - Retry Retry // Retry policy. -} + // HeaderSize is used to mark the data length for next data receiving. + // It's 2 bytes in default, 4 bytes max, which stands for the max data length + // from 65535 to 4294967295 bytes. + HeaderSize int -// getPkgOption wraps and returns the PkgOption. -// If no option given, it returns a new option with default value. -func getPkgOption(option ...PkgOption) (*PkgOption, error) { - pkgOption := PkgOption{} - if len(option) > 0 { - pkgOption = option[0] - } - if pkgOption.HeaderSize == 0 { - pkgOption.HeaderSize = gPKG_DEFAULT_HEADER_SIZE - } - if pkgOption.MaxDataSize == 0 { - pkgOption.MaxDataSize = gPKG_DEFAULT_MAX_DATA_SIZE - } else if pkgOption.MaxDataSize > 0xFFFFFF { - return nil, fmt.Errorf(`package size %d exceeds allowed max size %d`, pkgOption.MaxDataSize, 0xFFFFFF) - } - return &pkgOption, nil + // MaxDataSize is the data field size in bytes for data length validation. + // If it's not manually set, it'll automatically be set correspondingly with the HeaderSize. + MaxDataSize int + + // Retry policy when operation fails. + Retry Retry } // SendPkg send data using simple package protocol. @@ -48,7 +37,7 @@ func getPkgOption(option ...PkgOption) (*PkgOption, error) { // Simple package protocol: DataLength(24bit)|DataField(variant)。 // // Note that, -// 1. The DataLength is the length of DataField, which does not contain the header size 2 bytes. +// 1. The DataLength is the length of DataField, which does not contain the header size. // 2. The integer bytes of the package are encoded using BigEndian order. func (c *Conn) SendPkg(data []byte, option ...PkgOption) error { pkgOption, err := getPkgOption(option...) @@ -57,16 +46,18 @@ func (c *Conn) SendPkg(data []byte, option ...PkgOption) error { } length := len(data) if length > pkgOption.MaxDataSize { - return fmt.Errorf(`data size %d exceeds max pkg size %d`, length, pkgOption.MaxDataSize) + return fmt.Errorf( + `data too long, data size %d exceeds allowed max data size %d`, + length, pkgOption.MaxDataSize, + ) } - offset := gPKG_MAX_HEADER_SIZE - pkgOption.HeaderSize - buffer := make([]byte, gPKG_MAX_HEADER_SIZE+len(data)) + offset := gPKG_HEADER_SIZE_MAX - pkgOption.HeaderSize + buffer := make([]byte, gPKG_HEADER_SIZE_MAX+len(data)) binary.BigEndian.PutUint32(buffer[0:], uint32(length)) - copy(buffer[gPKG_MAX_HEADER_SIZE:], data) + copy(buffer[gPKG_HEADER_SIZE_MAX:], data) if pkgOption.Retry.Count > 0 { return c.Send(buffer[offset:], pkgOption.Retry) } - //fmt.Println("SendPkg:", buffer[offset:]) return c.Send(buffer[offset:]) } @@ -100,56 +91,39 @@ func (c *Conn) SendRecvPkgWithTimeout(data []byte, timeout time.Duration, option // RecvPkg receives data from connection using simple package protocol. func (c *Conn) RecvPkg(option ...PkgOption) (result []byte, err error) { - var temp []byte + var buffer []byte var length int pkgOption, err := getPkgOption(option...) if err != nil { return nil, err } - for { - for { - if len(c.buffer) >= pkgOption.HeaderSize { - if length <= 0 { - switch pkgOption.HeaderSize { - case 1: - // It fills with zero if the header size is lesser than 4 bytes (uint32). - length = int(binary.BigEndian.Uint32([]byte{0, 0, 0, c.buffer[0]})) - case 2: - length = int(binary.BigEndian.Uint32([]byte{0, 0, c.buffer[0], c.buffer[1]})) - case 3: - length = int(binary.BigEndian.Uint32([]byte{0, c.buffer[0], c.buffer[1], c.buffer[2]})) - default: - length = int(binary.BigEndian.Uint32([]byte{c.buffer[0], c.buffer[1], c.buffer[2], c.buffer[3]})) - } - } - // It here validates the size of the package. - // It clears the buffer and returns error immediately if it validates failed. - if length < 0 || length > pkgOption.MaxDataSize { - c.buffer = c.buffer[:0] - return nil, fmt.Errorf(`invalid package size %d`, length) - } - // It continues reading until it receives complete bytes of the package. - if len(c.buffer) < length+pkgOption.HeaderSize { - break - } - result = c.buffer[pkgOption.HeaderSize : pkgOption.HeaderSize+length] - c.buffer = c.buffer[pkgOption.HeaderSize+length:] - length = 0 - return - } else { - break - } - } - temp, err = c.Recv(0, pkgOption.Retry) - if err != nil { - break - } - if len(temp) > 0 { - c.buffer = append(c.buffer, temp...) - } - //fmt.Println("RecvPkg:", c.buffer) + // Header field. + buffer, err = c.Recv(pkgOption.HeaderSize, pkgOption.Retry) + if err != nil { + return nil, err } - return + switch pkgOption.HeaderSize { + case 1: + // It fills with zero if the header size is lesser than 4 bytes (uint32). + length = int(binary.BigEndian.Uint32([]byte{0, 0, 0, buffer[0]})) + case 2: + length = int(binary.BigEndian.Uint32([]byte{0, 0, buffer[0], buffer[1]})) + case 3: + length = int(binary.BigEndian.Uint32([]byte{0, buffer[0], buffer[1], buffer[2]})) + default: + length = int(binary.BigEndian.Uint32([]byte{buffer[0], buffer[1], buffer[2], buffer[3]})) + } + // It here validates the size of the package. + // It clears the buffer and returns error immediately if it validates failed. + if length < 0 || length > pkgOption.MaxDataSize { + return nil, fmt.Errorf(`invalid package size %d`, length) + } + // Empty package. + if length == 0 { + return nil, nil + } + // Data field. + return c.Recv(length, pkgOption.Retry) } // RecvPkgWithTimeout reads data from connection with timeout using simple package protocol. @@ -161,3 +135,40 @@ func (c *Conn) RecvPkgWithTimeout(timeout time.Duration, option ...PkgOption) (d data, err = c.RecvPkg(option...) return } + +// getPkgOption wraps and returns the PkgOption. +// If no option given, it returns a new option with default value. +func getPkgOption(option ...PkgOption) (*PkgOption, error) { + pkgOption := PkgOption{} + if len(option) > 0 { + pkgOption = option[0] + } + if pkgOption.HeaderSize == 0 { + pkgOption.HeaderSize = gPKG_HEADER_SIZE_DEFAULT + } + if pkgOption.HeaderSize > gPKG_HEADER_SIZE_MAX { + return nil, fmt.Errorf( + `package header size %d definition exceeds max header size %d`, + pkgOption.HeaderSize, gPKG_HEADER_SIZE_MAX, + ) + } + if pkgOption.MaxDataSize == 0 { + switch pkgOption.HeaderSize { + case 1: + pkgOption.MaxDataSize = 0xFF + case 2: + pkgOption.MaxDataSize = 0xFFFF + case 3: + pkgOption.MaxDataSize = 0xFFFFFF + case 4: + pkgOption.MaxDataSize = 0xFFFFFFFF + } + } + if pkgOption.MaxDataSize > 0xFFFFFFFF { + return nil, fmt.Errorf( + `package data size %d definition exceeds allowed max data size %d`, + pkgOption.MaxDataSize, 0xFFFFFFFF, + ) + } + return &pkgOption, nil +} diff --git a/net/gtcp/gtcp_server.go b/net/gtcp/gtcp_server.go index 763f72e01..87c263447 100644 --- a/net/gtcp/gtcp_server.go +++ b/net/gtcp/gtcp_server.go @@ -110,32 +110,32 @@ func (s *Server) Close() error { func (s *Server) Run() (err error) { if s.handler == nil { err = errors.New("start running failed: socket handler not defined") - glog.Error(err) + glog.Fatal(err) return } if s.tlsConfig != nil { // TLS Server s.listen, err = tls.Listen("tcp", s.address, s.tlsConfig) if err != nil { - glog.Error(err) + glog.Fatal(err) return } } else { // Normal Server addr, err := net.ResolveTCPAddr("tcp", s.address) if err != nil { - glog.Error(err) + glog.Fatal(err) return err } s.listen, err = net.ListenTCP("tcp", addr) if err != nil { - glog.Error(err) + glog.Fatal(err) return err } } + // Listening loop. for { if conn, err := s.listen.Accept(); err != nil { - glog.Error(err) return err } else if conn != nil { go s.handler(NewConnByNetConn(conn)) diff --git a/net/gtcp/gtcp_unit_test.go b/net/gtcp/gtcp_unit_test.go new file mode 100644 index 000000000..b4e8b1417 --- /dev/null +++ b/net/gtcp/gtcp_unit_test.go @@ -0,0 +1,183 @@ +// Copyright 2017 gf Author(https://github.com/gogf/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +package gtcp_test + +import ( + "fmt" + "github.com/gogf/gf/container/garray" + "github.com/gogf/gf/net/gtcp" + "github.com/gogf/gf/test/gtest" + "github.com/gogf/gf/util/gconv" + "testing" + "time" +) + +var ( + ports = garray.NewIntArray(true) +) + +func init() { + for i := 9000; i <= 10000; i++ { + ports.Append(i) + } +} + +func Test_Package_Basic(t *testing.T) { + p := ports.PopRand() + s := gtcp.NewServer(fmt.Sprintf(`:%d`, p), func(conn *gtcp.Conn) { + defer conn.Close() + for { + data, err := conn.RecvPkg() + if err != nil { + break + } + conn.SendPkg(data) + } + }) + go s.Run() + defer s.Close() + time.Sleep(100 * time.Millisecond) + // SendPkg + gtest.Case(t, func() { + conn, err := gtcp.NewConn(fmt.Sprintf("127.0.0.1:%d", p)) + gtest.Assert(err, nil) + defer conn.Close() + for i := 0; i < 100; i++ { + err := conn.SendPkg([]byte(gconv.String(i))) + gtest.Assert(err, nil) + } + for i := 0; i < 100; i++ { + err := conn.SendPkgWithTimeout([]byte(gconv.String(i)), time.Second) + gtest.Assert(err, nil) + } + }) + // SendPkg with big data - failure. + gtest.Case(t, func() { + conn, err := gtcp.NewConn(fmt.Sprintf("127.0.0.1:%d", p)) + gtest.Assert(err, nil) + defer conn.Close() + data := make([]byte, 65536) + err = conn.SendPkg(data) + gtest.AssertNE(err, nil) + }) + // SendRecvPkg + gtest.Case(t, func() { + conn, err := gtcp.NewConn(fmt.Sprintf("127.0.0.1:%d", p)) + gtest.Assert(err, nil) + defer conn.Close() + for i := 100; i < 200; i++ { + data := []byte(gconv.String(i)) + result, err := conn.SendRecvPkg(data) + gtest.Assert(err, nil) + gtest.Assert(result, data) + } + for i := 100; i < 200; i++ { + data := []byte(gconv.String(i)) + result, err := conn.SendRecvPkgWithTimeout(data, time.Second) + gtest.Assert(err, nil) + gtest.Assert(result, data) + } + }) + // SendRecvPkg with big data - failure. + gtest.Case(t, func() { + conn, err := gtcp.NewConn(fmt.Sprintf("127.0.0.1:%d", p)) + gtest.Assert(err, nil) + defer conn.Close() + data := make([]byte, 65536) + result, err := conn.SendRecvPkg(data) + gtest.AssertNE(err, nil) + gtest.Assert(result, nil) + }) + // SendRecvPkg with big data - success. + gtest.Case(t, func() { + conn, err := gtcp.NewConn(fmt.Sprintf("127.0.0.1:%d", p)) + gtest.Assert(err, nil) + defer conn.Close() + data := make([]byte, 65500) + data[100] = byte(65) + data[65400] = byte(85) + result, err := conn.SendRecvPkg(data) + gtest.Assert(err, nil) + gtest.Assert(result, data) + }) +} + +func Test_Package_Timeout(t *testing.T) { + p := ports.PopRand() + s := gtcp.NewServer(fmt.Sprintf(`:%d`, p), func(conn *gtcp.Conn) { + defer conn.Close() + for { + data, err := conn.RecvPkg() + if err != nil { + break + } + time.Sleep(time.Second) + gtest.Assert(conn.SendPkg(data), nil) + } + }) + go s.Run() + defer s.Close() + time.Sleep(100 * time.Millisecond) + gtest.Case(t, func() { + conn, err := gtcp.NewConn(fmt.Sprintf("127.0.0.1:%d", p)) + gtest.Assert(err, nil) + defer conn.Close() + data := []byte("10000") + result, err := conn.SendRecvPkgWithTimeout(data, time.Millisecond*500) + gtest.AssertNE(err, nil) + gtest.Assert(result, nil) + }) + gtest.Case(t, func() { + conn, err := gtcp.NewConn(fmt.Sprintf("127.0.0.1:%d", p)) + gtest.Assert(err, nil) + defer conn.Close() + data := []byte("10000") + result, err := conn.SendRecvPkgWithTimeout(data, time.Second*2) + gtest.Assert(err, nil) + gtest.Assert(result, data) + }) +} + +func Test_Package_Option(t *testing.T) { + p := ports.PopRand() + s := gtcp.NewServer(fmt.Sprintf(`:%d`, p), func(conn *gtcp.Conn) { + defer conn.Close() + option := gtcp.PkgOption{HeaderSize: 1} + for { + data, err := conn.RecvPkg(option) + if err != nil { + break + } + gtest.Assert(conn.SendPkg(data, option), nil) + } + }) + go s.Run() + defer s.Close() + time.Sleep(100 * time.Millisecond) + // SendRecvPkg with big data - failure. + gtest.Case(t, func() { + conn, err := gtcp.NewConn(fmt.Sprintf("127.0.0.1:%d", p)) + gtest.Assert(err, nil) + defer conn.Close() + data := make([]byte, 0xFF+1) + result, err := conn.SendRecvPkg(data, gtcp.PkgOption{HeaderSize: 1}) + gtest.AssertNE(err, nil) + gtest.Assert(result, nil) + }) + // SendRecvPkg with big data - success. + gtest.Case(t, func() { + conn, err := gtcp.NewConn(fmt.Sprintf("127.0.0.1:%d", p)) + gtest.Assert(err, nil) + defer conn.Close() + data := make([]byte, 0xFF) + data[100] = byte(65) + data[200] = byte(85) + result, err := conn.SendRecvPkg(data, gtcp.PkgOption{HeaderSize: 1}) + gtest.Assert(err, nil) + gtest.Assert(result, data) + }) +}