From 7ebf6825b6d7c271891f86f2f441375b7f11b91f Mon Sep 17 00:00:00 2001 From: John Date: Tue, 8 May 2018 18:41:29 +0800 Subject: [PATCH] =?UTF-8?q?=E7=83=AD=E9=87=8D=E5=90=AF=E7=89=B9=E6=80=A7?= =?UTF-8?q?=E5=BC=80=E5=8F=91=E4=B8=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- g/net/ghttp/ghttp_server.go | 110 +++++++++++++++++++++++---- g/net/ghttp/ghttp_server_graceful.go | 103 ++++--------------------- g/os/gpm/gpm.go | 102 +++++++++++++++++++++++++ g/os/gpm/gpm_proccess.go | 90 ++++++++++++++++++++++ geg/other/test.go | 7 +- geg/other/test2.go | 24 ++++-- 6 files changed, 323 insertions(+), 113 deletions(-) create mode 100644 g/os/gpm/gpm.go create mode 100644 g/os/gpm/gpm_proccess.go diff --git a/g/net/ghttp/ghttp_server.go b/g/net/ghttp/ghttp_server.go index d8e6fcb8e..113808e96 100644 --- a/g/net/ghttp/ghttp_server.go +++ b/g/net/ghttp/ghttp_server.go @@ -19,6 +19,9 @@ import ( "gitee.com/johng/gf/g/container/gmap" "gitee.com/johng/gf/g/container/gtype" "gitee.com/johng/gf/g/container/gqueue" + "fmt" + "gitee.com/johng/gf/g/os/gpm" + "net" ) const ( @@ -26,10 +29,12 @@ const ( gDEFAULT_SERVER = "default" gDEFAULT_DOMAIN = "default" gDEFAULT_METHOD = "ALL" - gDEFAULT_COOKIE_PATH = "/" // 默认path - gDEFAULT_COOKIE_MAX_AGE = 86400*365 // 默认cookie有效期(一年) - gDEFAULT_SESSION_MAX_AGE = 600 // 默认session有效期(600秒) - gDEFAULT_SESSION_ID_NAME = "gfsessionid" // 默认存放Cookie中的SessionId名称 + gDEFAULT_COOKIE_PATH = "/" // 默认path + gDEFAULT_COOKIE_MAX_AGE = 86400*365 // 默认cookie有效期(一年) + gDEFAULT_SESSION_MAX_AGE = 600 // 默认session有效期(600秒) + gDEFAULT_SESSION_ID_NAME = "gfsessionid" // 默认存放Cookie中的SessionId名称 + gCHILD_ENVIRON_KEY = "GF_WEB_SERVER_CHILD" // 用以子父进程识别,环境变量名称 + gCHILD_ENVIRON_STRING = gCHILD_ENVIRON_KEY + "=1" // 用以子父进程识别,环境变量键值设置 ) // ghttp.Server结构体 @@ -38,7 +43,8 @@ type Server struct { name string // 服务名称,方便识别 config ServerConfig // 配置对象 status int8 // 当前服务器状态(0:未启动,1:运行中) - servers []*http.Server // 底层http.Server列表 + servers []*gracefulServer // 底层http.Server列表 + pmanager *gpm.Manager // 进程管理器,用于管理子进程服务 methodsMap map[string]bool // 所有支持的HTTP Method(初始化时自动填充) servedCount *gtype.Int // 已经服务的请求数(4-8字节,不考虑溢出情况),同时作为请求ID closeQueue *gqueue.Queue // 请求结束的关闭队列(存放的是需要异步关闭处理的*Request对象) @@ -110,7 +116,8 @@ func GetServer(name...interface{}) (*Server) { } s := &Server { name : sname, - servers : make([]*http.Server, 0), + servers : make([]*gracefulServer, 0), + pmanager : gpm.New(), methodsMap : make(map[string]bool), handlerMap : make(HandlerMap), statusHandlerMap : make(map[string]HandlerFunc), @@ -161,7 +168,9 @@ func (s *Server) Run() error { s.startCloseQueueLoop() // 开始执行底层Web Server创建,端口监听 - var wg sync.WaitGroup + var fd = 3 + var wg sync.WaitGroup + var server *gracefulServer if len(s.config.HTTPSCertPath) > 0 && len(s.config.HTTPSKeyPath) > 0 { // HTTPS if len(s.config.HTTPSAddr) == 0 { @@ -175,12 +184,20 @@ func (s *Server) Run() error { for _, v := range array { wg.Add(1) go func(addr string) { - server := s.newGracefulServer(addr) + if s.isChildProcess() { + server = s.newGracefulServer(addr, fd) + fd++ + } else { + server = s.newGracefulServer(addr) + } if err := server.ListenAndServeTLS(s.config.HTTPSCertPath, s.config.HTTPSKeyPath); err != nil { - glog.Error(err) + // 如果非关闭错误,那么提示报错,否则认为是正常的服务关闭操作 + if !strings.EqualFold(http.ErrServerClosed.Error(), err.Error()) { + glog.Error(err) + } wg.Done() } else { - //s.servers = append(s.servers, server) + s.servers = append(s.servers, server) } }(v) } @@ -193,12 +210,20 @@ func (s *Server) Run() error { for _, v := range array { wg.Add(1) go func(addr string) { - server := s.newGracefulServer(addr) + if s.isChildProcess() { + server = s.newGracefulServer(addr, fd) + fd++ + } else { + server = s.newGracefulServer(addr) + } if err := server.ListenAndServe(); err != nil { - glog.Error(err) + // 如果非关闭错误,那么提示报错,否则认为是正常的服务关闭操作 + if !strings.EqualFold(http.ErrServerClosed.Error(), err.Error()) { + glog.Error(err) + } wg.Done() } else { - //s.servers = append(s.servers, server) + s.servers = append(s.servers, server) } }(v) } @@ -210,6 +235,65 @@ func (s *Server) Run() error { return nil } +// 重启Web Server +func (s *Server) Restart() { + if pid, err := s.startChildProcess(); err != nil { + glog.Printf("server restart failed: %v, continue serving\n", err) + } else { + glog.Printf("server restart successfully, new pid: %d\n", pid) + + } +} + +// 关闭Web Server +func (s *Server) Shutdown() { + for _, v := range s.servers { + v. + if f, e := v.listener.(*net.TCPListener).File(); e == nil { + files = append(files, f) + } else { + return 0, fmt.Errorf("failed to get listener file: %v", e) + } + } +} + +// 判断是否为子进程执行 +func (s *Server) isChildProcess() bool { + return os.Getenv(gCHILD_ENVIRON_KEY) != "" +} + +// 创建子进程来监听并处理新的HTTP请求,与父进程使用的是同一个socket文件描述符 +func (s *Server) startChildProcess() (int, error) { + if s.isChildProcess() { + return 0, errors.New("only main process can fork child process") + } + // 构造子进程的环境变量,并增加环境变量参数以标识该进程是graceful子进程 + env := make([]string, 0) + for _, value := range os.Environ() { + if value != gCHILD_ENVIRON_STRING { + env = append(env, value) + } + } + env = append(env, gCHILD_ENVIRON_STRING) + // 获取所有http server的file + files := []*os.File{ os.Stdin,os.Stdout,os.Stderr} + for _, v := range s.servers { + if f, e := v.listener.(*net.TCPListener).File(); e == nil { + files = append(files, f) + } else { + return 0, fmt.Errorf("failed to get listener file: %v", e) + } + } + p, err := os.StartProcess(os.Args[0], os.Args, &os.ProcAttr { + Env : env, + Files : files, + }) + if err != nil { + return 0, fmt.Errorf("failed to forkexec: %v", err) + } + return p.Pid, nil +} + // 生成一个底层的Web Server对象 func (s *Server) newServer(addr string) *http.Server { return &http.Server { diff --git a/g/net/ghttp/ghttp_server_graceful.go b/g/net/ghttp/ghttp_server_graceful.go index a72d8dcd3..6ea8e1c30 100644 --- a/g/net/ghttp/ghttp_server_graceful.go +++ b/g/net/ghttp/ghttp_server_graceful.go @@ -10,41 +10,32 @@ import ( "fmt" "net" "os" - "syscall" "context" "net/http" - "os/signal" "crypto/tls" "gitee.com/johng/gf/g/os/glog" ) -const ( - gGF_WEB_SERVER_GRACEFUL_ENVIRON_KEY = "GF_WEB_SERVER_GRACEFUL" - gGF_WEB_SERVER_GRACEFUL_ENVIRON_STRING = gGF_WEB_SERVER_GRACEFUL_ENVIRON_KEY + "=1" - gGF_WEB_SERVER_GRACEFUL_LISTENER_FD = 3 -) - // 优雅的Web Server对象封装 type gracefulServer struct { + fd uintptr + addr string httpServer *http.Server listener net.Listener - isGraceful bool - signalChan chan os.Signal shutdownChan chan bool } // 创建一个优雅的Http Server -func (s *Server) newGracefulServer(addr string) *gracefulServer { - isGraceful := false - if os.Getenv(gGF_WEB_SERVER_GRACEFUL_ENVIRON_KEY) != "" { - isGraceful = true - } - return &gracefulServer { +func (s *Server) newGracefulServer(addr string, fd...int) *gracefulServer { + gs := &gracefulServer { + addr : addr, httpServer : s.newServer(addr), - isGraceful : isGraceful, - signalChan : make(chan os.Signal), shutdownChan : make(chan bool), } + if len(fd) > 0 && fd[0] > 0 { + gs.fd = uintptr(fd[0]) + } + return gs } // 执行HTTP监听 @@ -84,7 +75,6 @@ func (s *gracefulServer) ListenAndServeTLS(certFile, keyFile string) error { // 开始执行Web Server服务处理 func (s *gracefulServer) doServe() error { - go s.handleSignals() err := s.httpServer.Serve(s.listener) <-s.shutdownChan return err @@ -94,13 +84,8 @@ func (s *gracefulServer) doServe() error { func (s *gracefulServer) getNetListener(addr string) (net.Listener, error) { var ln net.Listener var err error - if s.isGraceful { - //path := fmt.Sprintf("%s%sgf.web.server.fd.%d", gfile.TempDir(), gfile.Separator,gtime.Nanosecond()) - //f, err := gfile.Open(path) - //if err != nil { - // return nil, err - //} - f := os.NewFile(gGF_WEB_SERVER_GRACEFUL_LISTENER_FD, "") + if s.fd > 0 { + f := os.NewFile(s.fd, "") ln, err = net.FileListener(f) if err != nil { err = fmt.Errorf("net.FileListener error: %v", err) @@ -116,75 +101,13 @@ func (s *gracefulServer) getNetListener(addr string) (net.Listener, error) { return ln, nil } -// 处理终端信号指令监听 -func (s *gracefulServer) handleSignals() { - var sig os.Signal - - signal.Notify( - s.signalChan, - syscall.SIGTERM, - syscall.SIGUSR2, - ) - - for { - sig = <-s.signalChan - switch sig { - case syscall.SIGTERM: - glog.Println("received SIGTERM, graceful shutting down HTTP server") - s.shutdown() - - case syscall.SIGUSR2: - glog.Println("received SIGUSR2, graceful restarting HTTP server") - if pid, err := s.startNewProcess(); err != nil { - glog.Printf("start new process failed: %v, continue serving\n", err) - } else { - glog.Printf("start new process successfully, the new pid is %d\n", pid) - s.shutdown() - } - default: - } - } -} - // 执行请求优雅关闭 func (s *gracefulServer) shutdown() { if err := s.httpServer.Shutdown(context.Background()); err != nil { - glog.Errorf("server shutdown error: %v\n", err) + glog.Errorf("server %s shutdown error: %v\n", s.addr, err) } else { - glog.Println("server shutdown success") + glog.Printf("server %s shutdown successfully\n", s.addr) s.shutdownChan <- true } } -// 创建子进程来监听并处理新的HTTP请求,与父进程使用的是同一个socket文件描述符 -func (s *gracefulServer) startNewProcess() (uintptr, error) { - listenerFd, err := s.getTCPListenerFd() - if err != nil { - return 0, fmt.Errorf("failed to get socket file descriptor: %v", err) - } - // 构造子进程的环境变量,并增加环境变量参数以标识该进程是graceful子进程 - env := make([]string, 0) - for _, value := range os.Environ() { - if value != gGF_WEB_SERVER_GRACEFUL_ENVIRON_STRING { - env = append(env, value) - } - } - env = append(env, gGF_WEB_SERVER_GRACEFUL_ENVIRON_STRING) - fork, err := syscall.ForkExec(os.Args[0], os.Args, &syscall.ProcAttr { - Env : env, - Files : []uintptr{os.Stdin.Fd(), os.Stdout.Fd(), os.Stderr.Fd(), listenerFd}, - }) - if err != nil { - return 0, fmt.Errorf("failed to forkexec: %v", err) - } - return uintptr(fork), nil -} - -// 获得对应net.TCPListener的文件描述符文件ID -func (s *gracefulServer) getTCPListenerFd() (uintptr, error) { - file, err := s.listener.(*net.TCPListener).File() - if err != nil { - return 0, err - } - return file.Fd(), nil -} diff --git a/g/os/gpm/gpm.go b/g/os/gpm/gpm.go new file mode 100644 index 000000000..1865c0d81 --- /dev/null +++ b/g/os/gpm/gpm.go @@ -0,0 +1,102 @@ +// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +// 进程管理. +package gpm + +import ( + "os" + "gitee.com/johng/gf/g/container/gmap" +) + +// 进程管理器 +type Manager struct { + processes *gmap.IntInterfaceMap // 所管理的子进程map +} + +// 子进程 +type Process struct { + pm *Manager // 所属进程管理器 + path string // 可执行文件绝对路径 + args []string // 执行参数 + attr *os.ProcAttr // 进程属性 + process *os.Process // 底层进程对象 +} + +// 创建一个进程管理器 +func New () *Manager { + return &Manager{ + processes : gmap.NewIntInterfaceMap(), + } +} + +// 创建一个进程(不执行) +func (m *Manager) NewProcess(path string, args []string, env []string) *Process { + attr := &os.ProcAttr { + Env : env, + Files : []*os.File{ os.Stdin,os.Stdout,os.Stderr }, + } + return &Process{ + pm : m, + path : path, + args : args, + attr : attr, + } +} + +// 获取一个进程 +func (m *Manager) GetProcess(pid int) *Process { + if v := m.processes.Get(pid); v != nil { + return v.(*Process) + } + return nil +} + +// 获取所有的进程对象,构成列表返回 +func (m *Manager) Processes() []*Process { + processes := make([]*Process, 0) + m.processes.RLockFunc(func(m map[int]interface{}) { + for _, v := range m { + processes = append(processes, v.(*Process)) + } + }) + return processes +} + +// 等待所有子进程结束 +func (m *Manager) WaitAll() { + processes := m.Processes() + if len(processes) > 0 { + for _, p := range processes { + p.Wait() + } + } +} + +// 关闭所有的进程 +func (m *Manager) KillAll() error { + for _, p := range m.Processes() { + if err := p.Kill(); err != nil { + return err + } + } + return nil +} + +// 向所有进程发送信号量 +func (m *Manager) SignalAll(sig os.Signal) error { + for _, p := range m.Processes() { + if err := p.Signal(sig); err != nil { + return err + } + } + return nil +} + +// 当前进程总数 +func (m *Manager) Size() int { + return m.processes.Size() +} \ No newline at end of file diff --git a/g/os/gpm/gpm_proccess.go b/g/os/gpm/gpm_proccess.go new file mode 100644 index 000000000..e0b505ca5 --- /dev/null +++ b/g/os/gpm/gpm_proccess.go @@ -0,0 +1,90 @@ +// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +package gpm + +import ( + "os" +) + +// 运行进程 +func (p *Process) Run() (int, error) { + if process, err := os.StartProcess(p.path, p.args, p.attr); err == nil { + p.process = process + p.pm.processes.Set(process.Pid, p) + return process.Pid, nil + } else { + return 0, err + } +} + +func (p *Process) SetArgs(args []string) { + p.args = args +} + +func (p *Process) AddArgs(args []string) { + for _, v := range args { + p.args = append(p.args, v) + } +} + +func (p *Process) SetEnv(env []string) { + p.attr.Env = env +} + +func (p *Process) AddEnv(env []string) { + for _, v := range env { + p.attr.Env = append(p.attr.Env, v) + } +} + +func (p *Process) SetAttr(attr *os.ProcAttr) { + p.attr = attr +} + +func (p *Process) GetAttr() *os.ProcAttr { + return p.attr +} + +// PID +func (p *Process) Pid() int { + if p.process != nil { + return p.process.Pid + } + return 0 +} + +// Release releases any resources associated with the Process p, +// rendering it unusable in the future. +// Release only needs to be called if Wait is not. +func (p *Process) Release() error { + return p.process.Release() +} + +// Kill causes the Process to exit immediately. +func (p *Process) Kill() error { + if err := p.process.Kill(); err == nil { + p.pm.processes.Remove(p.Pid()) + return nil + } else { + return err + } +} + +// Wait waits for the Process to exit, and then returns a +// ProcessState describing its status and an error, if any. +// Wait releases any resources associated with the Process. +// On most operating systems, the Process must be a child +// of the current process or an error will be returned. +func (p *Process) Wait() (*os.ProcessState, error) { + return p.process.Wait() +} + +// Signal sends a signal to the Process. +// Sending Interrupt on Windows is not implemented. +func (p *Process) Signal(sig os.Signal) error { + return p.process.Signal(sig) +} \ No newline at end of file diff --git a/geg/other/test.go b/geg/other/test.go index 5a46ce481..2483c275e 100644 --- a/geg/other/test.go +++ b/geg/other/test.go @@ -5,6 +5,7 @@ import ( "net/http" "github.com/tabalt/gracehttp" + "os" ) @@ -20,5 +21,7 @@ func test() { } func main() { - test() -} \ No newline at end of file + fmt.Println(os.NewFile(11111, "")) + fmt.Println(os.NewFile(111111111, "")) + fmt.Println(os.NewFile(33333333333333, "")) +} diff --git a/geg/other/test2.go b/geg/other/test2.go index a3aceda50..6080a18d9 100644 --- a/geg/other/test2.go +++ b/geg/other/test2.go @@ -2,16 +2,24 @@ package main import ( "fmt" + "gitee.com/johng/gf/g/os/gpm" + "os" + "time" + "gitee.com/johng/gf/g/os/glog" ) func main() { - //var v interface{} - m := map[string]int { - "age" : 18, + m := gpm.New() + env := os.Environ() + env = append(env, "child=1") + p := m.NewProcess(os.Args[0], os.Args, env) + if os.Getenv("child") != "" { + time.Sleep(3*time.Second) + glog.Error("error") + } else { + pid, err := p.Run() + fmt.Println(pid) + fmt.Println(err) + fmt.Println(p.Wait()) } - //v = m - p := &m - (*p)["age"] = 16 - //fmt.Println(v) - fmt.Println(m) } \ No newline at end of file