diff --git a/g/net/ghttp/ghttp_server.go b/g/net/ghttp/ghttp_server.go index 21e96adc0..dff88d624 100644 --- a/g/net/ghttp/ghttp_server.go +++ b/g/net/ghttp/ghttp_server.go @@ -20,13 +20,13 @@ import ( "gitee.com/johng/gf/g/container/gtype" "gitee.com/johng/gf/g/container/gqueue" "fmt" - "gitee.com/johng/gf/g/os/gpm" "net" - "os/signal" "syscall" - "time" "gitee.com/johng/gf/g/os/gcmd" "gitee.com/johng/gf/g/os/gproc" + "gitee.com/johng/gf/g/encoding/gjson" + "gitee.com/johng/gf/g/os/gtime" + "time" ) const ( @@ -83,7 +83,6 @@ type Server struct { errorLogger *glog.Logger // error log日志对象 // 多进程管理控制 manager *gproc.Manager // 多进程管理 - heartbeats } // 域名、URI与回调函数的绑定记录表 @@ -146,8 +145,7 @@ func GetServer(name...interface{}) (*Server) { accessLogger : glog.New(), errorLogger : glog.New(), logHandler : gtype.NewInterface(), - manager : gproc.New(), - signalChan : make(chan os.Signal), + manager : gproc.NewManager(), } s.errorLogger.SetBacktraceSkip(4) s.accessLogger.SetBacktraceSkip(4) @@ -176,25 +174,26 @@ func (s *Server) Run() error { // 开启异步关闭队列处理循环 s.startCloseQueueLoop() - // 开启Web Server执行 - s.startServer() + + // 主进程只负责创建子进程 + if !gproc.IsChild() { + p := s.manager.NewProcess(os.Args[0], os.Args, os.Environ()) + p.Run() + gtime.SetTimeout(3*time.Second, func() { + b, _ := gjson.Encode(s.getAllListenerFdMap()) + s.sendMsg(p.Pid(), gMSG_START, b) + }) + } + // 开启进程消息监听处理 + s.handleProcessMsg() return nil } // 开启底层Web Server执行 -func (s *Server) startServer() { - // 主进程只负责创建子进程 - if !s.isChildProcess() { - s.forkChildProcess() - time.Sleep(10*time.Second) - time.Sleep(1000*time.Second) - return - } - +func (s *Server) startServer(fdMap map[string]string) { + fmt.Println("startServer") // 开始执行底层Web Server创建,端口监听 - var fd = 3 var wg sync.WaitGroup - var fcount = s.processFileCount() var server *gracefulServer if len(s.config.HTTPSCertPath) > 0 && len(s.config.HTTPSKeyPath) > 0 { // HTTPS @@ -205,15 +204,23 @@ func (s *Server) startServer() { s.config.HTTPSAddr = gDEFAULT_HTTPS_ADDR } } - array := strings.Split(s.config.HTTPSAddr, ",") + var array []string + var isFd bool + if v, ok := fdMap["https"]; ok && len(v) > 0 { + isFd = true + array = strings.Split(v, ",") + } else { + array = strings.Split(s.config.HTTPSAddr, ",") + } + for _, v := range array { wg.Add(1) - go func(addr string) { - if s.isChildProcess() && fcount > 0 { - server = s.newGracefulServer(addr, fd) - fd++ + go func(item string) { + if isFd { + tArray := strings.Split(item, ":") + server = s.newGracefulServer(tArray[0], gconv.Int(tArray[1])) } else { - server = s.newGracefulServer(addr) + server = s.newGracefulServer(item) } s.servers = append(s.servers, server) if err := server.ListenAndServeTLS(s.config.HTTPSCertPath, s.config.HTTPSKeyPath); err != nil { @@ -230,22 +237,27 @@ func (s *Server) startServer() { if s.servedCount.Val() == 0 && len(s.config.Addr) == 0 { s.config.Addr = gDEFAULT_HTTP_ADDR } - array := strings.Split(s.config.Addr, ",") + var array []string + var isFd bool + if v, ok := fdMap["http"]; ok && len(v) > 0 { + isFd = true + array = strings.Split(v, ",") + } else { + array = strings.Split(s.config.Addr, ",") + } for _, v := range array { wg.Add(1) - go func(addr string) { - if s.isChildProcess() && fcount > 0 { - server = s.newGracefulServer(addr, fd) - fd++ + go func(item string) { + if isFd { + tArray := strings.Split(item, ":") + server = s.newGracefulServer(tArray[0], gconv.Int(tArray[1])) } else { - server = s.newGracefulServer(addr) + server = s.newGracefulServer(item) } s.servers = append(s.servers, server) if err := server.ListenAndServe(); err != nil { // 如果非关闭错误,那么提示报错,否则认为是正常的服务关闭操作 if !strings.EqualFold(http.ErrServerClosed.Error(), err.Error()) { - glog.Println(fd) - glog.Println(os.Args) glog.Error(err) } wg.Done() @@ -259,26 +271,6 @@ func (s *Server) startServer() { wg.Wait() } -// 异步处理信号量监控 -func (s *Server) handleSignals() { - var sig os.Signal - - signal.Notify( - s.signalChan, - syscall.SIGTERM, - syscall.SIGUSR2, - ) - - for { - sig = <- s.signalChan - switch sig { - case syscall.SIGTERM: s.Shutdown() - case syscall.SIGUSR2: s.Restart() - default: - } - } -} - // 重启Web Server func (s *Server) Restart() { // 如果是主进程,那么向所有子进程发送重启信号 @@ -325,6 +317,43 @@ func (s *Server) getTopId() int { return 0 } +// 获取当前监听的文件描述符信息,构造成map返回 +func (s *Server) getAllListenerFdMap() map[string]string { + m := map[string]string{ + "http" : "", + "https" : "", + } + for _, v := range s.servers { + if f, e := v.listener.(*net.TCPListener).File(); e == nil { + str := v.addr + ":" + gconv.String(f.Fd()) + "," + if v.isHttps { + m["https"] += str + } else { + m["http"] += str + } + } else { + glog.Errorfln("failed to get listener file: %v", e) + } + } + if len(m["http"]) > 0 { + m["http"] = m["http"][0 : len(m["http"]) - 1] + } + if len(m["https"]) > 0 { + m["https"] = m["https"][0 : len(m["https"]) - 1] + } + return m +} + +// 二进制转换为FdMap +func (s *Server) bufferToFdMap(buffer []byte) map[string]string { + m := make(map[string]string) + j, _ := gjson.LoadContent(buffer, "json") + for k, v := range j.ToMap() { + m[k] = gconv.String(v) + } + return m +} + // 创建子进程来监听并处理新的HTTP请求,与父进程使用的是同一个socket文件描述符 func (s *Server) forkChildProcess() (int, error) { // 获取所有http server的file diff --git a/g/net/ghttp/ghttp_server_cmd.go b/g/net/ghttp/ghttp_server_cmd.go deleted file mode 100644 index d975fc7a2..000000000 --- a/g/net/ghttp/ghttp_server_cmd.go +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. -// -// This Source Code Form is subject to the terms of the MIT License. -// If a copy of the MIT was not distributed with this file, -// You can obtain one at https://gitee.com/johng/gf. - -package ghttp - -import ( - "fmt" - "gitee.com/johng/gf/g/net/gtcp" -) - -// 开启命令监听端口 -func (s *Server) startCmdService() { - s.BindHandler("/heartbeat", func(r *Request) { - - }) - s.BindHandler("/restart", func(r *Request) { - - }) - server := s.newGracefulServer(fmt.Sprintf("127.0.0.1:%d", s.cmdPort)) - if err := server.ListenAndServe(); err != nil { - - } -} \ No newline at end of file diff --git a/g/net/ghttp/ghttp_server_comm.go b/g/net/ghttp/ghttp_server_comm.go new file mode 100644 index 000000000..e64c471c2 --- /dev/null +++ b/g/net/ghttp/ghttp_server_comm.go @@ -0,0 +1,59 @@ +// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. +// Web Server进程间通信 + +package ghttp + +import ( + "os" + "gitee.com/johng/gf/g/os/gproc" + "gitee.com/johng/gf/g/encoding/gbinary" + "fmt" +) + +const ( + gMSG_START = iota + gMSG_RESTART + gMSG_SHUTDOWN + gMSG_EXIT +) + + +// 处理进程间消息 +// 数据格式: 操作(8bit) | 参数(变长) +func (s *Server) handleProcessMsg() { + for { + if msg := gproc.Receive(); msg != nil { + fmt.Println(msg) + act := gbinary.DecodeToInt(msg.Data[0 : 1]) + data := msg.Data[1 : ] + if gproc.IsChild() { + switch act { + case gMSG_START: + s.startServer(s.bufferToFdMap(data)) + case gMSG_RESTART: + case gMSG_SHUTDOWN: s.Shutdown() + case gMSG_EXIT: os.Exit(0) + + } + } else { + switch act { + case gMSG_START: + case gMSG_RESTART: + case gMSG_SHUTDOWN: + case gMSG_EXIT: os.Exit(0) + + } + } + } + } +} + +// 向进程发送操作消息 +func (s *Server) sendMsg(pid int, act int, data []byte) { + gproc.Send(pid, append(gbinary.EncodeInt8(int8(act)), data...)) +} + diff --git a/g/net/ghttp/ghttp_server_graceful.go b/g/net/ghttp/ghttp_server_graceful.go index 70022128a..29688a5cc 100644 --- a/g/net/ghttp/ghttp_server_graceful.go +++ b/g/net/ghttp/ghttp_server_graceful.go @@ -22,6 +22,7 @@ type gracefulServer struct { addr string httpServer *http.Server listener net.Listener + isHttps bool shutdownChan chan bool } @@ -61,6 +62,11 @@ func (s *gracefulServer) ListenAndServe() error { return s.doServe() } +// 设置自定义fd +func (s *gracefulServer) setFd(fd int) { + s.fd = uintptr(fd) +} + // 执行HTTPS监听 func (s *gracefulServer) ListenAndServeTLS(certFile, keyFile string) error { addr := s.httpServer.Addr @@ -82,6 +88,7 @@ func (s *gracefulServer) ListenAndServeTLS(certFile, keyFile string) error { return err } s.listener = tls.NewListener(ln, config) + s.isHttps = true return s.doServe() } diff --git a/g/os/gproc/gproc_comm.go b/g/os/gproc/gproc_comm.go index 70a42cd6f..13dffa35d 100644 --- a/g/os/gproc/gproc_comm.go +++ b/g/os/gproc/gproc_comm.go @@ -40,8 +40,10 @@ func init() { if !gfile.Exists(path) { gfile.Create(path) } + fmt.Println(path) // 文件事件监听,如果通信数据文件有任何变化,读取文件并添加到消息队列 err := gfsnotify.Add(path, func(event *gfsnotify.Event) { + fmt.Println(event) commLocker.Lock() buffer := gfile.GetBinContents(path) os.Truncate(path, 0) diff --git a/g/os/gproc/gproc_manager.go b/g/os/gproc/gproc_manager.go index 18b397b4b..336c2a931 100644 --- a/g/os/gproc/gproc_manager.go +++ b/g/os/gproc/gproc_manager.go @@ -19,24 +19,23 @@ type Manager struct { } // 创建一个进程管理器 -func New () *Manager { +func NewManager() *Manager { return &Manager{ processes : gmap.NewIntInterfaceMap(), } } // 创建一个进程(不执行) -func (m *Manager) NewProcess(path string, args []string, environment []string) *Process { +func NewProcess(path string, args []string, environment []string) *Process { env := make([]string, len(environment) + 2) for k, v := range environment { env[k] = v } - env[len(env) - 2] = fmt.Sprintf("%s=%d", gPROC_ENV_KEY_PPID_KEY, os.Getpid()) env[len(env) - 1] = fmt.Sprintf("%s=%s", gPROC_TEMP_DIR_ENV_KEY, os.TempDir()) p := &Process { - pm : m, path : path, args : make([]string, 0), + ppid : os.Getppid(), attr : &os.ProcAttr { Env : env, Files : []*os.File{ os.Stdin,os.Stdout,os.Stderr }, @@ -50,6 +49,13 @@ func (m *Manager) NewProcess(path string, args []string, environment []string) * return p } +// 创建一个进程(不执行) +func (m *Manager) NewProcess(path string, args []string, environment []string) *Process { + p := NewProcess(path, args, environment) + p.SetManager(m) + return p +} + // 获取当前进程管理器中的一个进程 func (m *Manager) GetProcess(pid int) *Process { if v := m.processes.Get(pid); v != nil { @@ -99,8 +105,18 @@ func (m *Manager) SignalAll(sig os.Signal) error { return nil } -// 获取当前进程管理器中的一个进程 -func (m *Manager) Send(pid int, data interface{}) error { +// 向所有进程发送消息 +func (m *Manager) Send(data interface{}) error { + for _, p := range m.Processes() { + if err := p.Send(data); err != nil { + return err + } + } + return nil +} + +// 向指定进程发送消息 +func (m *Manager) SendTo(pid int, data interface{}) error { return Send(pid, data) } diff --git a/g/os/gproc/gproc_proccess.go b/g/os/gproc/gproc_proccess.go index 239623dfc..0420d1a17 100644 --- a/g/os/gproc/gproc_proccess.go +++ b/g/os/gproc/gproc_proccess.go @@ -9,6 +9,7 @@ package gproc import ( "os" "errors" + "fmt" ) // 子进程 @@ -17,6 +18,7 @@ type Process struct { path string // 可执行文件绝对路径 args []string // 执行参数 attr *os.ProcAttr // 进程属性 + ppid int // 自定义关联的父进程ID process *os.Process // 底层进程对象 } @@ -25,15 +27,27 @@ func (p *Process) Run() (int, error) { if p.process != nil { return p.Pid(), nil } + p.attr.Env = append(p.attr.Env, fmt.Sprintf("%s=%d", gPROC_ENV_KEY_PPID_KEY, p.ppid)) if process, err := os.StartProcess(p.path, p.args, p.attr); err == nil { p.process = process - p.pm.processes.Set(process.Pid, p) + if p.pm != nil { + p.pm.processes.Set(process.Pid, p) + } return process.Pid, nil } else { return 0, err } } +func (p *Process) SetManager(m *Manager) { + p.pm = m +} + +// 设置自定义的父进程ID +func (p *Process) SetPpid(ppid int) { + p.ppid = ppid +} + func (p *Process) SetArgs(args []string) { p.args = args } @@ -89,7 +103,9 @@ func (p *Process) Release() error { // Kill causes the Process to exit immediately. func (p *Process) Kill() error { if err := p.process.Kill(); err == nil { - p.pm.processes.Remove(p.Pid()) + if p.pm != nil { + p.pm.processes.Remove(p.Pid()) + } return nil } else { return err