完成ghttp.Server平滑重启机制改进

This commit is contained in:
John
2018-06-01 18:19:54 +08:00
parent 0eafa1edc6
commit c78fa84a8f
9 changed files with 135 additions and 81 deletions

2
TODO
View File

@ -9,7 +9,7 @@ orm增加更多数据库支持
ghttp.Response增加输出内容后自动退出当前请求机制不需要用户手动return参考beego如何实现
对grpool进行优化改进包括属性原子操作封装采用gtype实现修正设计BUGhttps://github.com/johng-cn/gf/issues/6
平滑重启机制改进,以便于开发阶段调试;
gredis增加redis密码支持
DONE:
1. gconv完善针对不同类型的判断例如尽量减少sprintf("%v", xxx)来执行string类型的转换

View File

@ -117,6 +117,16 @@ var doneChan = make(chan struct{}, 100000)
// Web Server进程初始化
func init() {
// 如果是完整重启,那么需要等待主进程销毁后,才开始执行监听,防止端口冲突
if genv.Get(gADMIN_ACTION_RESTART_ENVKEY) != "" {
if p, e := os.FindProcess(gproc.PPid()); e == nil {
p.Kill()
p.Wait()
} else {
glog.Error(e)
}
}
// 信号量管理操作监听
go handleProcessSignal()
}
@ -193,18 +203,17 @@ func (s *Server) Start() error {
}
// 启动http server
reloaded := false
fdMapStr := genv.Get(gADMIN_ACTION_RELOAD_ENVKEY)
if len(fdMapStr) > 0 {
sfm := bufferToServerFdMap([]byte(fdMapStr))
for k, v := range sfm {
GetServer(k).startServer(v)
if v, ok := sfm[s.name]; ok {
s.startServer(v)
reloaded = true
}
} else {
serverMapping.RLockFunc(func(m map[string]interface{}) {
for _, v := range m {
v.(*Server).startServer(nil)
}
})
}
if !reloaded {
s.startServer(nil)
}
// 开启异步关闭队列处理循环

View File

@ -10,7 +10,6 @@ package ghttp
import (
"strings"
"gitee.com/johng/gf/g/os/gview"
"runtime"
"gitee.com/johng/gf/g/os/gproc"
"sync"
"gitee.com/johng/gf/g/os/gtime"
@ -21,13 +20,14 @@ import (
"os"
"gitee.com/johng/gf/g/encoding/gjson"
"gitee.com/johng/gf/g/util/gconv"
"time"
"runtime"
)
const (
gADMIN_ACTION_INTERVAL_LIMIT = 3000 // (毫秒)服务开启后允许执行管理操作的间隔限制
gADMIN_ACTION_RELOADING = 1
gADMIN_ACTION_RESTARTING = 2
gADMIN_ACTION_SHUTINGDOWN = 4
gADMIN_ACTION_INTERVAL_LIMIT = 2000 // (毫秒)服务开启后允许执行管理操作的间隔限制
gADMIN_ACTION_RESTARTING = 1
gADMIN_ACTION_SHUTINGDOWN = 2
gADMIN_ACTION_RELOAD_ENVKEY = "gf.server.reload"
gADMIN_ACTION_RESTART_ENVKEY = "gf.server.restart"
)
@ -55,7 +55,6 @@ func (p *utilAdmin) Index(r *Request) {
<title>gf ghttp admin</title>
</head>
<body>
<p><a href="{{$.uri}}/reload">reload</a></p>
<p><a href="{{$.uri}}/restart">restart</a></p>
<p><a href="{{$.uri}}/shutdown">shutdown</a></p>
</body>
@ -64,20 +63,7 @@ func (p *utilAdmin) Index(r *Request) {
r.Response.Write(buffer)
}
// 服务重启
func (p *utilAdmin) Reload(r *Request) {
if runtime.GOOS == "windows" {
p.Restart(r)
} else {
if err := r.Server.Reload(); err == nil {
r.Response.Write("server reloaded")
} else {
r.Response.Write(err.Error())
}
}
}
// 服务完整重启
// 服务重启
func (p *utilAdmin) Restart(r *Request) {
if err := r.Server.Restart(); err == nil {
r.Response.Write("server restarted")
@ -106,25 +92,9 @@ func (s *Server) EnableAdmin(pattern...string) {
s.BindObject(p, &utilAdmin{})
}
// 平滑重启Web Server
func (s *Server) Reload() error {
serverActionLocker.Lock()
defer serverActionLocker.Unlock()
if err := s.checkActionStatus(); err != nil {
return err
}
if err := s.checkActionFrequence(); err != nil {
return err
}
serverProcessStatus.Set(gADMIN_ACTION_RELOADING)
glog.Printfln("%d: server reloading", gproc.Pid())
forkReloadProcess()
go shutdownWebServers()
doneChan <- struct{}{}
return nil
}
// 完整重启Web Server
// 重启Web Server
// 针对*niux系统: 平滑重启
// 针对windows : 完整重启
func (s *Server) Restart() error {
serverActionLocker.Lock()
defer serverActionLocker.Unlock()
@ -134,9 +104,7 @@ func (s *Server) Restart() error {
if err := s.checkActionFrequence(); err != nil {
return err
}
serverProcessStatus.Set(gADMIN_ACTION_RESTARTING)
glog.Printfln("%d: server restarting", gproc.Pid())
doneChan <- struct{}{}
restartWebServers()
return nil
}
@ -150,10 +118,7 @@ func (s *Server) Shutdown() error {
if err := s.checkActionFrequence(); err != nil {
return err
}
serverProcessStatus.Set(gADMIN_ACTION_SHUTINGDOWN)
glog.Printfln("%d: server shutting down", gproc.Pid())
go closeWebServers()
doneChan <- struct{}{}
shutdownWebServers()
return nil
}
@ -172,18 +137,14 @@ func (s *Server) checkActionStatus() error {
status := serverProcessStatus.Val()
if status > 0 {
switch status {
case gADMIN_ACTION_RELOADING:
return errors.New("server is reloading")
case gADMIN_ACTION_RESTARTING:
return errors.New("server is restarting")
case gADMIN_ACTION_SHUTINGDOWN:
return errors.New("server is shutting down")
case gADMIN_ACTION_RESTARTING: return errors.New("server is restarting")
case gADMIN_ACTION_SHUTINGDOWN: return errors.New("server is shutting down")
}
}
return nil
}
// 创建一个子进程,通过环境变量传参
// 平滑重启:创建一个子进程,通过环境变量传参
func forkReloadProcess() {
p := procManager.NewProcess(os.Args[0], os.Args, os.Environ())
// 创建新的服务进程,子进程自动从父进程复制文件描述来监听同样的端口
@ -208,12 +169,24 @@ func forkReloadProcess() {
}
}
buffer, _ := gjson.Encode(sfm)
p.Env = append(p.Env, fmt.Sprintf("%s=%s", gADMIN_ACTION_RELOAD_ENVKEY, string(buffer)))
p.Env = append(p.Env, gADMIN_ACTION_RELOAD_ENVKEY + "=" + string(buffer))
if _, err := p.Start(); err != nil {
glog.Errorfln("%d: fork process failed, error:%s, %s", gproc.Pid(), err.Error(), string(buffer))
}
}
// 完整重启:创建一个新的子进程
func forkRestartProcess() {
// 去掉平滑重启的环境变量参数
os.Unsetenv(gADMIN_ACTION_RELOAD_ENVKEY)
env := os.Environ()
env = append(env, gADMIN_ACTION_RESTART_ENVKEY + "=1")
p := procManager.NewProcess(os.Args[0], os.Args, env)
if _, err := p.Start(); err != nil {
glog.Errorfln("%d: fork process failed, error:%s", gproc.Pid(), err.Error())
}
}
// 获取所有Web Server的文件描述符map
func getServerFdMap() map[string]listenerFdMap {
sfm := make(map[string]listenerFdMap)
@ -241,9 +214,37 @@ func bufferToServerFdMap(buffer []byte) map[string]listenerFdMap {
return sfm
}
// Web Server重启
func restartWebServers() {
serverProcessStatus.Set(gADMIN_ACTION_RESTARTING)
glog.Printfln("%d: server restarting", gproc.Pid())
if runtime.GOOS == "windows" {
// 异步1秒后再执行重启目的是让接口能够正确返回结果否则接口会报错(因为web server关闭了)
gtime.SetTimeout(time.Second, func() {
forcedlyCloseWebServers()
forkRestartProcess()
})
} else {
forkReloadProcess()
go gracefulShutdownWebServers()
doneChan <- struct{}{}
}
}
// Web Server关闭服务
func shutdownWebServers() {
serverProcessStatus.Set(gADMIN_ACTION_SHUTINGDOWN)
glog.Printfln("%d: server shutting down", gproc.Pid())
// 异步1秒后再执行重启目的是让接口能够正确返回结果否则接口会报错(因为web server关闭了)
gtime.SetTimeout(time.Second, func() {
forcedlyCloseWebServers()
doneChan <- struct{}{}
})
}
// 关优雅闭进程所有端口的Web Server服务
// 注意只是关闭Web Server服务并不是退出进程
func shutdownWebServers() {
func gracefulShutdownWebServers() {
serverMapping.RLockFunc(func(m map[string]interface{}) {
for _, v := range m {
for _, s := range v.(*Server).servers {
@ -255,7 +256,7 @@ func shutdownWebServers() {
// 强制关闭进程所有端口的Web Server服务
// 注意只是关闭Web Server服务并不是退出进程
func closeWebServers() {
func forcedlyCloseWebServers() {
serverMapping.RLockFunc(func(m map[string]interface{}) {
for _, v := range m {
for _, s := range v.(*Server).servers {

View File

@ -33,20 +33,17 @@ func handleProcessSignal() {
for {
sig = <- procSignalChan
switch sig {
// 进程终止,停止所有子进程运行
case syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGTERM:
// 进程终止,停止所有子进程运行
case syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGTERM:
shutdownWebServers()
return
return
// 用户信号,重启服务
case syscall.SIGUSR1:
restartWebServers()
return
// 用户信号,热重启服务
case syscall.SIGUSR1:
// 用户信号,完整重启服务
case syscall.SIGUSR2:
default:
default:
}
}
}

View File

@ -4,6 +4,8 @@
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// +build windows
package ghttp
// windows不处理信号量

View File

@ -15,6 +15,7 @@ import (
"crypto/tls"
"gitee.com/johng/gf/g/os/glog"
"gitee.com/johng/gf/g/os/gproc"
"time"
)
// 优雅的Web Server对象封装
@ -141,9 +142,18 @@ func (s *gracefulServer) getNetListener(addr string) (net.Listener, error) {
return nil, err
}
} else {
ln, err = net.Listen("tcp", addr)
// 如果监听失败1秒后重试最多重试3次
for i := 0; i < 3; i++ {
ln, err = net.Listen("tcp", addr)
if err != nil {
err = fmt.Errorf("%d: net.Listen error: %v", gproc.Pid(), err)
time.Sleep(time.Second)
} else {
err = nil
break
}
}
if err != nil {
err = fmt.Errorf("%d: net.Listen error: %v", gproc.Pid(), err)
return nil, err
}
}

17
geg/os/gproc/gproc4.go Normal file
View File

@ -0,0 +1,17 @@
package main
import (
"os"
"time"
"gitee.com/johng/gf/g/os/glog"
"gitee.com/johng/gf/g/os/genv"
"gitee.com/johng/gf/g/os/gproc"
)
// 查看进程的环境变量
func main () {
time.Sleep(5*time.Second)
glog.Printfln("%d: %v", gproc.Pid(), genv.All())
p := gproc.NewProcess(os.Args[0], os.Args, os.Environ())
p.Start()
}

16
geg/other/sleep/sleep.go Normal file
View File

@ -0,0 +1,16 @@
package sleep
import (
"time"
"gitee.com/johng/gf/g/os/glog"
)
func init () {
glog.Println("sleep package importing")
time.Sleep(3*time.Second)
glog.Println("sleep package imported")
}
func Test() {
glog.Println("Test")
}

View File

@ -1,7 +1,9 @@
package main
import "gitee.com/johng/gf/g/os/glog"
import (
"gitee.com/johng/gf/geg/other/sleep"
)
func main() {
glog.Error(1)
sleep.Test()
}