mirror of
https://gitee.com/johng/gf
synced 2026-06-06 16:21:40 +08:00
增加gpool对象池;增加gtcp.Conn连接池对象;改进gproc进程间通信机制,增加进程消息分组特性,并限定队列大小
This commit is contained in:
6
TODO
6
TODO
@ -11,8 +11,8 @@ Cookie&Session数据池化处理;
|
||||
ghttp.Client增加proxy特性;
|
||||
gtime增加对时区转换的封装,并简化失去转换时对类似+80500时区的支持;
|
||||
改进gf-orm的where查询功能,参考thinkphp 里的where查询语法;
|
||||
gproc进程间通信增加分组特性,不同的进程见可以通过进程ID以及分组名称发送/获取消息;
|
||||
|
||||
ORM增加获取被执行的sql语句的方法;
|
||||
gpage分页增加对自定义后缀的支持,如:2.html, 2.php等等;
|
||||
|
||||
DONE:
|
||||
1. gconv完善针对不同类型的判断,例如:尽量减少sprintf("%v", xxx)来执行string类型的转换;
|
||||
@ -41,7 +41,7 @@ DONE:
|
||||
24. 对grpool进行优化改进,包括属性原子操作封装采用gtype实现,修正设计BUG:https://github.com/johng-cn/gf/issues/6;
|
||||
25. gredis增加redis密码支持;
|
||||
26. 改进ghttp.Server平滑重启机制,当新进程接管服务后,再使用进程间通信方式通知父进程销毁;
|
||||
|
||||
27. gproc进程间通信增加分组特性,不同的进程间可以通过进程ID以及分组名称发送/获取进程消息;
|
||||
|
||||
|
||||
|
||||
|
||||
@ -17,7 +17,7 @@ var length = 10000000
|
||||
var q1 = gchan.New(length)
|
||||
var q2 = make(chan int, length)
|
||||
|
||||
func BenchmarkGqueuePushAndPop(b *testing.B) {
|
||||
func BenchmarkGchanPushAndPop(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
q1.Push(i)
|
||||
q1.Pop()
|
||||
|
||||
46
g/container/glist/glist_test.go
Normal file
46
g/container/glist/glist_test.go
Normal file
@ -0,0 +1,46 @@
|
||||
// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the MIT License.
|
||||
// If a copy of the MIT was not distributed with this file,
|
||||
// You can obtain one at https://gitee.com/johng/gf.
|
||||
|
||||
// go test *.go -bench=".*"
|
||||
|
||||
package glist
|
||||
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
var l = New()
|
||||
|
||||
func BenchmarkPushBack(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
l.PushBack(i)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkPopFront(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
l.PopFront()
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkPushFront(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
l.PushFront(i)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkPopBack(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
l.PopBack()
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkLen(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
l.Len()
|
||||
}
|
||||
}
|
||||
|
||||
94
g/container/gpool/gpool.go
Normal file
94
g/container/gpool/gpool.go
Normal file
@ -0,0 +1,94 @@
|
||||
// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the MIT License.
|
||||
// If a copy of the MIT was not distributed with this file,
|
||||
// You can obtain one at https://gitee.com/johng/gf.
|
||||
|
||||
// 对象复用池.
|
||||
package gpool
|
||||
|
||||
import (
|
||||
"time"
|
||||
"gitee.com/johng/gf/g/os/gtime"
|
||||
"gitee.com/johng/gf/g/container/glist"
|
||||
"gitee.com/johng/gf/g/container/gtype"
|
||||
"errors"
|
||||
)
|
||||
|
||||
// 对象池
|
||||
type Pool struct {
|
||||
list *glist.List // 可用/闲置的文件指针链表
|
||||
idle int64 // (毫秒)闲置最大时间,超过该时间则被系统回收
|
||||
closed *gtype.Bool // 连接池是否已关闭
|
||||
newFunc func()(interface{}, error) // 创建对象的方法定义
|
||||
}
|
||||
|
||||
// 对象池数据项
|
||||
type poolItem struct {
|
||||
expire int64 // (毫秒)过期时间
|
||||
value interface{} // 对象值
|
||||
}
|
||||
|
||||
// 创建一个对象池,为保证执行效率,过期时间一旦设定之后无法修改
|
||||
func New(expire int, newFunc...func() (interface{}, error)) *Pool {
|
||||
r := &Pool {
|
||||
list : glist.New(),
|
||||
idle : int64(expire),
|
||||
closed : gtype.NewBool(),
|
||||
}
|
||||
if len(newFunc) > 0 {
|
||||
r.newFunc = newFunc[0]
|
||||
}
|
||||
go r.expireCheckingLoop()
|
||||
return r
|
||||
}
|
||||
|
||||
// 放一个临时对象到池中
|
||||
func (p *Pool) Put(item interface{}) {
|
||||
p.list.PushBack(&poolItem{
|
||||
expire : gtime.Millisecond() + p.idle,
|
||||
value : item,
|
||||
})
|
||||
}
|
||||
|
||||
// 从池中获得一个临时对象
|
||||
func (p *Pool) Get() (interface{}, error) {
|
||||
for !p.closed.Val() {
|
||||
if r := p.list.PopFront(); r != nil {
|
||||
f := r.(*poolItem)
|
||||
if f.expire > gtime.Millisecond() {
|
||||
return f.value, nil
|
||||
}
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
if p.newFunc != nil {
|
||||
return p.newFunc()
|
||||
}
|
||||
return nil, errors.New("pool is empty")
|
||||
}
|
||||
|
||||
// 查询当前池中的对象数量
|
||||
func (p *Pool) Size() int {
|
||||
return p.list.Len()
|
||||
}
|
||||
|
||||
// 关闭池
|
||||
func (p *Pool) Close() {
|
||||
p.closed.Set(true)
|
||||
}
|
||||
|
||||
// 超时检测循环
|
||||
func (p *Pool) expireCheckingLoop() {
|
||||
for !p.closed.Val() {
|
||||
if r := p.list.PopFront(); r != nil {
|
||||
f := r.(*poolItem)
|
||||
if f.expire > gtime.Millisecond() {
|
||||
p.list.PushFront(f)
|
||||
break
|
||||
}
|
||||
}
|
||||
time.Sleep(3 * time.Second)
|
||||
}
|
||||
}
|
||||
41
g/container/gpool/gpool_test.go
Normal file
41
g/container/gpool/gpool_test.go
Normal file
@ -0,0 +1,41 @@
|
||||
// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the MIT License.
|
||||
// If a copy of the MIT was not distributed with this file,
|
||||
// You can obtain one at https://gitee.com/johng/gf.
|
||||
|
||||
// go test *.go -bench=".*"
|
||||
|
||||
package gpool
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"sync"
|
||||
)
|
||||
|
||||
var pool = New(99999999)
|
||||
var syncp = sync.Pool{}
|
||||
|
||||
func BenchmarkGPoolPut(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
pool.Put(i)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkGPoolGet(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
pool.Get()
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkSyncPoolPut(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
syncp.Put(i)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkGpoolGet(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
syncp.Get()
|
||||
}
|
||||
}
|
||||
@ -47,6 +47,18 @@ func Encode(vs ...interface{}) []byte {
|
||||
return buf.Bytes()
|
||||
}
|
||||
|
||||
// 将变量转换为二进制[]byte,并指定固定的[]byte长度返回,长度单位为字节(byte);
|
||||
// 如果转换的二进制长度超过指定长度,那么进行截断处理
|
||||
func EncodeByLength(length int, vs ...interface{}) []byte {
|
||||
b := Encode(vs...)
|
||||
if len(b) < length {
|
||||
b = append(b, make([]byte, length - len(b))...)
|
||||
} else if len(b) > length {
|
||||
b = b[0 : length]
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
// 整形二进制解包,注意第二个及其后参数为字长确定的整形变量的指针地址,以便确定解析的[]byte长度,
|
||||
// 例如:int8/16/32/64、uint8/16/32/64、float32/64等等
|
||||
func Decode(b []byte, vs ...interface{}) error {
|
||||
|
||||
@ -220,7 +220,7 @@ func (s *Server) Start() error {
|
||||
// 如果是子进程,那么服务开启后通知父进程销毁
|
||||
if gproc.IsChild() {
|
||||
gtime.SetTimeout(2*time.Second, func() {
|
||||
gproc.Send(gproc.PPid(), []byte("exit"))
|
||||
gproc.Send(gproc.PPid(), []byte("exit"), gADMIN_GPROC_COMM_GROUP)
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@ -31,6 +31,7 @@ const (
|
||||
gADMIN_ACTION_SHUTINGDOWN = 2
|
||||
gADMIN_ACTION_RELOAD_ENVKEY = "GF_SERVER_RELOAD"
|
||||
gADMIN_ACTION_RESTART_ENVKEY = "GF_SERVER_RESTART"
|
||||
gADMIN_GPROC_COMM_GROUP = "GF_GPROC_HTTP_SERVER"
|
||||
)
|
||||
|
||||
// 用于服务管理的对象
|
||||
@ -282,7 +283,7 @@ func forcedlyCloseWebServers() {
|
||||
// 异步监听进程间消息
|
||||
func handleProcessMessage() {
|
||||
for {
|
||||
if msg := gproc.Receive(); msg != nil {
|
||||
if msg := gproc.Receive(gADMIN_GPROC_COMM_GROUP); msg != nil {
|
||||
if bytes.EqualFold(msg.Data, []byte("exit")) {
|
||||
gracefulShutdownWebServers()
|
||||
doneChan <- struct{}{}
|
||||
|
||||
@ -16,7 +16,7 @@ import (
|
||||
|
||||
const (
|
||||
gDEFAULT_RETRY_INTERVAL = 100 // (毫秒)默认重试时间间隔
|
||||
gDEFAULT_READ_BUFFER_SIZE = 10 // 默认数据读取缓冲区大小
|
||||
gDEFAULT_READ_BUFFER_SIZE = 1024 // 默认数据读取缓冲区大小
|
||||
|
||||
)
|
||||
|
||||
@ -40,8 +40,8 @@ func Checksum(buffer []byte) uint32 {
|
||||
return checksum
|
||||
}
|
||||
|
||||
// 创建TCP链接
|
||||
func Conn(ip string, port int, timeout...int) (net.Conn, error) {
|
||||
// 创建原生TCP链接
|
||||
func NewNetConn(ip string, port int, timeout...int) (net.Conn, error) {
|
||||
addr := fmt.Sprintf("%s:%d", ip, port)
|
||||
if len(timeout) > 0 {
|
||||
return net.DialTimeout("tcp", addr, time.Duration(timeout[0]) * time.Millisecond)
|
||||
@ -52,19 +52,23 @@ func Conn(ip string, port int, timeout...int) (net.Conn, error) {
|
||||
|
||||
// 获取数据
|
||||
func Receive(conn net.Conn, retry...Retry) ([]byte, error) {
|
||||
var err error = nil
|
||||
size := gDEFAULT_READ_BUFFER_SIZE
|
||||
data := make([]byte, 0)
|
||||
var err error
|
||||
var length int
|
||||
var buffer []byte
|
||||
size := gDEFAULT_READ_BUFFER_SIZE
|
||||
data := make([]byte, 0)
|
||||
for {
|
||||
buffer := make([]byte, size)
|
||||
length, e := conn.Read(buffer)
|
||||
if length < 1 || e != nil {
|
||||
if e == io.EOF {
|
||||
buffer = make([]byte, size)
|
||||
length, err = conn.Read(buffer)
|
||||
// 这里使用 "&&" 只要有数据不管有无错误都将先进行解析
|
||||
if length < 1 && err != nil {
|
||||
// 链接已关闭
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if len(retry) > 0 {
|
||||
// 其他错误,重试之后仍不能成功
|
||||
if retry[0].Count == 0 {
|
||||
err = e
|
||||
break
|
||||
}
|
||||
retry[0].Count--
|
||||
@ -81,7 +85,7 @@ func Receive(conn net.Conn, retry...Retry) ([]byte, error) {
|
||||
break
|
||||
}
|
||||
data = append(data, buffer[0 : length]...)
|
||||
if length < gDEFAULT_READ_BUFFER_SIZE || e == io.EOF {
|
||||
if length < gDEFAULT_READ_BUFFER_SIZE || err == io.EOF {
|
||||
break
|
||||
}
|
||||
}
|
||||
@ -104,6 +108,11 @@ func Send(conn net.Conn, data []byte, retry...Retry) error {
|
||||
for {
|
||||
n, err := conn.Write(data)
|
||||
if err != nil {
|
||||
// 链接已关闭
|
||||
if err == io.EOF {
|
||||
return err
|
||||
}
|
||||
// 其他错误,重试之后仍不能成功
|
||||
if len(retry) == 0 || retry[0].Count == 0 {
|
||||
return err
|
||||
}
|
||||
@ -121,6 +130,7 @@ func Send(conn net.Conn, data []byte, retry...Retry) error {
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 带超时时间的数据发送
|
||||
|
||||
135
g/net/gtcp/tcp_pool.go
Normal file
135
g/net/gtcp/tcp_pool.go
Normal file
@ -0,0 +1,135 @@
|
||||
// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the MIT License.
|
||||
// If a copy of the MIT was not distributed with this file,
|
||||
// You can obtain one at https://gitee.com/johng/gf.
|
||||
|
||||
package gtcp
|
||||
|
||||
import (
|
||||
"net"
|
||||
"fmt"
|
||||
"time"
|
||||
"gitee.com/johng/gf/g/container/gmap"
|
||||
"gitee.com/johng/gf/g/container/gpool"
|
||||
)
|
||||
|
||||
// 封装的链接对象
|
||||
type Conn struct {
|
||||
net.Conn // 继承底层链接接口对象
|
||||
pool *gpool.Pool // 对应的链接池对象
|
||||
err error // 是否有错误产生
|
||||
}
|
||||
|
||||
const (
|
||||
gDEFAULT_POOL_EXPIRE = 3000 // (毫秒)默认链接对象过期时间
|
||||
)
|
||||
|
||||
var (
|
||||
// 连接池对象map,键名为地址端口,键值为对应的连接池对象
|
||||
pools = gmap.NewStringInterfaceMap()
|
||||
)
|
||||
|
||||
// 创建TCP链接
|
||||
func NewConn(ip string, port int, timeout...int) (*Conn, error) {
|
||||
var pool *gpool.Pool
|
||||
addr := fmt.Sprintf("%s:%d", ip, port)
|
||||
if v := pools.Get(addr); v == nil {
|
||||
pools.LockFunc(func(m map[string]interface{}) {
|
||||
if v, ok := m[addr]; ok {
|
||||
pool = v.(*gpool.Pool)
|
||||
} else {
|
||||
pool = gpool.New(gDEFAULT_POOL_EXPIRE, func() (interface{}, error) {
|
||||
if conn, err := NewNetConn(ip, port, timeout...); err == nil {
|
||||
return &Conn {
|
||||
Conn : conn,
|
||||
pool : pool,
|
||||
}, nil
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
})
|
||||
m[addr] = pool
|
||||
}
|
||||
})
|
||||
} else {
|
||||
pool = v.(*gpool.Pool)
|
||||
}
|
||||
|
||||
if v, err := pool.Get(); err == nil {
|
||||
return v.(*Conn), nil
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// 将net.Conn接口对象转换为*gtcp.Conn对象(注意递归影响,因为*gtcp.Conn本身也实现了net.Conn接口)
|
||||
func NewConnByNetConn(conn net.Conn) *Conn {
|
||||
return &Conn {
|
||||
Conn : conn,
|
||||
}
|
||||
}
|
||||
|
||||
// 覆盖底层接口对象的Close方法
|
||||
func (c *Conn) Close() error {
|
||||
if c.pool != nil && c.err == nil {
|
||||
c.pool.Put(c)
|
||||
} else {
|
||||
c.Conn.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 发送数据
|
||||
func (c *Conn) Send(data []byte, retry...Retry) error {
|
||||
err := Send(c, data, retry...)
|
||||
if err != nil {
|
||||
c.err = err
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// 接收数据
|
||||
func (c *Conn) Receive(retry...Retry) ([]byte, error) {
|
||||
data, err := Receive(c, retry...)
|
||||
if err != nil {
|
||||
c.err = err
|
||||
}
|
||||
return data, err
|
||||
}
|
||||
|
||||
// 带超时时间的数据获取
|
||||
func (c *Conn) ReceiveWithTimeout(timeout time.Duration, retry...Retry) ([]byte, error) {
|
||||
data, err := ReceiveWithTimeout(c, timeout, retry...)
|
||||
if err != nil {
|
||||
c.err = err
|
||||
}
|
||||
return data, err
|
||||
}
|
||||
|
||||
// 带超时时间的数据发送
|
||||
func (c *Conn) SendWithTimeout(data []byte, timeout time.Duration, retry...Retry) error {
|
||||
err := SendWithTimeout(c, data, timeout, retry...)
|
||||
if err != nil {
|
||||
c.err = err
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// 发送数据并等待接收返回数据
|
||||
func (c *Conn) SendReceive(data []byte, retry...Retry) ([]byte, error) {
|
||||
data, err := SendReceive(c, data, retry...)
|
||||
if err != nil {
|
||||
c.err = err
|
||||
}
|
||||
return data, err
|
||||
}
|
||||
|
||||
// 发送数据并等待接收返回数据(带返回超时等待时间)
|
||||
func (c *Conn) SendReceiveWithTimeout(data []byte, timeout time.Duration, retry...Retry) ([]byte, error) {
|
||||
data, err := SendReceiveWithTimeout(c, data, timeout, retry...)
|
||||
if err != nil {
|
||||
c.err = err
|
||||
}
|
||||
return data, err
|
||||
}
|
||||
@ -62,7 +62,6 @@ func New(path string, flag int, expire int) *Pool {
|
||||
// 独立的线程执行过期清理工作
|
||||
if expire != -1 {
|
||||
go func(p *Pool) {
|
||||
// 遍历可用指针列表,判断是否过期
|
||||
for !p.closed.Val() {
|
||||
if r := p.list.PopFront(); r != nil {
|
||||
f := r.(*PoolItem)
|
||||
|
||||
@ -18,7 +18,6 @@ import (
|
||||
|
||||
const (
|
||||
gPROC_ENV_KEY_PPID_KEY = "GPROC_PPID"
|
||||
gPROC_ENV_KEY_COMM_KEY = "GPROC_COMM_ENABLED"
|
||||
gPROC_TEMP_DIR_ENV_KEY = "GPROC_TEMP_DIR"
|
||||
)
|
||||
|
||||
|
||||
@ -13,20 +13,23 @@ import (
|
||||
"gitee.com/johng/gf/g/os/gfile"
|
||||
"gitee.com/johng/gf/g/util/gconv"
|
||||
"gitee.com/johng/gf/g/container/gmap"
|
||||
"gitee.com/johng/gf/g/container/gqueue"
|
||||
)
|
||||
|
||||
// 本地进程通信发送消息队列
|
||||
var commSendQueue = gqueue.New()
|
||||
// 本地进程通信接收消息队列
|
||||
var commReceiveQueue = gqueue.New()
|
||||
// (用于发送)已建立的PID对应的Conn通信对象
|
||||
var commPidConnMap = gmap.NewIntInterfaceMap()
|
||||
const (
|
||||
gPROC_MSG_QUEUE_MAX_LENGTH = 10000 // 进程消息队列最大长度(每个分组)
|
||||
)
|
||||
|
||||
// 本地进程通信接收消息队列(按照分组进行构建的map,键值为*gqueue.Queue对象)
|
||||
var commReceiveQueues = gmap.NewStringInterfaceMap()
|
||||
|
||||
// (用于发送)已建立的PID对应的Conn通信对象,键值为一个Pool,防止并行使用同一个通信对象造成数据重叠
|
||||
var commPidConnMap = gmap.NewIntInterfaceMap()
|
||||
|
||||
// TCP通信数据结构定义
|
||||
type Msg struct {
|
||||
Pid int // PID,来源哪个进程
|
||||
Data []byte // 数据
|
||||
Pid int // PID,来源哪个进程
|
||||
Data []byte // 数据
|
||||
Group string // 分组名称
|
||||
}
|
||||
|
||||
// TCP通信数据结构定义
|
||||
@ -35,14 +38,6 @@ type sendQueueItem struct {
|
||||
Data []byte // 数据
|
||||
}
|
||||
|
||||
// 进程管理/通信初始化操作
|
||||
func init() {
|
||||
// 默认下为空("")
|
||||
if os.Getenv(gPROC_ENV_KEY_COMM_KEY) != "0" {
|
||||
go startTcpListening()
|
||||
}
|
||||
}
|
||||
|
||||
// 获取指定进程的通信文件地址
|
||||
func getCommFilePath(pid int) string {
|
||||
return getCommDirPath() + gfile.Separator + gconv.String(pid)
|
||||
|
||||
@ -3,12 +3,156 @@
|
||||
// This Source Code Form is subject to the terms of the MIT License.
|
||||
// If a copy of the MIT was not distributed with this file,
|
||||
// You can obtain one at https://gitee.com/johng/gf.
|
||||
// "不要通过共享内存来通信,而应该通过通信来共享内存"
|
||||
|
||||
|
||||
package gproc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"gitee.com/johng/gf/g/os/glog"
|
||||
"gitee.com/johng/gf/g/net/gtcp"
|
||||
"gitee.com/johng/gf/g/os/gfile"
|
||||
"gitee.com/johng/gf/g/util/gconv"
|
||||
"gitee.com/johng/gf/g/encoding/gbinary"
|
||||
"gitee.com/johng/gf/g/container/gqueue"
|
||||
"gitee.com/johng/gf/g/container/gtype"
|
||||
)
|
||||
|
||||
const (
|
||||
gPROC_DEFAULT_TCP_PORT = 10000 // 默认开始监听的TCP端口号,如果占用则递增
|
||||
)
|
||||
|
||||
var (
|
||||
// 是否已开启TCP端口监听服务(使用int而非bool,以便于使用原子操作判断是否开启)
|
||||
tcpListeningCount = gtype.NewInt()
|
||||
)
|
||||
|
||||
// 创建本地进程TCP通信服务
|
||||
func startTcpListening() {
|
||||
// 一个进程只能开启一个监听goroutine
|
||||
if tcpListeningCount.Add(1) != 1 {
|
||||
return
|
||||
}
|
||||
var listen *net.TCPListener
|
||||
for i := gPROC_DEFAULT_TCP_PORT; ; i++ {
|
||||
addr, err := net.ResolveTCPAddr("tcp4", fmt.Sprintf("127.0.0.1:%d", i))
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
listen, err = net.ListenTCP("tcp", addr)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
// 将监听的端口保存到通信文件中(字符串类型存放)
|
||||
gfile.PutContents(getCommFilePath(Pid()), gconv.String(i))
|
||||
//glog.Printfln("%d: gproc listening on [%s]", Pid(), addr)
|
||||
break
|
||||
}
|
||||
for {
|
||||
if conn, err := listen.Accept(); err != nil {
|
||||
glog.Error(err)
|
||||
} else if conn != nil {
|
||||
go tcpServiceHandler(gtcp.NewConnByNetConn(conn))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TCP数据通信处理回调函数
|
||||
func tcpServiceHandler(conn *gtcp.Conn) {
|
||||
var retry = gtcp.Retry{3, 10}
|
||||
for {
|
||||
var result []byte
|
||||
buffer, err := conn.Receive(retry)
|
||||
if len(buffer) > 0 {
|
||||
var msgs []*Msg
|
||||
for _, msg := range bufferToMsgs(buffer) {
|
||||
if v := commReceiveQueues.Get(msg.Group); v != nil {
|
||||
msgs = append(msgs, msg)
|
||||
} else {
|
||||
result = []byte(fmt.Sprintf("group [%s] does not exist", msg.Group))
|
||||
break
|
||||
}
|
||||
}
|
||||
if len(result) == 0 {
|
||||
result = []byte("ok")
|
||||
for _, msg := range msgs {
|
||||
if v := commReceiveQueues.Get(msg.Group); v != nil {
|
||||
v.(*gqueue.Queue).PushBack(msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// 产生错误(或者对方已经关闭链接)时,退出接收循环
|
||||
if err == nil {
|
||||
conn.Send(result, retry)
|
||||
} else {
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 数据解包,防止黏包
|
||||
// 数据格式:总长度(24bit)|发送进程PID(16bit)|接收进程PID(16bit)|分组长度(8bit)|分组名称(变长)|校验(32bit)|参数(变长)
|
||||
func bufferToMsgs(buffer []byte) []*Msg {
|
||||
s := 0
|
||||
msgs := make([]*Msg, 0)
|
||||
for s < len(buffer) {
|
||||
// 长度解析及校验
|
||||
length := gbinary.DecodeToInt(buffer[s : s + 3])
|
||||
if length < 12 || length > len(buffer) {
|
||||
s++
|
||||
continue
|
||||
}
|
||||
// 分组信息解析
|
||||
groupLen := gbinary.DecodeToInt(buffer[s + 7 : s + 8])
|
||||
// checksum校验(仅对参数做校验,提高校验效率)
|
||||
checksum1 := gbinary.DecodeToUint32(buffer[s + 8 + groupLen : s + 8 + groupLen + 4])
|
||||
checksum2 := gtcp.Checksum(buffer[s + 8 + groupLen + 4 : s + length])
|
||||
if checksum1 != checksum2 {
|
||||
s++
|
||||
continue
|
||||
}
|
||||
// 接收进程PID校验
|
||||
if Pid() == gbinary.DecodeToInt(buffer[s + 5 : s + 7]) {
|
||||
msgs = append(msgs, &Msg {
|
||||
Pid : gbinary.DecodeToInt(buffer[s + 3 : s + 5]),
|
||||
Data : buffer[s + 8 + groupLen + 4 : s + length],
|
||||
Group : string(buffer[s + 8 : s + 8 + groupLen]),
|
||||
})
|
||||
}
|
||||
s += length
|
||||
}
|
||||
return msgs
|
||||
}
|
||||
|
||||
// 获取其他进程传递到当前进程的消息包,阻塞执行
|
||||
func Receive() *Msg {
|
||||
if v := commReceiveQueue.PopFront(); v != nil {
|
||||
func Receive(group...string) *Msg {
|
||||
// 开启端口监听
|
||||
go startTcpListening()
|
||||
|
||||
var queue *gqueue.Queue
|
||||
groupName := gPROC_COMM_DEAFULT_GRUOP_NAME
|
||||
if len(group) > 0 {
|
||||
groupName = group[0]
|
||||
}
|
||||
|
||||
if v := commReceiveQueues.Get(groupName); v == nil {
|
||||
commReceiveQueues.LockFunc(func(m map[string]interface{}) {
|
||||
if v, ok := m[groupName]; ok {
|
||||
queue = v.(*gqueue.Queue)
|
||||
} else {
|
||||
queue = gqueue.New(gPROC_MSG_QUEUE_MAX_LENGTH)
|
||||
m[groupName] = queue
|
||||
}
|
||||
})
|
||||
} else {
|
||||
queue = v.(*gqueue.Queue)
|
||||
}
|
||||
|
||||
if v := queue.PopFront(); v != nil {
|
||||
return v.(*Msg)
|
||||
}
|
||||
return nil
|
||||
|
||||
@ -7,69 +7,74 @@
|
||||
package gproc
|
||||
|
||||
import (
|
||||
"net"
|
||||
"gitee.com/johng/gf/g/net/gtcp"
|
||||
"gitee.com/johng/gf/g/os/gfile"
|
||||
"gitee.com/johng/gf/g/util/gconv"
|
||||
"gitee.com/johng/gf/g/encoding/gbinary"
|
||||
"fmt"
|
||||
"errors"
|
||||
"time"
|
||||
"bytes"
|
||||
)
|
||||
|
||||
const (
|
||||
gPROC_COMM_FAILURE_RETRY_COUNT = 3 // 失败重试次数
|
||||
gPROC_COMM_FAILURE_RETRY_COUNT = 3 // 失败重试次数
|
||||
gPROC_COMM_FAILURE_RETRY_TIMEOUT = 1000 // (毫秒)失败重试间隔
|
||||
gPROC_COMM_SEND_TIMEOUT = 5000 // (毫秒)发送超时时间
|
||||
gPROC_COMM_DEAFULT_GRUOP_NAME = "" // 默认分组名称
|
||||
)
|
||||
|
||||
// 向指定gproc进程发送数据
|
||||
// 数据格式:总长度(32bit) | 发送进程PID(32bit) | 接收进程PID(32bit) | 校验(32bit) | 参数(变长)
|
||||
func Send(pid int, data []byte) error {
|
||||
// 数据格式:总长度(24bit)|发送进程PID(16bit)|接收进程PID(16bit)|分组长度(8bit)|分组名称(变长)|校验(32bit)|参数(变长)
|
||||
func Send(pid int, data []byte, group...string) error {
|
||||
groupName := gPROC_COMM_DEAFULT_GRUOP_NAME
|
||||
if len(group) > 0 {
|
||||
groupName = group[0]
|
||||
}
|
||||
buffer := make([]byte, 0)
|
||||
buffer = append(buffer, gbinary.EncodeInt32(int32(len(data) + 16))...)
|
||||
buffer = append(buffer, gbinary.EncodeInt32(int32(Pid()))...)
|
||||
buffer = append(buffer, gbinary.EncodeInt32(int32(pid))...)
|
||||
buffer = append(buffer, gbinary.EncodeByLength(3, len(groupName) + len(data) + 12)...)
|
||||
buffer = append(buffer, gbinary.EncodeByLength(2, Pid())...)
|
||||
buffer = append(buffer, gbinary.EncodeByLength(2, pid)...)
|
||||
buffer = append(buffer, gbinary.EncodeByLength(1, len(groupName))...)
|
||||
buffer = append(buffer, []byte(groupName)...)
|
||||
buffer = append(buffer, gbinary.EncodeUint32(gtcp.Checksum(data))...)
|
||||
buffer = append(buffer, data...)
|
||||
if conn, err := getConnByPid(pid); err == nil {
|
||||
for i := gPROC_COMM_FAILURE_RETRY_COUNT; i > 0; i-- {
|
||||
if err = gtcp.Send(conn, buffer); err != nil {
|
||||
conn.Close()
|
||||
if conn, err = newConnByPid(pid); err != nil {
|
||||
return err
|
||||
// 执行发送流程
|
||||
var err error
|
||||
var buf []byte
|
||||
var conn *gtcp.Conn
|
||||
for i := gPROC_COMM_FAILURE_RETRY_COUNT; i > 0; i-- {
|
||||
if conn, err = getConnByPid(pid); err == nil {
|
||||
defer conn.Close()
|
||||
buf, err = conn.SendReceiveWithTimeout(buffer, gPROC_COMM_SEND_TIMEOUT*time.Millisecond)
|
||||
if len(buf) > 0 {
|
||||
// 如果有返回值,如果不是"ok",那么表示是错误信息
|
||||
if !bytes.EqualFold(buf, []byte("ok")) {
|
||||
err = errors.New(string(buf))
|
||||
break
|
||||
}
|
||||
} else {
|
||||
//glog.Printfln("%d: sent to %d, %v", Pid(), pid, buffer)
|
||||
}
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
return err
|
||||
} else {
|
||||
return err
|
||||
time.Sleep(gPROC_COMM_FAILURE_RETRY_TIMEOUT*time.Millisecond)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
|
||||
// 获取指定进程的TCP通信对象
|
||||
func getConnByPid(pid int) (net.Conn, error) {
|
||||
if v := commPidConnMap.Get(pid); v != nil {
|
||||
return v.(net.Conn), nil
|
||||
} else {
|
||||
return newConnByPid(pid)
|
||||
}
|
||||
}
|
||||
|
||||
// 创建与指定进程的TCP通信对象
|
||||
func newConnByPid(pid int) (net.Conn, error) {
|
||||
if port := getPortByPid(pid); port > 0 {
|
||||
if conn, err := gtcp.Conn("127.0.0.1", port); err == nil {
|
||||
commPidConnMap.Set(pid, conn)
|
||||
func getConnByPid(pid int) (*gtcp.Conn, error) {
|
||||
port := getPortByPid(pid)
|
||||
if port > 0 {
|
||||
if conn, err := gtcp.NewConn("127.0.0.1", port); err == nil {
|
||||
return conn, nil
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
return nil, errors.New(fmt.Sprintf("%d not found", pid))
|
||||
}
|
||||
|
||||
return nil, errors.New(fmt.Sprintf("could not find port for pid: %d" , pid))
|
||||
}
|
||||
|
||||
// 获取指定进程监听的端口号
|
||||
|
||||
@ -1,96 +0,0 @@
|
||||
// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the MIT License.
|
||||
// If a copy of the MIT was not distributed with this file,
|
||||
// You can obtain one at https://gitee.com/johng/gf.
|
||||
// "不要通过共享内存来通信,而应该通过通信来共享内存"
|
||||
|
||||
|
||||
package gproc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"gitee.com/johng/gf/g/os/glog"
|
||||
"gitee.com/johng/gf/g/net/gtcp"
|
||||
"gitee.com/johng/gf/g/os/gfile"
|
||||
"gitee.com/johng/gf/g/util/gconv"
|
||||
"gitee.com/johng/gf/g/encoding/gbinary"
|
||||
)
|
||||
|
||||
const (
|
||||
gPROC_DEFAULT_TCP_PORT = 10000
|
||||
)
|
||||
|
||||
// 创建本地进程TCP通信服务
|
||||
func startTcpListening() {
|
||||
var listen *net.TCPListener
|
||||
for i := gPROC_DEFAULT_TCP_PORT; ; i++ {
|
||||
addr, err := net.ResolveTCPAddr("tcp4", fmt.Sprintf("127.0.0.1:%d", i))
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
listen, err = net.ListenTCP("tcp", addr)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
// 将监听的端口保存到通信文件中(字符串类型存放)
|
||||
gfile.PutContents(getCommFilePath(Pid()), gconv.String(i))
|
||||
//glog.Printfln("%d: gproc listening on [%s]", Pid(), addr)
|
||||
break
|
||||
}
|
||||
for {
|
||||
if conn, err := listen.Accept(); err != nil {
|
||||
glog.Error(err)
|
||||
} else if conn != nil {
|
||||
go tcpServiceHandler(conn)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TCP数据通信处理回调函数
|
||||
// 数据格式:总长度(32bit) | PID(32bit) | 校验(32bit) | 参数(变长)
|
||||
func tcpServiceHandler(conn net.Conn) {
|
||||
for {
|
||||
if buffer, err := gtcp.Receive(conn, gtcp.Retry{3, 10}); len(buffer) > 0 && err == nil {
|
||||
//glog.Printfln("%d: receive, %v", Pid(), buffer)
|
||||
for _, v := range bufferToMsgs(buffer) {
|
||||
commReceiveQueue.PushBack(v)
|
||||
}
|
||||
} else {
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 数据解包,防止黏包
|
||||
// 数据格式:总长度(32bit) | 发送进程PID(32bit) | 接收进程PID(32bit) | 校验(32bit) | 参数(变长)
|
||||
func bufferToMsgs(buffer []byte) []*Msg {
|
||||
s := 0
|
||||
msgs := make([]*Msg, 0)
|
||||
for s < len(buffer) {
|
||||
// 长度解析及校验
|
||||
length := gbinary.DecodeToInt(buffer[s : s + 4])
|
||||
if length < 16 || length > len(buffer) {
|
||||
s++
|
||||
continue
|
||||
}
|
||||
// checksum校验(仅对参数做校验,提高校验效率)
|
||||
checksum1 := gbinary.DecodeToUint32(buffer[s + 12 : s + 16])
|
||||
checksum2 := gtcp.Checksum(buffer[s + 16 : s + length])
|
||||
if checksum1 != checksum2 {
|
||||
s++
|
||||
continue
|
||||
}
|
||||
// 接收进程PID校验
|
||||
if Pid() == gbinary.DecodeToInt(buffer[s + 8 : s + 12]) {
|
||||
msgs = append(msgs, &Msg {
|
||||
Pid : gbinary.DecodeToInt(buffer[s + 4 : s + 8]),
|
||||
Data : buffer[s + 16 : s + length],
|
||||
})
|
||||
}
|
||||
s += length
|
||||
}
|
||||
return msgs
|
||||
}
|
||||
@ -19,7 +19,6 @@ type Process struct {
|
||||
exec.Cmd
|
||||
Manager *Manager // 所属进程管理器
|
||||
PPid int // 自定义关联的父进程ID
|
||||
DisableComm bool // 是否关闭TCP通信监听服务
|
||||
}
|
||||
|
||||
// 创建一个进程(不执行)
|
||||
@ -61,12 +60,7 @@ func (p *Process) Start() (int, error) {
|
||||
if p.Process != nil {
|
||||
return p.Pid(), nil
|
||||
}
|
||||
commEnabled := 1
|
||||
if p.DisableComm {
|
||||
commEnabled = 0
|
||||
}
|
||||
p.Env = append(p.Env, fmt.Sprintf("%s=%d", gPROC_ENV_KEY_PPID_KEY, p.PPid))
|
||||
p.Env = append(p.Env, fmt.Sprintf("%s=%d", gPROC_ENV_KEY_COMM_KEY, commEnabled))
|
||||
if err := p.Cmd.Start(); err == nil {
|
||||
if p.Manager != nil {
|
||||
p.Manager.processes.Set(p.Process.Pid, p)
|
||||
|
||||
@ -10,6 +10,24 @@ import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func BenchmarkSecond(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
Second()
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkMillisecond(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
Millisecond()
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkMicrosecond(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
Microsecond()
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkNanosecond(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
Nanosecond()
|
||||
|
||||
17
geg/container/gpool/gpool.go
Normal file
17
geg/container/gpool/gpool.go
Normal file
@ -0,0 +1,17 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"gitee.com/johng/gf/g/container/gpool"
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
func main () {
|
||||
p := gpool.New(1000)
|
||||
fmt.Println(p.Get())
|
||||
p.Put(1)
|
||||
fmt.Println(p.Get())
|
||||
p.Put(2)
|
||||
time.Sleep(time.Second)
|
||||
fmt.Println(p.Get())
|
||||
}
|
||||
38
geg/net/gtcp/tcp_pool.go
Normal file
38
geg/net/gtcp/tcp_pool.go
Normal file
@ -0,0 +1,38 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"gitee.com/johng/gf/g/net/gtcp"
|
||||
"time"
|
||||
"net"
|
||||
"gitee.com/johng/gf/g/os/gtime"
|
||||
"fmt"
|
||||
"gitee.com/johng/gf/g/os/glog"
|
||||
)
|
||||
|
||||
func main() {
|
||||
go gtcp.NewServer("127.0.0.1:8999", func(conn net.Conn) {
|
||||
for {
|
||||
buffer := make([]byte, 1024)
|
||||
if length, err := conn.Read(buffer); err == nil {
|
||||
conn.Write(append([]byte("> "), buffer[0 : length]...))
|
||||
}
|
||||
//conn.Close()
|
||||
}
|
||||
}).Run()
|
||||
|
||||
time.Sleep(time.Second)
|
||||
|
||||
for {
|
||||
if conn, err := gtcp.NewConn("127.0.0.1", 8999); err == nil {
|
||||
if b, err := conn.SendReceive([]byte(gtime.Datetime())); err == nil {
|
||||
fmt.Println(string(b), conn.LocalAddr(), conn.RemoteAddr())
|
||||
conn.Close()
|
||||
} else {
|
||||
glog.Error(err)
|
||||
}
|
||||
} else {
|
||||
glog.Error(err)
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
}
|
||||
}
|
||||
@ -1,3 +1,6 @@
|
||||
// 多进程通信示例,
|
||||
// 子进程每个1秒向父进程发送当前时间,
|
||||
// 父进程监听进程消息,收到后打印到终端。
|
||||
package main
|
||||
|
||||
import (
|
||||
@ -6,13 +9,16 @@ import (
|
||||
"time"
|
||||
"gitee.com/johng/gf/g/os/gproc"
|
||||
"gitee.com/johng/gf/g/os/gtime"
|
||||
"gitee.com/johng/gf/g/os/glog"
|
||||
)
|
||||
|
||||
func main () {
|
||||
fmt.Printf("%d: I am child? %v\n", gproc.Pid(), gproc.IsChild())
|
||||
if gproc.IsChild() {
|
||||
gtime.SetInterval(time.Second, func() bool {
|
||||
gproc.Send(gproc.PPid(), []byte(gtime.Datetime()))
|
||||
if err := gproc.Send(gproc.PPid(), []byte(gtime.Datetime())); err != nil {
|
||||
glog.Error(err)
|
||||
}
|
||||
return true
|
||||
})
|
||||
select { }
|
||||
@ -22,7 +28,7 @@ func main () {
|
||||
p.Start()
|
||||
for {
|
||||
msg := gproc.Receive()
|
||||
fmt.Printf("receive from %d, data: %s\n", msg.Pid, string(msg.Data))
|
||||
fmt.Printf("%d: receive from %d, data: %s, group: %s\n", gproc.Pid(), msg.Pid, string(msg.Data), msg.Group)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
33
geg/os/gproc/gproc_comm_group.go
Normal file
33
geg/os/gproc/gproc_comm_group.go
Normal file
@ -0,0 +1,33 @@
|
||||
// 该示例是gproc_comm.go的改进,增加了分组消息的演示。
|
||||
package main
|
||||
|
||||
import (
|
||||
"os"
|
||||
"fmt"
|
||||
"time"
|
||||
"gitee.com/johng/gf/g/os/gproc"
|
||||
"gitee.com/johng/gf/g/os/gtime"
|
||||
"gitee.com/johng/gf/g/os/glog"
|
||||
)
|
||||
|
||||
func main () {
|
||||
fmt.Printf("%d: I am child? %v\n", gproc.Pid(), gproc.IsChild())
|
||||
group := "test"
|
||||
if gproc.IsChild() {
|
||||
gtime.SetInterval(time.Second, func() bool {
|
||||
if err := gproc.Send(gproc.PPid(), []byte(gtime.Datetime()), group); err != nil {
|
||||
glog.Error(err)
|
||||
}
|
||||
return true
|
||||
})
|
||||
select { }
|
||||
} else {
|
||||
m := gproc.NewManager()
|
||||
p := m.NewProcess(os.Args[0], os.Args, os.Environ())
|
||||
p.Start()
|
||||
for {
|
||||
msg := gproc.Receive(group)
|
||||
fmt.Printf("receive from %d, data: %s, group: %s\n", msg.Pid, string(msg.Data), msg.Group)
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -1,3 +1,4 @@
|
||||
// 向指定进程发送进程消息。
|
||||
package main
|
||||
|
||||
import (
|
||||
@ -6,6 +7,6 @@ import (
|
||||
)
|
||||
|
||||
func main () {
|
||||
err := gproc.Send(23504, []byte{30})
|
||||
err := gproc.Send(22988, []byte{30})
|
||||
fmt.Println(err)
|
||||
}
|
||||
@ -2,11 +2,12 @@ package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"gitee.com/johng/gf/g/encoding/gjson"
|
||||
"gitee.com/johng/gf/g/encoding/gbinary"
|
||||
)
|
||||
|
||||
func main() {
|
||||
content := `[0.00000000059, 1.598877777409]`
|
||||
j, _ := gjson.LoadContent([]byte(content), "json")
|
||||
fmt.Println(j.GetString("0"))
|
||||
i := 65533
|
||||
b := gbinary.EncodeByLength(3, i)
|
||||
fmt.Println(b)
|
||||
fmt.Println(gbinary.DecodeToInt32(b))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user