From 1ac6de962712365c37d71d4377cc181d6078602a Mon Sep 17 00:00:00 2001 From: John Date: Fri, 11 May 2018 23:17:25 +0800 Subject: [PATCH] =?UTF-8?q?ghttp.Server=E7=83=AD=E9=87=8D=E5=90=AF?= =?UTF-8?q?=E7=89=B9=E6=80=A7=E6=B5=8B=E8=AF=95=E4=B8=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- g/net/ghttp/ghttp_server.go | 40 ++--- g/net/ghttp/ghttp_server_comm.go | 148 ++++++++---------- g/net/ghttp/ghttp_server_comm_child.go | 103 ++++++++++++ g/net/ghttp/ghttp_server_comm_main.go | 73 +++++++++ g/net/ghttp/ghttp_server_graceful.go | 4 +- g/os/gproc/gproc_manager.go | 5 + geg/net/ghttp/hot_restart/multi_port.go | 23 +++ .../hot_restart/multi_port_and_server.go | 32 ++++ .../{hot_restart.go => hot_restart/simple.go} | 0 geg/os/gproc/gproc2.go | 2 +- geg/other/test.go | 41 ++++- 11 files changed, 352 insertions(+), 119 deletions(-) create mode 100644 g/net/ghttp/ghttp_server_comm_child.go create mode 100644 g/net/ghttp/ghttp_server_comm_main.go create mode 100644 geg/net/ghttp/hot_restart/multi_port.go create mode 100644 geg/net/ghttp/hot_restart/multi_port_and_server.go rename geg/net/ghttp/{hot_restart.go => hot_restart/simple.go} (100%) diff --git a/g/net/ghttp/ghttp_server.go b/g/net/ghttp/ghttp_server.go index d9daca7db..71108b429 100644 --- a/g/net/ghttp/ghttp_server.go +++ b/g/net/ghttp/ghttp_server.go @@ -18,7 +18,6 @@ import ( "gitee.com/johng/gf/g/os/gproc" "gitee.com/johng/gf/g/os/gcache" "gitee.com/johng/gf/g/util/gconv" - "gitee.com/johng/gf/g/encoding/gjson" "gitee.com/johng/gf/g/container/gmap" "gitee.com/johng/gf/g/container/gtype" "gitee.com/johng/gf/g/container/gqueue" @@ -111,6 +110,8 @@ var procManager = gproc.NewManager() var runChan = make(chan struct{}, 100000) // 已完成的run消息队列 var doneChan = make(chan struct{}, 100000) +// 阻塞消息队列,用于ghttp.Wait +var waitChan = make(chan struct{}, 0) // Web Server进程初始化 func init() { @@ -122,41 +123,22 @@ func init() { sendProcessMsg(os.Getpid(), gMSG_START, nil) } // 开启进程消息监听处理 - handleProcessMsg() + handleProcessMsgAndSignal() // 服务执行完成,需要退出 doneChan <- struct{}{} if !gproc.IsChild() { - glog.Printfln("all web server shutdown smoothly") + glog.Printfln("%d: all web server shutdown smoothly", gproc.Pid()) } + + // 停止进程等待 + close(waitChan) }() } -// 获取所有Web Server的文件描述符map -func getServerFdMap() map[string]listenerFdMap { - sfm := make(map[string]listenerFdMap) - serverMapping.RLockFunc(func(m map[string]interface{}) { - for k, v := range m { - sfm[k] = v.(*Server).getListenerFdMap() - } - }) - return sfm -} - -// 二进制转换为FdMap -func bufferToServerFdMap(buffer []byte) map[string]listenerFdMap { - sfm := make(map[string]listenerFdMap) - if len(buffer) > 0 { - j, _ := gjson.LoadContent(buffer, "json") - for k, _ := range j.ToMap() { - m := make(map[string]string) - for k, v := range j.GetMap(k) { - m[k] = gconv.String(v) - } - sfm[k] = m - } - } - return sfm +// 阻塞等待所有Web Server停止,常用于多Web Server场景,以及需要将Web Server异步运行的场景 +func Wait() { + <- waitChan } // 获取/创建一个默认配置的HTTP Server(默认监听端口是80) @@ -267,6 +249,7 @@ func (s *Server) startServer(fdMap listenerFdMap) { if err := server.ListenAndServeTLS(s.config.HTTPSCertPath, s.config.HTTPSKeyPath); err != nil { // 如果非关闭错误,那么提示报错,否则认为是正常的服务关闭操作 if !strings.EqualFold(http.ErrServerClosed.Error(), err.Error()) { + s.servers = s.servers[0 : len(s.servers) - 1] glog.Error(err) } } @@ -297,6 +280,7 @@ func (s *Server) startServer(fdMap listenerFdMap) { if err := server.ListenAndServe(); err != nil { // 如果非关闭错误,那么提示报错,否则认为是正常的服务关闭操作 if !strings.EqualFold(http.ErrServerClosed.Error(), err.Error()) { + s.servers = s.servers[0 : len(s.servers) - 1] glog.Error(err) } } diff --git a/g/net/ghttp/ghttp_server_comm.go b/g/net/ghttp/ghttp_server_comm.go index bc0e4ebda..c4240296b 100644 --- a/g/net/ghttp/ghttp_server_comm.go +++ b/g/net/ghttp/ghttp_server_comm.go @@ -9,13 +9,12 @@ package ghttp import ( "os" - "fmt" - "strings" "syscall" "os/signal" "gitee.com/johng/gf/g/os/gproc" "gitee.com/johng/gf/g/util/gconv" "gitee.com/johng/gf/g/encoding/gjson" + "gitee.com/johng/gf/g/container/gtype" "gitee.com/johng/gf/g/encoding/gbinary" ) @@ -25,108 +24,65 @@ const ( gMSG_SHUTDOWN = 30 gMSG_NEW_FORK = 40 gMSG_REMOVE_PROC = 50 + gMSG_HEARTBEAT = 60 + + gPROC_HEARTBEAT_INTERVAL = 1000 // (毫秒)进程间心跳间隔 + gPROC_HEARTBEAT_TIMEOUT = 3000 // (毫秒)进程间心跳超时时间,如果子进程在这段内没有接收到任何心跳,那么自动退出,防止可能出现的僵尸子进程 ) // 进程信号量监听消息队列 -var procSignalChan = make(chan os.Signal) +var procSignalChan = make(chan os.Signal) + +// (主子进程)在第一次创建子进程成功之后才会开始心跳检测,同理对应超时时间才会生效 +var heartbeatStarted = gtype.NewBool() + +// 处理进程信号量监控以及进程间消息通信 +func handleProcessMsgAndSignal() { + go handleProcessSignal() + if gproc.IsChild() { + go handleChildProcessHeartbeat() + } else { + go handleMainProcessHeartbeat() + } + handleProcessMsg() +} // 处理进程间消息 // 数据格式: 操作(8bit) | 参数(变长) func handleProcessMsg() { - go handleProcessSignals() for { if msg := gproc.Receive(); msg != nil { - //fmt.Println(gproc.Pid(), gproc.IsChild(), msg) + // 记录消息日志,用于调试 + //gfile.PutContentsAppend("/tmp/gproc-log", + // gconv.String(msg.Pid) + "=>" + gconv.String(gproc.Pid()) + ":" + fmt.Sprintf("%v\n", msg.Data), + //) act := gbinary.DecodeToUint(msg.Data[0 : 1]) data := msg.Data[1 : ] if gproc.IsChild() { // 子进程 switch act { - // 开启所有Web Server(根据消息启动) - case gMSG_START: - if len(data) > 0 { - sfm := bufferToServerFdMap(data) - for k, v := range sfm { - GetServer(k).startServer(v) - } - } else { - serverMapping.RLockFunc(func(m map[string]interface{}) { - for _, v := range m { - v.(*Server).startServer(nil) - } - }) - } - - // 子进程收到重启消息,那么将自身的ServerFdMap信息收集后发送给主进程,由主进程进行统一调度 - case gMSG_RESTART: - // 创建新的服务进程,子进程自动从父进程复制文件描述来监听同样的端口 - sfm := getServerFdMap() - p := procManager.NewProcess(os.Args[0], os.Args, os.Environ()) - for name, m := range sfm { - for fdk, fdv := range m { - if len(fdv) > 0 { - s := "" - for _, item := range strings.Split(fdv, ",") { - array := strings.Split(item, "#") - fd := uintptr(gconv.Uint(array[1])) - s += fmt.Sprintf("%s#%d", array[0], len(p.GetAttr().Files)) - p.GetAttr().Files = append(p.GetAttr().Files, os.NewFile(fd, "")) - } - sfm[name][fdk] = strings.TrimRight(s, ",") - } - } - } - p.SetPpid(gproc.Ppid()) - p.Run() - b, _ := gjson.Encode(sfm) - sendProcessMsg(p.Pid(), gMSG_START, b) - sendProcessMsg(gproc.Ppid(), gMSG_NEW_FORK, gbinary.EncodeInt(p.Pid())) - sendProcessMsg(gproc.Pid(), gMSG_SHUTDOWN, nil) - - // 友好关闭服务链接并退出 + case gMSG_START: onCommChildStart(msg.Pid, data) + case gMSG_RESTART: onCommChildRestart(msg.Pid, data) + case gMSG_HEARTBEAT: onCommChildHeartbeat(msg.Pid, data) case gMSG_SHUTDOWN: - serverMapping.RLockFunc(func(m map[string]interface{}) { - for _, v := range m { - for _, s := range v.(*Server).servers { - s.shutdown() - } - } - }) - sendProcessMsg(gproc.Ppid(), gMSG_REMOVE_PROC, gbinary.EncodeInt(gproc.Pid())) + onCommChildShutdown(msg.Pid, data) return - } } else { // 父进程 switch act { - // 开启服务 - case gMSG_START: - p := procManager.NewProcess(os.Args[0], os.Args, os.Environ()) - p.Run() - sendProcessMsg(p.Pid(), gMSG_START, nil) - - // 重启服务 - case gMSG_RESTART: - // 向所有子进程发送重启命令,子进程将会搜集Web Server信息发送给父进程进行协调重启工作 - procManager.Send(formatMsgBuffer(gMSG_RESTART, nil)) - - // 新建子进程通知 - case gMSG_NEW_FORK: - pid := gbinary.DecodeToInt(data) - procManager.AddProcess(pid) - - // 销毁子进程通知 + case gMSG_START: onCommMainStart(msg.Pid, data) + case gMSG_RESTART: onCommMainRestart(msg.Pid, data) + case gMSG_NEW_FORK: onCommMainNewFork(msg.Pid, data) + case gMSG_HEARTBEAT: onCommMainHeartbeat(msg.Pid, data) case gMSG_REMOVE_PROC: - pid := gbinary.DecodeToInt(data) - procManager.RemoveProcess(pid) + onCommMainRemoveProc(msg.Pid, data) // 如果所有子进程都退出,那么主进程也主动退出 if procManager.Size() == 0 { return } - - // 关闭服务 case gMSG_SHUTDOWN: - procManager.Send(formatMsgBuffer(gMSG_SHUTDOWN, nil)) + onCommMainShutdown(msg.Pid, data) return } } @@ -135,7 +91,7 @@ func handleProcessMsg() { } // 信号量处理 -func handleProcessSignals() { +func handleProcessSignal() { var sig os.Signal signal.Notify( procSignalChan, @@ -163,14 +119,6 @@ func handleProcessSignals() { } } -func onMainShutDown() { - procManager.Send(formatMsgBuffer(gMSG_SHUTDOWN, nil)) -} - -func onMainRemoveProc() { - procManager.Send(formatMsgBuffer(gMSG_SHUTDOWN, nil)) -} - // 向进程发送操作消息 func sendProcessMsg(pid int, act int, data []byte) { gproc.Send(pid, formatMsgBuffer(act, data)) @@ -181,3 +129,29 @@ func formatMsgBuffer(act int, data []byte) []byte { return append(gbinary.EncodeUint8(uint8(act)), data...) } +// 获取所有Web Server的文件描述符map +func getServerFdMap() map[string]listenerFdMap { + sfm := make(map[string]listenerFdMap) + serverMapping.RLockFunc(func(m map[string]interface{}) { + for k, v := range m { + sfm[k] = v.(*Server).getListenerFdMap() + } + }) + return sfm +} + +// 二进制转换为FdMap +func bufferToServerFdMap(buffer []byte) map[string]listenerFdMap { + sfm := make(map[string]listenerFdMap) + if len(buffer) > 0 { + j, _ := gjson.LoadContent(buffer, "json") + for k, _ := range j.ToMap() { + m := make(map[string]string) + for k, v := range j.GetMap(k) { + m[k] = gconv.String(v) + } + sfm[k] = m + } + } + return sfm +} \ No newline at end of file diff --git a/g/net/ghttp/ghttp_server_comm_child.go b/g/net/ghttp/ghttp_server_comm_child.go new file mode 100644 index 000000000..fe15f93ab --- /dev/null +++ b/g/net/ghttp/ghttp_server_comm_child.go @@ -0,0 +1,103 @@ +// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. +// Web Server进程间通信 - 子进程 + +package ghttp + +import ( + "os" + "fmt" + "time" + "strings" + "gitee.com/johng/gf/g/os/gproc" + "gitee.com/johng/gf/g/os/gtime" + "gitee.com/johng/gf/g/util/gconv" + "gitee.com/johng/gf/g/encoding/gjson" + "gitee.com/johng/gf/g/container/gtype" + "gitee.com/johng/gf/g/encoding/gbinary" +) + +// (子进程)上一次从主进程接收心跳的时间戳 +var lastHeartbeatTime = gtype.NewInt() + +// 开启所有Web Server(根据消息启动) +func onCommChildStart(pid int, data []byte) { + if len(data) > 0 { + sfm := bufferToServerFdMap(data) + for k, v := range sfm { + GetServer(k).startServer(v) + } + } else { + fmt.Println(serverMapping) + serverMapping.RLockFunc(func(m map[string]interface{}) { + for _, v := range m { + v.(*Server).startServer(nil) + } + }) + } + heartbeatStarted.Set(true) +} + +// 心跳消息 +func onCommChildHeartbeat(pid int, data []byte) { + //glog.Printfln("%d: child heartbeat", gproc.Pid()) + lastHeartbeatTime.Set(int(gtime.Millisecond())) +} + +// 子进程收到重启消息,那么将自身的ServerFdMap信息收集后发送给主进程,由主进程进行统一调度 +func onCommChildRestart(pid int, data []byte) { + // 创建新的服务进程,子进程自动从父进程复制文件描述来监听同样的端口 + sfm := getServerFdMap() + p := procManager.NewProcess(os.Args[0], os.Args, os.Environ()) + // 将sfm中的fd按照子进程创建时的文件描述符顺序进行整理,以便子进程获取到正确的fd + for name, m := range sfm { + for fdk, fdv := range m { + if len(fdv) > 0 { + s := "" + for _, item := range strings.Split(fdv, ",") { + array := strings.Split(item, "#") + fd := uintptr(gconv.Uint(array[1])) + s += fmt.Sprintf("%s#%d,", array[0], len(p.GetAttr().Files)) + p.GetAttr().Files = append(p.GetAttr().Files, os.NewFile(fd, "")) + } + sfm[name][fdk] = strings.TrimRight(s, ",") + } + } + } + p.SetPpid(gproc.Ppid()) + p.Run() + // 编码,通信 + b, _ := gjson.Encode(sfm) + sendProcessMsg(p.Pid(), gMSG_START, b) + sendProcessMsg(gproc.Ppid(), gMSG_NEW_FORK, gbinary.EncodeInt(p.Pid())) + sendProcessMsg(gproc.Pid(), gMSG_SHUTDOWN, nil) +} + +// 友好关闭服务链接并退出 +func onCommChildShutdown(pid int, data []byte) { + serverMapping.RLockFunc(func(m map[string]interface{}) { + for _, v := range m { + for _, s := range v.(*Server).servers { + s.shutdown() + } + } + }) + sendProcessMsg(gproc.Ppid(), gMSG_REMOVE_PROC, gbinary.EncodeInt(gproc.Pid())) +} + +// 主进程与子进程相互异步方式发送心跳信息,保持活跃状态 +func handleChildProcessHeartbeat() { + for { + time.Sleep(gPROC_HEARTBEAT_INTERVAL*time.Millisecond) + sendProcessMsg(gproc.Ppid(), gMSG_HEARTBEAT, nil) + // 超过时间没有接收到主进程心跳,自动关闭退出 + if heartbeatStarted.Val() && (int(gtime.Millisecond()) - lastHeartbeatTime.Val() > gPROC_HEARTBEAT_TIMEOUT) { + sendProcessMsg(gproc.Pid(), gMSG_SHUTDOWN, nil) + // 子进程有时会无法退出,这里直接使用exit,而不是return + os.Exit(0) + } + } +} \ No newline at end of file diff --git a/g/net/ghttp/ghttp_server_comm_main.go b/g/net/ghttp/ghttp_server_comm_main.go new file mode 100644 index 000000000..273b10f93 --- /dev/null +++ b/g/net/ghttp/ghttp_server_comm_main.go @@ -0,0 +1,73 @@ +// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. +// Web Server进程间通信 - 主进程 + +package ghttp + +import ( + "os" + "time" + "gitee.com/johng/gf/g/os/gtime" + "gitee.com/johng/gf/g/container/gmap" + "gitee.com/johng/gf/g/encoding/gbinary" +) + +// (主进程)主进程与子进程上一次活跃时间映射map +var procUpdateMap = gmap.NewIntIntMap() + +// 开启服务 +func onCommMainStart(pid int, data []byte) { + p := procManager.NewProcess(os.Args[0], os.Args, os.Environ()) + p.Run() + sendProcessMsg(p.Pid(), gMSG_START, nil) +} + +// 心跳处理 +func onCommMainHeartbeat(pid int, data []byte) { + //glog.Printfln("%d: main heartbeat", gproc.Pid()) + procUpdateMap.Set(pid, int(gtime.Millisecond())) +} + +// 重启服务 +func onCommMainRestart(pid int, data []byte) { + // 向所有子进程发送重启命令,子进程将会搜集Web Server信息发送给父进程进行协调重启工作 + procManager.Send(formatMsgBuffer(gMSG_RESTART, nil)) +} + +// 新建子进程通知 +func onCommMainNewFork(pid int, data []byte) { + procManager.AddProcess(gbinary.DecodeToInt(data)) + heartbeatStarted.Set(true) +} + +// 销毁子进程通知 +func onCommMainRemoveProc(pid int, data []byte) { + procManager.RemoveProcess(gbinary.DecodeToInt(data)) +} + +// 关闭服务,通知所有子进程退出 +func onCommMainShutdown(pid int, data []byte) { + procManager.Send(formatMsgBuffer(gMSG_SHUTDOWN, nil)) +} + +// 主进程与子进程相互异步方式发送心跳信息,保持活跃状态 +func handleMainProcessHeartbeat() { + for { + time.Sleep(gPROC_HEARTBEAT_INTERVAL*time.Millisecond) + procManager.Send(formatMsgBuffer(gMSG_HEARTBEAT, nil)) + // 清理过期进程 + if heartbeatStarted.Val() { + for _, pid := range procManager.Pids() { + if int(gtime.Millisecond()) - procUpdateMap.Get(pid) > gPROC_HEARTBEAT_TIMEOUT { + // 这里需要手动从进程管理器中去掉该进程 + procManager.RemoveProcess(pid) + sendProcessMsg(pid, gMSG_SHUTDOWN, nil) + return + } + } + } + } +} \ No newline at end of file diff --git a/g/net/ghttp/ghttp_server_graceful.go b/g/net/ghttp/ghttp_server_graceful.go index b4d543ae3..5558c3fec 100644 --- a/g/net/ghttp/ghttp_server_graceful.go +++ b/g/net/ghttp/ghttp_server_graceful.go @@ -118,13 +118,13 @@ func (s *gracefulServer) getNetListener(addr string) (net.Listener, error) { f := os.NewFile(s.fd, "") ln, err = net.FileListener(f) if err != nil { - err = fmt.Errorf("net.FileListener error: %v", err) + err = fmt.Errorf("%d: net.FileListener error: %v", gproc.Pid(), err) return nil, err } } else { ln, err = net.Listen("tcp", addr) if err != nil { - err = fmt.Errorf("net.Listen error: %v", err) + err = fmt.Errorf("%d: net.Listen error: %v", gproc.Pid(), err) return nil, err } } diff --git a/g/os/gproc/gproc_manager.go b/g/os/gproc/gproc_manager.go index de6ad1751..1e3119682 100644 --- a/g/os/gproc/gproc_manager.go +++ b/g/os/gproc/gproc_manager.go @@ -99,6 +99,11 @@ func (m *Manager) Processes() []*Process { return processes } +// 获取所有的进程pid,构成列表返回 +func (m *Manager) Pids() []int { + return m.processes.Keys() +} + // 等待所有子进程结束 func (m *Manager) WaitAll() { processes := m.Processes() diff --git a/geg/net/ghttp/hot_restart/multi_port.go b/geg/net/ghttp/hot_restart/multi_port.go new file mode 100644 index 000000000..359520ab8 --- /dev/null +++ b/geg/net/ghttp/hot_restart/multi_port.go @@ -0,0 +1,23 @@ +package main + +import ( + "gitee.com/johng/gf/g" + "gitee.com/johng/gf/g/net/ghttp" +) + +func main() { + s := g.Server() + s.BindHandler("/", func(r *ghttp.Request){ + r.Response.Writeln("hello") + }) + s.BindHandler("/restart", func(r *ghttp.Request){ + r.Response.Writeln("restart server") + r.Server.Restart() + }) + s.BindHandler("/shutdown", func(r *ghttp.Request){ + r.Response.Writeln("shutdown server") + r.Server.Shutdown() + }) + s.SetPort(8199, 8200) + s.Run() +} \ No newline at end of file diff --git a/geg/net/ghttp/hot_restart/multi_port_and_server.go b/geg/net/ghttp/hot_restart/multi_port_and_server.go new file mode 100644 index 000000000..f7dbfac35 --- /dev/null +++ b/geg/net/ghttp/hot_restart/multi_port_and_server.go @@ -0,0 +1,32 @@ +package main + +import ( + "gitee.com/johng/gf/g" + "gitee.com/johng/gf/g/net/ghttp" +) + +func main() { + s1 := g.Server("s1") + s1.BindHandler("/", func(r *ghttp.Request){ + r.Response.Writeln("hello s1") + }) + s1.BindHandler("/restart", func(r *ghttp.Request){ + r.Response.Writeln("restart server") + r.Server.Restart() + }) + s1.BindHandler("/shutdown", func(r *ghttp.Request){ + r.Response.Writeln("shutdown server") + r.Server.Shutdown() + }) + s1.SetPort(8199, 8200) + go s1.Run() + + s2 := g.Server("s2") + s2.BindHandler("/", func(r *ghttp.Request){ + r.Response.Writeln("hello s2") + }) + s2.SetPort(8300, 8080) + go s2.Run() + + ghttp.Wait() +} \ No newline at end of file diff --git a/geg/net/ghttp/hot_restart.go b/geg/net/ghttp/hot_restart/simple.go similarity index 100% rename from geg/net/ghttp/hot_restart.go rename to geg/net/ghttp/hot_restart/simple.go diff --git a/geg/os/gproc/gproc2.go b/geg/os/gproc/gproc2.go index e92aed49c..c48ddd7ec 100644 --- a/geg/os/gproc/gproc2.go +++ b/geg/os/gproc/gproc2.go @@ -6,6 +6,6 @@ import ( ) func main () { - err := gproc.Send(26248, []byte{40}) + err := gproc.Send(23504, []byte{30}) fmt.Println(err) } diff --git a/geg/other/test.go b/geg/other/test.go index 1c91035f0..ce225f777 100644 --- a/geg/other/test.go +++ b/geg/other/test.go @@ -5,11 +5,50 @@ import ( "gitee.com/johng/gf/g/os/gfile" "gitee.com/johng/gf/g/os/gtime" "gitee.com/johng/gf/g/encoding/gbinary" + "gitee.com/johng/gf/g/os/gproc" ) +// 数据解包,防止黏包 +func bufferToMsgs(buffer []byte) []*gproc.Msg { + s := 0 + msgs := make([]*gproc.Msg, 0) + for s < len(buffer) { + length := gbinary.DecodeToInt(buffer[s : s + 4]) + if length < 0 || length > len(buffer) { + s++ + continue + } + checksum1 := gbinary.DecodeToUint32(buffer[s + 8 : s + 12]) + checksum2 := checksum(buffer[s + 12 : s + length]) + if checksum1 != checksum2 { + s++ + continue + } + msgs = append(msgs, &gproc.Msg { + Pid : gbinary.DecodeToInt(buffer[s + 4 : s + 8]), + Data : buffer[s + 12 : s + length], + }) + s += length + } + return msgs +} + +// 常见的二进制数据校验方式,生成校验结果 +func checksum(buffer []byte) uint32 { + var checksum uint32 + for _, b := range buffer { + checksum += uint32(b) + } + return checksum +} func main(){ - fmt.Println(uint8(int(300))) + b := gfile.GetBinContents("/tmp/gproc/27501") + for _, msg := range bufferToMsgs(b) { + fmt.Println(msg.Pid) + fmt.Println(msg.Data) + } + return t1 := gfile.MTime("/home/john/Workspace/Go/GOPATH/src/gitee.com/johng/gf/geg/other/test.go") t2 := gtime.Second()