diff --git a/g/net/ghttp/ghttp_server.go b/g/net/ghttp/ghttp_server.go index fc81460ce..b85d7dfa3 100644 --- a/g/net/ghttp/ghttp_server.go +++ b/g/net/ghttp/ghttp_server.go @@ -23,6 +23,7 @@ import ( "gitee.com/johng/gf/g/container/gqueue" "gitee.com/johng/gf/g/os/gspath" "gitee.com/johng/gf/g/os/gfile" + "gitee.com/johng/gf/g/os/genv" ) const ( @@ -116,23 +117,8 @@ var doneChan = make(chan struct{}, 100000) // Web Server进程初始化 func init() { - go func() { - // 等待ready消息(Run方法调用) - <- readyChan - // 主进程只负责创建子进程 - if !gproc.IsChild() { - sendProcessMsg(os.Getpid(), gMSG_START, nil) - } - // 开启进程消息监听处理 - handleProcessMsgAndSignal() - - // 服务执行完成,需要退出 - doneChan <- struct{}{} - - if !gproc.IsChild() { - glog.Printfln("%d: all servers shutdown", gproc.Pid()) - } - }() + // 信号量管理操作监听 + go handleProcessSignal() } // 获取/创建一个默认配置的HTTP Server(默认监听端口是80) @@ -198,11 +184,6 @@ func (s *Server) Start() error { } } - // 主进程,不执行任何业务,只负责进程管理 - if !gproc.IsChild() { - return nil - } - if s.status == 1 { return errors.New("server is already running") } @@ -210,6 +191,22 @@ func (s *Server) Start() error { if s.config.Handler == nil { s.config.Handler = http.HandlerFunc(s.defaultHttpHandle) } + + // 启动http server + fdMapStr := genv.Get(gADMIN_ACTION_RELOAD_ENVKEY) + if len(fdMapStr) > 0 { + sfm := bufferToServerFdMap([]byte(fdMapStr)) + for k, v := range sfm { + GetServer(k).startServer(v) + } + } else { + serverMapping.RLockFunc(func(m map[string]interface{}) { + for _, v := range m { + v.(*Server).startServer(nil) + } + }) + } + // 开启异步关闭队列处理循环 s.startCloseQueueLoop() return nil @@ -220,10 +217,10 @@ func (s *Server) Run() error { if err := s.Start(); err != nil { return err } - // Web Server准备就绪,待执行 - readyChan <- struct{}{} // 阻塞等待服务执行完成 <- doneChan + + glog.Printfln("%d: all servers shutdown", gproc.Pid()) return nil } @@ -231,8 +228,10 @@ func (s *Server) Run() error { // 阻塞等待所有Web Server停止,常用于多Web Server场景,以及需要将Web Server异步运行的场景 // 这是一个与进程相关的方法 func Wait() { - readyChan <- struct{}{} + // 阻塞等待服务执行完成 <- doneChan + + glog.Printfln("%d: all servers shutdown", gproc.Pid()) } diff --git a/g/net/ghttp/ghttp_server_admin.go b/g/net/ghttp/ghttp_server_admin.go index 8e06fb913..f97936902 100644 --- a/g/net/ghttp/ghttp_server_admin.go +++ b/g/net/ghttp/ghttp_server_admin.go @@ -18,6 +18,9 @@ import ( "fmt" "gitee.com/johng/gf/g/container/gtype" "gitee.com/johng/gf/g/os/glog" + "os" + "gitee.com/johng/gf/g/encoding/gjson" + "gitee.com/johng/gf/g/util/gconv" ) const ( @@ -25,6 +28,8 @@ const ( gADMIN_ACTION_RELOADING = 1 gADMIN_ACTION_RESTARTING = 2 gADMIN_ACTION_SHUTINGDOWN = 4 + gADMIN_ACTION_RELOAD_ENVKEY = "gf.server.reload" + gADMIN_ACTION_RESTART_ENVKEY = "gf.server.restart" ) // 用于服务管理的对象 @@ -113,7 +118,10 @@ func (s *Server) Reload() error { } serverProcessStatus.Set(gADMIN_ACTION_RELOADING) glog.Printfln("%d: server reloading", gproc.Pid()) - return sendProcessMsg(gproc.Pid(), gMSG_RELOAD, nil) + forkReloadProcess() + go shutdownWebServers() + doneChan <- struct{}{} + return nil } // 完整重启Web Server @@ -128,7 +136,8 @@ func (s *Server) Restart() error { } serverProcessStatus.Set(gADMIN_ACTION_RESTARTING) glog.Printfln("%d: server restarting", gproc.Pid()) - return sendProcessMsg(gproc.Pid(), gMSG_RESTART, nil) + doneChan <- struct{}{} + return nil } // 关闭Web Server @@ -143,7 +152,9 @@ func (s *Server) Shutdown() error { } serverProcessStatus.Set(gADMIN_ACTION_SHUTINGDOWN) glog.Printfln("%d: server shutting down", gproc.Pid()) - return sendProcessMsg(gproc.PPid(), gMSG_SHUTDOWN, nil) + go closeWebServers() + doneChan <- struct{}{} + return nil } // 检测当前操作的频繁度 @@ -170,4 +181,86 @@ func (s *Server) checkActionStatus() error { } } return nil +} + +// 创建一个子进程,通过环境变量传参 +func forkReloadProcess() { + p := procManager.NewProcess(os.Args[0], os.Args, os.Environ()) + // 创建新的服务进程,子进程自动从父进程复制文件描述来监听同样的端口 + sfm := getServerFdMap() + // 将sfm中的fd按照子进程创建时的文件描述符顺序进行整理,以便子进程获取到正确的fd + for name, m := range sfm { + for fdk, fdv := range m { + if len(fdv) > 0 { + s := "" + for _, item := range strings.Split(fdv, ",") { + array := strings.Split(item, "#") + fd := uintptr(gconv.Uint(array[1])) + if fd > 0 { + s += fmt.Sprintf("%s#%d,", array[0], 3 + len(p.ExtraFiles)) + p.ExtraFiles = append(p.ExtraFiles, os.NewFile(fd, "")) + } else { + s += fmt.Sprintf("%s#%d,", array[0], 0) + } + } + sfm[name][fdk] = strings.TrimRight(s, ",") + } + } + } + buffer, _ := gjson.Encode(sfm) + p.Env = append(p.Env, fmt.Sprintf("%s=%s", gADMIN_ACTION_RELOAD_ENVKEY, string(buffer))) + if _, err := p.Start(); err != nil { + glog.Errorfln("%d: fork process failed, error:%s, %s", gproc.Pid(), err.Error(), string(buffer)) + } +} + +// 获取所有Web Server的文件描述符map +func getServerFdMap() map[string]listenerFdMap { + sfm := make(map[string]listenerFdMap) + serverMapping.RLockFunc(func(m map[string]interface{}) { + for k, v := range m { + sfm[k] = v.(*Server).getListenerFdMap() + } + }) + return sfm +} + +// 二进制转换为FdMap +func bufferToServerFdMap(buffer []byte) map[string]listenerFdMap { + sfm := make(map[string]listenerFdMap) + if len(buffer) > 0 { + j, _ := gjson.LoadContent(buffer, "json") + for k, _ := range j.ToMap() { + m := make(map[string]string) + for k, v := range j.GetMap(k) { + m[k] = gconv.String(v) + } + sfm[k] = m + } + } + return sfm +} + +// 关优雅闭进程所有端口的Web Server服务 +// 注意,只是关闭Web Server服务,并不是退出进程 +func shutdownWebServers() { + serverMapping.RLockFunc(func(m map[string]interface{}) { + for _, v := range m { + for _, s := range v.(*Server).servers { + s.shutdown() + } + } + }) +} + +// 强制关闭进程所有端口的Web Server服务 +// 注意,只是关闭Web Server服务,并不是退出进程 +func closeWebServers() { + serverMapping.RLockFunc(func(m map[string]interface{}) { + for _, v := range m { + for _, s := range v.(*Server).servers { + s.close() + } + } + }) } \ No newline at end of file diff --git a/g/net/ghttp/ghttp_server_comm_signal_unix.go b/g/net/ghttp/ghttp_server_admin_unix.go similarity index 56% rename from g/net/ghttp/ghttp_server_comm_signal_unix.go rename to g/net/ghttp/ghttp_server_admin_unix.go index 0c8316419..6c8902822 100644 --- a/g/net/ghttp/ghttp_server_comm_signal_unix.go +++ b/g/net/ghttp/ghttp_server_admin_unix.go @@ -12,9 +12,11 @@ import ( "os" "syscall" "os/signal" - "gitee.com/johng/gf/g/os/gproc" ) +// 进程信号量监听消息队列 +var procSignalChan = make(chan os.Signal) + // 信号量处理 func handleProcessSignal() { var sig os.Signal @@ -31,23 +33,20 @@ func handleProcessSignal() { for { sig = <- procSignalChan switch sig { - // 进程终止,停止所有子进程运行 - case syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGTERM: - sendProcessMsg(gproc.Pid(), gMSG_SHUTDOWN, nil) - if gproc.IsChild() { - sendProcessMsg(gproc.PPid(), gMSG_SHUTDOWN, nil) - } - return + // 进程终止,停止所有子进程运行 + case syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGTERM: + + return // 用户信号,热重启服务 - case syscall.SIGUSR1: - sendProcessMsg(gproc.Pid(), gMSG_RELOAD, nil) + case syscall.SIGUSR1: + // 用户信号,完整重启服务 - case syscall.SIGUSR2: - sendProcessMsg(gproc.Pid(), gMSG_RESTART, nil) + case syscall.SIGUSR2: - default: + + default: } } } \ No newline at end of file diff --git a/g/net/ghttp/ghttp_server_comm_signal_windows.go b/g/net/ghttp/ghttp_server_admin_windows.go similarity index 100% rename from g/net/ghttp/ghttp_server_comm_signal_windows.go rename to g/net/ghttp/ghttp_server_admin_windows.go diff --git a/g/net/ghttp/ghttp_server_comm.go b/g/net/ghttp/ghttp_server_comm.go deleted file mode 100644 index 0f0082e06..000000000 --- a/g/net/ghttp/ghttp_server_comm.go +++ /dev/null @@ -1,178 +0,0 @@ -// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. -// -// This Source Code Form is subject to the terms of the MIT License. -// If a copy of the MIT was not distributed with this file, -// You can obtain one at https://gitee.com/johng/gf. -// Web Server进程间通信 - -package ghttp - -import ( - "os" - "gitee.com/johng/gf/g/os/gproc" - "gitee.com/johng/gf/g/util/gconv" - "gitee.com/johng/gf/g/encoding/gjson" - "gitee.com/johng/gf/g/container/gtype" - "gitee.com/johng/gf/g/encoding/gbinary" - "gitee.com/johng/gf/g/os/gtime" - "time" -) - -const ( - gMSG_START = 1 - gMSG_RELOAD = 2 - gMSG_RESTART = 3 - gMSG_SHUTDOWN = 4 - gMSG_CLOSE = 5 - gMSG_NEW_FORK = 6 - gMSG_HEARTBEAT = 7 - - gPROC_FAILURE_RETRY_COUNT = 3 // 发送消息失败重试次数 - gPROC_FAILURE_RETRY_INTERVAL = 500 // (毫秒)发送消息失败时重试间隔 - gPROC_HEARTBEAT_INTERVAL = 1000 // (毫秒)进程间心跳间隔 - gPROC_HEARTBEAT_TIMEOUT = 3000 // (毫秒)进程间心跳超时时间,如果子进程在这段内没有接收到任何心跳,那么自动退出,防止可能出现的僵尸子进程 -) - -// 进程信号量监听消息队列 -var procSignalChan = make(chan os.Signal) - -// 上一次进程间心跳的时间戳 -var lastUpdateTime = gtype.NewInt(int(gtime.Millisecond())) - -// (主子进程)在第一次创建子进程成功之后才会开始心跳检测,同理对应超时时间才会生效 -var checkHeartbeat = gtype.NewBool() - -// 处理进程信号量监控以及进程间消息通信 -func handleProcessMsgAndSignal() { - go handleProcessSignal() - if gproc.IsChild() { - go handleChildProcessHeartbeat() - } else { - go handleMainProcessHeartbeat() - } - handleProcessMsg() -} - -// 处理进程间消息 -// 数据格式: 操作(8bit) | 参数(变长) -func handleProcessMsg() { - for { - if msg := gproc.Receive(); msg != nil { - // 记录消息日志,用于调试 - //content := gconv.String(msg.Pid) + "=>" + gconv.String(gproc.Pid()) + ":" + fmt.Sprintf("%v\n", msg.Data) - //glog.Print(content) - //gfile.PutContentsAppend("/tmp/gproc-log", content) - act := gbinary.DecodeToUint(msg.Data[0 : 1]) - data := msg.Data[1 : ] - if msg.Pid != gproc.Pid() { - updateProcessUpdateTime() - } - if gproc.IsChild() { - // =============== - // 子进程 - // =============== - switch act { - case gMSG_START: onCommChildStart(msg.Pid, data) - case gMSG_RELOAD: onCommChildReload(msg.Pid, data) - case gMSG_RESTART: onCommChildRestart(msg.Pid, data) - case gMSG_CLOSE: onCommChildClose(msg.Pid, data) - case gMSG_HEARTBEAT: onCommChildHeartbeat(msg.Pid, data) - case gMSG_SHUTDOWN: onCommChildShutdown(msg.Pid, data) - } - } else { - // =============== - // 父进程 - // =============== - // 任何进程消息都会自动更新最后通信时间记录 - if msg.Pid != gproc.Pid() { - updateProcessCommTime(msg.Pid) - } - switch act { - case gMSG_START: onCommMainStart(msg.Pid, data) - case gMSG_RELOAD: onCommMainReload(msg.Pid, data) - case gMSG_RESTART: onCommMainRestart(msg.Pid, data) - case gMSG_NEW_FORK: onCommMainNewFork(msg.Pid, data) - case gMSG_HEARTBEAT: onCommMainHeartbeat(msg.Pid, data) - case gMSG_SHUTDOWN: - onCommMainShutdown(msg.Pid, data) - return - } - } - } - } -} - -// 向进程发送操作消息 -func sendProcessMsg(pid int, act int, data []byte) error { - var err error - for i := gPROC_FAILURE_RETRY_COUNT; i > 0; i-- { - if err = gproc.Send(pid, formatMsgBuffer(act, data)); err != nil { - time.Sleep(gPROC_FAILURE_RETRY_INTERVAL*time.Millisecond) - } else { - break - } - } - //glog.Printfln("%d=>%d, %d, %v", gproc.Pid(), pid, act, err) - return err -} - -// 生成一条满足Web Server进程通信协议的消息 -func formatMsgBuffer(act int, data []byte) []byte { - return append(gbinary.EncodeUint8(uint8(act)), data...) -} - -// 获取所有Web Server的文件描述符map -func getServerFdMap() map[string]listenerFdMap { - sfm := make(map[string]listenerFdMap) - serverMapping.RLockFunc(func(m map[string]interface{}) { - for k, v := range m { - sfm[k] = v.(*Server).getListenerFdMap() - } - }) - return sfm -} - -// 二进制转换为FdMap -func bufferToServerFdMap(buffer []byte) map[string]listenerFdMap { - sfm := make(map[string]listenerFdMap) - if len(buffer) > 0 { - j, _ := gjson.LoadContent(buffer, "json") - for k, _ := range j.ToMap() { - m := make(map[string]string) - for k, v := range j.GetMap(k) { - m[k] = gconv.String(v) - } - sfm[k] = m - } - } - return sfm -} - -// 关优雅闭进程所有端口的Web Server服务 -// 注意,只是关闭Web Server服务,并不是退出进程 -func shutdownWebServers() { - serverMapping.RLockFunc(func(m map[string]interface{}) { - for _, v := range m { - for _, s := range v.(*Server).servers { - s.shutdown() - } - } - }) -} - -// 强制关闭进程所有端口的Web Server服务 -// 注意,只是关闭Web Server服务,并不是退出进程 -func closeWebServers() { - serverMapping.RLockFunc(func(m map[string]interface{}) { - for _, v := range m { - for _, s := range v.(*Server).servers { - s.close() - } - } - }) -} - -// 更新上一次进程间通信的时间 -func updateProcessUpdateTime() { - lastUpdateTime.Set(int(gtime.Millisecond())) -} \ No newline at end of file diff --git a/g/net/ghttp/ghttp_server_comm_child.go b/g/net/ghttp/ghttp_server_comm_child.go deleted file mode 100644 index bd60638ce..000000000 --- a/g/net/ghttp/ghttp_server_comm_child.go +++ /dev/null @@ -1,109 +0,0 @@ -// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. -// -// This Source Code Form is subject to the terms of the MIT License. -// If a copy of the MIT was not distributed with this file, -// You can obtain one at https://gitee.com/johng/gf. -// Web Server进程间通信 - 子进程 - -package ghttp - -import ( - "os" - "fmt" - "time" - "strings" - "runtime" - "gitee.com/johng/gf/g/os/glog" - "gitee.com/johng/gf/g/os/gproc" - "gitee.com/johng/gf/g/os/gtime" - "gitee.com/johng/gf/g/util/gconv" - "gitee.com/johng/gf/g/encoding/gjson" -) - -const ( - gPROC_CHILD_MAX_IDLE_TIME = 10000 // 子进程闲置时间(未开启心跳机制的时间) -) - -// 心跳处理(方法为空,逻辑放到公共通信switch中进行处理) -func onCommChildHeartbeat(pid int, data []byte) { - -} - -// 平滑重启,子进程收到重启消息,那么将自身的ServerFdMap信息收集后发送给主进程,由主进程进行统一调度 -func onCommChildReload(pid int, data []byte) { - var buffer []byte = nil - p := procManager.NewProcess(os.Args[0], os.Args, os.Environ()) - // windows系统无法进行文件描述符操作,只能重启进程 - if runtime.GOOS == "windows" { - // windows下使用shutdown会造成协程阻塞,这里直接使用close强制关闭 - closeWebServers() - } else { - // 创建新的服务进程,子进程自动从父进程复制文件描述来监听同样的端口 - sfm := getServerFdMap() - // 将sfm中的fd按照子进程创建时的文件描述符顺序进行整理,以便子进程获取到正确的fd - for name, m := range sfm { - for fdk, fdv := range m { - if len(fdv) > 0 { - s := "" - for _, item := range strings.Split(fdv, ",") { - array := strings.Split(item, "#") - fd := uintptr(gconv.Uint(array[1])) - if fd > 0 { - s += fmt.Sprintf("%s#%d,", array[0], 3 + len(p.ExtraFiles)) - p.ExtraFiles = append(p.ExtraFiles, os.NewFile(fd, "")) - } else { - s += fmt.Sprintf("%s#%d,", array[0], 0) - } - } - sfm[name][fdk] = strings.TrimRight(s, ",") - } - } - } - buffer, _ = gjson.Encode(sfm) - } - p.PPid = gproc.PPid() - if newPid, err := p.Start(); err == nil { - sendProcessMsg(newPid, gMSG_START, buffer) - } else { - glog.Errorfln("%d: fork process failed, error:%s, %s", gproc.Pid(), err.Error(), string(buffer)) - } -} - -// 完整重启 -func onCommChildRestart(pid int, data []byte) { - sendProcessMsg(gproc.PPid(), gMSG_RESTART, nil) -} - -// 优雅关闭服务链接并退出 -func onCommChildShutdown(pid int, data []byte) { - if runtime.GOOS != "windows" { - shutdownWebServers() - } - os.Exit(0) -} - -// 强制性关闭服务链接并退出 -func onCommChildClose(pid int, data []byte) { - closeWebServers() - os.Exit(0) -} - -// 主进程与子进程相互异步方式发送心跳信息,保持活跃状态 -func handleChildProcessHeartbeat() { - for { - time.Sleep(gPROC_HEARTBEAT_INTERVAL*time.Millisecond) - sendProcessMsg(gproc.PPid(), gMSG_HEARTBEAT, nil) - // 超过时间没有接收到主进程心跳,自动关闭退出 - if checkHeartbeat.Val() && (int(gtime.Millisecond()) - lastUpdateTime.Val() > gPROC_HEARTBEAT_TIMEOUT) { - // 子进程有时会无法退出(僵尸?),这里直接使用exit,而不是return - glog.Printfln("%d: %d - %d > %d", gproc.Pid(), int(gtime.Millisecond()), lastUpdateTime.Val(), gPROC_HEARTBEAT_TIMEOUT) - glog.Printfln("%d: heartbeat timeout[%dms], exit", gproc.Pid(), gPROC_HEARTBEAT_TIMEOUT) - os.Exit(0) - } - // 未开启心跳检测的闲置超过一定时间则主动关闭 - if !checkHeartbeat.Val() && gproc.Uptime() > gPROC_CHILD_MAX_IDLE_TIME { - glog.Printfln("%d: idle timeout[%dms], exit", gproc.Pid(), gPROC_CHILD_MAX_IDLE_TIME) - os.Exit(0) - } - } -} \ No newline at end of file diff --git a/g/net/ghttp/ghttp_server_comm_child_unix.go b/g/net/ghttp/ghttp_server_comm_child_unix.go deleted file mode 100644 index 7d8d84988..000000000 --- a/g/net/ghttp/ghttp_server_comm_child_unix.go +++ /dev/null @@ -1,43 +0,0 @@ -// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. -// -// This Source Code Form is subject to the terms of the MIT License. -// If a copy of the MIT was not distributed with this file, -// You can obtain one at https://gitee.com/johng/gf. -// Web Server进程间通信 - 子进程 - -// +build !windows - -package ghttp - -import ( - "os" - "gitee.com/johng/gf/g/os/gproc" -) - -// 开启所有Web Server(根据消息启动) -func onCommChildStart(pid int, data []byte) { - if len(data) > 0 { - sfm := bufferToServerFdMap(data) - for k, v := range sfm { - GetServer(k).startServer(v) - } - } else { - serverMapping.RLockFunc(func(m map[string]interface{}) { - for _, v := range m { - v.(*Server).startServer(nil) - } - }) - } - // 进程创建成功之后(开始执行服务时间点为准),通知主进程自身的存在,并开始执行心跳机制 - sendProcessMsg(gproc.PPid(), gMSG_NEW_FORK, nil) - // 如果创建自己的父进程非gproc父进程,那么表示该进程为重启创建的进程,创建成功之后需要通知父进程自行销毁 - if gproc.PPidOS() != gproc.PPid() { - //如果子进程已经继承了父进程的socket文件描述符,那么父进程没有存在的必要,直接kill掉 - if p, err := os.FindProcess(gproc.PPidOS()); err == nil { - p.Kill() - } - } - // 开始心跳时必须保证主进程时间有值,但是又不能等待主进程消息后再开始检测,因此这里自己更新一下通信时间 - updateProcessUpdateTime() - checkHeartbeat.Set(true) -} \ No newline at end of file diff --git a/g/net/ghttp/ghttp_server_comm_child_windows.go b/g/net/ghttp/ghttp_server_comm_child_windows.go deleted file mode 100644 index a28027441..000000000 --- a/g/net/ghttp/ghttp_server_comm_child_windows.go +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. -// -// This Source Code Form is subject to the terms of the MIT License. -// If a copy of the MIT was not distributed with this file, -// You can obtain one at https://gitee.com/johng/gf. - -package ghttp - -import ( - "gitee.com/johng/gf/g/os/gproc" -) - -// 开启所有Web Server(根据消息启动) -func onCommChildStart(pid int, data []byte) { - // 进程创建成功之后(开始执行服务时间点为准),通知主进程自身的存在,并开始执行心跳机制 - sendProcessMsg(gproc.PPid(), gMSG_NEW_FORK, nil) - // 开启Web Server服务 - serverMapping.RLockFunc(func(m map[string]interface{}) { - for _, v := range m { - v.(*Server).startServer(nil) - } - }) - // 开始心跳时必须保证主进程时间有值,但是又不能等待主进程消息后再开始检测,因此这里自己更新一下通信时间 - updateProcessUpdateTime() - checkHeartbeat.Set(true) -} \ No newline at end of file diff --git a/g/net/ghttp/ghttp_server_comm_main.go b/g/net/ghttp/ghttp_server_comm_main.go deleted file mode 100644 index 7b9b1e4a6..000000000 --- a/g/net/ghttp/ghttp_server_comm_main.go +++ /dev/null @@ -1,119 +0,0 @@ -// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. -// -// This Source Code Form is subject to the terms of the MIT License. -// If a copy of the MIT was not distributed with this file, -// You can obtain one at https://gitee.com/johng/gf. -// Web Server进程间通信 - 主进程. -// 管理子进程按照规则听话玩,不然有一百种方法让子进程在本地混不下去. - -package ghttp - -import ( - "os" - "time" - "gitee.com/johng/gf/g/os/glog" - "gitee.com/johng/gf/g/os/gtime" - "gitee.com/johng/gf/g/os/gproc" - "gitee.com/johng/gf/g/container/gmap" -) - -// (主进程)主进程与子进程上一次活跃时间映射map -var procUpdateTimeMap = gmap.NewIntIntMap() - -// 开启服务 -func onCommMainStart(pid int, data []byte) { - fork := forkNewProcess() - if fork == nil { - os.Exit(1) - } - updateProcessCommTime(fork.Pid()) - // 子进程创建成功之后再发送执行命令 - sendProcessMsg(fork.Pid(), gMSG_START, nil) -} - -// 心跳处理(方法为空,逻辑放到公共通信switch中进行处理) -func onCommMainHeartbeat(pid int, data []byte) { - -} - -// 平滑重启服务 -func onCommMainReload(pid int, data []byte) { - procManager.Send(formatMsgBuffer(gMSG_RELOAD, nil)) -} - -// 完整重启服务 -func onCommMainRestart(pid int, data []byte) { - // 如果是父进程自身发送的重启指令,那么通知所有子进程重启 - if pid == gproc.Pid() { - procManager.Send(formatMsgBuffer(gMSG_RESTART, nil)) - return - } - // 首先创建子进程,暂时不开始服务,否则会有端口冲突 - fork := forkNewProcess() - if fork == nil { - os.Exit(1) - } - // 然后通知旧的子进程自动关闭并退出(不需要新建的子进程来处理) - sendProcessMsg(pid, gMSG_CLOSE, nil) - if p, err := os.FindProcess(pid); err == nil && p != nil { - p.Kill() - p.Wait() - } - // 通知新的子进程执行服务监听 - sendProcessMsg(fork.Pid(), gMSG_START, nil) -} - -// 新建子进程通知 -func onCommMainNewFork(pid int, data []byte) { - procManager.AddProcess(pid) - checkHeartbeat.Set(true) -} - -// 关闭服务,通知所有子进程退出(Kill强制性退出) -func onCommMainShutdown(pid int, data []byte) { - procManager.Send(formatMsgBuffer(gMSG_CLOSE, nil)) - procManager.KillAll() - procManager.WaitAll() -} - -// 更新指定进程的通信时间记录 -func updateProcessCommTime(pid int) { - procUpdateTimeMap.Set(pid, int(gtime.Millisecond())) -} - -// 创建一个子进程,但是暂时不执行服务监听 -func forkNewProcess() *gproc.Process { - p := procManager.NewProcess(os.Args[0], os.Args, os.Environ()) - if _, err := p.Start(); err != nil { - glog.Errorfln("%d: fork new process error:%s", gproc.Pid(), err.Error()) - return nil - } - return p -} - -// 主进程与子进程相互异步方式发送心跳信息,保持活跃状态 -func handleMainProcessHeartbeat() { - for { - time.Sleep(gPROC_HEARTBEAT_INTERVAL*time.Millisecond) - procManager.Send(formatMsgBuffer(gMSG_HEARTBEAT, nil)) - // 清理过期进程 - if checkHeartbeat.Val() { - for _, pid := range procManager.Pids() { - updatetime := procUpdateTimeMap.Get(pid) - if updatetime > 0 && int(gtime.Millisecond()) - updatetime > gPROC_HEARTBEAT_TIMEOUT { - //fmt.Println("remove pid", pid, int(gtime.Millisecond()), updatetime) - // 这里需要手动从进程管理器中去掉该进程 - procManager.RemoveProcess(pid) - sendProcessMsg(pid, gMSG_CLOSE, nil) - } - } - - // (双保险)如果所有子进程都退出,并且主进程未活动达到超时时间,那么主进程也没存在的必要 - if procManager.Size() == 0 && int(gtime.Millisecond()) - lastUpdateTime.Val() > gPROC_HEARTBEAT_TIMEOUT{ - glog.Printfln("%d: all children died, exit", gproc.Pid()) - os.Exit(0) - } - } - - } -} diff --git a/g/os/gproc/gproc.go b/g/os/gproc/gproc.go index 0735a6438..cd223c12a 100644 --- a/g/os/gproc/gproc.go +++ b/g/os/gproc/gproc.go @@ -13,6 +13,7 @@ import ( "os" "time" "gitee.com/johng/gf/g/util/gconv" + "strings" ) const ( @@ -51,6 +52,15 @@ func IsChild() bool { return os.Getenv(gPROC_ENV_KEY_PPID_KEY) != "" } +// 设置gproc父进程ID,当ppid为0时表示该进程为gproc主进程,否则为gproc子进程 +func SetPPid(ppid int) { + if ppid > 0 { + os.Setenv(gPROC_ENV_KEY_PPID_KEY, gconv.String(ppid)) + } else { + os.Unsetenv(gPROC_ENV_KEY_PPID_KEY) + } +} + // 进程开始执行时间 func StartTime() time.Time { return processStartTime @@ -61,3 +71,13 @@ func Uptime() int { return int(time.Now().UnixNano()/1e6 - processStartTime.UnixNano()/1e6) } +// 检测环境变量中是否已经存在指定键名 +func checkEnvKey(env []string, key string) bool { + for _, v := range env { + if len(v) >= len(key) && strings.EqualFold(v[0 : len(key)], key) { + return true + } + } + return false +} + diff --git a/g/os/gproc/gproc_manager.go b/g/os/gproc/gproc_manager.go index 59aad5458..977c855c7 100644 --- a/g/os/gproc/gproc_manager.go +++ b/g/os/gproc/gproc_manager.go @@ -9,14 +9,10 @@ package gproc import ( "os" - "fmt" "strings" "os/exec" "gitee.com/johng/gf/g/container/gmap" -) - -const ( - gCHILD_ARGS_MARK_NAME = "--gproc-child" + "fmt" ) // 进程管理器 @@ -55,17 +51,6 @@ func NewProcess(path string, args []string, environment []string) *Process { if d, err := os.Getwd(); err == nil { p.Dir = d } - // 判断是否加上子进程标识 - hasChildMark := false - childMarkLen := len(gCHILD_ARGS_MARK_NAME) - for _, v := range args { - if len(v) >= childMarkLen && strings.EqualFold(v[0 : childMarkLen], gCHILD_ARGS_MARK_NAME) { - hasChildMark = true - } - } - if !hasChildMark { - p.Args = append(p.Args, gCHILD_ARGS_MARK_NAME) - } if len(args) > 0 { start := 0 if strings.EqualFold(path, args[0]) { diff --git a/g/os/gproc/gproc_proccess.go b/g/os/gproc/gproc_proccess.go index 9d3501005..4db879f50 100644 --- a/g/os/gproc/gproc_proccess.go +++ b/g/os/gproc/gproc_proccess.go @@ -87,4 +87,4 @@ func (p *Process) Kill() error { // Sending Interrupt on Windows is not implemented. func (p *Process) Signal(sig os.Signal) error { return p.Process.Signal(sig) -} \ No newline at end of file +} diff --git a/geg/os/gproc/gproc3.go b/geg/os/gproc/gproc3.go new file mode 100644 index 000000000..caf4ccfaf --- /dev/null +++ b/geg/os/gproc/gproc3.go @@ -0,0 +1,34 @@ +package main + +import ( + "os" + "time" + "gitee.com/johng/gf/g/os/glog" + "gitee.com/johng/gf/g/os/gproc" +) + +// 父进程销毁后,请使用进程管理器查看存活的子进程 +func main () { + if gproc.IsChild() { + glog.Printfln("%d: I am child, waiting 10 seconds to die", gproc.Pid()) + //p, err := os.FindProcess(os.Getppid()) + //fmt.Println(err) + //p.Kill() + time.Sleep(2*time.Second) + glog.Printfln("%d: 2", gproc.Pid()) + time.Sleep(2*time.Second) + glog.Printfln("%d: 4", gproc.Pid()) + time.Sleep(2*time.Second) + glog.Printfln("%d: 6", gproc.Pid()) + time.Sleep(2*time.Second) + glog.Printfln("%d: 8", gproc.Pid()) + time.Sleep(2*time.Second) + glog.Printfln("%d: died", gproc.Pid()) + } else { + p := gproc.NewProcess(os.Args[0], os.Args, os.Environ()) + p.Start() + glog.Printfln("%d: I am main, waiting 3 seconds to die", gproc.Pid()) + time.Sleep(3*time.Second) + glog.Printfln("%d: died", gproc.Pid()) + } +}