mirror of
https://gitee.com/johng/gf
synced 2026-06-29 18:41:50 +08:00
ghttp.Server热重启稳定性测试,功能改进,TODO++
This commit is contained in:
2
TODO
2
TODO
@ -10,7 +10,7 @@ ON THE WAY:
|
||||
9. orm增加更多数据库支持;
|
||||
10. ghttp.Response增加输出内容后自动退出当前请求机制,不需要用户手动return,参考beego如何实现;
|
||||
11. 当二进制参数为nil时,gjson.LoadContent并将gjson.Json对象ToMap时会报错;
|
||||
|
||||
12. 改进控制器及执行对象注册,更友好地支持动态路由注册,例如:注册规则为 /channel/:name,现有的控制器及执行对象注册很难友好支持这种动态形式;
|
||||
|
||||
|
||||
DONE:
|
||||
|
||||
@ -223,8 +223,6 @@ func Wait() {
|
||||
|
||||
// 开启底层Web Server执行
|
||||
func (s *Server) startServer(fdMap listenerFdMap) {
|
||||
// 开始执行底层Web Server创建,端口监听
|
||||
var server *gracefulServer
|
||||
var httpsEnabled bool
|
||||
if len(s.config.HTTPSCertPath) > 0 && len(s.config.HTTPSKeyPath) > 0 {
|
||||
// ================
|
||||
@ -249,30 +247,21 @@ func (s *Server) startServer(fdMap listenerFdMap) {
|
||||
if len(v) == 0 {
|
||||
continue
|
||||
}
|
||||
go func(addrItem string) {
|
||||
fd := 0
|
||||
addr := addrItem
|
||||
array := strings.Split(addrItem, "#")
|
||||
if len(array) > 1 {
|
||||
addr = array[0]
|
||||
// windows系统不支持文件描述符传递socket通信平滑交接,因此只能完整重启
|
||||
if runtime.GOOS != "windows" {
|
||||
fd = gconv.Int(array[1])
|
||||
}
|
||||
fd := 0
|
||||
addr := v
|
||||
array := strings.Split(v, "#")
|
||||
if len(array) > 1 {
|
||||
addr = array[0]
|
||||
// windows系统不支持文件描述符传递socket通信平滑交接,因此只能完整重启
|
||||
if runtime.GOOS != "windows" {
|
||||
fd = gconv.Int(array[1])
|
||||
}
|
||||
if fd > 0 {
|
||||
server = s.newGracefulServer(addr, fd)
|
||||
} else {
|
||||
server = s.newGracefulServer(addr)
|
||||
}
|
||||
s.servers = append(s.servers, server)
|
||||
if err := server.ListenAndServeTLS(s.config.HTTPSCertPath, s.config.HTTPSKeyPath); err != nil {
|
||||
// 如果非关闭错误,那么提示报错,否则认为是正常的服务关闭操作
|
||||
if !strings.EqualFold(http.ErrServerClosed.Error(), err.Error()) {
|
||||
glog.Error(err)
|
||||
}
|
||||
}
|
||||
}(v)
|
||||
}
|
||||
if fd > 0 {
|
||||
s.servers = append(s.servers, s.newGracefulServer(addr, fd))
|
||||
} else {
|
||||
s.servers = append(s.servers, s.newGracefulServer(addr))
|
||||
}
|
||||
}
|
||||
}
|
||||
// ================
|
||||
@ -292,28 +281,34 @@ func (s *Server) startServer(fdMap listenerFdMap) {
|
||||
if len(v) == 0 {
|
||||
continue
|
||||
}
|
||||
go func(addrItem string) {
|
||||
fd := 0
|
||||
addr := addrItem
|
||||
array := strings.Split(addrItem, "#")
|
||||
if len(array) > 1 {
|
||||
addr = array[0]
|
||||
// windows系统不支持文件描述符传递socket通信平滑交接,因此只能完整重启
|
||||
if runtime.GOOS != "windows" {
|
||||
fd = gconv.Int(array[1])
|
||||
}
|
||||
fd := 0
|
||||
addr := v
|
||||
array := strings.Split(v, "#")
|
||||
if len(array) > 1 {
|
||||
addr = array[0]
|
||||
// windows系统不支持文件描述符传递socket通信平滑交接,因此只能完整重启
|
||||
if runtime.GOOS != "windows" {
|
||||
fd = gconv.Int(array[1])
|
||||
}
|
||||
if fd > 0 {
|
||||
server = s.newGracefulServer(addr, fd)
|
||||
}
|
||||
if fd > 0 {
|
||||
s.servers = append(s.servers, s.newGracefulServer(addr, fd))
|
||||
} else {
|
||||
s.servers = append(s.servers, s.newGracefulServer(addr))
|
||||
}
|
||||
}
|
||||
// 开始执行异步监听
|
||||
for _, v := range s.servers {
|
||||
go func(server *gracefulServer) {
|
||||
var err error
|
||||
if server.isHttps {
|
||||
err = server.ListenAndServeTLS(s.config.HTTPSCertPath, s.config.HTTPSKeyPath)
|
||||
} else {
|
||||
server = s.newGracefulServer(addr)
|
||||
err = server.ListenAndServe()
|
||||
}
|
||||
s.servers = append(s.servers, server)
|
||||
if err := server.ListenAndServe(); err != nil {
|
||||
// 如果非关闭错误,那么提示报错,否则认为是正常的服务关闭操作
|
||||
if !strings.EqualFold(http.ErrServerClosed.Error(), err.Error()) {
|
||||
glog.Error(err)
|
||||
}
|
||||
// 如果非关闭错误,那么提示报错,否则认为是正常的服务关闭操作
|
||||
if err != nil && !strings.EqualFold(http.ErrServerClosed.Error(), err.Error()) {
|
||||
glog.Error(err)
|
||||
}
|
||||
}(v)
|
||||
}
|
||||
@ -321,6 +316,11 @@ func (s *Server) startServer(fdMap listenerFdMap) {
|
||||
s.status = 1
|
||||
}
|
||||
|
||||
// 热重启Web Server
|
||||
func (s *Server) Reload() {
|
||||
sendProcessMsg(gproc.Pid(), gMSG_RELOAD, nil)
|
||||
}
|
||||
|
||||
// 重启Web Server
|
||||
func (s *Server) Restart() {
|
||||
sendProcessMsg(gproc.Pid(), gMSG_RESTART, nil)
|
||||
|
||||
@ -10,6 +10,7 @@ package ghttp
|
||||
import (
|
||||
"strings"
|
||||
"gitee.com/johng/gf/g/os/gview"
|
||||
"runtime"
|
||||
)
|
||||
|
||||
// 用于服务管理的对象
|
||||
@ -26,15 +27,26 @@ func (p *utilAdmin) Index(r *Request) {
|
||||
<title>gf ghttp admin</title>
|
||||
</head>
|
||||
<body>
|
||||
<p><a href="{{$.uri}}/reload">reload</a></p>
|
||||
<p><a href="{{$.uri}}/restart">restart</a></p>
|
||||
<p><a href="{{$.uri}}/shutdown">shutdown</a></p>
|
||||
</body>
|
||||
</html>
|
||||
`, data)
|
||||
`, data)
|
||||
r.Response.Write(buffer)
|
||||
}
|
||||
|
||||
// 服务重启
|
||||
// 服务热重启
|
||||
func (p *utilAdmin) Reload(r *Request) {
|
||||
if runtime.GOOS == "windows" {
|
||||
p.Restart(r)
|
||||
} else {
|
||||
r.Response.Write("reload server")
|
||||
r.Server.Reload()
|
||||
}
|
||||
}
|
||||
|
||||
// 服务完整重启
|
||||
func (p *utilAdmin) Restart(r *Request) {
|
||||
r.Response.Write("restart server")
|
||||
r.Server.Restart()
|
||||
|
||||
@ -10,32 +10,34 @@ package ghttp
|
||||
import (
|
||||
"os"
|
||||
"gitee.com/johng/gf/g/os/gproc"
|
||||
"gitee.com/johng/gf/g/os/gtime"
|
||||
"gitee.com/johng/gf/g/util/gconv"
|
||||
"gitee.com/johng/gf/g/encoding/gjson"
|
||||
"gitee.com/johng/gf/g/container/gtype"
|
||||
"gitee.com/johng/gf/g/encoding/gbinary"
|
||||
"gitee.com/johng/gf/g/os/gtime"
|
||||
)
|
||||
|
||||
const (
|
||||
gMSG_START = 10
|
||||
gMSG_RESTART = 20
|
||||
gMSG_SHUTDOWN = 30
|
||||
gMSG_NEW_FORK = 40
|
||||
gMSG_REMOVE_PROC = 50
|
||||
gMSG_HEARTBEAT = 60
|
||||
gMSG_RELOAD = 20
|
||||
gMSG_RESTART = 30
|
||||
gMSG_SHUTDOWN = 40
|
||||
gMSG_CLOSE = 45
|
||||
gMSG_NEW_FORK = 50
|
||||
gMSG_HEARTBEAT = 70
|
||||
|
||||
gPROC_HEARTBEAT_INTERVAL = 1000 // (毫秒)进程间心跳间隔
|
||||
gPROC_HEARTBEAT_TIMEOUT = 30000 // (毫秒)进程间心跳超时时间,如果子进程在这段内没有接收到任何心跳,那么自动退出,防止可能出现的僵尸子进程
|
||||
//gPROC_MULTI_CHILD_CLEAR_INTERVAL = 1000 // (毫秒)检测间隔,当存在多个子进程时(往往是重启间隔非常短且频繁造成),需要进行清理,最终留下一个最新的子进程
|
||||
//gPROC_MULTI_CHILD_CLEAR_MIN_EXPIRE = 30000 // (毫秒)当多个子进程存在时,允许子进程进程至少运行的最小时间,超过该时间则清理
|
||||
gPROC_HEARTBEAT_INTERVAL = 1000 // (毫秒)进程间心跳间隔
|
||||
gPROC_HEARTBEAT_TIMEOUT = 3000 // (毫秒)进程间心跳超时时间,如果子进程在这段内没有接收到任何心跳,那么自动退出,防止可能出现的僵尸子进程
|
||||
)
|
||||
|
||||
// 进程信号量监听消息队列
|
||||
var procSignalChan = make(chan os.Signal)
|
||||
var procSignalChan = make(chan os.Signal)
|
||||
|
||||
// 上一次进程间心跳的时间戳
|
||||
var lastUpdateTime = gtype.NewInt()
|
||||
|
||||
// (主子进程)在第一次创建子进程成功之后才会开始心跳检测,同理对应超时时间才会生效
|
||||
var checkHeartbeat = gtype.NewBool()
|
||||
var checkHeartbeat = gtype.NewBool()
|
||||
|
||||
// 处理进程信号量监控以及进程间消息通信
|
||||
func handleProcessMsgAndSignal() {
|
||||
@ -44,7 +46,6 @@ func handleProcessMsgAndSignal() {
|
||||
go handleChildProcessHeartbeat()
|
||||
} else {
|
||||
go handleMainProcessHeartbeat()
|
||||
//go handleMainProcessChildClear()
|
||||
}
|
||||
handleProcessMsg()
|
||||
}
|
||||
@ -60,21 +61,20 @@ func handleProcessMsg() {
|
||||
//gfile.PutContentsAppend("/tmp/gproc-log", content)
|
||||
act := gbinary.DecodeToUint(msg.Data[0 : 1])
|
||||
data := msg.Data[1 : ]
|
||||
if msg.Pid != gproc.Pid() {
|
||||
updateProcessUpdateTime()
|
||||
}
|
||||
if gproc.IsChild() {
|
||||
// ===============
|
||||
// 子进程
|
||||
// ===============
|
||||
// 任何与父进程的通信都会更新最后通信时间
|
||||
if msg.Pid == gproc.PPid() {
|
||||
updateProcessChildUpdateTime()
|
||||
}
|
||||
switch act {
|
||||
case gMSG_START: onCommChildStart(msg.Pid, data)
|
||||
case gMSG_RELOAD: onCommChildReload(msg.Pid, data)
|
||||
case gMSG_RESTART: onCommChildRestart(msg.Pid, data)
|
||||
case gMSG_CLOSE: onCommChildClose(msg.Pid, data)
|
||||
case gMSG_HEARTBEAT: onCommChildHeartbeat(msg.Pid, data)
|
||||
case gMSG_SHUTDOWN:
|
||||
onCommChildShutdown(msg.Pid, data)
|
||||
return
|
||||
case gMSG_SHUTDOWN: onCommChildShutdown(msg.Pid, data)
|
||||
}
|
||||
} else {
|
||||
// ===============
|
||||
@ -84,20 +84,12 @@ func handleProcessMsg() {
|
||||
if msg.Pid != gproc.Pid() {
|
||||
updateProcessCommTime(msg.Pid)
|
||||
}
|
||||
if !procFirstTimeMap.Contains(msg.Pid) {
|
||||
procFirstTimeMap.Set(msg.Pid, int(gtime.Millisecond()))
|
||||
}
|
||||
switch act {
|
||||
case gMSG_START: onCommMainStart(msg.Pid, data)
|
||||
case gMSG_RELOAD: onCommMainReload(msg.Pid, data)
|
||||
case gMSG_RESTART: onCommMainRestart(msg.Pid, data)
|
||||
case gMSG_NEW_FORK: onCommMainNewFork(msg.Pid, data)
|
||||
case gMSG_HEARTBEAT: onCommMainHeartbeat(msg.Pid, data)
|
||||
case gMSG_REMOVE_PROC:
|
||||
onCommMainRemoveProc(msg.Pid, data)
|
||||
// 如果所有子进程都退出,那么主进程也主动退出
|
||||
if procManager.Size() == 0 {
|
||||
return
|
||||
}
|
||||
case gMSG_SHUTDOWN:
|
||||
onCommMainShutdown(msg.Pid, data)
|
||||
return
|
||||
@ -166,4 +158,9 @@ func closeWebServers() {
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// 更新上一次进程间通信的时间
|
||||
func updateProcessUpdateTime() {
|
||||
lastUpdateTime.Set(int(gtime.Millisecond()))
|
||||
}
|
||||
@ -18,23 +18,19 @@ import (
|
||||
"gitee.com/johng/gf/g/os/gtime"
|
||||
"gitee.com/johng/gf/g/util/gconv"
|
||||
"gitee.com/johng/gf/g/encoding/gjson"
|
||||
"gitee.com/johng/gf/g/container/gtype"
|
||||
)
|
||||
|
||||
const (
|
||||
gPROC_CHILD_MAX_IDLE_TIME = 3000 // 子进程闲置时间(未开启心跳机制的时间)
|
||||
)
|
||||
|
||||
// (子进程)上一次从主进程接收心跳的时间戳
|
||||
var lastHeartbeatTime = gtype.NewInt()
|
||||
|
||||
// 心跳消息
|
||||
func onCommChildHeartbeat(pid int, data []byte) {
|
||||
updateProcessChildUpdateTime()
|
||||
|
||||
}
|
||||
|
||||
// 子进程收到重启消息,那么将自身的ServerFdMap信息收集后发送给主进程,由主进程进行统一调度
|
||||
func onCommChildRestart(pid int, data []byte) {
|
||||
// 热重启,子进程收到重启消息,那么将自身的ServerFdMap信息收集后发送给主进程,由主进程进行统一调度
|
||||
func onCommChildReload(pid int, data []byte) {
|
||||
var buffer []byte = nil
|
||||
p := procManager.NewProcess(os.Args[0], os.Args, os.Environ())
|
||||
// windows系统无法进行文件描述符操作,只能重启进程
|
||||
@ -73,18 +69,23 @@ func onCommChildRestart(pid int, data []byte) {
|
||||
}
|
||||
}
|
||||
|
||||
// 关闭服务链接并退出
|
||||
// 完整重启
|
||||
func onCommChildRestart(pid int, data []byte) {
|
||||
sendProcessMsg(gproc.PPid(), gMSG_RESTART, nil)
|
||||
}
|
||||
|
||||
// 优雅关闭服务链接并退出
|
||||
func onCommChildShutdown(pid int, data []byte) {
|
||||
sendProcessMsg(gproc.PPid(), gMSG_REMOVE_PROC, nil)
|
||||
if runtime.GOOS != "windows" {
|
||||
shutdownWebServers()
|
||||
}
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
// 更新上一次主进程主动与子进程通信的时间
|
||||
func updateProcessChildUpdateTime() {
|
||||
lastHeartbeatTime.Set(int(gtime.Millisecond()))
|
||||
// 强制性关闭服务链接并退出
|
||||
func onCommChildClose(pid int, data []byte) {
|
||||
closeWebServers()
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
// 主进程与子进程相互异步方式发送心跳信息,保持活跃状态
|
||||
@ -93,17 +94,15 @@ func handleChildProcessHeartbeat() {
|
||||
time.Sleep(gPROC_HEARTBEAT_INTERVAL*time.Millisecond)
|
||||
sendProcessMsg(gproc.PPid(), gMSG_HEARTBEAT, nil)
|
||||
// 超过时间没有接收到主进程心跳,自动关闭退出
|
||||
if checkHeartbeat.Val() && (int(gtime.Millisecond()) - lastHeartbeatTime.Val() > gPROC_HEARTBEAT_TIMEOUT) {
|
||||
sendProcessMsg(gproc.Pid(), gMSG_SHUTDOWN, nil)
|
||||
if checkHeartbeat.Val() && (int(gtime.Millisecond()) - lastUpdateTime.Val() > gPROC_HEARTBEAT_TIMEOUT) {
|
||||
// 子进程有时会无法退出(僵尸?),这里直接使用exit,而不是return
|
||||
//glog.Printfln("%d: %d - %d > %d", gproc.Pid(), int(gtime.Millisecond()), lastHeartbeatTime.Val(), gPROC_HEARTBEAT_TIMEOUT)
|
||||
//glog.Printfln("%d: heartbeat timeout, exit", gproc.Pid())
|
||||
glog.Printfln("%d: waiting %dms for shutdown timeout, exit", gproc.Pid(), gPROC_HEARTBEAT_TIMEOUT)
|
||||
glog.Printfln("%d: heartbeat timeout[%dms], exit", gproc.Pid(), gPROC_HEARTBEAT_TIMEOUT)
|
||||
os.Exit(0)
|
||||
}
|
||||
// 未开启心跳检测的闲置超过一定时间则主动关闭
|
||||
if !checkHeartbeat.Val() && gproc.Uptime() > gPROC_CHILD_MAX_IDLE_TIME {
|
||||
glog.Printfln("%d: max idle time %dms exceeded, exit", gproc.Pid(), gPROC_CHILD_MAX_IDLE_TIME)
|
||||
glog.Printfln("%d: idle timeout[%dms], exit", gproc.Pid(), gPROC_CHILD_MAX_IDLE_TIME)
|
||||
os.Exit(0)
|
||||
}
|
||||
}
|
||||
|
||||
@ -10,6 +10,7 @@
|
||||
package ghttp
|
||||
|
||||
import (
|
||||
"os"
|
||||
"gitee.com/johng/gf/g/os/gproc"
|
||||
)
|
||||
|
||||
@ -31,9 +32,12 @@ func onCommChildStart(pid int, data []byte) {
|
||||
sendProcessMsg(gproc.PPid(), gMSG_NEW_FORK, nil)
|
||||
// 如果创建自己的父进程非gproc父进程,那么表示该进程为重启创建的进程,创建成功之后需要通知父进程自行销毁
|
||||
if gproc.PPidOS() != gproc.PPid() {
|
||||
sendProcessMsg(gproc.PPidOS(), gMSG_SHUTDOWN, nil)
|
||||
//sendProcessMsg(gproc.PPidOS(), gMSG_SHUTDOWN, nil)
|
||||
if p, err := os.FindProcess(gproc.PPidOS()); err == nil {
|
||||
p.Kill()
|
||||
}
|
||||
}
|
||||
// 开始心跳时必须保证主进程时间有值,但是又不能等待主进程消息后再开始检测,因此这里自己更新一下通信时间
|
||||
updateProcessChildUpdateTime()
|
||||
updateProcessUpdateTime()
|
||||
checkHeartbeat.Set(true)
|
||||
}
|
||||
@ -13,12 +13,9 @@ import (
|
||||
"time"
|
||||
"gitee.com/johng/gf/g/os/gtime"
|
||||
"gitee.com/johng/gf/g/container/gmap"
|
||||
"gitee.com/johng/gf/g/os/gproc"
|
||||
)
|
||||
|
||||
// (主进程)子进程与主进程第一次通信的时间映射map
|
||||
// 用以识别子进程创建时间先后顺序,当存在多个子进程时,主动销毁旧的子进程
|
||||
var procFirstTimeMap = gmap.NewIntIntMap()
|
||||
|
||||
// (主进程)主进程与子进程上一次活跃时间映射map
|
||||
var procUpdateTimeMap = gmap.NewIntIntMap()
|
||||
|
||||
@ -34,10 +31,23 @@ func onCommMainHeartbeat(pid int, data []byte) {
|
||||
updateProcessCommTime(pid)
|
||||
}
|
||||
|
||||
// 重启服务
|
||||
func onCommMainRestart(pid int, data []byte) {
|
||||
// 热重启服务
|
||||
func onCommMainReload(pid int, data []byte) {
|
||||
// 向所有子进程发送重启命令,子进程将会搜集Web Server信息发送给父进程进行协调重启工作
|
||||
procManager.Send(formatMsgBuffer(gMSG_RESTART, nil))
|
||||
procManager.Send(formatMsgBuffer(gMSG_RELOAD, nil))
|
||||
}
|
||||
|
||||
// 完整重启服务
|
||||
func onCommMainRestart(pid int, data []byte) {
|
||||
if pid == gproc.Pid() {
|
||||
procManager.Send(formatMsgBuffer(gMSG_RESTART, nil))
|
||||
return
|
||||
}
|
||||
if p, _ := os.FindProcess(pid); p != nil {
|
||||
p.Kill()
|
||||
p.Wait()
|
||||
}
|
||||
sendProcessMsg(gproc.Pid(), gMSG_START, nil)
|
||||
}
|
||||
|
||||
// 新建子进程通知
|
||||
@ -46,14 +56,11 @@ func onCommMainNewFork(pid int, data []byte) {
|
||||
checkHeartbeat.Set(true)
|
||||
}
|
||||
|
||||
// 销毁子进程通知
|
||||
func onCommMainRemoveProc(pid int, data []byte) {
|
||||
procManager.RemoveProcess(pid)
|
||||
}
|
||||
|
||||
// 关闭服务,通知所有子进程退出
|
||||
func onCommMainShutdown(pid int, data []byte) {
|
||||
procManager.Send(formatMsgBuffer(gMSG_SHUTDOWN, nil))
|
||||
//procManager.Send(formatMsgBuffer(gMSG_SHUTDOWN, nil))
|
||||
procManager.KillAll()
|
||||
procManager.WaitAll()
|
||||
}
|
||||
|
||||
// 更新指定进程的通信时间记录
|
||||
@ -72,30 +79,13 @@ func handleMainProcessHeartbeat() {
|
||||
if int(gtime.Millisecond()) - procUpdateTimeMap.Get(pid) > gPROC_HEARTBEAT_TIMEOUT {
|
||||
// 这里需要手动从进程管理器中去掉该进程
|
||||
procManager.RemoveProcess(pid)
|
||||
sendProcessMsg(pid, gMSG_SHUTDOWN, nil)
|
||||
sendProcessMsg(pid, gMSG_CLOSE, nil)
|
||||
}
|
||||
}
|
||||
}
|
||||
// 如果所有子进程都退出,并且达到超时时间,那么主进程也没存在的必要
|
||||
if procManager.Size() == 0 && int(gtime.Millisecond()) - lastUpdateTime.Val() > gPROC_HEARTBEAT_TIMEOUT{
|
||||
os.Exit(0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 清理多余的子进程
|
||||
//func handleMainProcessChildClear() {
|
||||
// for {
|
||||
// time.Sleep(gPROC_MULTI_CHILD_CLEAR_INTERVAL*time.Millisecond)
|
||||
// if procManager.Size() > 1 {
|
||||
// minPid := 0
|
||||
// minTime := int(gtime.Millisecond())
|
||||
// for _, pid := range procManager.Pids() {
|
||||
// if t := procFirstTimeMap.Get(pid); t < minTime {
|
||||
// minPid = pid
|
||||
// minTime = t
|
||||
// }
|
||||
// }
|
||||
// if minPid > 0 && procUpdateTimeMap.Get(minPid) - procFirstTimeMap.Get(minPid) > gPROC_MULTI_CHILD_CLEAR_MIN_EXPIRE {
|
||||
// sendProcessMsg(minPid, gMSG_SHUTDOWN, nil)
|
||||
// glog.Printfln("%d: multi child occurred, shutdown %d", gproc.Pid(), minPid)
|
||||
// }
|
||||
// }
|
||||
// }
|
||||
//}
|
||||
@ -26,6 +26,7 @@ func handleProcessSignal() {
|
||||
syscall.SIGHUP,
|
||||
syscall.SIGTERM,
|
||||
syscall.SIGUSR1,
|
||||
syscall.SIGUSR2,
|
||||
)
|
||||
for {
|
||||
sig = <- procSignalChan
|
||||
@ -33,12 +34,14 @@ func handleProcessSignal() {
|
||||
// 进程终止,停止所有子进程运行
|
||||
case syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGTERM:
|
||||
sendProcessMsg(gproc.Pid(), gMSG_SHUTDOWN, nil)
|
||||
// 强制性kill掉所有子进程
|
||||
procManager.KillAll()
|
||||
return
|
||||
|
||||
// 用户信号,重启服务
|
||||
// 用户信号,热重启服务
|
||||
case syscall.SIGUSR1:
|
||||
sendProcessMsg(gproc.Pid(), gMSG_RELOAD, nil)
|
||||
|
||||
// 用户信号,完整重启服务
|
||||
case syscall.SIGUSR2:
|
||||
sendProcessMsg(gproc.Pid(), gMSG_RESTART, nil)
|
||||
|
||||
default:
|
||||
|
||||
@ -18,6 +18,10 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
gGRACEFUL_SHUTDOWN_TIMEOUT = 10*time.Second // 优雅关闭链接时的超时时间
|
||||
)
|
||||
|
||||
// 优雅的Web Server对象封装
|
||||
type gracefulServer struct {
|
||||
fd uintptr
|
||||
@ -156,12 +160,10 @@ func (s *gracefulServer) getNetListener(addr string) (net.Listener, error) {
|
||||
|
||||
// 执行请求优雅关闭
|
||||
func (s *gracefulServer) shutdown() {
|
||||
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
|
||||
defer cancel()
|
||||
if err := s.httpServer.Shutdown(ctx); err != nil {
|
||||
if err := s.httpServer.Shutdown(context.Background()); err != nil {
|
||||
glog.Errorfln("%d: %s server [%s] shutdown error: %v", gproc.Pid(), s.getProto(), s.addr, err)
|
||||
} else {
|
||||
glog.Printfln("%d: %s server [%s] shutdown smoothly", gproc.Pid(), s.getProto(), s.addr)
|
||||
//glog.Printfln("%d: %s server [%s] shutdown smoothly", gproc.Pid(), s.getProto(), s.addr)
|
||||
s.shutdownChan <- true
|
||||
}
|
||||
}
|
||||
@ -171,7 +173,7 @@ func (s *gracefulServer) close() {
|
||||
if err := s.httpServer.Close(); err != nil {
|
||||
glog.Errorfln("%d: %s server [%s] closed error: %v", gproc.Pid(), s.getProto(), s.addr, err)
|
||||
} else {
|
||||
glog.Printfln("%d: %s server [%s] closed smoothly", gproc.Pid(), s.getProto(), s.addr)
|
||||
//glog.Printfln("%d: %s server [%s] closed smoothly", gproc.Pid(), s.getProto(), s.addr)
|
||||
s.shutdownChan <- true
|
||||
}
|
||||
}
|
||||
|
||||
@ -9,6 +9,7 @@ package gproc
|
||||
import (
|
||||
"os"
|
||||
"fmt"
|
||||
"time"
|
||||
"gitee.com/johng/gf/g/os/glog"
|
||||
"gitee.com/johng/gf/g/os/gfile"
|
||||
"gitee.com/johng/gf/g/os/gflock"
|
||||
@ -21,7 +22,9 @@ import (
|
||||
|
||||
const (
|
||||
// 由于子进程的temp dir有可能会和父进程不一致(特别是windows下),影响进程间通信,这里统一使用环境变量设置
|
||||
gPROC_TEMP_DIR_ENV_KEY = "gproc.tempdir"
|
||||
gPROC_TEMP_DIR_ENV_KEY = "gproc.tempdir"
|
||||
// 自动通信文件清理时间间隔
|
||||
gPROC_COMM_AUTO_CLEAR_INTERVAL = time.Second
|
||||
)
|
||||
|
||||
// 当前进程的文件锁
|
||||
@ -69,6 +72,23 @@ func init() {
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
}
|
||||
|
||||
go autoClearCommDir()
|
||||
}
|
||||
|
||||
// 自动清理通信目录文件
|
||||
// @todo 目前是以时间过期规则进行清理,后期可以考虑加入进程存在性判断
|
||||
func autoClearCommDir() {
|
||||
dirPath := getCommDirPath()
|
||||
for {
|
||||
time.Sleep(gPROC_COMM_AUTO_CLEAR_INTERVAL)
|
||||
for _, name := range gfile.ScanDir(dirPath) {
|
||||
path := dirPath + gfile.Separator + name
|
||||
if gtime.Second() - gfile.MTime(path) >= 10 {
|
||||
gfile.Remove(path)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 手动检查进程通信消息,如果存在消息曾推送到进程消息队列
|
||||
@ -97,6 +117,10 @@ func Receive() *Msg {
|
||||
// 向指定gproc进程发送数据
|
||||
// 数据格式:总长度(32bit) | PID(32bit) | 校验(32bit) | 参数(变长)
|
||||
func Send(pid int, data interface{}) error {
|
||||
// 首先检测进程存在不存在,存在才能发送消息
|
||||
if _, err := os.FindProcess(pid); err != nil {
|
||||
return err
|
||||
}
|
||||
buffer := gconv.Bytes(data)
|
||||
b := make([]byte, 0)
|
||||
b = append(b, gbinary.EncodeInt32(int32(len(buffer) + 12))...)
|
||||
@ -112,11 +136,16 @@ func Send(pid int, data interface{}) error {
|
||||
|
||||
// 获取指定进程的通信文件地址
|
||||
func getCommFilePath(pid int) string {
|
||||
return getCommDirPath() + gfile.Separator + gconv.String(pid)
|
||||
}
|
||||
|
||||
// 获取进程间通信目录地址
|
||||
func getCommDirPath() string {
|
||||
tempDir := os.Getenv("gproc.tempdir")
|
||||
if tempDir == "" {
|
||||
tempDir = gfile.TempDir()
|
||||
}
|
||||
return tempDir + gfile.Separator + "gproc" + gfile.Separator + gconv.String(pid)
|
||||
return tempDir + gfile.Separator + "gproc"
|
||||
}
|
||||
|
||||
// 数据解包,防止黏包
|
||||
|
||||
@ -6,7 +6,8 @@ import (
|
||||
"gitee.com/johng/gf/g/os/gtime"
|
||||
"gitee.com/johng/gf/g/encoding/gbinary"
|
||||
"gitee.com/johng/gf/g/os/gproc"
|
||||
"strings"
|
||||
"os"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
// 数据解包,防止黏包
|
||||
@ -44,7 +45,8 @@ func checksum(buffer []byte) uint32 {
|
||||
}
|
||||
|
||||
func main(){
|
||||
fmt.Println(len(strings.Split("", ",")))
|
||||
p, _ := os.FindProcess(10354)
|
||||
fmt.Println(p.Signal(syscall.Signal(1)))
|
||||
return
|
||||
b := gfile.GetBinContents("/tmp/gproc/30588")
|
||||
for _, msg := range bufferToMsgs(b) {
|
||||
|
||||
Reference in New Issue
Block a user