diff --git a/g/net/ghttp/ghttp_server.go b/g/net/ghttp/ghttp_server.go index 113808e96..a2aeba79d 100644 --- a/g/net/ghttp/ghttp_server.go +++ b/g/net/ghttp/ghttp_server.go @@ -22,19 +22,22 @@ import ( "fmt" "gitee.com/johng/gf/g/os/gpm" "net" + "os/signal" + "syscall" + "time" + "gitee.com/johng/gf/g/os/gcmd" ) const ( - gHTTP_METHODS = "GET,PUT,POST,DELETE,PATCH,HEAD,CONNECT,OPTIONS,TRACE" - gDEFAULT_SERVER = "default" - gDEFAULT_DOMAIN = "default" - gDEFAULT_METHOD = "ALL" - gDEFAULT_COOKIE_PATH = "/" // 默认path - gDEFAULT_COOKIE_MAX_AGE = 86400*365 // 默认cookie有效期(一年) - gDEFAULT_SESSION_MAX_AGE = 600 // 默认session有效期(600秒) - gDEFAULT_SESSION_ID_NAME = "gfsessionid" // 默认存放Cookie中的SessionId名称 - gCHILD_ENVIRON_KEY = "GF_WEB_SERVER_CHILD" // 用以子父进程识别,环境变量名称 - gCHILD_ENVIRON_STRING = gCHILD_ENVIRON_KEY + "=1" // 用以子父进程识别,环境变量键值设置 + gHTTP_METHODS = "GET,PUT,POST,DELETE,PATCH,HEAD,CONNECT,OPTIONS,TRACE" + gDEFAULT_SERVER = "default" + gDEFAULT_DOMAIN = "default" + gDEFAULT_METHOD = "ALL" + gDEFAULT_COOKIE_PATH = "/" // 默认path + gDEFAULT_COOKIE_MAX_AGE = 86400*365 // 默认cookie有效期(一年) + gDEFAULT_SESSION_MAX_AGE = 600 // 默认session有效期(600秒) + gDEFAULT_SESSION_ID_NAME = "gfsessionid" // 默认存放Cookie中的SessionId名称 + gDEFAULT_COMMAND_PORT = 336816 // 默认本地命令控制端口 ) // ghttp.Server结构体 @@ -44,7 +47,7 @@ type Server struct { config ServerConfig // 配置对象 status int8 // 当前服务器状态(0:未启动,1:运行中) servers []*gracefulServer // 底层http.Server列表 - pmanager *gpm.Manager // 进程管理器,用于管理子进程服务 + cmdPort int // 本地Web Server命令控制端口 methodsMap map[string]bool // 所有支持的HTTP Method(初始化时自动填充) servedCount *gtype.Int // 已经服务的请求数(4-8字节,不考虑溢出情况),同时作为请求ID closeQueue *gqueue.Queue // 请求结束的关闭队列(存放的是需要异步关闭处理的*Request对象) @@ -77,6 +80,9 @@ type Server struct { accessLogEnabled *gtype.Bool // 是否开启access log accessLogger *glog.Logger // access log日志对象 errorLogger *glog.Logger // error log日志对象 + // 多进程管理控制 + manager *gpm.Manager // 多进程管理 + heartbeats } // 域名、URI与回调函数的绑定记录表 @@ -117,7 +123,7 @@ func GetServer(name...interface{}) (*Server) { s := &Server { name : sname, servers : make([]*gracefulServer, 0), - pmanager : gpm.New(), + cmdPort : gDEFAULT_COMMAND_PORT, methodsMap : make(map[string]bool), handlerMap : make(HandlerMap), statusHandlerMap : make(map[string]HandlerFunc), @@ -139,6 +145,8 @@ func GetServer(name...interface{}) (*Server) { accessLogger : glog.New(), errorLogger : glog.New(), logHandler : gtype.NewInterface(), + manager : gpm.New(), + signalChan : make(chan os.Signal), } s.errorLogger.SetBacktraceSkip(4) s.accessLogger.SetBacktraceSkip(4) @@ -167,9 +175,26 @@ func (s *Server) Run() error { // 开启异步关闭队列处理循环 s.startCloseQueueLoop() + // 开启Web Server执行 + s.startServer() + return nil +} + +// 开启底层Web Server执行 +func (s *Server) startServer() { + // 主进程只负责创建子进程 + if !s.isChildProcess() { + s.forkChildProcess() + time.Sleep(10*time.Second) + time.Sleep(1000*time.Second) + return + } + // 信号量控制监听 + go s.handleSignals() // 开始执行底层Web Server创建,端口监听 var fd = 3 var wg sync.WaitGroup + var fcount = s.processFileCount() var server *gracefulServer if len(s.config.HTTPSCertPath) > 0 && len(s.config.HTTPSKeyPath) > 0 { // HTTPS @@ -184,20 +209,19 @@ func (s *Server) Run() error { for _, v := range array { wg.Add(1) go func(addr string) { - if s.isChildProcess() { + if s.isChildProcess() && fcount > 0 { server = s.newGracefulServer(addr, fd) fd++ } else { server = s.newGracefulServer(addr) } + s.servers = append(s.servers, server) if err := server.ListenAndServeTLS(s.config.HTTPSCertPath, s.config.HTTPSKeyPath); err != nil { // 如果非关闭错误,那么提示报错,否则认为是正常的服务关闭操作 if !strings.EqualFold(http.ErrServerClosed.Error(), err.Error()) { glog.Error(err) } wg.Done() - } else { - s.servers = append(s.servers, server) } }(v) } @@ -210,20 +234,21 @@ func (s *Server) Run() error { for _, v := range array { wg.Add(1) go func(addr string) { - if s.isChildProcess() { + if s.isChildProcess() && fcount > 0 { server = s.newGracefulServer(addr, fd) fd++ } else { server = s.newGracefulServer(addr) } + s.servers = append(s.servers, server) if err := server.ListenAndServe(); err != nil { // 如果非关闭错误,那么提示报错,否则认为是正常的服务关闭操作 if !strings.EqualFold(http.ErrServerClosed.Error(), err.Error()) { + glog.Println(fd) + glog.Println(os.Args) glog.Error(err) } wg.Done() - } else { - s.servers = append(s.servers, server) } }(v) } @@ -232,51 +257,78 @@ func (s *Server) Run() error { // 阻塞执行,直到所有Web Server退出 wg.Wait() - return nil +} + +// 异步处理信号量监控 +func (s *Server) handleSignals() { + var sig os.Signal + + signal.Notify( + s.signalChan, + syscall.SIGTERM, + syscall.SIGUSR2, + ) + + for { + sig = <- s.signalChan + switch sig { + case syscall.SIGTERM: s.Shutdown() + case syscall.SIGUSR2: s.Restart() + default: + } + } } // 重启Web Server func (s *Server) Restart() { - if pid, err := s.startChildProcess(); err != nil { - glog.Printf("server restart failed: %v, continue serving\n", err) + // 如果是主进程,那么向所有子进程发送重启信号 + if !s.isChildProcess() { + s.manager.SignalAll(syscall.SIGUSR2) + return + } + if pid, err := s.forkChildProcess(); err != nil { + glog.Errorf("server restart failed: %v, continue serving\n", err) } else { glog.Printf("server restart successfully, new pid: %d\n", pid) - + s.Shutdown() } } // 关闭Web Server func (s *Server) Shutdown() { - for _, v := range s.servers { - v. - if f, e := v.listener.(*net.TCPListener).File(); e == nil { - files = append(files, f) - } else { - return 0, fmt.Errorf("failed to get listener file: %v", e) - } + // 如果是主进程,那么向所有子进程发送关闭信号 + if !s.isChildProcess() { + s.manager.SignalAll(syscall.SIGTERM) + return } + for _, v := range s.servers { + v.shutdown() + } +} + +// 子进程获取的文件打开数 +func (s *Server) processFileCount() int { + return gconv.Int(gcmd.Option.Get("fcount")) } // 判断是否为子进程执行 func (s *Server) isChildProcess() bool { - return os.Getenv(gCHILD_ENVIRON_KEY) != "" + return s.getTopId() > 0 +} + +// 获取顶级进程ID(管理进程ID) +func (s *Server) getTopId() int { + id := gcmd.Option.Get("topid") + if id != "" { + return gconv.Int(id) + } + return 0 } // 创建子进程来监听并处理新的HTTP请求,与父进程使用的是同一个socket文件描述符 -func (s *Server) startChildProcess() (int, error) { - if s.isChildProcess() { - return 0, errors.New("only main process can fork child process") - } - // 构造子进程的环境变量,并增加环境变量参数以标识该进程是graceful子进程 - env := make([]string, 0) - for _, value := range os.Environ() { - if value != gCHILD_ENVIRON_STRING { - env = append(env, value) - } - } - env = append(env, gCHILD_ENVIRON_STRING) +func (s *Server) forkChildProcess() (int, error) { // 获取所有http server的file - files := []*os.File{ os.Stdin,os.Stdout,os.Stderr} + files := []*os.File{os.Stdin,os.Stdout,os.Stderr} for _, v := range s.servers { if f, e := v.listener.(*net.TCPListener).File(); e == nil { files = append(files, f) @@ -284,25 +336,22 @@ func (s *Server) startChildProcess() (int, error) { return 0, fmt.Errorf("failed to get listener file: %v", e) } } - p, err := os.StartProcess(os.Args[0], os.Args, &os.ProcAttr { - Env : env, - Files : files, - }) - if err != nil { - return 0, fmt.Errorf("failed to forkexec: %v", err) + // 开启子进程,并传递socket文件指针 + topId := s.getTopId() + if topId == 0 { + topId = os.Getpid() } - return p.Pid, nil -} - -// 生成一个底层的Web Server对象 -func (s *Server) newServer(addr string) *http.Server { - return &http.Server { - Addr : addr, - Handler : s.config.Handler, - ReadTimeout : s.config.ReadTimeout, - WriteTimeout : s.config.WriteTimeout, - IdleTimeout : s.config.IdleTimeout, - MaxHeaderBytes : s.config.MaxHeaderBytes, + args := make([]string, 4) + args[0] = os.Args[0] + args[1] = fmt.Sprintf("--name=%s", s.name) + args[2] = fmt.Sprintf("--port=%d", s.cmdPort) + args[3] = fmt.Sprintf("--fcount=%d", len(files) - 3) + p := s.manager.NewProcess(os.Args[0], args, os.Environ()) + p.GetAttr().Files = files + if pid, err := p.Run(); err != nil { + return 0, fmt.Errorf("failed to fork process: %v", err) + } else { + return pid, nil } } diff --git a/g/net/ghttp/ghttp_server_cmd.go b/g/net/ghttp/ghttp_server_cmd.go new file mode 100644 index 000000000..d975fc7a2 --- /dev/null +++ b/g/net/ghttp/ghttp_server_cmd.go @@ -0,0 +1,26 @@ +// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +package ghttp + +import ( + "fmt" + "gitee.com/johng/gf/g/net/gtcp" +) + +// 开启命令监听端口 +func (s *Server) startCmdService() { + s.BindHandler("/heartbeat", func(r *Request) { + + }) + s.BindHandler("/restart", func(r *Request) { + + }) + server := s.newGracefulServer(fmt.Sprintf("127.0.0.1:%d", s.cmdPort)) + if err := server.ListenAndServe(); err != nil { + + } +} \ No newline at end of file diff --git a/g/net/ghttp/ghttp_server_graceful.go b/g/net/ghttp/ghttp_server_graceful.go index 6ea8e1c30..70022128a 100644 --- a/g/net/ghttp/ghttp_server_graceful.go +++ b/g/net/ghttp/ghttp_server_graceful.go @@ -29,7 +29,7 @@ type gracefulServer struct { func (s *Server) newGracefulServer(addr string, fd...int) *gracefulServer { gs := &gracefulServer { addr : addr, - httpServer : s.newServer(addr), + httpServer : s.newHttpServer(addr), shutdownChan : make(chan bool), } if len(fd) > 0 && fd[0] > 0 { @@ -38,6 +38,18 @@ func (s *Server) newGracefulServer(addr string, fd...int) *gracefulServer { return gs } +// 生成一个底层的Web Server对象 +func (s *Server) newHttpServer(addr string) *http.Server { + return &http.Server { + Addr : addr, + Handler : s.config.Handler, + ReadTimeout : s.config.ReadTimeout, + WriteTimeout : s.config.WriteTimeout, + IdleTimeout : s.config.IdleTimeout, + MaxHeaderBytes : s.config.MaxHeaderBytes, + } +} + // 执行HTTP监听 func (s *gracefulServer) ListenAndServe() error { addr := s.httpServer.Addr diff --git a/g/os/gpm/gpm.go b/g/os/gpm/gpm.go index 1865c0d81..432988679 100644 --- a/g/os/gpm/gpm.go +++ b/g/os/gpm/gpm.go @@ -35,19 +35,18 @@ func New () *Manager { // 创建一个进程(不执行) func (m *Manager) NewProcess(path string, args []string, env []string) *Process { - attr := &os.ProcAttr { - Env : env, - Files : []*os.File{ os.Stdin,os.Stdout,os.Stderr }, - } - return &Process{ + return &Process { pm : m, path : path, args : args, - attr : attr, + attr : &os.ProcAttr { + Env : env, + Files : []*os.File{ os.Stdin,os.Stdout,os.Stderr }, + }, } } -// 获取一个进程 +// 获取当前进程管理器中的一个进程 func (m *Manager) GetProcess(pid int) *Process { if v := m.processes.Get(pid); v != nil { return v.(*Process) @@ -55,6 +54,22 @@ func (m *Manager) GetProcess(pid int) *Process { return nil } +// 添加一个已存在的进程到管理器中 +func (m *Manager) AddProcess(pid int) *Process { + if v := m.GetProcess(pid); v != nil { + return v + } + if process, err := os.FindProcess(pid); err == nil { + p := &Process { + pm : m, + process : process, + } + m.processes.Set(pid, p) + return p + } + return nil +} + // 获取所有的进程对象,构成列表返回 func (m *Manager) Processes() []*Process { processes := make([]*Process, 0) diff --git a/g/os/gpm/gpm_proccess.go b/g/os/gpm/gpm_proccess.go index e0b505ca5..7a9502b98 100644 --- a/g/os/gpm/gpm_proccess.go +++ b/g/os/gpm/gpm_proccess.go @@ -12,6 +12,9 @@ import ( // 运行进程 func (p *Process) Run() (int, error) { + if p.process != nil { + return p.Pid(), nil + } if process, err := os.StartProcess(p.path, p.args, p.attr); err == nil { p.process = process p.pm.processes.Set(process.Pid, p) diff --git a/geg/other/test.go b/geg/other/test.go index 2483c275e..ee6c83601 100644 --- a/geg/other/test.go +++ b/geg/other/test.go @@ -6,6 +6,8 @@ import ( "github.com/tabalt/gracehttp" "os" + "gitee.com/johng/gf/g/os/gpm" + "time" ) @@ -21,7 +23,10 @@ func test() { } func main() { - fmt.Println(os.NewFile(11111, "")) - fmt.Println(os.NewFile(111111111, "")) - fmt.Println(os.NewFile(33333333333333, "")) + m := gpm.New() + args := os.Args + args = append(args, "--child=1") + p := m.NewProcess(args[0], args, nil) + p.Run() + time.Sleep(100*time.Second) }