diff --git a/g/net/ghttp/ghttp_server_admin.go b/g/net/ghttp/ghttp_server_admin.go index 7720c6338..8e06fb913 100644 --- a/g/net/ghttp/ghttp_server_admin.go +++ b/g/net/ghttp/ghttp_server_admin.go @@ -113,8 +113,7 @@ func (s *Server) Reload() error { } serverProcessStatus.Set(gADMIN_ACTION_RELOADING) glog.Printfln("%d: server reloading", gproc.Pid()) - sendProcessMsg(gproc.Pid(), gMSG_RELOAD, nil) - return nil + return sendProcessMsg(gproc.Pid(), gMSG_RELOAD, nil) } // 完整重启Web Server @@ -129,8 +128,7 @@ func (s *Server) Restart() error { } serverProcessStatus.Set(gADMIN_ACTION_RESTARTING) glog.Printfln("%d: server restarting", gproc.Pid()) - sendProcessMsg(gproc.Pid(), gMSG_RESTART, nil) - return nil + return sendProcessMsg(gproc.Pid(), gMSG_RESTART, nil) } // 关闭Web Server @@ -145,8 +143,7 @@ func (s *Server) Shutdown() error { } serverProcessStatus.Set(gADMIN_ACTION_SHUTINGDOWN) glog.Printfln("%d: server shutting down", gproc.Pid()) - sendProcessMsg(gproc.PPid(), gMSG_SHUTDOWN, nil) - return nil + return sendProcessMsg(gproc.PPid(), gMSG_SHUTDOWN, nil) } // 检测当前操作的频繁度 diff --git a/g/net/ghttp/ghttp_server_comm.go b/g/net/ghttp/ghttp_server_comm.go index 13eec434c..0f0082e06 100644 --- a/g/net/ghttp/ghttp_server_comm.go +++ b/g/net/ghttp/ghttp_server_comm.go @@ -15,7 +15,6 @@ import ( "gitee.com/johng/gf/g/container/gtype" "gitee.com/johng/gf/g/encoding/gbinary" "gitee.com/johng/gf/g/os/gtime" - "gitee.com/johng/gf/g/os/glog" "time" ) @@ -113,7 +112,7 @@ func sendProcessMsg(pid int, act int, data []byte) error { break } } - glog.Printfln("%d=>%d, %d, %v", gproc.Pid(), pid, act, err) + //glog.Printfln("%d=>%d, %d, %v", gproc.Pid(), pid, act, err) return err } diff --git a/g/os/gproc/gproc_comm_send.go b/g/os/gproc/gproc_comm_send.go index 61737ba11..0fcfcec20 100644 --- a/g/os/gproc/gproc_comm_send.go +++ b/g/os/gproc/gproc_comm_send.go @@ -12,31 +12,32 @@ import ( "gitee.com/johng/gf/g/os/gfile" "gitee.com/johng/gf/g/util/gconv" "gitee.com/johng/gf/g/encoding/gbinary" - "os" "fmt" "errors" ) const ( - gPROC_COMM_FAILURE_RETRY_COUNT = 3 + gPROC_COMM_FAILURE_RETRY_COUNT = 3 // 失败重试次数 ) // 向指定gproc进程发送数据 -// 数据格式:总长度(32bit) | PID(32bit) | 校验(32bit) | 参数(变长) +// 数据格式:总长度(32bit) | 发送进程PID(32bit) | 接收进程PID(32bit) | 校验(32bit) | 参数(变长) func Send(pid int, data []byte) error { - b := make([]byte, 0) - b = append(b, gbinary.EncodeInt32(int32(len(data) + 12))...) - b = append(b, gbinary.EncodeInt32(int32(os.Getpid()))...) - b = append(b, gbinary.EncodeUint32(gtcp.Checksum(data))...) - b = append(b, data...) + buffer := make([]byte, 0) + buffer = append(buffer, gbinary.EncodeInt32(int32(len(data) + 16))...) + buffer = append(buffer, gbinary.EncodeInt32(int32(Pid()))...) + buffer = append(buffer, gbinary.EncodeInt32(int32(pid))...) + buffer = append(buffer, gbinary.EncodeUint32(gtcp.Checksum(data))...) + buffer = append(buffer, data...) if conn, err := getConnByPid(pid); err == nil { for i := gPROC_COMM_FAILURE_RETRY_COUNT; i > 0; i-- { - if err = gtcp.Send(conn, b); err != nil { + if err = gtcp.Send(conn, buffer); err != nil { conn.Close() if conn, err = newConnByPid(pid); err != nil { return err } } else { + //glog.Printfln("%d: sent to %d, %v", Pid(), pid, buffer) break } } diff --git a/g/os/gproc/gproc_comm_tcp.go b/g/os/gproc/gproc_comm_tcp.go index 798d83552..fe1ff39a6 100644 --- a/g/os/gproc/gproc_comm_tcp.go +++ b/g/os/gproc/gproc_comm_tcp.go @@ -36,6 +36,7 @@ func startTcpListening() { } // 将监听的端口保存到通信文件中(字符串类型存放) gfile.PutContents(getCommFilePath(Pid()), gconv.String(i)) + //glog.Printfln("%d: gproc listening on [%s]", Pid(), addr) break } for { @@ -51,14 +52,12 @@ func startTcpListening() { // 数据格式:总长度(32bit) | PID(32bit) | 校验(32bit) | 参数(变长) func tcpServiceHandler(conn net.Conn) { for { - if buffer, err := gtcp.Receive(conn, gtcp.Retry{3, 10}); err == nil { - if len(buffer) > 0 { - for _, v := range bufferToMsgs(buffer) { - commReceiveQueue.PushBack(v) - } + if buffer, err := gtcp.Receive(conn, gtcp.Retry{3, 10}); len(buffer) > 0 && err == nil { + //glog.Printfln("%d: receive, %v", Pid(), buffer) + for _, v := range bufferToMsgs(buffer) { + commReceiveQueue.PushBack(v) } } else { - fmt.Println(err) conn.Close() return } @@ -66,25 +65,31 @@ func tcpServiceHandler(conn net.Conn) { } // 数据解包,防止黏包 +// 数据格式:总长度(32bit) | 发送进程PID(32bit) | 接收进程PID(32bit) | 校验(32bit) | 参数(变长) func bufferToMsgs(buffer []byte) []*Msg { s := 0 msgs := make([]*Msg, 0) for s < len(buffer) { + // 长度解析及校验 length := gbinary.DecodeToInt(buffer[s : s + 4]) - if length < 0 || length > len(buffer) { + if length < 16 || length > len(buffer) { s++ continue } - checksum1 := gbinary.DecodeToUint32(buffer[s + 8 : s + 12]) - checksum2 := gtcp.Checksum(buffer[s + 12 : s + length]) + // checksum校验(仅对参数做校验,提高校验效率) + checksum1 := gbinary.DecodeToUint32(buffer[s + 12 : s + 16]) + checksum2 := gtcp.Checksum(buffer[s + 16 : s + length]) if checksum1 != checksum2 { s++ continue } - msgs = append(msgs, &Msg { - Pid : gbinary.DecodeToInt(buffer[s + 4 : s + 8]), - Data : buffer[s + 12 : s + length], - }) + // 接收进程PID校验 + if Pid() == gbinary.DecodeToInt(buffer[s + 8 : s + 12]) { + msgs = append(msgs, &Msg { + Pid : gbinary.DecodeToInt(buffer[s + 4 : s + 8]), + Data : buffer[s + 16 : s + length], + }) + } s += length } return msgs diff --git a/g/os/gproc/gproc_manager.go b/g/os/gproc/gproc_manager.go index 6cb7c00ba..59aad5458 100644 --- a/g/os/gproc/gproc_manager.go +++ b/g/os/gproc/gproc_manager.go @@ -154,13 +154,10 @@ func (m *Manager) SignalAll(sig os.Signal) error { } // 向所有进程发送消息 -func (m *Manager) Send(data []byte) error { +func (m *Manager) Send(data []byte) { for _, p := range m.Processes() { - if err := p.Send(data); err != nil { - return err - } + p.Send(data) } - return nil } // 向指定进程发送消息 diff --git a/geg/other/test.go b/geg/other/test.go index 11f6a7f9e..2b5f512d5 100644 --- a/geg/other/test.go +++ b/geg/other/test.go @@ -1,16 +1,17 @@ package main -import ( - "fmt" -) +import "fmt" -func test(data []byte) { - data = append(data, byte('A')) - fmt.Println(data) + +func test(a []byte) { + fmt.Println(a) } func main() { - a := []byte("1") + b := []byte{0,1,2,3,4,5,6,7,8,9} + a := []byte{} + a = append(a, b[0:2]...) + a = append(a, b[7:10]...) test(a) - fmt.Println(a) + fmt.Println(b) }