ghttp.Server平滑重启机制改进中

This commit is contained in:
John
2018-06-01 00:11:45 +08:00
parent 0f76fc932c
commit 0eafa1edc6
13 changed files with 188 additions and 533 deletions

View File

@ -23,6 +23,7 @@ import (
"gitee.com/johng/gf/g/container/gqueue"
"gitee.com/johng/gf/g/os/gspath"
"gitee.com/johng/gf/g/os/gfile"
"gitee.com/johng/gf/g/os/genv"
)
const (
@ -116,23 +117,8 @@ var doneChan = make(chan struct{}, 100000)
// Web Server进程初始化
func init() {
go func() {
// 等待ready消息(Run方法调用)
<- readyChan
// 主进程只负责创建子进程
if !gproc.IsChild() {
sendProcessMsg(os.Getpid(), gMSG_START, nil)
}
// 开启进程消息监听处理
handleProcessMsgAndSignal()
// 服务执行完成,需要退出
doneChan <- struct{}{}
if !gproc.IsChild() {
glog.Printfln("%d: all servers shutdown", gproc.Pid())
}
}()
// 信号量管理操作监听
go handleProcessSignal()
}
// 获取/创建一个默认配置的HTTP Server(默认监听端口是80)
@ -198,11 +184,6 @@ func (s *Server) Start() error {
}
}
// 主进程,不执行任何业务,只负责进程管理
if !gproc.IsChild() {
return nil
}
if s.status == 1 {
return errors.New("server is already running")
}
@ -210,6 +191,22 @@ func (s *Server) Start() error {
if s.config.Handler == nil {
s.config.Handler = http.HandlerFunc(s.defaultHttpHandle)
}
// 启动http server
fdMapStr := genv.Get(gADMIN_ACTION_RELOAD_ENVKEY)
if len(fdMapStr) > 0 {
sfm := bufferToServerFdMap([]byte(fdMapStr))
for k, v := range sfm {
GetServer(k).startServer(v)
}
} else {
serverMapping.RLockFunc(func(m map[string]interface{}) {
for _, v := range m {
v.(*Server).startServer(nil)
}
})
}
// 开启异步关闭队列处理循环
s.startCloseQueueLoop()
return nil
@ -220,10 +217,10 @@ func (s *Server) Run() error {
if err := s.Start(); err != nil {
return err
}
// Web Server准备就绪待执行
readyChan <- struct{}{}
// 阻塞等待服务执行完成
<- doneChan
glog.Printfln("%d: all servers shutdown", gproc.Pid())
return nil
}
@ -231,8 +228,10 @@ func (s *Server) Run() error {
// 阻塞等待所有Web Server停止常用于多Web Server场景以及需要将Web Server异步运行的场景
// 这是一个与进程相关的方法
func Wait() {
readyChan <- struct{}{}
// 阻塞等待服务执行完成
<- doneChan
glog.Printfln("%d: all servers shutdown", gproc.Pid())
}

View File

@ -18,6 +18,9 @@ import (
"fmt"
"gitee.com/johng/gf/g/container/gtype"
"gitee.com/johng/gf/g/os/glog"
"os"
"gitee.com/johng/gf/g/encoding/gjson"
"gitee.com/johng/gf/g/util/gconv"
)
const (
@ -25,6 +28,8 @@ const (
gADMIN_ACTION_RELOADING = 1
gADMIN_ACTION_RESTARTING = 2
gADMIN_ACTION_SHUTINGDOWN = 4
gADMIN_ACTION_RELOAD_ENVKEY = "gf.server.reload"
gADMIN_ACTION_RESTART_ENVKEY = "gf.server.restart"
)
// 用于服务管理的对象
@ -113,7 +118,10 @@ func (s *Server) Reload() error {
}
serverProcessStatus.Set(gADMIN_ACTION_RELOADING)
glog.Printfln("%d: server reloading", gproc.Pid())
return sendProcessMsg(gproc.Pid(), gMSG_RELOAD, nil)
forkReloadProcess()
go shutdownWebServers()
doneChan <- struct{}{}
return nil
}
// 完整重启Web Server
@ -128,7 +136,8 @@ func (s *Server) Restart() error {
}
serverProcessStatus.Set(gADMIN_ACTION_RESTARTING)
glog.Printfln("%d: server restarting", gproc.Pid())
return sendProcessMsg(gproc.Pid(), gMSG_RESTART, nil)
doneChan <- struct{}{}
return nil
}
// 关闭Web Server
@ -143,7 +152,9 @@ func (s *Server) Shutdown() error {
}
serverProcessStatus.Set(gADMIN_ACTION_SHUTINGDOWN)
glog.Printfln("%d: server shutting down", gproc.Pid())
return sendProcessMsg(gproc.PPid(), gMSG_SHUTDOWN, nil)
go closeWebServers()
doneChan <- struct{}{}
return nil
}
// 检测当前操作的频繁度
@ -170,4 +181,86 @@ func (s *Server) checkActionStatus() error {
}
}
return nil
}
// 创建一个子进程,通过环境变量传参
func forkReloadProcess() {
p := procManager.NewProcess(os.Args[0], os.Args, os.Environ())
// 创建新的服务进程,子进程自动从父进程复制文件描述来监听同样的端口
sfm := getServerFdMap()
// 将sfm中的fd按照子进程创建时的文件描述符顺序进行整理以便子进程获取到正确的fd
for name, m := range sfm {
for fdk, fdv := range m {
if len(fdv) > 0 {
s := ""
for _, item := range strings.Split(fdv, ",") {
array := strings.Split(item, "#")
fd := uintptr(gconv.Uint(array[1]))
if fd > 0 {
s += fmt.Sprintf("%s#%d,", array[0], 3 + len(p.ExtraFiles))
p.ExtraFiles = append(p.ExtraFiles, os.NewFile(fd, ""))
} else {
s += fmt.Sprintf("%s#%d,", array[0], 0)
}
}
sfm[name][fdk] = strings.TrimRight(s, ",")
}
}
}
buffer, _ := gjson.Encode(sfm)
p.Env = append(p.Env, fmt.Sprintf("%s=%s", gADMIN_ACTION_RELOAD_ENVKEY, string(buffer)))
if _, err := p.Start(); err != nil {
glog.Errorfln("%d: fork process failed, error:%s, %s", gproc.Pid(), err.Error(), string(buffer))
}
}
// 获取所有Web Server的文件描述符map
func getServerFdMap() map[string]listenerFdMap {
sfm := make(map[string]listenerFdMap)
serverMapping.RLockFunc(func(m map[string]interface{}) {
for k, v := range m {
sfm[k] = v.(*Server).getListenerFdMap()
}
})
return sfm
}
// 二进制转换为FdMap
func bufferToServerFdMap(buffer []byte) map[string]listenerFdMap {
sfm := make(map[string]listenerFdMap)
if len(buffer) > 0 {
j, _ := gjson.LoadContent(buffer, "json")
for k, _ := range j.ToMap() {
m := make(map[string]string)
for k, v := range j.GetMap(k) {
m[k] = gconv.String(v)
}
sfm[k] = m
}
}
return sfm
}
// 关优雅闭进程所有端口的Web Server服务
// 注意只是关闭Web Server服务并不是退出进程
func shutdownWebServers() {
serverMapping.RLockFunc(func(m map[string]interface{}) {
for _, v := range m {
for _, s := range v.(*Server).servers {
s.shutdown()
}
}
})
}
// 强制关闭进程所有端口的Web Server服务
// 注意只是关闭Web Server服务并不是退出进程
func closeWebServers() {
serverMapping.RLockFunc(func(m map[string]interface{}) {
for _, v := range m {
for _, s := range v.(*Server).servers {
s.close()
}
}
})
}

View File

@ -12,9 +12,11 @@ import (
"os"
"syscall"
"os/signal"
"gitee.com/johng/gf/g/os/gproc"
)
// 进程信号量监听消息队列
var procSignalChan = make(chan os.Signal)
// 信号量处理
func handleProcessSignal() {
var sig os.Signal
@ -31,23 +33,20 @@ func handleProcessSignal() {
for {
sig = <- procSignalChan
switch sig {
// 进程终止,停止所有子进程运行
case syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGTERM:
sendProcessMsg(gproc.Pid(), gMSG_SHUTDOWN, nil)
if gproc.IsChild() {
sendProcessMsg(gproc.PPid(), gMSG_SHUTDOWN, nil)
}
return
// 进程终止,停止所有子进程运行
case syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGTERM:
return
// 用户信号,热重启服务
case syscall.SIGUSR1:
sendProcessMsg(gproc.Pid(), gMSG_RELOAD, nil)
case syscall.SIGUSR1:
// 用户信号,完整重启服务
case syscall.SIGUSR2:
sendProcessMsg(gproc.Pid(), gMSG_RESTART, nil)
case syscall.SIGUSR2:
default:
default:
}
}
}

View File

@ -1,178 +0,0 @@
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// Web Server进程间通信
package ghttp
import (
"os"
"gitee.com/johng/gf/g/os/gproc"
"gitee.com/johng/gf/g/util/gconv"
"gitee.com/johng/gf/g/encoding/gjson"
"gitee.com/johng/gf/g/container/gtype"
"gitee.com/johng/gf/g/encoding/gbinary"
"gitee.com/johng/gf/g/os/gtime"
"time"
)
const (
gMSG_START = 1
gMSG_RELOAD = 2
gMSG_RESTART = 3
gMSG_SHUTDOWN = 4
gMSG_CLOSE = 5
gMSG_NEW_FORK = 6
gMSG_HEARTBEAT = 7
gPROC_FAILURE_RETRY_COUNT = 3 // 发送消息失败重试次数
gPROC_FAILURE_RETRY_INTERVAL = 500 // (毫秒)发送消息失败时重试间隔
gPROC_HEARTBEAT_INTERVAL = 1000 // (毫秒)进程间心跳间隔
gPROC_HEARTBEAT_TIMEOUT = 3000 // (毫秒)进程间心跳超时时间,如果子进程在这段内没有接收到任何心跳,那么自动退出,防止可能出现的僵尸子进程
)
// 进程信号量监听消息队列
var procSignalChan = make(chan os.Signal)
// 上一次进程间心跳的时间戳
var lastUpdateTime = gtype.NewInt(int(gtime.Millisecond()))
// (主子进程)在第一次创建子进程成功之后才会开始心跳检测,同理对应超时时间才会生效
var checkHeartbeat = gtype.NewBool()
// 处理进程信号量监控以及进程间消息通信
func handleProcessMsgAndSignal() {
go handleProcessSignal()
if gproc.IsChild() {
go handleChildProcessHeartbeat()
} else {
go handleMainProcessHeartbeat()
}
handleProcessMsg()
}
// 处理进程间消息
// 数据格式: 操作(8bit) | 参数(变长)
func handleProcessMsg() {
for {
if msg := gproc.Receive(); msg != nil {
// 记录消息日志,用于调试
//content := gconv.String(msg.Pid) + "=>" + gconv.String(gproc.Pid()) + ":" + fmt.Sprintf("%v\n", msg.Data)
//glog.Print(content)
//gfile.PutContentsAppend("/tmp/gproc-log", content)
act := gbinary.DecodeToUint(msg.Data[0 : 1])
data := msg.Data[1 : ]
if msg.Pid != gproc.Pid() {
updateProcessUpdateTime()
}
if gproc.IsChild() {
// ===============
// 子进程
// ===============
switch act {
case gMSG_START: onCommChildStart(msg.Pid, data)
case gMSG_RELOAD: onCommChildReload(msg.Pid, data)
case gMSG_RESTART: onCommChildRestart(msg.Pid, data)
case gMSG_CLOSE: onCommChildClose(msg.Pid, data)
case gMSG_HEARTBEAT: onCommChildHeartbeat(msg.Pid, data)
case gMSG_SHUTDOWN: onCommChildShutdown(msg.Pid, data)
}
} else {
// ===============
// 父进程
// ===============
// 任何进程消息都会自动更新最后通信时间记录
if msg.Pid != gproc.Pid() {
updateProcessCommTime(msg.Pid)
}
switch act {
case gMSG_START: onCommMainStart(msg.Pid, data)
case gMSG_RELOAD: onCommMainReload(msg.Pid, data)
case gMSG_RESTART: onCommMainRestart(msg.Pid, data)
case gMSG_NEW_FORK: onCommMainNewFork(msg.Pid, data)
case gMSG_HEARTBEAT: onCommMainHeartbeat(msg.Pid, data)
case gMSG_SHUTDOWN:
onCommMainShutdown(msg.Pid, data)
return
}
}
}
}
}
// 向进程发送操作消息
func sendProcessMsg(pid int, act int, data []byte) error {
var err error
for i := gPROC_FAILURE_RETRY_COUNT; i > 0; i-- {
if err = gproc.Send(pid, formatMsgBuffer(act, data)); err != nil {
time.Sleep(gPROC_FAILURE_RETRY_INTERVAL*time.Millisecond)
} else {
break
}
}
//glog.Printfln("%d=>%d, %d, %v", gproc.Pid(), pid, act, err)
return err
}
// 生成一条满足Web Server进程通信协议的消息
func formatMsgBuffer(act int, data []byte) []byte {
return append(gbinary.EncodeUint8(uint8(act)), data...)
}
// 获取所有Web Server的文件描述符map
func getServerFdMap() map[string]listenerFdMap {
sfm := make(map[string]listenerFdMap)
serverMapping.RLockFunc(func(m map[string]interface{}) {
for k, v := range m {
sfm[k] = v.(*Server).getListenerFdMap()
}
})
return sfm
}
// 二进制转换为FdMap
func bufferToServerFdMap(buffer []byte) map[string]listenerFdMap {
sfm := make(map[string]listenerFdMap)
if len(buffer) > 0 {
j, _ := gjson.LoadContent(buffer, "json")
for k, _ := range j.ToMap() {
m := make(map[string]string)
for k, v := range j.GetMap(k) {
m[k] = gconv.String(v)
}
sfm[k] = m
}
}
return sfm
}
// 关优雅闭进程所有端口的Web Server服务
// 注意只是关闭Web Server服务并不是退出进程
func shutdownWebServers() {
serverMapping.RLockFunc(func(m map[string]interface{}) {
for _, v := range m {
for _, s := range v.(*Server).servers {
s.shutdown()
}
}
})
}
// 强制关闭进程所有端口的Web Server服务
// 注意只是关闭Web Server服务并不是退出进程
func closeWebServers() {
serverMapping.RLockFunc(func(m map[string]interface{}) {
for _, v := range m {
for _, s := range v.(*Server).servers {
s.close()
}
}
})
}
// 更新上一次进程间通信的时间
func updateProcessUpdateTime() {
lastUpdateTime.Set(int(gtime.Millisecond()))
}

View File

@ -1,109 +0,0 @@
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// Web Server进程间通信 - 子进程
package ghttp
import (
"os"
"fmt"
"time"
"strings"
"runtime"
"gitee.com/johng/gf/g/os/glog"
"gitee.com/johng/gf/g/os/gproc"
"gitee.com/johng/gf/g/os/gtime"
"gitee.com/johng/gf/g/util/gconv"
"gitee.com/johng/gf/g/encoding/gjson"
)
const (
gPROC_CHILD_MAX_IDLE_TIME = 10000 // 子进程闲置时间(未开启心跳机制的时间)
)
// 心跳处理(方法为空逻辑放到公共通信switch中进行处理)
func onCommChildHeartbeat(pid int, data []byte) {
}
// 平滑重启子进程收到重启消息那么将自身的ServerFdMap信息收集后发送给主进程由主进程进行统一调度
func onCommChildReload(pid int, data []byte) {
var buffer []byte = nil
p := procManager.NewProcess(os.Args[0], os.Args, os.Environ())
// windows系统无法进行文件描述符操作只能重启进程
if runtime.GOOS == "windows" {
// windows下使用shutdown会造成协程阻塞这里直接使用close强制关闭
closeWebServers()
} else {
// 创建新的服务进程,子进程自动从父进程复制文件描述来监听同样的端口
sfm := getServerFdMap()
// 将sfm中的fd按照子进程创建时的文件描述符顺序进行整理以便子进程获取到正确的fd
for name, m := range sfm {
for fdk, fdv := range m {
if len(fdv) > 0 {
s := ""
for _, item := range strings.Split(fdv, ",") {
array := strings.Split(item, "#")
fd := uintptr(gconv.Uint(array[1]))
if fd > 0 {
s += fmt.Sprintf("%s#%d,", array[0], 3 + len(p.ExtraFiles))
p.ExtraFiles = append(p.ExtraFiles, os.NewFile(fd, ""))
} else {
s += fmt.Sprintf("%s#%d,", array[0], 0)
}
}
sfm[name][fdk] = strings.TrimRight(s, ",")
}
}
}
buffer, _ = gjson.Encode(sfm)
}
p.PPid = gproc.PPid()
if newPid, err := p.Start(); err == nil {
sendProcessMsg(newPid, gMSG_START, buffer)
} else {
glog.Errorfln("%d: fork process failed, error:%s, %s", gproc.Pid(), err.Error(), string(buffer))
}
}
// 完整重启
func onCommChildRestart(pid int, data []byte) {
sendProcessMsg(gproc.PPid(), gMSG_RESTART, nil)
}
// 优雅关闭服务链接并退出
func onCommChildShutdown(pid int, data []byte) {
if runtime.GOOS != "windows" {
shutdownWebServers()
}
os.Exit(0)
}
// 强制性关闭服务链接并退出
func onCommChildClose(pid int, data []byte) {
closeWebServers()
os.Exit(0)
}
// 主进程与子进程相互异步方式发送心跳信息,保持活跃状态
func handleChildProcessHeartbeat() {
for {
time.Sleep(gPROC_HEARTBEAT_INTERVAL*time.Millisecond)
sendProcessMsg(gproc.PPid(), gMSG_HEARTBEAT, nil)
// 超过时间没有接收到主进程心跳,自动关闭退出
if checkHeartbeat.Val() && (int(gtime.Millisecond()) - lastUpdateTime.Val() > gPROC_HEARTBEAT_TIMEOUT) {
// 子进程有时会无法退出(僵尸?)这里直接使用exit而不是return
glog.Printfln("%d: %d - %d > %d", gproc.Pid(), int(gtime.Millisecond()), lastUpdateTime.Val(), gPROC_HEARTBEAT_TIMEOUT)
glog.Printfln("%d: heartbeat timeout[%dms], exit", gproc.Pid(), gPROC_HEARTBEAT_TIMEOUT)
os.Exit(0)
}
// 未开启心跳检测的闲置超过一定时间则主动关闭
if !checkHeartbeat.Val() && gproc.Uptime() > gPROC_CHILD_MAX_IDLE_TIME {
glog.Printfln("%d: idle timeout[%dms], exit", gproc.Pid(), gPROC_CHILD_MAX_IDLE_TIME)
os.Exit(0)
}
}
}

View File

@ -1,43 +0,0 @@
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// Web Server进程间通信 - 子进程
// +build !windows
package ghttp
import (
"os"
"gitee.com/johng/gf/g/os/gproc"
)
// 开启所有Web Server(根据消息启动)
func onCommChildStart(pid int, data []byte) {
if len(data) > 0 {
sfm := bufferToServerFdMap(data)
for k, v := range sfm {
GetServer(k).startServer(v)
}
} else {
serverMapping.RLockFunc(func(m map[string]interface{}) {
for _, v := range m {
v.(*Server).startServer(nil)
}
})
}
// 进程创建成功之后(开始执行服务时间点为准),通知主进程自身的存在,并开始执行心跳机制
sendProcessMsg(gproc.PPid(), gMSG_NEW_FORK, nil)
// 如果创建自己的父进程非gproc父进程那么表示该进程为重启创建的进程创建成功之后需要通知父进程自行销毁
if gproc.PPidOS() != gproc.PPid() {
//如果子进程已经继承了父进程的socket文件描述符那么父进程没有存在的必要直接kill掉
if p, err := os.FindProcess(gproc.PPidOS()); err == nil {
p.Kill()
}
}
// 开始心跳时必须保证主进程时间有值,但是又不能等待主进程消息后再开始检测,因此这里自己更新一下通信时间
updateProcessUpdateTime()
checkHeartbeat.Set(true)
}

View File

@ -1,26 +0,0 @@
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package ghttp
import (
"gitee.com/johng/gf/g/os/gproc"
)
// 开启所有Web Server(根据消息启动)
func onCommChildStart(pid int, data []byte) {
// 进程创建成功之后(开始执行服务时间点为准),通知主进程自身的存在,并开始执行心跳机制
sendProcessMsg(gproc.PPid(), gMSG_NEW_FORK, nil)
// 开启Web Server服务
serverMapping.RLockFunc(func(m map[string]interface{}) {
for _, v := range m {
v.(*Server).startServer(nil)
}
})
// 开始心跳时必须保证主进程时间有值,但是又不能等待主进程消息后再开始检测,因此这里自己更新一下通信时间
updateProcessUpdateTime()
checkHeartbeat.Set(true)
}

View File

@ -1,119 +0,0 @@
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// Web Server进程间通信 - 主进程.
// 管理子进程按照规则听话玩,不然有一百种方法让子进程在本地混不下去.
package ghttp
import (
"os"
"time"
"gitee.com/johng/gf/g/os/glog"
"gitee.com/johng/gf/g/os/gtime"
"gitee.com/johng/gf/g/os/gproc"
"gitee.com/johng/gf/g/container/gmap"
)
// (主进程)主进程与子进程上一次活跃时间映射map
var procUpdateTimeMap = gmap.NewIntIntMap()
// 开启服务
func onCommMainStart(pid int, data []byte) {
fork := forkNewProcess()
if fork == nil {
os.Exit(1)
}
updateProcessCommTime(fork.Pid())
// 子进程创建成功之后再发送执行命令
sendProcessMsg(fork.Pid(), gMSG_START, nil)
}
// 心跳处理(方法为空逻辑放到公共通信switch中进行处理)
func onCommMainHeartbeat(pid int, data []byte) {
}
// 平滑重启服务
func onCommMainReload(pid int, data []byte) {
procManager.Send(formatMsgBuffer(gMSG_RELOAD, nil))
}
// 完整重启服务
func onCommMainRestart(pid int, data []byte) {
// 如果是父进程自身发送的重启指令,那么通知所有子进程重启
if pid == gproc.Pid() {
procManager.Send(formatMsgBuffer(gMSG_RESTART, nil))
return
}
// 首先创建子进程,暂时不开始服务,否则会有端口冲突
fork := forkNewProcess()
if fork == nil {
os.Exit(1)
}
// 然后通知旧的子进程自动关闭并退出(不需要新建的子进程来处理)
sendProcessMsg(pid, gMSG_CLOSE, nil)
if p, err := os.FindProcess(pid); err == nil && p != nil {
p.Kill()
p.Wait()
}
// 通知新的子进程执行服务监听
sendProcessMsg(fork.Pid(), gMSG_START, nil)
}
// 新建子进程通知
func onCommMainNewFork(pid int, data []byte) {
procManager.AddProcess(pid)
checkHeartbeat.Set(true)
}
// 关闭服务,通知所有子进程退出(Kill强制性退出)
func onCommMainShutdown(pid int, data []byte) {
procManager.Send(formatMsgBuffer(gMSG_CLOSE, nil))
procManager.KillAll()
procManager.WaitAll()
}
// 更新指定进程的通信时间记录
func updateProcessCommTime(pid int) {
procUpdateTimeMap.Set(pid, int(gtime.Millisecond()))
}
// 创建一个子进程,但是暂时不执行服务监听
func forkNewProcess() *gproc.Process {
p := procManager.NewProcess(os.Args[0], os.Args, os.Environ())
if _, err := p.Start(); err != nil {
glog.Errorfln("%d: fork new process error:%s", gproc.Pid(), err.Error())
return nil
}
return p
}
// 主进程与子进程相互异步方式发送心跳信息,保持活跃状态
func handleMainProcessHeartbeat() {
for {
time.Sleep(gPROC_HEARTBEAT_INTERVAL*time.Millisecond)
procManager.Send(formatMsgBuffer(gMSG_HEARTBEAT, nil))
// 清理过期进程
if checkHeartbeat.Val() {
for _, pid := range procManager.Pids() {
updatetime := procUpdateTimeMap.Get(pid)
if updatetime > 0 && int(gtime.Millisecond()) - updatetime > gPROC_HEARTBEAT_TIMEOUT {
//fmt.Println("remove pid", pid, int(gtime.Millisecond()), updatetime)
// 这里需要手动从进程管理器中去掉该进程
procManager.RemoveProcess(pid)
sendProcessMsg(pid, gMSG_CLOSE, nil)
}
}
// (双保险)如果所有子进程都退出,并且主进程未活动达到超时时间,那么主进程也没存在的必要
if procManager.Size() == 0 && int(gtime.Millisecond()) - lastUpdateTime.Val() > gPROC_HEARTBEAT_TIMEOUT{
glog.Printfln("%d: all children died, exit", gproc.Pid())
os.Exit(0)
}
}
}
}

View File

@ -13,6 +13,7 @@ import (
"os"
"time"
"gitee.com/johng/gf/g/util/gconv"
"strings"
)
const (
@ -51,6 +52,15 @@ func IsChild() bool {
return os.Getenv(gPROC_ENV_KEY_PPID_KEY) != ""
}
// 设置gproc父进程ID当ppid为0时表示该进程为gproc主进程否则为gproc子进程
func SetPPid(ppid int) {
if ppid > 0 {
os.Setenv(gPROC_ENV_KEY_PPID_KEY, gconv.String(ppid))
} else {
os.Unsetenv(gPROC_ENV_KEY_PPID_KEY)
}
}
// 进程开始执行时间
func StartTime() time.Time {
return processStartTime
@ -61,3 +71,13 @@ func Uptime() int {
return int(time.Now().UnixNano()/1e6 - processStartTime.UnixNano()/1e6)
}
// 检测环境变量中是否已经存在指定键名
func checkEnvKey(env []string, key string) bool {
for _, v := range env {
if len(v) >= len(key) && strings.EqualFold(v[0 : len(key)], key) {
return true
}
}
return false
}

View File

@ -9,14 +9,10 @@ package gproc
import (
"os"
"fmt"
"strings"
"os/exec"
"gitee.com/johng/gf/g/container/gmap"
)
const (
gCHILD_ARGS_MARK_NAME = "--gproc-child"
"fmt"
)
// 进程管理器
@ -55,17 +51,6 @@ func NewProcess(path string, args []string, environment []string) *Process {
if d, err := os.Getwd(); err == nil {
p.Dir = d
}
// 判断是否加上子进程标识
hasChildMark := false
childMarkLen := len(gCHILD_ARGS_MARK_NAME)
for _, v := range args {
if len(v) >= childMarkLen && strings.EqualFold(v[0 : childMarkLen], gCHILD_ARGS_MARK_NAME) {
hasChildMark = true
}
}
if !hasChildMark {
p.Args = append(p.Args, gCHILD_ARGS_MARK_NAME)
}
if len(args) > 0 {
start := 0
if strings.EqualFold(path, args[0]) {

View File

@ -87,4 +87,4 @@ func (p *Process) Kill() error {
// Sending Interrupt on Windows is not implemented.
func (p *Process) Signal(sig os.Signal) error {
return p.Process.Signal(sig)
}
}

34
geg/os/gproc/gproc3.go Normal file
View File

@ -0,0 +1,34 @@
package main
import (
"os"
"time"
"gitee.com/johng/gf/g/os/glog"
"gitee.com/johng/gf/g/os/gproc"
)
// 父进程销毁后,请使用进程管理器查看存活的子进程
func main () {
if gproc.IsChild() {
glog.Printfln("%d: I am child, waiting 10 seconds to die", gproc.Pid())
//p, err := os.FindProcess(os.Getppid())
//fmt.Println(err)
//p.Kill()
time.Sleep(2*time.Second)
glog.Printfln("%d: 2", gproc.Pid())
time.Sleep(2*time.Second)
glog.Printfln("%d: 4", gproc.Pid())
time.Sleep(2*time.Second)
glog.Printfln("%d: 6", gproc.Pid())
time.Sleep(2*time.Second)
glog.Printfln("%d: 8", gproc.Pid())
time.Sleep(2*time.Second)
glog.Printfln("%d: died", gproc.Pid())
} else {
p := gproc.NewProcess(os.Args[0], os.Args, os.Environ())
p.Start()
glog.Printfln("%d: I am main, waiting 3 seconds to die", gproc.Pid())
time.Sleep(3*time.Second)
glog.Printfln("%d: died", gproc.Pid())
}
}