diff --git a/TODO b/TODO index 7b1f2297b..fa2fbbde9 100644 --- a/TODO +++ b/TODO @@ -10,7 +10,7 @@ ON THE WAY: 9. orm增加更多数据库支持; 10. ghttp.Response增加输出内容后自动退出当前请求机制,不需要用户手动return,参考beego如何实现; 11. 当二进制参数为nil时,gjson.LoadContent并将gjson.Json对象ToMap时会报错; - +12. 改进控制器及执行对象注册,更友好地支持动态路由注册,例如:注册规则为 /channel/:name,现有的控制器及执行对象注册很难友好支持这种动态形式; DONE: diff --git a/g/net/ghttp/ghttp_server.go b/g/net/ghttp/ghttp_server.go index 871ac58ae..2541e6eec 100644 --- a/g/net/ghttp/ghttp_server.go +++ b/g/net/ghttp/ghttp_server.go @@ -223,8 +223,6 @@ func Wait() { // 开启底层Web Server执行 func (s *Server) startServer(fdMap listenerFdMap) { - // 开始执行底层Web Server创建,端口监听 - var server *gracefulServer var httpsEnabled bool if len(s.config.HTTPSCertPath) > 0 && len(s.config.HTTPSKeyPath) > 0 { // ================ @@ -249,30 +247,21 @@ func (s *Server) startServer(fdMap listenerFdMap) { if len(v) == 0 { continue } - go func(addrItem string) { - fd := 0 - addr := addrItem - array := strings.Split(addrItem, "#") - if len(array) > 1 { - addr = array[0] - // windows系统不支持文件描述符传递socket通信平滑交接,因此只能完整重启 - if runtime.GOOS != "windows" { - fd = gconv.Int(array[1]) - } + fd := 0 + addr := v + array := strings.Split(v, "#") + if len(array) > 1 { + addr = array[0] + // windows系统不支持文件描述符传递socket通信平滑交接,因此只能完整重启 + if runtime.GOOS != "windows" { + fd = gconv.Int(array[1]) } - if fd > 0 { - server = s.newGracefulServer(addr, fd) - } else { - server = s.newGracefulServer(addr) - } - s.servers = append(s.servers, server) - if err := server.ListenAndServeTLS(s.config.HTTPSCertPath, s.config.HTTPSKeyPath); err != nil { - // 如果非关闭错误,那么提示报错,否则认为是正常的服务关闭操作 - if !strings.EqualFold(http.ErrServerClosed.Error(), err.Error()) { - glog.Error(err) - } - } - }(v) + } + if fd > 0 { + s.servers = append(s.servers, s.newGracefulServer(addr, fd)) + } else { + s.servers = append(s.servers, s.newGracefulServer(addr)) + } } } // ================ @@ -292,28 +281,34 @@ func (s *Server) startServer(fdMap listenerFdMap) { if len(v) == 0 { continue } - go func(addrItem string) { - fd := 0 - addr := addrItem - array := strings.Split(addrItem, "#") - if len(array) > 1 { - addr = array[0] - // windows系统不支持文件描述符传递socket通信平滑交接,因此只能完整重启 - if runtime.GOOS != "windows" { - fd = gconv.Int(array[1]) - } + fd := 0 + addr := v + array := strings.Split(v, "#") + if len(array) > 1 { + addr = array[0] + // windows系统不支持文件描述符传递socket通信平滑交接,因此只能完整重启 + if runtime.GOOS != "windows" { + fd = gconv.Int(array[1]) } - if fd > 0 { - server = s.newGracefulServer(addr, fd) + } + if fd > 0 { + s.servers = append(s.servers, s.newGracefulServer(addr, fd)) + } else { + s.servers = append(s.servers, s.newGracefulServer(addr)) + } + } + // 开始执行异步监听 + for _, v := range s.servers { + go func(server *gracefulServer) { + var err error + if server.isHttps { + err = server.ListenAndServeTLS(s.config.HTTPSCertPath, s.config.HTTPSKeyPath) } else { - server = s.newGracefulServer(addr) + err = server.ListenAndServe() } - s.servers = append(s.servers, server) - if err := server.ListenAndServe(); err != nil { - // 如果非关闭错误,那么提示报错,否则认为是正常的服务关闭操作 - if !strings.EqualFold(http.ErrServerClosed.Error(), err.Error()) { - glog.Error(err) - } + // 如果非关闭错误,那么提示报错,否则认为是正常的服务关闭操作 + if err != nil && !strings.EqualFold(http.ErrServerClosed.Error(), err.Error()) { + glog.Error(err) } }(v) } @@ -321,6 +316,11 @@ func (s *Server) startServer(fdMap listenerFdMap) { s.status = 1 } +// 热重启Web Server +func (s *Server) Reload() { + sendProcessMsg(gproc.Pid(), gMSG_RELOAD, nil) +} + // 重启Web Server func (s *Server) Restart() { sendProcessMsg(gproc.Pid(), gMSG_RESTART, nil) diff --git a/g/net/ghttp/ghttp_server_admin.go b/g/net/ghttp/ghttp_server_admin.go index 3f5778e68..11c09ea9c 100644 --- a/g/net/ghttp/ghttp_server_admin.go +++ b/g/net/ghttp/ghttp_server_admin.go @@ -10,6 +10,7 @@ package ghttp import ( "strings" "gitee.com/johng/gf/g/os/gview" + "runtime" ) // 用于服务管理的对象 @@ -26,15 +27,26 @@ func (p *utilAdmin) Index(r *Request) { gf ghttp admin +

reload

restart

shutdown

- `, data) + `, data) r.Response.Write(buffer) } -// 服务重启 +// 服务热重启 +func (p *utilAdmin) Reload(r *Request) { + if runtime.GOOS == "windows" { + p.Restart(r) + } else { + r.Response.Write("reload server") + r.Server.Reload() + } +} + +// 服务完整重启 func (p *utilAdmin) Restart(r *Request) { r.Response.Write("restart server") r.Server.Restart() diff --git a/g/net/ghttp/ghttp_server_comm.go b/g/net/ghttp/ghttp_server_comm.go index 67af23d84..8857fb39e 100644 --- a/g/net/ghttp/ghttp_server_comm.go +++ b/g/net/ghttp/ghttp_server_comm.go @@ -10,32 +10,34 @@ package ghttp import ( "os" "gitee.com/johng/gf/g/os/gproc" - "gitee.com/johng/gf/g/os/gtime" "gitee.com/johng/gf/g/util/gconv" "gitee.com/johng/gf/g/encoding/gjson" "gitee.com/johng/gf/g/container/gtype" "gitee.com/johng/gf/g/encoding/gbinary" + "gitee.com/johng/gf/g/os/gtime" ) const ( gMSG_START = 10 - gMSG_RESTART = 20 - gMSG_SHUTDOWN = 30 - gMSG_NEW_FORK = 40 - gMSG_REMOVE_PROC = 50 - gMSG_HEARTBEAT = 60 + gMSG_RELOAD = 20 + gMSG_RESTART = 30 + gMSG_SHUTDOWN = 40 + gMSG_CLOSE = 45 + gMSG_NEW_FORK = 50 + gMSG_HEARTBEAT = 70 - gPROC_HEARTBEAT_INTERVAL = 1000 // (毫秒)进程间心跳间隔 - gPROC_HEARTBEAT_TIMEOUT = 30000 // (毫秒)进程间心跳超时时间,如果子进程在这段内没有接收到任何心跳,那么自动退出,防止可能出现的僵尸子进程 - //gPROC_MULTI_CHILD_CLEAR_INTERVAL = 1000 // (毫秒)检测间隔,当存在多个子进程时(往往是重启间隔非常短且频繁造成),需要进行清理,最终留下一个最新的子进程 - //gPROC_MULTI_CHILD_CLEAR_MIN_EXPIRE = 30000 // (毫秒)当多个子进程存在时,允许子进程进程至少运行的最小时间,超过该时间则清理 + gPROC_HEARTBEAT_INTERVAL = 1000 // (毫秒)进程间心跳间隔 + gPROC_HEARTBEAT_TIMEOUT = 3000 // (毫秒)进程间心跳超时时间,如果子进程在这段内没有接收到任何心跳,那么自动退出,防止可能出现的僵尸子进程 ) // 进程信号量监听消息队列 -var procSignalChan = make(chan os.Signal) +var procSignalChan = make(chan os.Signal) + +// 上一次进程间心跳的时间戳 +var lastUpdateTime = gtype.NewInt() // (主子进程)在第一次创建子进程成功之后才会开始心跳检测,同理对应超时时间才会生效 -var checkHeartbeat = gtype.NewBool() +var checkHeartbeat = gtype.NewBool() // 处理进程信号量监控以及进程间消息通信 func handleProcessMsgAndSignal() { @@ -44,7 +46,6 @@ func handleProcessMsgAndSignal() { go handleChildProcessHeartbeat() } else { go handleMainProcessHeartbeat() - //go handleMainProcessChildClear() } handleProcessMsg() } @@ -60,21 +61,20 @@ func handleProcessMsg() { //gfile.PutContentsAppend("/tmp/gproc-log", content) act := gbinary.DecodeToUint(msg.Data[0 : 1]) data := msg.Data[1 : ] + if msg.Pid != gproc.Pid() { + updateProcessUpdateTime() + } if gproc.IsChild() { // =============== // 子进程 // =============== - // 任何与父进程的通信都会更新最后通信时间 - if msg.Pid == gproc.PPid() { - updateProcessChildUpdateTime() - } switch act { case gMSG_START: onCommChildStart(msg.Pid, data) + case gMSG_RELOAD: onCommChildReload(msg.Pid, data) case gMSG_RESTART: onCommChildRestart(msg.Pid, data) + case gMSG_CLOSE: onCommChildClose(msg.Pid, data) case gMSG_HEARTBEAT: onCommChildHeartbeat(msg.Pid, data) - case gMSG_SHUTDOWN: - onCommChildShutdown(msg.Pid, data) - return + case gMSG_SHUTDOWN: onCommChildShutdown(msg.Pid, data) } } else { // =============== @@ -84,20 +84,12 @@ func handleProcessMsg() { if msg.Pid != gproc.Pid() { updateProcessCommTime(msg.Pid) } - if !procFirstTimeMap.Contains(msg.Pid) { - procFirstTimeMap.Set(msg.Pid, int(gtime.Millisecond())) - } switch act { case gMSG_START: onCommMainStart(msg.Pid, data) + case gMSG_RELOAD: onCommMainReload(msg.Pid, data) case gMSG_RESTART: onCommMainRestart(msg.Pid, data) case gMSG_NEW_FORK: onCommMainNewFork(msg.Pid, data) case gMSG_HEARTBEAT: onCommMainHeartbeat(msg.Pid, data) - case gMSG_REMOVE_PROC: - onCommMainRemoveProc(msg.Pid, data) - // 如果所有子进程都退出,那么主进程也主动退出 - if procManager.Size() == 0 { - return - } case gMSG_SHUTDOWN: onCommMainShutdown(msg.Pid, data) return @@ -166,4 +158,9 @@ func closeWebServers() { } } }) +} + +// 更新上一次进程间通信的时间 +func updateProcessUpdateTime() { + lastUpdateTime.Set(int(gtime.Millisecond())) } \ No newline at end of file diff --git a/g/net/ghttp/ghttp_server_comm_child.go b/g/net/ghttp/ghttp_server_comm_child.go index f567dc7d2..2839ca9e2 100644 --- a/g/net/ghttp/ghttp_server_comm_child.go +++ b/g/net/ghttp/ghttp_server_comm_child.go @@ -18,23 +18,19 @@ import ( "gitee.com/johng/gf/g/os/gtime" "gitee.com/johng/gf/g/util/gconv" "gitee.com/johng/gf/g/encoding/gjson" - "gitee.com/johng/gf/g/container/gtype" ) const ( gPROC_CHILD_MAX_IDLE_TIME = 3000 // 子进程闲置时间(未开启心跳机制的时间) ) -// (子进程)上一次从主进程接收心跳的时间戳 -var lastHeartbeatTime = gtype.NewInt() - // 心跳消息 func onCommChildHeartbeat(pid int, data []byte) { - updateProcessChildUpdateTime() + } -// 子进程收到重启消息,那么将自身的ServerFdMap信息收集后发送给主进程,由主进程进行统一调度 -func onCommChildRestart(pid int, data []byte) { +// 热重启,子进程收到重启消息,那么将自身的ServerFdMap信息收集后发送给主进程,由主进程进行统一调度 +func onCommChildReload(pid int, data []byte) { var buffer []byte = nil p := procManager.NewProcess(os.Args[0], os.Args, os.Environ()) // windows系统无法进行文件描述符操作,只能重启进程 @@ -73,18 +69,23 @@ func onCommChildRestart(pid int, data []byte) { } } -// 关闭服务链接并退出 +// 完整重启 +func onCommChildRestart(pid int, data []byte) { + sendProcessMsg(gproc.PPid(), gMSG_RESTART, nil) +} + +// 优雅关闭服务链接并退出 func onCommChildShutdown(pid int, data []byte) { - sendProcessMsg(gproc.PPid(), gMSG_REMOVE_PROC, nil) if runtime.GOOS != "windows" { shutdownWebServers() } os.Exit(0) } -// 更新上一次主进程主动与子进程通信的时间 -func updateProcessChildUpdateTime() { - lastHeartbeatTime.Set(int(gtime.Millisecond())) +// 强制性关闭服务链接并退出 +func onCommChildClose(pid int, data []byte) { + closeWebServers() + os.Exit(0) } // 主进程与子进程相互异步方式发送心跳信息,保持活跃状态 @@ -93,17 +94,15 @@ func handleChildProcessHeartbeat() { time.Sleep(gPROC_HEARTBEAT_INTERVAL*time.Millisecond) sendProcessMsg(gproc.PPid(), gMSG_HEARTBEAT, nil) // 超过时间没有接收到主进程心跳,自动关闭退出 - if checkHeartbeat.Val() && (int(gtime.Millisecond()) - lastHeartbeatTime.Val() > gPROC_HEARTBEAT_TIMEOUT) { - sendProcessMsg(gproc.Pid(), gMSG_SHUTDOWN, nil) + if checkHeartbeat.Val() && (int(gtime.Millisecond()) - lastUpdateTime.Val() > gPROC_HEARTBEAT_TIMEOUT) { // 子进程有时会无法退出(僵尸?),这里直接使用exit,而不是return //glog.Printfln("%d: %d - %d > %d", gproc.Pid(), int(gtime.Millisecond()), lastHeartbeatTime.Val(), gPROC_HEARTBEAT_TIMEOUT) - //glog.Printfln("%d: heartbeat timeout, exit", gproc.Pid()) - glog.Printfln("%d: waiting %dms for shutdown timeout, exit", gproc.Pid(), gPROC_HEARTBEAT_TIMEOUT) + glog.Printfln("%d: heartbeat timeout[%dms], exit", gproc.Pid(), gPROC_HEARTBEAT_TIMEOUT) os.Exit(0) } // 未开启心跳检测的闲置超过一定时间则主动关闭 if !checkHeartbeat.Val() && gproc.Uptime() > gPROC_CHILD_MAX_IDLE_TIME { - glog.Printfln("%d: max idle time %dms exceeded, exit", gproc.Pid(), gPROC_CHILD_MAX_IDLE_TIME) + glog.Printfln("%d: idle timeout[%dms], exit", gproc.Pid(), gPROC_CHILD_MAX_IDLE_TIME) os.Exit(0) } } diff --git a/g/net/ghttp/ghttp_server_comm_child_unix.go b/g/net/ghttp/ghttp_server_comm_child_unix.go index f112dd55f..20485b93a 100644 --- a/g/net/ghttp/ghttp_server_comm_child_unix.go +++ b/g/net/ghttp/ghttp_server_comm_child_unix.go @@ -10,6 +10,7 @@ package ghttp import ( + "os" "gitee.com/johng/gf/g/os/gproc" ) @@ -31,9 +32,12 @@ func onCommChildStart(pid int, data []byte) { sendProcessMsg(gproc.PPid(), gMSG_NEW_FORK, nil) // 如果创建自己的父进程非gproc父进程,那么表示该进程为重启创建的进程,创建成功之后需要通知父进程自行销毁 if gproc.PPidOS() != gproc.PPid() { - sendProcessMsg(gproc.PPidOS(), gMSG_SHUTDOWN, nil) + //sendProcessMsg(gproc.PPidOS(), gMSG_SHUTDOWN, nil) + if p, err := os.FindProcess(gproc.PPidOS()); err == nil { + p.Kill() + } } // 开始心跳时必须保证主进程时间有值,但是又不能等待主进程消息后再开始检测,因此这里自己更新一下通信时间 - updateProcessChildUpdateTime() + updateProcessUpdateTime() checkHeartbeat.Set(true) } \ No newline at end of file diff --git a/g/net/ghttp/ghttp_server_comm_main.go b/g/net/ghttp/ghttp_server_comm_main.go index ec0bb608d..37a4ecc72 100644 --- a/g/net/ghttp/ghttp_server_comm_main.go +++ b/g/net/ghttp/ghttp_server_comm_main.go @@ -13,12 +13,9 @@ import ( "time" "gitee.com/johng/gf/g/os/gtime" "gitee.com/johng/gf/g/container/gmap" + "gitee.com/johng/gf/g/os/gproc" ) -// (主进程)子进程与主进程第一次通信的时间映射map -// 用以识别子进程创建时间先后顺序,当存在多个子进程时,主动销毁旧的子进程 -var procFirstTimeMap = gmap.NewIntIntMap() - // (主进程)主进程与子进程上一次活跃时间映射map var procUpdateTimeMap = gmap.NewIntIntMap() @@ -34,10 +31,23 @@ func onCommMainHeartbeat(pid int, data []byte) { updateProcessCommTime(pid) } -// 重启服务 -func onCommMainRestart(pid int, data []byte) { +// 热重启服务 +func onCommMainReload(pid int, data []byte) { // 向所有子进程发送重启命令,子进程将会搜集Web Server信息发送给父进程进行协调重启工作 - procManager.Send(formatMsgBuffer(gMSG_RESTART, nil)) + procManager.Send(formatMsgBuffer(gMSG_RELOAD, nil)) +} + +// 完整重启服务 +func onCommMainRestart(pid int, data []byte) { + if pid == gproc.Pid() { + procManager.Send(formatMsgBuffer(gMSG_RESTART, nil)) + return + } + if p, _ := os.FindProcess(pid); p != nil { + p.Kill() + p.Wait() + } + sendProcessMsg(gproc.Pid(), gMSG_START, nil) } // 新建子进程通知 @@ -46,14 +56,11 @@ func onCommMainNewFork(pid int, data []byte) { checkHeartbeat.Set(true) } -// 销毁子进程通知 -func onCommMainRemoveProc(pid int, data []byte) { - procManager.RemoveProcess(pid) -} - // 关闭服务,通知所有子进程退出 func onCommMainShutdown(pid int, data []byte) { - procManager.Send(formatMsgBuffer(gMSG_SHUTDOWN, nil)) + //procManager.Send(formatMsgBuffer(gMSG_SHUTDOWN, nil)) + procManager.KillAll() + procManager.WaitAll() } // 更新指定进程的通信时间记录 @@ -72,30 +79,13 @@ func handleMainProcessHeartbeat() { if int(gtime.Millisecond()) - procUpdateTimeMap.Get(pid) > gPROC_HEARTBEAT_TIMEOUT { // 这里需要手动从进程管理器中去掉该进程 procManager.RemoveProcess(pid) - sendProcessMsg(pid, gMSG_SHUTDOWN, nil) + sendProcessMsg(pid, gMSG_CLOSE, nil) } } } + // 如果所有子进程都退出,并且达到超时时间,那么主进程也没存在的必要 + if procManager.Size() == 0 && int(gtime.Millisecond()) - lastUpdateTime.Val() > gPROC_HEARTBEAT_TIMEOUT{ + os.Exit(0) + } } } - -// 清理多余的子进程 -//func handleMainProcessChildClear() { -// for { -// time.Sleep(gPROC_MULTI_CHILD_CLEAR_INTERVAL*time.Millisecond) -// if procManager.Size() > 1 { -// minPid := 0 -// minTime := int(gtime.Millisecond()) -// for _, pid := range procManager.Pids() { -// if t := procFirstTimeMap.Get(pid); t < minTime { -// minPid = pid -// minTime = t -// } -// } -// if minPid > 0 && procUpdateTimeMap.Get(minPid) - procFirstTimeMap.Get(minPid) > gPROC_MULTI_CHILD_CLEAR_MIN_EXPIRE { -// sendProcessMsg(minPid, gMSG_SHUTDOWN, nil) -// glog.Printfln("%d: multi child occurred, shutdown %d", gproc.Pid(), minPid) -// } -// } -// } -//} \ No newline at end of file diff --git a/g/net/ghttp/ghttp_server_comm_signal_unix.go b/g/net/ghttp/ghttp_server_comm_signal_unix.go index af56d9356..760212c07 100644 --- a/g/net/ghttp/ghttp_server_comm_signal_unix.go +++ b/g/net/ghttp/ghttp_server_comm_signal_unix.go @@ -26,6 +26,7 @@ func handleProcessSignal() { syscall.SIGHUP, syscall.SIGTERM, syscall.SIGUSR1, + syscall.SIGUSR2, ) for { sig = <- procSignalChan @@ -33,12 +34,14 @@ func handleProcessSignal() { // 进程终止,停止所有子进程运行 case syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGTERM: sendProcessMsg(gproc.Pid(), gMSG_SHUTDOWN, nil) - // 强制性kill掉所有子进程 - procManager.KillAll() return - // 用户信号,重启服务 + // 用户信号,热重启服务 case syscall.SIGUSR1: + sendProcessMsg(gproc.Pid(), gMSG_RELOAD, nil) + + // 用户信号,完整重启服务 + case syscall.SIGUSR2: sendProcessMsg(gproc.Pid(), gMSG_RESTART, nil) default: diff --git a/g/net/ghttp/ghttp_server_graceful.go b/g/net/ghttp/ghttp_server_graceful.go index d6d2160be..8d93af987 100644 --- a/g/net/ghttp/ghttp_server_graceful.go +++ b/g/net/ghttp/ghttp_server_graceful.go @@ -18,6 +18,10 @@ import ( "time" ) +const ( + gGRACEFUL_SHUTDOWN_TIMEOUT = 10*time.Second // 优雅关闭链接时的超时时间 +) + // 优雅的Web Server对象封装 type gracefulServer struct { fd uintptr @@ -156,12 +160,10 @@ func (s *gracefulServer) getNetListener(addr string) (net.Listener, error) { // 执行请求优雅关闭 func (s *gracefulServer) shutdown() { - ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) - defer cancel() - if err := s.httpServer.Shutdown(ctx); err != nil { + if err := s.httpServer.Shutdown(context.Background()); err != nil { glog.Errorfln("%d: %s server [%s] shutdown error: %v", gproc.Pid(), s.getProto(), s.addr, err) } else { - glog.Printfln("%d: %s server [%s] shutdown smoothly", gproc.Pid(), s.getProto(), s.addr) + //glog.Printfln("%d: %s server [%s] shutdown smoothly", gproc.Pid(), s.getProto(), s.addr) s.shutdownChan <- true } } @@ -171,7 +173,7 @@ func (s *gracefulServer) close() { if err := s.httpServer.Close(); err != nil { glog.Errorfln("%d: %s server [%s] closed error: %v", gproc.Pid(), s.getProto(), s.addr, err) } else { - glog.Printfln("%d: %s server [%s] closed smoothly", gproc.Pid(), s.getProto(), s.addr) + //glog.Printfln("%d: %s server [%s] closed smoothly", gproc.Pid(), s.getProto(), s.addr) s.shutdownChan <- true } } diff --git a/g/os/gproc/gproc_comm.go b/g/os/gproc/gproc_comm.go index 2e9a101c5..9ef920e3d 100644 --- a/g/os/gproc/gproc_comm.go +++ b/g/os/gproc/gproc_comm.go @@ -9,6 +9,7 @@ package gproc import ( "os" "fmt" + "time" "gitee.com/johng/gf/g/os/glog" "gitee.com/johng/gf/g/os/gfile" "gitee.com/johng/gf/g/os/gflock" @@ -21,7 +22,9 @@ import ( const ( // 由于子进程的temp dir有可能会和父进程不一致(特别是windows下),影响进程间通信,这里统一使用环境变量设置 - gPROC_TEMP_DIR_ENV_KEY = "gproc.tempdir" + gPROC_TEMP_DIR_ENV_KEY = "gproc.tempdir" + // 自动通信文件清理时间间隔 + gPROC_COMM_AUTO_CLEAR_INTERVAL = time.Second ) // 当前进程的文件锁 @@ -69,6 +72,23 @@ func init() { if err != nil { glog.Error(err) } + + go autoClearCommDir() +} + +// 自动清理通信目录文件 +// @todo 目前是以时间过期规则进行清理,后期可以考虑加入进程存在性判断 +func autoClearCommDir() { + dirPath := getCommDirPath() + for { + time.Sleep(gPROC_COMM_AUTO_CLEAR_INTERVAL) + for _, name := range gfile.ScanDir(dirPath) { + path := dirPath + gfile.Separator + name + if gtime.Second() - gfile.MTime(path) >= 10 { + gfile.Remove(path) + } + } + } } // 手动检查进程通信消息,如果存在消息曾推送到进程消息队列 @@ -97,6 +117,10 @@ func Receive() *Msg { // 向指定gproc进程发送数据 // 数据格式:总长度(32bit) | PID(32bit) | 校验(32bit) | 参数(变长) func Send(pid int, data interface{}) error { + // 首先检测进程存在不存在,存在才能发送消息 + if _, err := os.FindProcess(pid); err != nil { + return err + } buffer := gconv.Bytes(data) b := make([]byte, 0) b = append(b, gbinary.EncodeInt32(int32(len(buffer) + 12))...) @@ -112,11 +136,16 @@ func Send(pid int, data interface{}) error { // 获取指定进程的通信文件地址 func getCommFilePath(pid int) string { + return getCommDirPath() + gfile.Separator + gconv.String(pid) +} + +// 获取进程间通信目录地址 +func getCommDirPath() string { tempDir := os.Getenv("gproc.tempdir") if tempDir == "" { tempDir = gfile.TempDir() } - return tempDir + gfile.Separator + "gproc" + gfile.Separator + gconv.String(pid) + return tempDir + gfile.Separator + "gproc" } // 数据解包,防止黏包 diff --git a/geg/other/test.go b/geg/other/test.go index 19b906338..57b793a10 100644 --- a/geg/other/test.go +++ b/geg/other/test.go @@ -6,7 +6,8 @@ import ( "gitee.com/johng/gf/g/os/gtime" "gitee.com/johng/gf/g/encoding/gbinary" "gitee.com/johng/gf/g/os/gproc" - "strings" + "os" + "syscall" ) // 数据解包,防止黏包 @@ -44,7 +45,8 @@ func checksum(buffer []byte) uint32 { } func main(){ - fmt.Println(len(strings.Split("", ","))) + p, _ := os.FindProcess(10354) + fmt.Println(p.Signal(syscall.Signal(1))) return b := gfile.GetBinContents("/tmp/gproc/30588") for _, msg := range bufferToMsgs(b) {