完成gproc进程间通信由共享文件改进为TCP/IP方式,并对应改进ghttp.Server热重启机制,完成在Linux及Windows下的功能测试

This commit is contained in:
John
2018-05-21 10:11:24 +08:00
parent 729b632531
commit d093f4093c
6 changed files with 43 additions and 43 deletions

View File

@ -113,8 +113,7 @@ func (s *Server) Reload() error {
}
serverProcessStatus.Set(gADMIN_ACTION_RELOADING)
glog.Printfln("%d: server reloading", gproc.Pid())
sendProcessMsg(gproc.Pid(), gMSG_RELOAD, nil)
return nil
return sendProcessMsg(gproc.Pid(), gMSG_RELOAD, nil)
}
// 完整重启Web Server
@ -129,8 +128,7 @@ func (s *Server) Restart() error {
}
serverProcessStatus.Set(gADMIN_ACTION_RESTARTING)
glog.Printfln("%d: server restarting", gproc.Pid())
sendProcessMsg(gproc.Pid(), gMSG_RESTART, nil)
return nil
return sendProcessMsg(gproc.Pid(), gMSG_RESTART, nil)
}
// 关闭Web Server
@ -145,8 +143,7 @@ func (s *Server) Shutdown() error {
}
serverProcessStatus.Set(gADMIN_ACTION_SHUTINGDOWN)
glog.Printfln("%d: server shutting down", gproc.Pid())
sendProcessMsg(gproc.PPid(), gMSG_SHUTDOWN, nil)
return nil
return sendProcessMsg(gproc.PPid(), gMSG_SHUTDOWN, nil)
}
// 检测当前操作的频繁度

View File

@ -15,7 +15,6 @@ import (
"gitee.com/johng/gf/g/container/gtype"
"gitee.com/johng/gf/g/encoding/gbinary"
"gitee.com/johng/gf/g/os/gtime"
"gitee.com/johng/gf/g/os/glog"
"time"
)
@ -113,7 +112,7 @@ func sendProcessMsg(pid int, act int, data []byte) error {
break
}
}
glog.Printfln("%d=>%d, %d, %v", gproc.Pid(), pid, act, err)
//glog.Printfln("%d=>%d, %d, %v", gproc.Pid(), pid, act, err)
return err
}

View File

@ -12,31 +12,32 @@ import (
"gitee.com/johng/gf/g/os/gfile"
"gitee.com/johng/gf/g/util/gconv"
"gitee.com/johng/gf/g/encoding/gbinary"
"os"
"fmt"
"errors"
)
const (
gPROC_COMM_FAILURE_RETRY_COUNT = 3
gPROC_COMM_FAILURE_RETRY_COUNT = 3 // 失败重试次数
)
// 向指定gproc进程发送数据
// 数据格式:总长度(32bit) | PID(32bit) | 校验(32bit) | 参数(变长)
// 数据格式:总长度(32bit) | 发送进程PID(32bit) | 接收进程PID(32bit) | 校验(32bit) | 参数(变长)
func Send(pid int, data []byte) error {
b := make([]byte, 0)
b = append(b, gbinary.EncodeInt32(int32(len(data) + 12))...)
b = append(b, gbinary.EncodeInt32(int32(os.Getpid()))...)
b = append(b, gbinary.EncodeUint32(gtcp.Checksum(data))...)
b = append(b, data...)
buffer := make([]byte, 0)
buffer = append(buffer, gbinary.EncodeInt32(int32(len(data) + 16))...)
buffer = append(buffer, gbinary.EncodeInt32(int32(Pid()))...)
buffer = append(buffer, gbinary.EncodeInt32(int32(pid))...)
buffer = append(buffer, gbinary.EncodeUint32(gtcp.Checksum(data))...)
buffer = append(buffer, data...)
if conn, err := getConnByPid(pid); err == nil {
for i := gPROC_COMM_FAILURE_RETRY_COUNT; i > 0; i-- {
if err = gtcp.Send(conn, b); err != nil {
if err = gtcp.Send(conn, buffer); err != nil {
conn.Close()
if conn, err = newConnByPid(pid); err != nil {
return err
}
} else {
//glog.Printfln("%d: sent to %d, %v", Pid(), pid, buffer)
break
}
}

View File

@ -36,6 +36,7 @@ func startTcpListening() {
}
// 将监听的端口保存到通信文件中(字符串类型存放)
gfile.PutContents(getCommFilePath(Pid()), gconv.String(i))
//glog.Printfln("%d: gproc listening on [%s]", Pid(), addr)
break
}
for {
@ -51,14 +52,12 @@ func startTcpListening() {
// 数据格式:总长度(32bit) | PID(32bit) | 校验(32bit) | 参数(变长)
func tcpServiceHandler(conn net.Conn) {
for {
if buffer, err := gtcp.Receive(conn, gtcp.Retry{3, 10}); err == nil {
if len(buffer) > 0 {
for _, v := range bufferToMsgs(buffer) {
commReceiveQueue.PushBack(v)
}
if buffer, err := gtcp.Receive(conn, gtcp.Retry{3, 10}); len(buffer) > 0 && err == nil {
//glog.Printfln("%d: receive, %v", Pid(), buffer)
for _, v := range bufferToMsgs(buffer) {
commReceiveQueue.PushBack(v)
}
} else {
fmt.Println(err)
conn.Close()
return
}
@ -66,25 +65,31 @@ func tcpServiceHandler(conn net.Conn) {
}
// 数据解包,防止黏包
// 数据格式:总长度(32bit) | 发送进程PID(32bit) | 接收进程PID(32bit) | 校验(32bit) | 参数(变长)
func bufferToMsgs(buffer []byte) []*Msg {
s := 0
msgs := make([]*Msg, 0)
for s < len(buffer) {
// 长度解析及校验
length := gbinary.DecodeToInt(buffer[s : s + 4])
if length < 0 || length > len(buffer) {
if length < 16 || length > len(buffer) {
s++
continue
}
checksum1 := gbinary.DecodeToUint32(buffer[s + 8 : s + 12])
checksum2 := gtcp.Checksum(buffer[s + 12 : s + length])
// checksum校验(仅对参数做校验,提高校验效率)
checksum1 := gbinary.DecodeToUint32(buffer[s + 12 : s + 16])
checksum2 := gtcp.Checksum(buffer[s + 16 : s + length])
if checksum1 != checksum2 {
s++
continue
}
msgs = append(msgs, &Msg {
Pid : gbinary.DecodeToInt(buffer[s + 4 : s + 8]),
Data : buffer[s + 12 : s + length],
})
// 接收进程PID校验
if Pid() == gbinary.DecodeToInt(buffer[s + 8 : s + 12]) {
msgs = append(msgs, &Msg {
Pid : gbinary.DecodeToInt(buffer[s + 4 : s + 8]),
Data : buffer[s + 16 : s + length],
})
}
s += length
}
return msgs

View File

@ -154,13 +154,10 @@ func (m *Manager) SignalAll(sig os.Signal) error {
}
// 向所有进程发送消息
func (m *Manager) Send(data []byte) error {
func (m *Manager) Send(data []byte) {
for _, p := range m.Processes() {
if err := p.Send(data); err != nil {
return err
}
p.Send(data)
}
return nil
}
// 向指定进程发送消息

View File

@ -1,16 +1,17 @@
package main
import (
"fmt"
)
import "fmt"
func test(data []byte) {
data = append(data, byte('A'))
fmt.Println(data)
func test(a []byte) {
fmt.Println(a)
}
func main() {
a := []byte("1")
b := []byte{0,1,2,3,4,5,6,7,8,9}
a := []byte{}
a = append(a, b[0:2]...)
a = append(a, b[7:10]...)
test(a)
fmt.Println(a)
fmt.Println(b)
}