From c78fa84a8f1e59df53911825349ae4d309b87da8 Mon Sep 17 00:00:00 2001 From: John Date: Fri, 1 Jun 2018 18:19:54 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E6=88=90ghttp.Server=E5=B9=B3?= =?UTF-8?q?=E6=BB=91=E9=87=8D=E5=90=AF=E6=9C=BA=E5=88=B6=E6=94=B9=E8=BF=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- TODO | 2 +- g/net/ghttp/ghttp_server.go | 25 +++-- g/net/ghttp/ghttp_server_admin.go | 113 +++++++++++----------- g/net/ghttp/ghttp_server_admin_unix.go | 21 ++-- g/net/ghttp/ghttp_server_admin_windows.go | 2 + g/net/ghttp/ghttp_server_graceful.go | 14 ++- geg/os/gproc/gproc4.go | 17 ++++ geg/other/sleep/sleep.go | 16 +++ geg/other/test.go | 6 +- 9 files changed, 135 insertions(+), 81 deletions(-) create mode 100644 geg/os/gproc/gproc4.go create mode 100644 geg/other/sleep/sleep.go diff --git a/TODO b/TODO index 97370232c..c49661786 100644 --- a/TODO +++ b/TODO @@ -9,7 +9,7 @@ orm增加更多数据库支持; ghttp.Response增加输出内容后自动退出当前请求机制,不需要用户手动return,参考beego如何实现; 对grpool进行优化改进,包括属性原子操作封装采用gtype实现,修正设计BUG:https://github.com/johng-cn/gf/issues/6; 平滑重启机制改进,以便于开发阶段调试; - +gredis增加redis密码支持; DONE: 1. gconv完善针对不同类型的判断,例如:尽量减少sprintf("%v", xxx)来执行string类型的转换; diff --git a/g/net/ghttp/ghttp_server.go b/g/net/ghttp/ghttp_server.go index b85d7dfa3..9aaf931b2 100644 --- a/g/net/ghttp/ghttp_server.go +++ b/g/net/ghttp/ghttp_server.go @@ -117,6 +117,16 @@ var doneChan = make(chan struct{}, 100000) // Web Server进程初始化 func init() { + // 如果是完整重启,那么需要等待主进程销毁后,才开始执行监听,防止端口冲突 + if genv.Get(gADMIN_ACTION_RESTART_ENVKEY) != "" { + if p, e := os.FindProcess(gproc.PPid()); e == nil { + p.Kill() + p.Wait() + } else { + glog.Error(e) + } + } + // 信号量管理操作监听 go handleProcessSignal() } @@ -193,18 +203,17 @@ func (s *Server) Start() error { } // 启动http server + reloaded := false fdMapStr := genv.Get(gADMIN_ACTION_RELOAD_ENVKEY) if len(fdMapStr) > 0 { sfm := bufferToServerFdMap([]byte(fdMapStr)) - for k, v := range sfm { - GetServer(k).startServer(v) + if v, ok := sfm[s.name]; ok { + s.startServer(v) + reloaded = true } - } else { - serverMapping.RLockFunc(func(m map[string]interface{}) { - for _, v := range m { - v.(*Server).startServer(nil) - } - }) + } + if !reloaded { + s.startServer(nil) } // 开启异步关闭队列处理循环 diff --git a/g/net/ghttp/ghttp_server_admin.go b/g/net/ghttp/ghttp_server_admin.go index f97936902..a2d9dc26a 100644 --- a/g/net/ghttp/ghttp_server_admin.go +++ b/g/net/ghttp/ghttp_server_admin.go @@ -10,7 +10,6 @@ package ghttp import ( "strings" "gitee.com/johng/gf/g/os/gview" - "runtime" "gitee.com/johng/gf/g/os/gproc" "sync" "gitee.com/johng/gf/g/os/gtime" @@ -21,13 +20,14 @@ import ( "os" "gitee.com/johng/gf/g/encoding/gjson" "gitee.com/johng/gf/g/util/gconv" + "time" + "runtime" ) const ( - gADMIN_ACTION_INTERVAL_LIMIT = 3000 // (毫秒)服务开启后允许执行管理操作的间隔限制 - gADMIN_ACTION_RELOADING = 1 - gADMIN_ACTION_RESTARTING = 2 - gADMIN_ACTION_SHUTINGDOWN = 4 + gADMIN_ACTION_INTERVAL_LIMIT = 2000 // (毫秒)服务开启后允许执行管理操作的间隔限制 + gADMIN_ACTION_RESTARTING = 1 + gADMIN_ACTION_SHUTINGDOWN = 2 gADMIN_ACTION_RELOAD_ENVKEY = "gf.server.reload" gADMIN_ACTION_RESTART_ENVKEY = "gf.server.restart" ) @@ -55,7 +55,6 @@ func (p *utilAdmin) Index(r *Request) { gf ghttp admin -

reload

restart

shutdown

@@ -64,20 +63,7 @@ func (p *utilAdmin) Index(r *Request) { r.Response.Write(buffer) } -// 服务热重启 -func (p *utilAdmin) Reload(r *Request) { - if runtime.GOOS == "windows" { - p.Restart(r) - } else { - if err := r.Server.Reload(); err == nil { - r.Response.Write("server reloaded") - } else { - r.Response.Write(err.Error()) - } - } -} - -// 服务完整重启 +// 服务重启 func (p *utilAdmin) Restart(r *Request) { if err := r.Server.Restart(); err == nil { r.Response.Write("server restarted") @@ -106,25 +92,9 @@ func (s *Server) EnableAdmin(pattern...string) { s.BindObject(p, &utilAdmin{}) } -// 平滑重启Web Server -func (s *Server) Reload() error { - serverActionLocker.Lock() - defer serverActionLocker.Unlock() - if err := s.checkActionStatus(); err != nil { - return err - } - if err := s.checkActionFrequence(); err != nil { - return err - } - serverProcessStatus.Set(gADMIN_ACTION_RELOADING) - glog.Printfln("%d: server reloading", gproc.Pid()) - forkReloadProcess() - go shutdownWebServers() - doneChan <- struct{}{} - return nil -} - -// 完整重启Web Server +// 重启Web Server +// 针对*niux系统: 平滑重启 +// 针对windows : 完整重启 func (s *Server) Restart() error { serverActionLocker.Lock() defer serverActionLocker.Unlock() @@ -134,9 +104,7 @@ func (s *Server) Restart() error { if err := s.checkActionFrequence(); err != nil { return err } - serverProcessStatus.Set(gADMIN_ACTION_RESTARTING) - glog.Printfln("%d: server restarting", gproc.Pid()) - doneChan <- struct{}{} + restartWebServers() return nil } @@ -150,10 +118,7 @@ func (s *Server) Shutdown() error { if err := s.checkActionFrequence(); err != nil { return err } - serverProcessStatus.Set(gADMIN_ACTION_SHUTINGDOWN) - glog.Printfln("%d: server shutting down", gproc.Pid()) - go closeWebServers() - doneChan <- struct{}{} + shutdownWebServers() return nil } @@ -172,18 +137,14 @@ func (s *Server) checkActionStatus() error { status := serverProcessStatus.Val() if status > 0 { switch status { - case gADMIN_ACTION_RELOADING: - return errors.New("server is reloading") - case gADMIN_ACTION_RESTARTING: - return errors.New("server is restarting") - case gADMIN_ACTION_SHUTINGDOWN: - return errors.New("server is shutting down") + case gADMIN_ACTION_RESTARTING: return errors.New("server is restarting") + case gADMIN_ACTION_SHUTINGDOWN: return errors.New("server is shutting down") } } return nil } -// 创建一个子进程,通过环境变量传参 +// 平滑重启:创建一个子进程,通过环境变量传参 func forkReloadProcess() { p := procManager.NewProcess(os.Args[0], os.Args, os.Environ()) // 创建新的服务进程,子进程自动从父进程复制文件描述来监听同样的端口 @@ -208,12 +169,24 @@ func forkReloadProcess() { } } buffer, _ := gjson.Encode(sfm) - p.Env = append(p.Env, fmt.Sprintf("%s=%s", gADMIN_ACTION_RELOAD_ENVKEY, string(buffer))) + p.Env = append(p.Env, gADMIN_ACTION_RELOAD_ENVKEY + "=" + string(buffer)) if _, err := p.Start(); err != nil { glog.Errorfln("%d: fork process failed, error:%s, %s", gproc.Pid(), err.Error(), string(buffer)) } } +// 完整重启:创建一个新的子进程 +func forkRestartProcess() { + // 去掉平滑重启的环境变量参数 + os.Unsetenv(gADMIN_ACTION_RELOAD_ENVKEY) + env := os.Environ() + env = append(env, gADMIN_ACTION_RESTART_ENVKEY + "=1") + p := procManager.NewProcess(os.Args[0], os.Args, env) + if _, err := p.Start(); err != nil { + glog.Errorfln("%d: fork process failed, error:%s", gproc.Pid(), err.Error()) + } +} + // 获取所有Web Server的文件描述符map func getServerFdMap() map[string]listenerFdMap { sfm := make(map[string]listenerFdMap) @@ -241,9 +214,37 @@ func bufferToServerFdMap(buffer []byte) map[string]listenerFdMap { return sfm } +// Web Server重启 +func restartWebServers() { + serverProcessStatus.Set(gADMIN_ACTION_RESTARTING) + glog.Printfln("%d: server restarting", gproc.Pid()) + if runtime.GOOS == "windows" { + // 异步1秒后再执行重启,目的是让接口能够正确返回结果,否则接口会报错(因为web server关闭了) + gtime.SetTimeout(time.Second, func() { + forcedlyCloseWebServers() + forkRestartProcess() + }) + } else { + forkReloadProcess() + go gracefulShutdownWebServers() + doneChan <- struct{}{} + } +} + +// Web Server关闭服务 +func shutdownWebServers() { + serverProcessStatus.Set(gADMIN_ACTION_SHUTINGDOWN) + glog.Printfln("%d: server shutting down", gproc.Pid()) + // 异步1秒后再执行重启,目的是让接口能够正确返回结果,否则接口会报错(因为web server关闭了) + gtime.SetTimeout(time.Second, func() { + forcedlyCloseWebServers() + doneChan <- struct{}{} + }) +} + // 关优雅闭进程所有端口的Web Server服务 // 注意,只是关闭Web Server服务,并不是退出进程 -func shutdownWebServers() { +func gracefulShutdownWebServers() { serverMapping.RLockFunc(func(m map[string]interface{}) { for _, v := range m { for _, s := range v.(*Server).servers { @@ -255,7 +256,7 @@ func shutdownWebServers() { // 强制关闭进程所有端口的Web Server服务 // 注意,只是关闭Web Server服务,并不是退出进程 -func closeWebServers() { +func forcedlyCloseWebServers() { serverMapping.RLockFunc(func(m map[string]interface{}) { for _, v := range m { for _, s := range v.(*Server).servers { diff --git a/g/net/ghttp/ghttp_server_admin_unix.go b/g/net/ghttp/ghttp_server_admin_unix.go index 6c8902822..647374f73 100644 --- a/g/net/ghttp/ghttp_server_admin_unix.go +++ b/g/net/ghttp/ghttp_server_admin_unix.go @@ -33,20 +33,17 @@ func handleProcessSignal() { for { sig = <- procSignalChan switch sig { - // 进程终止,停止所有子进程运行 - case syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGTERM: + // 进程终止,停止所有子进程运行 + case syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGTERM: + shutdownWebServers() + return - return + // 用户信号,重启服务 + case syscall.SIGUSR1: + restartWebServers() + return - // 用户信号,热重启服务 - case syscall.SIGUSR1: - - - // 用户信号,完整重启服务 - case syscall.SIGUSR2: - - - default: + default: } } } \ No newline at end of file diff --git a/g/net/ghttp/ghttp_server_admin_windows.go b/g/net/ghttp/ghttp_server_admin_windows.go index 148375961..d763818c9 100644 --- a/g/net/ghttp/ghttp_server_admin_windows.go +++ b/g/net/ghttp/ghttp_server_admin_windows.go @@ -4,6 +4,8 @@ // If a copy of the MIT was not distributed with this file, // You can obtain one at https://gitee.com/johng/gf. +// +build windows + package ghttp // windows不处理信号量 diff --git a/g/net/ghttp/ghttp_server_graceful.go b/g/net/ghttp/ghttp_server_graceful.go index 13a17bdf2..f406e24fe 100644 --- a/g/net/ghttp/ghttp_server_graceful.go +++ b/g/net/ghttp/ghttp_server_graceful.go @@ -15,6 +15,7 @@ import ( "crypto/tls" "gitee.com/johng/gf/g/os/glog" "gitee.com/johng/gf/g/os/gproc" + "time" ) // 优雅的Web Server对象封装 @@ -141,9 +142,18 @@ func (s *gracefulServer) getNetListener(addr string) (net.Listener, error) { return nil, err } } else { - ln, err = net.Listen("tcp", addr) + // 如果监听失败,1秒后重试,最多重试3次 + for i := 0; i < 3; i++ { + ln, err = net.Listen("tcp", addr) + if err != nil { + err = fmt.Errorf("%d: net.Listen error: %v", gproc.Pid(), err) + time.Sleep(time.Second) + } else { + err = nil + break + } + } if err != nil { - err = fmt.Errorf("%d: net.Listen error: %v", gproc.Pid(), err) return nil, err } } diff --git a/geg/os/gproc/gproc4.go b/geg/os/gproc/gproc4.go new file mode 100644 index 000000000..db5eddf12 --- /dev/null +++ b/geg/os/gproc/gproc4.go @@ -0,0 +1,17 @@ +package main + +import ( + "os" + "time" + "gitee.com/johng/gf/g/os/glog" + "gitee.com/johng/gf/g/os/genv" + "gitee.com/johng/gf/g/os/gproc" +) + +// 查看进程的环境变量 +func main () { + time.Sleep(5*time.Second) + glog.Printfln("%d: %v", gproc.Pid(), genv.All()) + p := gproc.NewProcess(os.Args[0], os.Args, os.Environ()) + p.Start() +} diff --git a/geg/other/sleep/sleep.go b/geg/other/sleep/sleep.go new file mode 100644 index 000000000..ea4573aa3 --- /dev/null +++ b/geg/other/sleep/sleep.go @@ -0,0 +1,16 @@ +package sleep + +import ( + "time" + "gitee.com/johng/gf/g/os/glog" +) + +func init () { + glog.Println("sleep package importing") + time.Sleep(3*time.Second) + glog.Println("sleep package imported") +} + +func Test() { + glog.Println("Test") +} diff --git a/geg/other/test.go b/geg/other/test.go index 6e4f24752..19cbc6b79 100644 --- a/geg/other/test.go +++ b/geg/other/test.go @@ -1,7 +1,9 @@ package main -import "gitee.com/johng/gf/g/os/glog" +import ( + "gitee.com/johng/gf/geg/other/sleep" +) func main() { - glog.Error(1) + sleep.Test() } \ No newline at end of file