From dabc7891134db729c5f557fd93ea533b6132f6e0 Mon Sep 17 00:00:00 2001 From: John Date: Fri, 11 May 2018 18:34:17 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E6=88=90ghttp.Server=E7=83=AD?= =?UTF-8?q?=E9=87=8D=E5=90=AF=E7=89=B9=E6=80=A7=E5=88=9D=E6=AD=A5=E5=BC=80?= =?UTF-8?q?=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- g/net/ghttp/ghttp_server.go | 22 ++----- g/net/ghttp/ghttp_server_comm.go | 91 ++++++++++++++++++++++------ g/net/ghttp/ghttp_server_graceful.go | 15 ++++- g/os/gproc/gproc_comm.go | 2 +- g/os/gproc/gproc_manager.go | 30 ++++++--- g/os/gproc/gproc_proccess.go | 45 +++++++++++--- geg/net/ghttp/hot_restart.go | 12 ++-- 7 files changed, 155 insertions(+), 62 deletions(-) diff --git a/g/net/ghttp/ghttp_server.go b/g/net/ghttp/ghttp_server.go index ee17765b1..d9daca7db 100644 --- a/g/net/ghttp/ghttp_server.go +++ b/g/net/ghttp/ghttp_server.go @@ -125,6 +125,10 @@ func init() { handleProcessMsg() // 服务执行完成,需要退出 doneChan <- struct{}{} + + if !gproc.IsChild() { + glog.Printfln("all web server shutdown smoothly") + } }() } @@ -226,8 +230,6 @@ func (s *Server) Run() error { // 阻塞等待服务执行完成 <- doneChan - - glog.Printfln("web server pid %d exit successfully", gproc.Pid()) return nil } @@ -262,7 +264,6 @@ func (s *Server) startServer(fdMap listenerFdMap) { server = s.newGracefulServer(item) } s.servers = append(s.servers, server) - glog.Printfln("https server started listening on %s", server.addr) if err := server.ListenAndServeTLS(s.config.HTTPSCertPath, s.config.HTTPSKeyPath); err != nil { // 如果非关闭错误,那么提示报错,否则认为是正常的服务关闭操作 if !strings.EqualFold(http.ErrServerClosed.Error(), err.Error()) { @@ -293,7 +294,6 @@ func (s *Server) startServer(fdMap listenerFdMap) { server = s.newGracefulServer(item) } s.servers = append(s.servers, server) - glog.Printfln("http server started listening on %s", server.addr) if err := server.ListenAndServe(); err != nil { // 如果非关闭错误,那么提示报错,否则认为是正常的服务关闭操作 if !strings.EqualFold(http.ErrServerClosed.Error(), err.Error()) { @@ -308,24 +308,12 @@ func (s *Server) startServer(fdMap listenerFdMap) { // 重启Web Server func (s *Server) Restart() { - // 如果是主进程,那么向所有子进程发送重启信号 - if !gproc.IsChild() { - procManager.Send(formatMsgBuffer(gMSG_RESTART, nil)) - return - } sendProcessMsg(gproc.Pid(), gMSG_RESTART, nil) } // 关闭Web Server func (s *Server) Shutdown() { - // 如果是主进程,那么向所有子进程发送关闭信号 - if !gproc.IsChild() { - procManager.Send(formatMsgBuffer(gMSG_SHUTDOWN, nil)) - return - } - for _, v := range s.servers { - v.shutdown() - } + sendProcessMsg(gproc.Pid(), gMSG_SHUTDOWN, nil) } // 获取当前监听的文件描述符信息,构造成map返回 diff --git a/g/net/ghttp/ghttp_server_comm.go b/g/net/ghttp/ghttp_server_comm.go index 975d8abda..bc0e4ebda 100644 --- a/g/net/ghttp/ghttp_server_comm.go +++ b/g/net/ghttp/ghttp_server_comm.go @@ -9,28 +9,34 @@ package ghttp import ( "os" - "gitee.com/johng/gf/g/os/gproc" - "gitee.com/johng/gf/g/encoding/gbinary" - "strings" - "gitee.com/johng/gf/g/util/gconv" "fmt" + "strings" + "syscall" + "os/signal" + "gitee.com/johng/gf/g/os/gproc" + "gitee.com/johng/gf/g/util/gconv" "gitee.com/johng/gf/g/encoding/gjson" + "gitee.com/johng/gf/g/encoding/gbinary" ) const ( - gMSG_START = 10 - gMSG_RESTART = 20 - gMSG_CORESTART = 30 - gMSG_SHUTDOWN = 40 - gMSG_NEW_FORK = 50 + gMSG_START = 10 + gMSG_RESTART = 20 + gMSG_SHUTDOWN = 30 + gMSG_NEW_FORK = 40 + gMSG_REMOVE_PROC = 50 ) +// 进程信号量监听消息队列 +var procSignalChan = make(chan os.Signal) + // 处理进程间消息 // 数据格式: 操作(8bit) | 参数(变长) func handleProcessMsg() { + go handleProcessSignals() for { if msg := gproc.Receive(); msg != nil { - fmt.Println(gproc.Pid(), gproc.IsChild(), msg) + //fmt.Println(gproc.Pid(), gproc.IsChild(), msg) act := gbinary.DecodeToUint(msg.Data[0 : 1]) data := msg.Data[1 : ] if gproc.IsChild() { @@ -64,7 +70,7 @@ func handleProcessMsg() { array := strings.Split(item, "#") fd := uintptr(gconv.Uint(array[1])) s += fmt.Sprintf("%s#%d", array[0], len(p.GetAttr().Files)) - p.GetAttr().Files = append(p.GetAttr().Files, fd) + p.GetAttr().Files = append(p.GetAttr().Files, os.NewFile(fd, "")) } sfm[name][fdk] = strings.TrimRight(s, ",") } @@ -72,19 +78,21 @@ func handleProcessMsg() { } p.SetPpid(gproc.Ppid()) p.Run() - fmt.Println(procManager) b, _ := gjson.Encode(sfm) sendProcessMsg(p.Pid(), gMSG_START, b) + sendProcessMsg(gproc.Ppid(), gMSG_NEW_FORK, gbinary.EncodeInt(p.Pid())) sendProcessMsg(gproc.Pid(), gMSG_SHUTDOWN, nil) - sendProcessMsg(gproc.Ppid(), gMSG_NEW_FORK, gconv.Bytes(p.Pid())) // 友好关闭服务链接并退出 case gMSG_SHUTDOWN: serverMapping.RLockFunc(func(m map[string]interface{}) { for _, v := range m { - v.(*Server).Shutdown() + for _, s := range v.(*Server).servers { + s.shutdown() + } } }) + sendProcessMsg(gproc.Ppid(), gMSG_REMOVE_PROC, gbinary.EncodeInt(gproc.Pid())) return } @@ -102,22 +110,67 @@ func handleProcessMsg() { // 向所有子进程发送重启命令,子进程将会搜集Web Server信息发送给父进程进行协调重启工作 procManager.Send(formatMsgBuffer(gMSG_RESTART, nil)) - // 协调重启服务 + // 新建子进程通知 case gMSG_NEW_FORK: - //sendProcessMsg(p.Pid(), gMSG_START, data) - // 关闭旧的服务进程 - //sendProcessMsg(msg.Pid, gMSG_SHUTDOWN, nil) + pid := gbinary.DecodeToInt(data) + procManager.AddProcess(pid) + + // 销毁子进程通知 + case gMSG_REMOVE_PROC: + pid := gbinary.DecodeToInt(data) + procManager.RemoveProcess(pid) + // 如果所有子进程都退出,那么主进程也主动退出 + if procManager.Size() == 0 { + return + } // 关闭服务 case gMSG_SHUTDOWN: procManager.Send(formatMsgBuffer(gMSG_SHUTDOWN, nil)) - + return } } } } } +// 信号量处理 +func handleProcessSignals() { + var sig os.Signal + signal.Notify( + procSignalChan, + syscall.SIGINT, + syscall.SIGQUIT, + syscall.SIGKILL, + syscall.SIGHUP, + syscall.SIGTERM, + syscall.SIGUSR1, + ) + for { + sig = <- procSignalChan + switch sig { + // 进程终止,停止所有子进程运行 + case syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGTERM: + sendProcessMsg(gproc.Pid(), gMSG_SHUTDOWN, nil) + return + + // 用户信号,重启服务 + case syscall.SIGUSR1: + sendProcessMsg(gproc.Pid(), gMSG_RESTART, nil) + + default: + } + } +} + +func onMainShutDown() { + procManager.Send(formatMsgBuffer(gMSG_SHUTDOWN, nil)) +} + +func onMainRemoveProc() { + procManager.Send(formatMsgBuffer(gMSG_SHUTDOWN, nil)) +} + // 向进程发送操作消息 func sendProcessMsg(pid int, act int, data []byte) { gproc.Send(pid, formatMsgBuffer(act, data)) diff --git a/g/net/ghttp/ghttp_server_graceful.go b/g/net/ghttp/ghttp_server_graceful.go index 29688a5cc..b4d543ae3 100644 --- a/g/net/ghttp/ghttp_server_graceful.go +++ b/g/net/ghttp/ghttp_server_graceful.go @@ -14,6 +14,7 @@ import ( "net/http" "crypto/tls" "gitee.com/johng/gf/g/os/glog" + "gitee.com/johng/gf/g/os/gproc" ) // 优雅的Web Server对象封装 @@ -92,8 +93,18 @@ func (s *gracefulServer) ListenAndServeTLS(certFile, keyFile string) error { return s.doServe() } +// 获取服务协议字符串 +func (s *gracefulServer) getProto() string { + proto := "http" + if s.isHttps { + proto = "https" + } + return proto +} + // 开始执行Web Server服务处理 func (s *gracefulServer) doServe() error { + glog.Printfln("%d: %s server started listening on [%s]", gproc.Pid(), s.getProto(), s.addr) err := s.httpServer.Serve(s.listener) <-s.shutdownChan return err @@ -123,9 +134,9 @@ func (s *gracefulServer) getNetListener(addr string) (net.Listener, error) { // 执行请求优雅关闭 func (s *gracefulServer) shutdown() { if err := s.httpServer.Shutdown(context.Background()); err != nil { - glog.Errorf("server %s shutdown error: %v\n", s.addr, err) + glog.Errorfln("%d: %s server [%s] shutdown error: %v", gproc.Pid(), s.getProto(), s.addr, err) } else { - glog.Printf("server %s shutdown successfully\n", s.addr) + glog.Printfln("%d: %s server [%s] shutdown smoothly", gproc.Pid(), s.getProto(), s.addr) s.shutdownChan <- true } } diff --git a/g/os/gproc/gproc_comm.go b/g/os/gproc/gproc_comm.go index de000c747..6f1632851 100644 --- a/g/os/gproc/gproc_comm.go +++ b/g/os/gproc/gproc_comm.go @@ -20,7 +20,7 @@ import ( ) const ( - // 由于子进程的temp dir有可能会和父进程不一致,影响进程间通信,这里统一使用环境变量设置 + // 由于子进程的temp dir有可能会和父进程不一致(特别是windows下),影响进程间通信,这里统一使用环境变量设置 gPROC_TEMP_DIR_ENV_KEY = "gproc.tempdir" ) diff --git a/g/os/gproc/gproc_manager.go b/g/os/gproc/gproc_manager.go index dc2bb65c7..de6ad1751 100644 --- a/g/os/gproc/gproc_manager.go +++ b/g/os/gproc/gproc_manager.go @@ -9,9 +9,13 @@ package gproc import ( "os" - "gitee.com/johng/gf/g/container/gmap" "fmt" - "syscall" + "gitee.com/johng/gf/g/container/gmap" + "strings" +) + +const ( + gCHILD_ARGS_MARK_NAME = "--gproc-child" ) // 进程管理器 @@ -37,14 +41,20 @@ func NewProcess(path string, args []string, environment []string) *Process { path : path, args : make([]string, 0), ppid : os.Getpid(), - attr : &syscall.ProcAttr { - Files: []uintptr{os.Stdin.Fd(), os.Stdout.Fd(), os.Stderr.Fd()}, + attr : &os.ProcAttr { + Env : env, + Files : []*os.File{ os.Stdin,os.Stdout,os.Stderr }, }, } - p.args = append(p.args, args[0]) - p.args = append(p.args, "--gproc-child") + if len(args) > 0 { + p.args = append(p.args, args[0]) + } + // 判断是否加上子进程标识 + if len(args) == 1 || (len(args) > 1 && !strings.EqualFold(args[1], gCHILD_ARGS_MARK_NAME)) { + p.args = append(p.args, gCHILD_ARGS_MARK_NAME) + } if len(args) > 1 { - p.args = append(p.args, args[1:]...) + p.args = append(p.args, args[1 : ]...) } return p } @@ -69,6 +79,7 @@ func (m *Manager) AddProcess(pid int) { if process, err := os.FindProcess(pid); err == nil { p := m.NewProcess("", nil, nil) p.process = process + m.processes.Set(pid, p) } } @@ -133,6 +144,11 @@ func (m *Manager) SendTo(pid int, data interface{}) error { return Send(pid, data) } +// 清空管理器 +func (m *Manager) Clear() { + m.processes.Clear() +} + // 当前进程总数 func (m *Manager) Size() int { return m.processes.Size() diff --git a/g/os/gproc/gproc_proccess.go b/g/os/gproc/gproc_proccess.go index b52459c55..1fa3e6de4 100644 --- a/g/os/gproc/gproc_proccess.go +++ b/g/os/gproc/gproc_proccess.go @@ -8,9 +8,8 @@ package gproc import ( "os" - "errors" "fmt" - "syscall" + "errors" ) // 子进程 @@ -18,8 +17,7 @@ type Process struct { pm *Manager // 所属进程管理器 path string // 可执行文件绝对路径 args []string // 执行参数 - //attr *os.ProcAttr // 进程属性 - attr *syscall.ProcAttr // 进程属性 + attr *os.ProcAttr // 进程属性 ppid int // 自定义关联的父进程ID process *os.Process // 底层进程对象 } @@ -30,17 +28,44 @@ func (p *Process) Run() (int, error) { return p.Pid(), nil } p.attr.Env = append(p.attr.Env, fmt.Sprintf("%s=%d", gPROC_ENV_KEY_PPID_KEY, p.ppid)) - if pid, err := syscall.ForkExec(p.path, p.args, p.attr); err == nil { - p.process, _ = os.FindProcess(pid) + if process, err := os.StartProcess(p.path, p.args, p.attr); err == nil { + p.process = process if p.pm != nil { - p.pm.processes.Set(pid, p) + p.pm.processes.Set(process.Pid, p) } - return pid, nil + return process.Pid, nil } else { return 0, err } } +// 运行进程,守护进程,主进程退出后不退出 +//func (p *Process) RunDaemon() (int, error) { +// if p.process != nil { +// return p.Pid(), nil +// } +// p.attr.Env = append(p.attr.Env, fmt.Sprintf("%s=%d", gPROC_ENV_KEY_PPID_KEY, p.ppid)) +// procAttr := &syscall.ProcAttr { +// Dir : p.attr.Dir, +// Env : p.attr.Env, +// Sys : p.attr.Sys, +// Files : make([]uintptr, 0), +// } +// // 将os.ProcAttr.Files转换为底层的syscall.ProcAttr.Files +// for _, f := range p.attr.Files { +// procAttr.Files = append(procAttr.Files, f.Fd()) +// } +// if pid, err := syscall.ForkExec(p.path, p.args, procAttr); err == nil { +// p.process, _ = os.FindProcess(pid) +// if p.pm != nil { +// p.pm.processes.Set(pid, p) +// } +// return pid, nil +// } else { +// return 0, err +// } +//} + func (p *Process) SetManager(m *Manager) { p.pm = m } @@ -70,11 +95,11 @@ func (p *Process) AddEnv(env []string) { } } -func (p *Process) SetAttr(attr *syscall.ProcAttr) { +func (p *Process) SetAttr(attr *os.ProcAttr) { p.attr = attr } -func (p *Process) GetAttr() *syscall.ProcAttr { +func (p *Process) GetAttr() *os.ProcAttr { return p.attr } diff --git a/geg/net/ghttp/hot_restart.go b/geg/net/ghttp/hot_restart.go index 4869e139c..c227c05ec 100644 --- a/geg/net/ghttp/hot_restart.go +++ b/geg/net/ghttp/hot_restart.go @@ -3,8 +3,6 @@ package main import ( "gitee.com/johng/gf/g" "gitee.com/johng/gf/g/net/ghttp" - "gitee.com/johng/gf/g/os/gtime" - "time" ) func main() { @@ -13,10 +11,12 @@ func main() { r.Response.Writeln("hello") }) s.BindHandler("/restart", func(r *ghttp.Request){ - r.Response.Writeln("restart server in 2 seconds") - gtime.SetTimeout(2*time.Second, func() { - r.Server.Restart() - }) + r.Response.Writeln("restart server") + r.Server.Restart() + }) + s.BindHandler("/shutdown", func(r *ghttp.Request){ + r.Response.Writeln("shutdown server") + r.Server.Shutdown() }) s.SetPort(8199) s.Run()