From 0048cb4ecabacc858c00e63253529679c150a327 Mon Sep 17 00:00:00 2001 From: John Date: Wed, 4 Jul 2018 19:32:51 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0gpool=E5=AF=B9=E8=B1=A1?= =?UTF-8?q?=E6=B1=A0=EF=BC=9B=E5=A2=9E=E5=8A=A0gtcp.Conn=E8=BF=9E=E6=8E=A5?= =?UTF-8?q?=E6=B1=A0=E5=AF=B9=E8=B1=A1=EF=BC=9B=E6=94=B9=E8=BF=9Bgproc?= =?UTF-8?q?=E8=BF=9B=E7=A8=8B=E9=97=B4=E9=80=9A=E4=BF=A1=E6=9C=BA=E5=88=B6?= =?UTF-8?q?=EF=BC=8C=E5=A2=9E=E5=8A=A0=E8=BF=9B=E7=A8=8B=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E5=88=86=E7=BB=84=E7=89=B9=E6=80=A7=EF=BC=8C=E5=B9=B6=E9=99=90?= =?UTF-8?q?=E5=AE=9A=E9=98=9F=E5=88=97=E5=A4=A7=E5=B0=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- TODO | 6 +- g/container/gchan/gchan_test.go | 2 +- g/container/glist/glist_test.go | 46 ++++++ g/container/gpool/gpool.go | 94 +++++++++++ g/container/gpool/gpool_test.go | 41 +++++ g/encoding/gbinary/gbinary.go | 12 ++ g/net/ghttp/ghttp_server.go | 2 +- g/net/ghttp/ghttp_server_admin.go | 3 +- g/net/gtcp/tcp_func.go | 34 ++-- g/net/gtcp/tcp_pool.go | 135 ++++++++++++++++ g/os/gfilepool/gfilepool.go | 1 - g/os/gproc/gproc.go | 1 - g/os/gproc/gproc_comm.go | 29 ++-- g/os/gproc/gproc_comm_receive.go | 148 +++++++++++++++++- g/os/gproc/gproc_comm_send.go | 73 +++++---- g/os/gproc/gproc_comm_tcp.go | 96 ------------ g/os/gproc/gproc_proccess.go | 6 - g/os/gtime/gtime_test.go | 18 +++ geg/container/{ => garray}/garray.go | 0 geg/container/gpool/gpool.go | 17 ++ geg/net/gtcp/tcp_pool.go | 38 +++++ geg/net/{ => gtcp}/tcp_server.go | 0 geg/os/gproc/gproc_comm.go | 10 +- geg/os/gproc/gproc_comm_group.go | 33 ++++ .../{gproc_comm2.go => gproc_comm_send.go} | 3 +- geg/other/test.go | 9 +- 26 files changed, 675 insertions(+), 182 deletions(-) create mode 100644 g/container/glist/glist_test.go create mode 100644 g/container/gpool/gpool.go create mode 100644 g/container/gpool/gpool_test.go create mode 100644 g/net/gtcp/tcp_pool.go delete mode 100644 g/os/gproc/gproc_comm_tcp.go rename geg/container/{ => garray}/garray.go (100%) create mode 100644 geg/container/gpool/gpool.go create mode 100644 geg/net/gtcp/tcp_pool.go rename geg/net/{ => gtcp}/tcp_server.go (100%) create mode 100644 geg/os/gproc/gproc_comm_group.go rename geg/os/gproc/{gproc_comm2.go => gproc_comm_send.go} (57%) diff --git a/TODO b/TODO index 861470d72..16078e2d7 100644 --- a/TODO +++ b/TODO @@ -11,8 +11,8 @@ Cookie&Session数据池化处理; ghttp.Client增加proxy特性; gtime增加对时区转换的封装,并简化失去转换时对类似+80500时区的支持; 改进gf-orm的where查询功能,参考thinkphp 里的where查询语法; -gproc进程间通信增加分组特性,不同的进程见可以通过进程ID以及分组名称发送/获取消息; - +ORM增加获取被执行的sql语句的方法; +gpage分页增加对自定义后缀的支持,如:2.html, 2.php等等; DONE: 1. gconv完善针对不同类型的判断,例如:尽量减少sprintf("%v", xxx)来执行string类型的转换; @@ -41,7 +41,7 @@ DONE: 24. 对grpool进行优化改进,包括属性原子操作封装采用gtype实现,修正设计BUG:https://github.com/johng-cn/gf/issues/6; 25. gredis增加redis密码支持; 26. 改进ghttp.Server平滑重启机制,当新进程接管服务后,再使用进程间通信方式通知父进程销毁; - +27. gproc进程间通信增加分组特性,不同的进程间可以通过进程ID以及分组名称发送/获取进程消息; diff --git a/g/container/gchan/gchan_test.go b/g/container/gchan/gchan_test.go index 59413a168..ce6819ce4 100644 --- a/g/container/gchan/gchan_test.go +++ b/g/container/gchan/gchan_test.go @@ -17,7 +17,7 @@ var length = 10000000 var q1 = gchan.New(length) var q2 = make(chan int, length) -func BenchmarkGqueuePushAndPop(b *testing.B) { +func BenchmarkGchanPushAndPop(b *testing.B) { for i := 0; i < b.N; i++ { q1.Push(i) q1.Pop() diff --git a/g/container/glist/glist_test.go b/g/container/glist/glist_test.go new file mode 100644 index 000000000..bbd07bf31 --- /dev/null +++ b/g/container/glist/glist_test.go @@ -0,0 +1,46 @@ +// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +// go test *.go -bench=".*" + +package glist + +import ( + "testing" +) + +var l = New() + +func BenchmarkPushBack(b *testing.B) { + for i := 0; i < b.N; i++ { + l.PushBack(i) + } +} + +func BenchmarkPopFront(b *testing.B) { + for i := 0; i < b.N; i++ { + l.PopFront() + } +} + +func BenchmarkPushFront(b *testing.B) { + for i := 0; i < b.N; i++ { + l.PushFront(i) + } +} + +func BenchmarkPopBack(b *testing.B) { + for i := 0; i < b.N; i++ { + l.PopBack() + } +} + +func BenchmarkLen(b *testing.B) { + for i := 0; i < b.N; i++ { + l.Len() + } +} + diff --git a/g/container/gpool/gpool.go b/g/container/gpool/gpool.go new file mode 100644 index 000000000..8fde9264b --- /dev/null +++ b/g/container/gpool/gpool.go @@ -0,0 +1,94 @@ +// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +// 对象复用池. +package gpool + +import ( + "time" + "gitee.com/johng/gf/g/os/gtime" + "gitee.com/johng/gf/g/container/glist" + "gitee.com/johng/gf/g/container/gtype" + "errors" +) + +// 对象池 +type Pool struct { + list *glist.List // 可用/闲置的文件指针链表 + idle int64 // (毫秒)闲置最大时间,超过该时间则被系统回收 + closed *gtype.Bool // 连接池是否已关闭 + newFunc func()(interface{}, error) // 创建对象的方法定义 +} + +// 对象池数据项 +type poolItem struct { + expire int64 // (毫秒)过期时间 + value interface{} // 对象值 +} + +// 创建一个对象池,为保证执行效率,过期时间一旦设定之后无法修改 +func New(expire int, newFunc...func() (interface{}, error)) *Pool { + r := &Pool { + list : glist.New(), + idle : int64(expire), + closed : gtype.NewBool(), + } + if len(newFunc) > 0 { + r.newFunc = newFunc[0] + } + go r.expireCheckingLoop() + return r +} + +// 放一个临时对象到池中 +func (p *Pool) Put(item interface{}) { + p.list.PushBack(&poolItem{ + expire : gtime.Millisecond() + p.idle, + value : item, + }) +} + +// 从池中获得一个临时对象 +func (p *Pool) Get() (interface{}, error) { + for !p.closed.Val() { + if r := p.list.PopFront(); r != nil { + f := r.(*poolItem) + if f.expire > gtime.Millisecond() { + return f.value, nil + } + } else { + break + } + } + if p.newFunc != nil { + return p.newFunc() + } + return nil, errors.New("pool is empty") +} + +// 查询当前池中的对象数量 +func (p *Pool) Size() int { + return p.list.Len() +} + +// 关闭池 +func (p *Pool) Close() { + p.closed.Set(true) +} + +// 超时检测循环 +func (p *Pool) expireCheckingLoop() { + for !p.closed.Val() { + if r := p.list.PopFront(); r != nil { + f := r.(*poolItem) + if f.expire > gtime.Millisecond() { + p.list.PushFront(f) + break + } + } + time.Sleep(3 * time.Second) + } +} \ No newline at end of file diff --git a/g/container/gpool/gpool_test.go b/g/container/gpool/gpool_test.go new file mode 100644 index 000000000..cf569ddf9 --- /dev/null +++ b/g/container/gpool/gpool_test.go @@ -0,0 +1,41 @@ +// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +// go test *.go -bench=".*" + +package gpool + +import ( + "testing" + "sync" +) + +var pool = New(99999999) +var syncp = sync.Pool{} + +func BenchmarkGPoolPut(b *testing.B) { + for i := 0; i < b.N; i++ { + pool.Put(i) + } +} + +func BenchmarkGPoolGet(b *testing.B) { + for i := 0; i < b.N; i++ { + pool.Get() + } +} + +func BenchmarkSyncPoolPut(b *testing.B) { + for i := 0; i < b.N; i++ { + syncp.Put(i) + } +} + +func BenchmarkGpoolGet(b *testing.B) { + for i := 0; i < b.N; i++ { + syncp.Get() + } +} \ No newline at end of file diff --git a/g/encoding/gbinary/gbinary.go b/g/encoding/gbinary/gbinary.go index 42013644d..9dcab67f5 100644 --- a/g/encoding/gbinary/gbinary.go +++ b/g/encoding/gbinary/gbinary.go @@ -47,6 +47,18 @@ func Encode(vs ...interface{}) []byte { return buf.Bytes() } +// 将变量转换为二进制[]byte,并指定固定的[]byte长度返回,长度单位为字节(byte); +// 如果转换的二进制长度超过指定长度,那么进行截断处理 +func EncodeByLength(length int, vs ...interface{}) []byte { + b := Encode(vs...) + if len(b) < length { + b = append(b, make([]byte, length - len(b))...) + } else if len(b) > length { + b = b[0 : length] + } + return b +} + // 整形二进制解包,注意第二个及其后参数为字长确定的整形变量的指针地址,以便确定解析的[]byte长度, // 例如:int8/16/32/64、uint8/16/32/64、float32/64等等 func Decode(b []byte, vs ...interface{}) error { diff --git a/g/net/ghttp/ghttp_server.go b/g/net/ghttp/ghttp_server.go index eae67dac5..1bea05337 100644 --- a/g/net/ghttp/ghttp_server.go +++ b/g/net/ghttp/ghttp_server.go @@ -220,7 +220,7 @@ func (s *Server) Start() error { // 如果是子进程,那么服务开启后通知父进程销毁 if gproc.IsChild() { gtime.SetTimeout(2*time.Second, func() { - gproc.Send(gproc.PPid(), []byte("exit")) + gproc.Send(gproc.PPid(), []byte("exit"), gADMIN_GPROC_COMM_GROUP) }) } diff --git a/g/net/ghttp/ghttp_server_admin.go b/g/net/ghttp/ghttp_server_admin.go index bd1de1edb..abce91fc1 100644 --- a/g/net/ghttp/ghttp_server_admin.go +++ b/g/net/ghttp/ghttp_server_admin.go @@ -31,6 +31,7 @@ const ( gADMIN_ACTION_SHUTINGDOWN = 2 gADMIN_ACTION_RELOAD_ENVKEY = "GF_SERVER_RELOAD" gADMIN_ACTION_RESTART_ENVKEY = "GF_SERVER_RESTART" + gADMIN_GPROC_COMM_GROUP = "GF_GPROC_HTTP_SERVER" ) // 用于服务管理的对象 @@ -282,7 +283,7 @@ func forcedlyCloseWebServers() { // 异步监听进程间消息 func handleProcessMessage() { for { - if msg := gproc.Receive(); msg != nil { + if msg := gproc.Receive(gADMIN_GPROC_COMM_GROUP); msg != nil { if bytes.EqualFold(msg.Data, []byte("exit")) { gracefulShutdownWebServers() doneChan <- struct{}{} diff --git a/g/net/gtcp/tcp_func.go b/g/net/gtcp/tcp_func.go index 43e95911e..766eff985 100644 --- a/g/net/gtcp/tcp_func.go +++ b/g/net/gtcp/tcp_func.go @@ -16,7 +16,7 @@ import ( const ( gDEFAULT_RETRY_INTERVAL = 100 // (毫秒)默认重试时间间隔 - gDEFAULT_READ_BUFFER_SIZE = 10 // 默认数据读取缓冲区大小 + gDEFAULT_READ_BUFFER_SIZE = 1024 // 默认数据读取缓冲区大小 ) @@ -40,8 +40,8 @@ func Checksum(buffer []byte) uint32 { return checksum } -// 创建TCP链接 -func Conn(ip string, port int, timeout...int) (net.Conn, error) { +// 创建原生TCP链接 +func NewNetConn(ip string, port int, timeout...int) (net.Conn, error) { addr := fmt.Sprintf("%s:%d", ip, port) if len(timeout) > 0 { return net.DialTimeout("tcp", addr, time.Duration(timeout[0]) * time.Millisecond) @@ -52,19 +52,23 @@ func Conn(ip string, port int, timeout...int) (net.Conn, error) { // 获取数据 func Receive(conn net.Conn, retry...Retry) ([]byte, error) { - var err error = nil - size := gDEFAULT_READ_BUFFER_SIZE - data := make([]byte, 0) + var err error + var length int + var buffer []byte + size := gDEFAULT_READ_BUFFER_SIZE + data := make([]byte, 0) for { - buffer := make([]byte, size) - length, e := conn.Read(buffer) - if length < 1 || e != nil { - if e == io.EOF { + buffer = make([]byte, size) + length, err = conn.Read(buffer) + // 这里使用 "&&" 只要有数据不管有无错误都将先进行解析 + if length < 1 && err != nil { + // 链接已关闭 + if err == io.EOF { break } if len(retry) > 0 { + // 其他错误,重试之后仍不能成功 if retry[0].Count == 0 { - err = e break } retry[0].Count-- @@ -81,7 +85,7 @@ func Receive(conn net.Conn, retry...Retry) ([]byte, error) { break } data = append(data, buffer[0 : length]...) - if length < gDEFAULT_READ_BUFFER_SIZE || e == io.EOF { + if length < gDEFAULT_READ_BUFFER_SIZE || err == io.EOF { break } } @@ -104,6 +108,11 @@ func Send(conn net.Conn, data []byte, retry...Retry) error { for { n, err := conn.Write(data) if err != nil { + // 链接已关闭 + if err == io.EOF { + return err + } + // 其他错误,重试之后仍不能成功 if len(retry) == 0 || retry[0].Count == 0 { return err } @@ -121,6 +130,7 @@ func Send(conn net.Conn, data []byte, retry...Retry) error { } } } + return nil } // 带超时时间的数据发送 diff --git a/g/net/gtcp/tcp_pool.go b/g/net/gtcp/tcp_pool.go new file mode 100644 index 000000000..f7570324c --- /dev/null +++ b/g/net/gtcp/tcp_pool.go @@ -0,0 +1,135 @@ +// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +package gtcp + +import ( + "net" + "fmt" + "time" + "gitee.com/johng/gf/g/container/gmap" + "gitee.com/johng/gf/g/container/gpool" +) + +// 封装的链接对象 +type Conn struct { + net.Conn // 继承底层链接接口对象 + pool *gpool.Pool // 对应的链接池对象 + err error // 是否有错误产生 +} + +const ( + gDEFAULT_POOL_EXPIRE = 3000 // (毫秒)默认链接对象过期时间 +) + +var ( + // 连接池对象map,键名为地址端口,键值为对应的连接池对象 + pools = gmap.NewStringInterfaceMap() +) + +// 创建TCP链接 +func NewConn(ip string, port int, timeout...int) (*Conn, error) { + var pool *gpool.Pool + addr := fmt.Sprintf("%s:%d", ip, port) + if v := pools.Get(addr); v == nil { + pools.LockFunc(func(m map[string]interface{}) { + if v, ok := m[addr]; ok { + pool = v.(*gpool.Pool) + } else { + pool = gpool.New(gDEFAULT_POOL_EXPIRE, func() (interface{}, error) { + if conn, err := NewNetConn(ip, port, timeout...); err == nil { + return &Conn { + Conn : conn, + pool : pool, + }, nil + } else { + return nil, err + } + }) + m[addr] = pool + } + }) + } else { + pool = v.(*gpool.Pool) + } + + if v, err := pool.Get(); err == nil { + return v.(*Conn), nil + } else { + return nil, err + } +} + +// 将net.Conn接口对象转换为*gtcp.Conn对象(注意递归影响,因为*gtcp.Conn本身也实现了net.Conn接口) +func NewConnByNetConn(conn net.Conn) *Conn { + return &Conn { + Conn : conn, + } +} + +// 覆盖底层接口对象的Close方法 +func (c *Conn) Close() error { + if c.pool != nil && c.err == nil { + c.pool.Put(c) + } else { + c.Conn.Close() + } + return nil +} + +// 发送数据 +func (c *Conn) Send(data []byte, retry...Retry) error { + err := Send(c, data, retry...) + if err != nil { + c.err = err + } + return err +} + +// 接收数据 +func (c *Conn) Receive(retry...Retry) ([]byte, error) { + data, err := Receive(c, retry...) + if err != nil { + c.err = err + } + return data, err +} + +// 带超时时间的数据获取 +func (c *Conn) ReceiveWithTimeout(timeout time.Duration, retry...Retry) ([]byte, error) { + data, err := ReceiveWithTimeout(c, timeout, retry...) + if err != nil { + c.err = err + } + return data, err +} + +// 带超时时间的数据发送 +func (c *Conn) SendWithTimeout(data []byte, timeout time.Duration, retry...Retry) error { + err := SendWithTimeout(c, data, timeout, retry...) + if err != nil { + c.err = err + } + return err +} + +// 发送数据并等待接收返回数据 +func (c *Conn) SendReceive(data []byte, retry...Retry) ([]byte, error) { + data, err := SendReceive(c, data, retry...) + if err != nil { + c.err = err + } + return data, err +} + +// 发送数据并等待接收返回数据(带返回超时等待时间) +func (c *Conn) SendReceiveWithTimeout(data []byte, timeout time.Duration, retry...Retry) ([]byte, error) { + data, err := SendReceiveWithTimeout(c, data, timeout, retry...) + if err != nil { + c.err = err + } + return data, err +} \ No newline at end of file diff --git a/g/os/gfilepool/gfilepool.go b/g/os/gfilepool/gfilepool.go index df4aaee0b..7cb0fe313 100644 --- a/g/os/gfilepool/gfilepool.go +++ b/g/os/gfilepool/gfilepool.go @@ -62,7 +62,6 @@ func New(path string, flag int, expire int) *Pool { // 独立的线程执行过期清理工作 if expire != -1 { go func(p *Pool) { - // 遍历可用指针列表,判断是否过期 for !p.closed.Val() { if r := p.list.PopFront(); r != nil { f := r.(*PoolItem) diff --git a/g/os/gproc/gproc.go b/g/os/gproc/gproc.go index ac17cf2b0..8fd029d11 100644 --- a/g/os/gproc/gproc.go +++ b/g/os/gproc/gproc.go @@ -18,7 +18,6 @@ import ( const ( gPROC_ENV_KEY_PPID_KEY = "GPROC_PPID" - gPROC_ENV_KEY_COMM_KEY = "GPROC_COMM_ENABLED" gPROC_TEMP_DIR_ENV_KEY = "GPROC_TEMP_DIR" ) diff --git a/g/os/gproc/gproc_comm.go b/g/os/gproc/gproc_comm.go index a58efb822..8ccee4038 100644 --- a/g/os/gproc/gproc_comm.go +++ b/g/os/gproc/gproc_comm.go @@ -13,20 +13,23 @@ import ( "gitee.com/johng/gf/g/os/gfile" "gitee.com/johng/gf/g/util/gconv" "gitee.com/johng/gf/g/container/gmap" - "gitee.com/johng/gf/g/container/gqueue" ) -// 本地进程通信发送消息队列 -var commSendQueue = gqueue.New() -// 本地进程通信接收消息队列 -var commReceiveQueue = gqueue.New() -// (用于发送)已建立的PID对应的Conn通信对象 -var commPidConnMap = gmap.NewIntInterfaceMap() +const ( + gPROC_MSG_QUEUE_MAX_LENGTH = 10000 // 进程消息队列最大长度(每个分组) +) + +// 本地进程通信接收消息队列(按照分组进行构建的map,键值为*gqueue.Queue对象) +var commReceiveQueues = gmap.NewStringInterfaceMap() + +// (用于发送)已建立的PID对应的Conn通信对象,键值为一个Pool,防止并行使用同一个通信对象造成数据重叠 +var commPidConnMap = gmap.NewIntInterfaceMap() // TCP通信数据结构定义 type Msg struct { - Pid int // PID,来源哪个进程 - Data []byte // 数据 + Pid int // PID,来源哪个进程 + Data []byte // 数据 + Group string // 分组名称 } // TCP通信数据结构定义 @@ -35,14 +38,6 @@ type sendQueueItem struct { Data []byte // 数据 } -// 进程管理/通信初始化操作 -func init() { - // 默认下为空("") - if os.Getenv(gPROC_ENV_KEY_COMM_KEY) != "0" { - go startTcpListening() - } -} - // 获取指定进程的通信文件地址 func getCommFilePath(pid int) string { return getCommDirPath() + gfile.Separator + gconv.String(pid) diff --git a/g/os/gproc/gproc_comm_receive.go b/g/os/gproc/gproc_comm_receive.go index 90664ccdc..6db7a94c7 100644 --- a/g/os/gproc/gproc_comm_receive.go +++ b/g/os/gproc/gproc_comm_receive.go @@ -3,12 +3,156 @@ // This Source Code Form is subject to the terms of the MIT License. // If a copy of the MIT was not distributed with this file, // You can obtain one at https://gitee.com/johng/gf. +// "不要通过共享内存来通信,而应该通过通信来共享内存" + package gproc +import ( + "fmt" + "net" + "gitee.com/johng/gf/g/os/glog" + "gitee.com/johng/gf/g/net/gtcp" + "gitee.com/johng/gf/g/os/gfile" + "gitee.com/johng/gf/g/util/gconv" + "gitee.com/johng/gf/g/encoding/gbinary" + "gitee.com/johng/gf/g/container/gqueue" + "gitee.com/johng/gf/g/container/gtype" +) + +const ( + gPROC_DEFAULT_TCP_PORT = 10000 // 默认开始监听的TCP端口号,如果占用则递增 +) + +var ( + // 是否已开启TCP端口监听服务(使用int而非bool,以便于使用原子操作判断是否开启) + tcpListeningCount = gtype.NewInt() +) + +// 创建本地进程TCP通信服务 +func startTcpListening() { + // 一个进程只能开启一个监听goroutine + if tcpListeningCount.Add(1) != 1 { + return + } + var listen *net.TCPListener + for i := gPROC_DEFAULT_TCP_PORT; ; i++ { + addr, err := net.ResolveTCPAddr("tcp4", fmt.Sprintf("127.0.0.1:%d", i)) + if err != nil { + continue + } + listen, err = net.ListenTCP("tcp", addr) + if err != nil { + continue + } + // 将监听的端口保存到通信文件中(字符串类型存放) + gfile.PutContents(getCommFilePath(Pid()), gconv.String(i)) + //glog.Printfln("%d: gproc listening on [%s]", Pid(), addr) + break + } + for { + if conn, err := listen.Accept(); err != nil { + glog.Error(err) + } else if conn != nil { + go tcpServiceHandler(gtcp.NewConnByNetConn(conn)) + } + } +} + +// TCP数据通信处理回调函数 +func tcpServiceHandler(conn *gtcp.Conn) { + var retry = gtcp.Retry{3, 10} + for { + var result []byte + buffer, err := conn.Receive(retry) + if len(buffer) > 0 { + var msgs []*Msg + for _, msg := range bufferToMsgs(buffer) { + if v := commReceiveQueues.Get(msg.Group); v != nil { + msgs = append(msgs, msg) + } else { + result = []byte(fmt.Sprintf("group [%s] does not exist", msg.Group)) + break + } + } + if len(result) == 0 { + result = []byte("ok") + for _, msg := range msgs { + if v := commReceiveQueues.Get(msg.Group); v != nil { + v.(*gqueue.Queue).PushBack(msg) + } + } + } + } + // 产生错误(或者对方已经关闭链接)时,退出接收循环 + if err == nil { + conn.Send(result, retry) + } else { + conn.Close() + return + } + } +} + +// 数据解包,防止黏包 +// 数据格式:总长度(24bit)|发送进程PID(16bit)|接收进程PID(16bit)|分组长度(8bit)|分组名称(变长)|校验(32bit)|参数(变长) +func bufferToMsgs(buffer []byte) []*Msg { + s := 0 + msgs := make([]*Msg, 0) + for s < len(buffer) { + // 长度解析及校验 + length := gbinary.DecodeToInt(buffer[s : s + 3]) + if length < 12 || length > len(buffer) { + s++ + continue + } + // 分组信息解析 + groupLen := gbinary.DecodeToInt(buffer[s + 7 : s + 8]) + // checksum校验(仅对参数做校验,提高校验效率) + checksum1 := gbinary.DecodeToUint32(buffer[s + 8 + groupLen : s + 8 + groupLen + 4]) + checksum2 := gtcp.Checksum(buffer[s + 8 + groupLen + 4 : s + length]) + if checksum1 != checksum2 { + s++ + continue + } + // 接收进程PID校验 + if Pid() == gbinary.DecodeToInt(buffer[s + 5 : s + 7]) { + msgs = append(msgs, &Msg { + Pid : gbinary.DecodeToInt(buffer[s + 3 : s + 5]), + Data : buffer[s + 8 + groupLen + 4 : s + length], + Group : string(buffer[s + 8 : s + 8 + groupLen]), + }) + } + s += length + } + return msgs +} + // 获取其他进程传递到当前进程的消息包,阻塞执行 -func Receive() *Msg { - if v := commReceiveQueue.PopFront(); v != nil { +func Receive(group...string) *Msg { + // 开启端口监听 + go startTcpListening() + + var queue *gqueue.Queue + groupName := gPROC_COMM_DEAFULT_GRUOP_NAME + if len(group) > 0 { + groupName = group[0] + } + + if v := commReceiveQueues.Get(groupName); v == nil { + commReceiveQueues.LockFunc(func(m map[string]interface{}) { + if v, ok := m[groupName]; ok { + queue = v.(*gqueue.Queue) + } else { + queue = gqueue.New(gPROC_MSG_QUEUE_MAX_LENGTH) + m[groupName] = queue + } + }) + } else { + queue = v.(*gqueue.Queue) + } + + if v := queue.PopFront(); v != nil { return v.(*Msg) } return nil diff --git a/g/os/gproc/gproc_comm_send.go b/g/os/gproc/gproc_comm_send.go index 0fcfcec20..7bb2a8360 100644 --- a/g/os/gproc/gproc_comm_send.go +++ b/g/os/gproc/gproc_comm_send.go @@ -7,69 +7,74 @@ package gproc import ( - "net" "gitee.com/johng/gf/g/net/gtcp" "gitee.com/johng/gf/g/os/gfile" "gitee.com/johng/gf/g/util/gconv" "gitee.com/johng/gf/g/encoding/gbinary" "fmt" "errors" + "time" + "bytes" ) const ( - gPROC_COMM_FAILURE_RETRY_COUNT = 3 // 失败重试次数 + gPROC_COMM_FAILURE_RETRY_COUNT = 3 // 失败重试次数 + gPROC_COMM_FAILURE_RETRY_TIMEOUT = 1000 // (毫秒)失败重试间隔 + gPROC_COMM_SEND_TIMEOUT = 5000 // (毫秒)发送超时时间 + gPROC_COMM_DEAFULT_GRUOP_NAME = "" // 默认分组名称 ) // 向指定gproc进程发送数据 -// 数据格式:总长度(32bit) | 发送进程PID(32bit) | 接收进程PID(32bit) | 校验(32bit) | 参数(变长) -func Send(pid int, data []byte) error { +// 数据格式:总长度(24bit)|发送进程PID(16bit)|接收进程PID(16bit)|分组长度(8bit)|分组名称(变长)|校验(32bit)|参数(变长) +func Send(pid int, data []byte, group...string) error { + groupName := gPROC_COMM_DEAFULT_GRUOP_NAME + if len(group) > 0 { + groupName = group[0] + } buffer := make([]byte, 0) - buffer = append(buffer, gbinary.EncodeInt32(int32(len(data) + 16))...) - buffer = append(buffer, gbinary.EncodeInt32(int32(Pid()))...) - buffer = append(buffer, gbinary.EncodeInt32(int32(pid))...) + buffer = append(buffer, gbinary.EncodeByLength(3, len(groupName) + len(data) + 12)...) + buffer = append(buffer, gbinary.EncodeByLength(2, Pid())...) + buffer = append(buffer, gbinary.EncodeByLength(2, pid)...) + buffer = append(buffer, gbinary.EncodeByLength(1, len(groupName))...) + buffer = append(buffer, []byte(groupName)...) buffer = append(buffer, gbinary.EncodeUint32(gtcp.Checksum(data))...) buffer = append(buffer, data...) - if conn, err := getConnByPid(pid); err == nil { - for i := gPROC_COMM_FAILURE_RETRY_COUNT; i > 0; i-- { - if err = gtcp.Send(conn, buffer); err != nil { - conn.Close() - if conn, err = newConnByPid(pid); err != nil { - return err + // 执行发送流程 + var err error + var buf []byte + var conn *gtcp.Conn + for i := gPROC_COMM_FAILURE_RETRY_COUNT; i > 0; i-- { + if conn, err = getConnByPid(pid); err == nil { + defer conn.Close() + buf, err = conn.SendReceiveWithTimeout(buffer, gPROC_COMM_SEND_TIMEOUT*time.Millisecond) + if len(buf) > 0 { + // 如果有返回值,如果不是"ok",那么表示是错误信息 + if !bytes.EqualFold(buf, []byte("ok")) { + err = errors.New(string(buf)) + break } - } else { - //glog.Printfln("%d: sent to %d, %v", Pid(), pid, buffer) + } + if err == nil { break } } - return err - } else { - return err + time.Sleep(gPROC_COMM_FAILURE_RETRY_TIMEOUT*time.Millisecond) } + return err } // 获取指定进程的TCP通信对象 -func getConnByPid(pid int) (net.Conn, error) { - if v := commPidConnMap.Get(pid); v != nil { - return v.(net.Conn), nil - } else { - return newConnByPid(pid) - } -} - -// 创建与指定进程的TCP通信对象 -func newConnByPid(pid int) (net.Conn, error) { - if port := getPortByPid(pid); port > 0 { - if conn, err := gtcp.Conn("127.0.0.1", port); err == nil { - commPidConnMap.Set(pid, conn) +func getConnByPid(pid int) (*gtcp.Conn, error) { + port := getPortByPid(pid) + if port > 0 { + if conn, err := gtcp.NewConn("127.0.0.1", port); err == nil { return conn, nil } else { return nil, err } - } else { - return nil, errors.New(fmt.Sprintf("%d not found", pid)) } - + return nil, errors.New(fmt.Sprintf("could not find port for pid: %d" , pid)) } // 获取指定进程监听的端口号 diff --git a/g/os/gproc/gproc_comm_tcp.go b/g/os/gproc/gproc_comm_tcp.go deleted file mode 100644 index fe1ff39a6..000000000 --- a/g/os/gproc/gproc_comm_tcp.go +++ /dev/null @@ -1,96 +0,0 @@ -// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved. -// -// This Source Code Form is subject to the terms of the MIT License. -// If a copy of the MIT was not distributed with this file, -// You can obtain one at https://gitee.com/johng/gf. -// "不要通过共享内存来通信,而应该通过通信来共享内存" - - -package gproc - -import ( - "fmt" - "net" - "gitee.com/johng/gf/g/os/glog" - "gitee.com/johng/gf/g/net/gtcp" - "gitee.com/johng/gf/g/os/gfile" - "gitee.com/johng/gf/g/util/gconv" - "gitee.com/johng/gf/g/encoding/gbinary" -) - -const ( - gPROC_DEFAULT_TCP_PORT = 10000 -) - -// 创建本地进程TCP通信服务 -func startTcpListening() { - var listen *net.TCPListener - for i := gPROC_DEFAULT_TCP_PORT; ; i++ { - addr, err := net.ResolveTCPAddr("tcp4", fmt.Sprintf("127.0.0.1:%d", i)) - if err != nil { - continue - } - listen, err = net.ListenTCP("tcp", addr) - if err != nil { - continue - } - // 将监听的端口保存到通信文件中(字符串类型存放) - gfile.PutContents(getCommFilePath(Pid()), gconv.String(i)) - //glog.Printfln("%d: gproc listening on [%s]", Pid(), addr) - break - } - for { - if conn, err := listen.Accept(); err != nil { - glog.Error(err) - } else if conn != nil { - go tcpServiceHandler(conn) - } - } -} - -// TCP数据通信处理回调函数 -// 数据格式:总长度(32bit) | PID(32bit) | 校验(32bit) | 参数(变长) -func tcpServiceHandler(conn net.Conn) { - for { - if buffer, err := gtcp.Receive(conn, gtcp.Retry{3, 10}); len(buffer) > 0 && err == nil { - //glog.Printfln("%d: receive, %v", Pid(), buffer) - for _, v := range bufferToMsgs(buffer) { - commReceiveQueue.PushBack(v) - } - } else { - conn.Close() - return - } - } -} - -// 数据解包,防止黏包 -// 数据格式:总长度(32bit) | 发送进程PID(32bit) | 接收进程PID(32bit) | 校验(32bit) | 参数(变长) -func bufferToMsgs(buffer []byte) []*Msg { - s := 0 - msgs := make([]*Msg, 0) - for s < len(buffer) { - // 长度解析及校验 - length := gbinary.DecodeToInt(buffer[s : s + 4]) - if length < 16 || length > len(buffer) { - s++ - continue - } - // checksum校验(仅对参数做校验,提高校验效率) - checksum1 := gbinary.DecodeToUint32(buffer[s + 12 : s + 16]) - checksum2 := gtcp.Checksum(buffer[s + 16 : s + length]) - if checksum1 != checksum2 { - s++ - continue - } - // 接收进程PID校验 - if Pid() == gbinary.DecodeToInt(buffer[s + 8 : s + 12]) { - msgs = append(msgs, &Msg { - Pid : gbinary.DecodeToInt(buffer[s + 4 : s + 8]), - Data : buffer[s + 16 : s + length], - }) - } - s += length - } - return msgs -} diff --git a/g/os/gproc/gproc_proccess.go b/g/os/gproc/gproc_proccess.go index 654ca0796..7991b3c8a 100644 --- a/g/os/gproc/gproc_proccess.go +++ b/g/os/gproc/gproc_proccess.go @@ -19,7 +19,6 @@ type Process struct { exec.Cmd Manager *Manager // 所属进程管理器 PPid int // 自定义关联的父进程ID - DisableComm bool // 是否关闭TCP通信监听服务 } // 创建一个进程(不执行) @@ -61,12 +60,7 @@ func (p *Process) Start() (int, error) { if p.Process != nil { return p.Pid(), nil } - commEnabled := 1 - if p.DisableComm { - commEnabled = 0 - } p.Env = append(p.Env, fmt.Sprintf("%s=%d", gPROC_ENV_KEY_PPID_KEY, p.PPid)) - p.Env = append(p.Env, fmt.Sprintf("%s=%d", gPROC_ENV_KEY_COMM_KEY, commEnabled)) if err := p.Cmd.Start(); err == nil { if p.Manager != nil { p.Manager.processes.Set(p.Process.Pid, p) diff --git a/g/os/gtime/gtime_test.go b/g/os/gtime/gtime_test.go index 4014571a4..32ee6fd77 100644 --- a/g/os/gtime/gtime_test.go +++ b/g/os/gtime/gtime_test.go @@ -10,6 +10,24 @@ import ( "testing" ) +func BenchmarkSecond(b *testing.B) { + for i := 0; i < b.N; i++ { + Second() + } +} + +func BenchmarkMillisecond(b *testing.B) { + for i := 0; i < b.N; i++ { + Millisecond() + } +} + +func BenchmarkMicrosecond(b *testing.B) { + for i := 0; i < b.N; i++ { + Microsecond() + } +} + func BenchmarkNanosecond(b *testing.B) { for i := 0; i < b.N; i++ { Nanosecond() diff --git a/geg/container/garray.go b/geg/container/garray/garray.go similarity index 100% rename from geg/container/garray.go rename to geg/container/garray/garray.go diff --git a/geg/container/gpool/gpool.go b/geg/container/gpool/gpool.go new file mode 100644 index 000000000..9ed4871d9 --- /dev/null +++ b/geg/container/gpool/gpool.go @@ -0,0 +1,17 @@ +package main + +import ( + "gitee.com/johng/gf/g/container/gpool" + "fmt" + "time" +) + +func main () { + p := gpool.New(1000) + fmt.Println(p.Get()) + p.Put(1) + fmt.Println(p.Get()) + p.Put(2) + time.Sleep(time.Second) + fmt.Println(p.Get()) +} diff --git a/geg/net/gtcp/tcp_pool.go b/geg/net/gtcp/tcp_pool.go new file mode 100644 index 000000000..db19c5635 --- /dev/null +++ b/geg/net/gtcp/tcp_pool.go @@ -0,0 +1,38 @@ +package main + +import ( + "gitee.com/johng/gf/g/net/gtcp" + "time" + "net" + "gitee.com/johng/gf/g/os/gtime" + "fmt" + "gitee.com/johng/gf/g/os/glog" +) + +func main() { + go gtcp.NewServer("127.0.0.1:8999", func(conn net.Conn) { + for { + buffer := make([]byte, 1024) + if length, err := conn.Read(buffer); err == nil { + conn.Write(append([]byte("> "), buffer[0 : length]...)) + } + //conn.Close() + } + }).Run() + + time.Sleep(time.Second) + + for { + if conn, err := gtcp.NewConn("127.0.0.1", 8999); err == nil { + if b, err := conn.SendReceive([]byte(gtime.Datetime())); err == nil { + fmt.Println(string(b), conn.LocalAddr(), conn.RemoteAddr()) + conn.Close() + } else { + glog.Error(err) + } + } else { + glog.Error(err) + } + time.Sleep(time.Second) + } +} \ No newline at end of file diff --git a/geg/net/tcp_server.go b/geg/net/gtcp/tcp_server.go similarity index 100% rename from geg/net/tcp_server.go rename to geg/net/gtcp/tcp_server.go diff --git a/geg/os/gproc/gproc_comm.go b/geg/os/gproc/gproc_comm.go index aba0c8b08..341505ab4 100644 --- a/geg/os/gproc/gproc_comm.go +++ b/geg/os/gproc/gproc_comm.go @@ -1,3 +1,6 @@ +// 多进程通信示例, +// 子进程每个1秒向父进程发送当前时间, +// 父进程监听进程消息,收到后打印到终端。 package main import ( @@ -6,13 +9,16 @@ import ( "time" "gitee.com/johng/gf/g/os/gproc" "gitee.com/johng/gf/g/os/gtime" + "gitee.com/johng/gf/g/os/glog" ) func main () { fmt.Printf("%d: I am child? %v\n", gproc.Pid(), gproc.IsChild()) if gproc.IsChild() { gtime.SetInterval(time.Second, func() bool { - gproc.Send(gproc.PPid(), []byte(gtime.Datetime())) + if err := gproc.Send(gproc.PPid(), []byte(gtime.Datetime())); err != nil { + glog.Error(err) + } return true }) select { } @@ -22,7 +28,7 @@ func main () { p.Start() for { msg := gproc.Receive() - fmt.Printf("receive from %d, data: %s\n", msg.Pid, string(msg.Data)) + fmt.Printf("%d: receive from %d, data: %s, group: %s\n", gproc.Pid(), msg.Pid, string(msg.Data), msg.Group) } } } diff --git a/geg/os/gproc/gproc_comm_group.go b/geg/os/gproc/gproc_comm_group.go new file mode 100644 index 000000000..496d59f09 --- /dev/null +++ b/geg/os/gproc/gproc_comm_group.go @@ -0,0 +1,33 @@ +// 该示例是gproc_comm.go的改进,增加了分组消息的演示。 +package main + +import ( + "os" + "fmt" + "time" + "gitee.com/johng/gf/g/os/gproc" + "gitee.com/johng/gf/g/os/gtime" + "gitee.com/johng/gf/g/os/glog" +) + +func main () { + fmt.Printf("%d: I am child? %v\n", gproc.Pid(), gproc.IsChild()) + group := "test" + if gproc.IsChild() { + gtime.SetInterval(time.Second, func() bool { + if err := gproc.Send(gproc.PPid(), []byte(gtime.Datetime()), group); err != nil { + glog.Error(err) + } + return true + }) + select { } + } else { + m := gproc.NewManager() + p := m.NewProcess(os.Args[0], os.Args, os.Environ()) + p.Start() + for { + msg := gproc.Receive(group) + fmt.Printf("receive from %d, data: %s, group: %s\n", msg.Pid, string(msg.Data), msg.Group) + } + } +} diff --git a/geg/os/gproc/gproc_comm2.go b/geg/os/gproc/gproc_comm_send.go similarity index 57% rename from geg/os/gproc/gproc_comm2.go rename to geg/os/gproc/gproc_comm_send.go index c48ddd7ec..3ebf9aae2 100644 --- a/geg/os/gproc/gproc_comm2.go +++ b/geg/os/gproc/gproc_comm_send.go @@ -1,3 +1,4 @@ +// 向指定进程发送进程消息。 package main import ( @@ -6,6 +7,6 @@ import ( ) func main () { - err := gproc.Send(23504, []byte{30}) + err := gproc.Send(22988, []byte{30}) fmt.Println(err) } diff --git a/geg/other/test.go b/geg/other/test.go index 2a09e7127..ac383ab30 100644 --- a/geg/other/test.go +++ b/geg/other/test.go @@ -2,11 +2,12 @@ package main import ( "fmt" - "gitee.com/johng/gf/g/encoding/gjson" + "gitee.com/johng/gf/g/encoding/gbinary" ) func main() { - content := `[0.00000000059, 1.598877777409]` - j, _ := gjson.LoadContent([]byte(content), "json") - fmt.Println(j.GetString("0")) + i := 65533 + b := gbinary.EncodeByLength(3, i) + fmt.Println(b) + fmt.Println(gbinary.DecodeToInt32(b)) }