diff --git a/g/net/ghttp/ghttp_server.go b/g/net/ghttp/ghttp_server.go index dff88d624..72e742b34 100644 --- a/g/net/ghttp/ghttp_server.go +++ b/g/net/ghttp/ghttp_server.go @@ -19,14 +19,9 @@ import ( "gitee.com/johng/gf/g/container/gmap" "gitee.com/johng/gf/g/container/gtype" "gitee.com/johng/gf/g/container/gqueue" - "fmt" "net" - "syscall" - "gitee.com/johng/gf/g/os/gcmd" "gitee.com/johng/gf/g/os/gproc" "gitee.com/johng/gf/g/encoding/gjson" - "gitee.com/johng/gf/g/os/gtime" - "time" ) const ( @@ -38,7 +33,6 @@ const ( gDEFAULT_COOKIE_MAX_AGE = 86400*365 // 默认cookie有效期(一年) gDEFAULT_SESSION_MAX_AGE = 600 // 默认session有效期(600秒) gDEFAULT_SESSION_ID_NAME = "gfsessionid" // 默认存放Cookie中的SessionId名称 - gDEFAULT_COMMAND_PORT = 336816 // 默认本地命令控制端口 ) // ghttp.Server结构体 @@ -48,7 +42,6 @@ type Server struct { config ServerConfig // 配置对象 status int8 // 当前服务器状态(0:未启动,1:运行中) servers []*gracefulServer // 底层http.Server列表 - cmdPort int // 本地Web Server命令控制端口 methodsMap map[string]bool // 所有支持的HTTP Method(初始化时自动填充) servedCount *gtype.Int // 已经服务的请求数(4-8字节,不考虑溢出情况),同时作为请求ID closeQueue *gqueue.Queue // 请求结束的关闭队列(存放的是需要异步关闭处理的*Request对象) @@ -81,8 +74,6 @@ type Server struct { accessLogEnabled *gtype.Bool // 是否开启access log accessLogger *glog.Logger // access log日志对象 errorLogger *glog.Logger // error log日志对象 - // 多进程管理控制 - manager *gproc.Manager // 多进程管理 } // 域名、URI与回调函数的绑定记录表 @@ -107,9 +98,61 @@ type HandlerItem struct { // http注册函数 type HandlerFunc func(r *Request) +// 文件描述符map +type listenerFdMap map[string]string + // Server表,用以存储和检索名称与Server对象之间的关联关系 var serverMapping = gmap.NewStringInterfaceMap() +// Web Server多进程管理器 +var procManager = gproc.NewManager() + +// Web Server开始执行事件通道,由于同一个进程支持多Server,因此该通道为非阻塞 +var runChan = make(chan struct{}, 100000) +// 已完成的run消息队列 +var doneChan = make(chan struct{}, 100000) + +// Web Server进程初始化 +func init() { + go func() { + <- runChan + // 主进程只负责创建子进程 + if !gproc.IsChild() { + sendProcessMsg(os.Getpid(), gMSG_START, nil) + } + // 开启进程消息监听处理 + handleProcessMsg() + doneChan <- struct{}{} + }() +} + +// 获取所有Web Server的文件描述符map +func getServerFdMap() map[string]listenerFdMap { + sfm := make(map[string]listenerFdMap) + serverMapping.RLockFunc(func(m map[string]interface{}) { + for k, v := range m { + sfm[k] = v.(*Server).getListenerFdMap() + } + }) + return sfm +} + +// 二进制转换为FdMap +func bufferToServerFdMap(buffer []byte) map[string]listenerFdMap { + sfm := make(map[string]listenerFdMap) + if len(buffer) > 0 { + j, _ := gjson.LoadContent(buffer, "json") + for k, _ := range j.ToMap() { + m := make(map[string]string) + for k, v := range j.GetMap(k) { + m[k] = gconv.String(v) + } + sfm[k] = m + } + } + return sfm +} + // 获取/创建一个默认配置的HTTP Server(默认监听端口是80) // 单例模式,请保证name的唯一性 func GetServer(name...interface{}) (*Server) { @@ -123,7 +166,6 @@ func GetServer(name...interface{}) (*Server) { s := &Server { name : sname, servers : make([]*gracefulServer, 0), - cmdPort : gDEFAULT_COMMAND_PORT, methodsMap : make(map[string]bool), handlerMap : make(HandlerMap), statusHandlerMap : make(map[string]HandlerFunc), @@ -145,7 +187,6 @@ func GetServer(name...interface{}) (*Server) { accessLogger : glog.New(), errorLogger : glog.New(), logHandler : gtype.NewInterface(), - manager : gproc.NewManager(), } s.errorLogger.SetBacktraceSkip(4) s.accessLogger.SetBacktraceSkip(4) @@ -162,6 +203,13 @@ func GetServer(name...interface{}) (*Server) { // 阻塞执行监听 func (s *Server) Run() error { + runChan <- struct{}{} + + if !gproc.IsChild() { + <- doneChan + return nil + } + if s.status == 1 { return errors.New("server is already running") } @@ -174,24 +222,12 @@ func (s *Server) Run() error { // 开启异步关闭队列处理循环 s.startCloseQueueLoop() - - // 主进程只负责创建子进程 - if !gproc.IsChild() { - p := s.manager.NewProcess(os.Args[0], os.Args, os.Environ()) - p.Run() - gtime.SetTimeout(3*time.Second, func() { - b, _ := gjson.Encode(s.getAllListenerFdMap()) - s.sendMsg(p.Pid(), gMSG_START, b) - }) - } - // 开启进程消息监听处理 - s.handleProcessMsg() + <- doneChan return nil } // 开启底层Web Server执行 -func (s *Server) startServer(fdMap map[string]string) { - fmt.Println("startServer") +func (s *Server) startServer(fdMap listenerFdMap) { // 开始执行底层Web Server创建,端口监听 var wg sync.WaitGroup var server *gracefulServer @@ -223,6 +259,7 @@ func (s *Server) startServer(fdMap map[string]string) { server = s.newGracefulServer(item) } s.servers = append(s.servers, server) + glog.Printfln("https server started listening on %s", server.addr) if err := server.ListenAndServeTLS(s.config.HTTPSCertPath, s.config.HTTPSKeyPath); err != nil { // 如果非关闭错误,那么提示报错,否则认为是正常的服务关闭操作 if !strings.EqualFold(http.ErrServerClosed.Error(), err.Error()) { @@ -255,6 +292,7 @@ func (s *Server) startServer(fdMap map[string]string) { server = s.newGracefulServer(item) } s.servers = append(s.servers, server) + glog.Printfln("http server started listening on %s", server.addr) if err := server.ListenAndServe(); err != nil { // 如果非关闭错误,那么提示报错,否则认为是正常的服务关闭操作 if !strings.EqualFold(http.ErrServerClosed.Error(), err.Error()) { @@ -274,23 +312,23 @@ func (s *Server) startServer(fdMap map[string]string) { // 重启Web Server func (s *Server) Restart() { // 如果是主进程,那么向所有子进程发送重启信号 - if !s.isChildProcess() { - s.manager.SignalAll(syscall.SIGUSR2) + if !gproc.IsChild() { + return } - if pid, err := s.forkChildProcess(); err != nil { - glog.Errorf("server restart failed: %v, continue serving\n", err) - } else { - glog.Printf("server restart successfully, new pid: %d\n", pid) - s.Shutdown() - } + //if pid, err := s.forkChildProcess(); err != nil { + // glog.Errorf("server restart failed: %v, continue serving\n", err) + //} else { + // glog.Printf("server restart successfully, new pid: %d\n", pid) + // s.Shutdown() + //} } // 关闭Web Server func (s *Server) Shutdown() { // 如果是主进程,那么向所有子进程发送关闭信号 - if !s.isChildProcess() { - s.manager.SignalAll(syscall.SIGTERM) + if !gproc.IsChild() { + return } for _, v := range s.servers { @@ -298,27 +336,10 @@ func (s *Server) Shutdown() { } } -// 子进程获取的文件打开数 -func (s *Server) processFileCount() int { - return gconv.Int(gcmd.Option.Get("fcount")) -} -// 判断是否为子进程执行 -func (s *Server) isChildProcess() bool { - return s.getTopId() > 0 -} - -// 获取顶级进程ID(管理进程ID) -func (s *Server) getTopId() int { - id := gcmd.Option.Get("topid") - if id != "" { - return gconv.Int(id) - } - return 0 -} // 获取当前监听的文件描述符信息,构造成map返回 -func (s *Server) getAllListenerFdMap() map[string]string { +func (s *Server) getListenerFdMap() map[string]string { m := map[string]string{ "http" : "", "https" : "", @@ -344,46 +365,6 @@ func (s *Server) getAllListenerFdMap() map[string]string { return m } -// 二进制转换为FdMap -func (s *Server) bufferToFdMap(buffer []byte) map[string]string { - m := make(map[string]string) - j, _ := gjson.LoadContent(buffer, "json") - for k, v := range j.ToMap() { - m[k] = gconv.String(v) - } - return m -} - -// 创建子进程来监听并处理新的HTTP请求,与父进程使用的是同一个socket文件描述符 -func (s *Server) forkChildProcess() (int, error) { - // 获取所有http server的file - files := []*os.File{os.Stdin,os.Stdout,os.Stderr} - for _, v := range s.servers { - if f, e := v.listener.(*net.TCPListener).File(); e == nil { - files = append(files, f) - } else { - return 0, fmt.Errorf("failed to get listener file: %v", e) - } - } - // 开启子进程,并传递socket文件指针 - topId := s.getTopId() - if topId == 0 { - topId = os.Getpid() - } - args := make([]string, 4) - args[0] = os.Args[0] - args[1] = fmt.Sprintf("--name=%s", s.name) - args[2] = fmt.Sprintf("--port=%d", s.cmdPort) - args[3] = fmt.Sprintf("--fcount=%d", len(files) - 3) - p := s.manager.NewProcess(os.Args[0], args, os.Environ()) - p.GetAttr().Files = files - if pid, err := p.Run(); err != nil { - return 0, fmt.Errorf("failed to fork process: %v", err) - } else { - return pid, nil - } -} - // 清空当前的handlerCache func (s *Server) clearHandlerCache() { s.hmcmu.Lock() diff --git a/g/net/ghttp/ghttp_server_comm.go b/g/net/ghttp/ghttp_server_comm.go index e64c471c2..7dca87c9c 100644 --- a/g/net/ghttp/ghttp_server_comm.go +++ b/g/net/ghttp/ghttp_server_comm.go @@ -8,23 +8,23 @@ package ghttp import ( - "os" "gitee.com/johng/gf/g/os/gproc" "gitee.com/johng/gf/g/encoding/gbinary" "fmt" + "gitee.com/johng/gf/g/encoding/gjson" + "os" ) const ( gMSG_START = iota gMSG_RESTART gMSG_SHUTDOWN - gMSG_EXIT ) // 处理进程间消息 // 数据格式: 操作(8bit) | 参数(变长) -func (s *Server) handleProcessMsg() { +func handleProcessMsg() { for { if msg := gproc.Receive(); msg != nil { fmt.Println(msg) @@ -32,19 +32,48 @@ func (s *Server) handleProcessMsg() { data := msg.Data[1 : ] if gproc.IsChild() { switch act { + // 开启所有Web Server(根据消息启动) case gMSG_START: - s.startServer(s.bufferToFdMap(data)) + sfm := bufferToServerFdMap(data) + for k, v := range sfm { + GetServer(k).startServer(v) + } + + // 子进程收到重启消息,那么将自身的ServerFdMap信息收集后发送给主进程,由主进程进行统一调度 case gMSG_RESTART: - case gMSG_SHUTDOWN: s.Shutdown() - case gMSG_EXIT: os.Exit(0) + b, _ := gjson.Encode(getServerFdMap()) + sendProcessMsg(gproc.Ppid(), gMSG_RESTART, b) + + // 友好关闭服务链接并退出 + case gMSG_SHUTDOWN: + serverMapping.RLockFunc(func(m map[string]interface{}) { + for _, v := range m { + v.(*Server).Shutdown() + } + }) + return } } else { switch act { + // 开启服务 case gMSG_START: + p := procManager.NewProcess(os.Args[0], os.Args, os.Environ()) + p.Run() + sendProcessMsg(p.Pid(), gMSG_START, nil) + + // 重启服务 case gMSG_RESTART: + // 创建新的服务进程,使用文件描述来监听同样的端口 + p := procManager.NewProcess(os.Args[0], os.Args, os.Environ()) + p.Run() + sendProcessMsg(p.Pid(), gMSG_START, data) + // 关闭旧的服务进程 + sendProcessMsg(msg.Pid, gMSG_SHUTDOWN, nil) + + // 关闭服务 case gMSG_SHUTDOWN: - case gMSG_EXIT: os.Exit(0) + procManager.Send(formatMsgBuffer(gMSG_SHUTDOWN, nil)) } } @@ -53,7 +82,12 @@ func (s *Server) handleProcessMsg() { } // 向进程发送操作消息 -func (s *Server) sendMsg(pid int, act int, data []byte) { - gproc.Send(pid, append(gbinary.EncodeInt8(int8(act)), data...)) +func sendProcessMsg(pid int, act int, data []byte) { + gproc.Send(pid, formatMsgBuffer(act, data)) +} + +// 生成一条满足Web Server进程通信协议的消息 +func formatMsgBuffer(act int, data []byte) []byte { + return append(gbinary.EncodeInt8(int8(act)), data...) } diff --git a/g/os/gfile/gfile.go b/g/os/gfile/gfile.go index 1efde864e..2f4176039 100644 --- a/g/os/gfile/gfile.go +++ b/g/os/gfile/gfile.go @@ -46,6 +46,10 @@ func Mkdir(path string) error { // 给定文件的绝对路径创建文件 func Create(path string) error { + dir := Dir(path) + if !Exists(dir) { + Mkdir(dir) + } f, err := os.Create(path) if err != nil { return err diff --git a/g/os/gproc/gproc_comm.go b/g/os/gproc/gproc_comm.go index 13dffa35d..de000c747 100644 --- a/g/os/gproc/gproc_comm.go +++ b/g/os/gproc/gproc_comm.go @@ -16,6 +16,7 @@ import ( "gitee.com/johng/gf/g/os/gfsnotify" "gitee.com/johng/gf/g/container/gqueue" "gitee.com/johng/gf/g/encoding/gbinary" + "gitee.com/johng/gf/g/os/gtime" ) const ( @@ -38,25 +39,39 @@ type Msg struct { func init() { path := getCommFilePath(os.Getpid()) if !gfile.Exists(path) { - gfile.Create(path) + if err := gfile.Create(path); err != nil { + glog.Error(err) + } + } else { + // 初始化时读取已有数据(文件修改时间在10秒以内) + if gtime.Second() - gfile.MTime(path) < 10 { + checkCommBuffer(path) + } } - fmt.Println(path) // 文件事件监听,如果通信数据文件有任何变化,读取文件并添加到消息队列 err := gfsnotify.Add(path, func(event *gfsnotify.Event) { - fmt.Println(event) - commLocker.Lock() - buffer := gfile.GetBinContents(path) - os.Truncate(path, 0) - commLocker.UnLock() - for _, v := range bufferToMsgs(buffer) { - commQueue.PushBack(v) - } + checkCommBuffer(path) }) if err != nil { glog.Error(err) } } +// 手动检查进程通信消息,如果存在消息曾推送到进程消息队列 +func checkCommBuffer(path string) { + commLocker.Lock() + buffer := gfile.GetBinContents(path) + if len(buffer) > 0 { + os.Truncate(path, 0) + } + commLocker.UnLock() + if len(buffer) > 0 { + for _, v := range bufferToMsgs(buffer) { + commQueue.PushBack(v) + } + } +} + // 获取其他进程传递到当前进程的消息包,阻塞执行 func Receive() *Msg { if v := commQueue.PopFront(); v != nil { diff --git a/geg/other/test.go b/geg/other/test.go index 9a6703bc3..af3b1e9cb 100644 --- a/geg/other/test.go +++ b/geg/other/test.go @@ -2,62 +2,14 @@ package main import ( "fmt" - "net/http" - "github.com/tabalt/gracehttp" - "gitee.com/johng/gf/g/encoding/gbinary" - "gitee.com/johng/gf/g/os/gproc" + "gitee.com/johng/gf/g/os/gfile" + "gitee.com/johng/gf/g/os/gtime" ) -func test() { - http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - fmt.Fprintf(w, "hello world") - }) - s := gracehttp.NewServer(":8888", nil, gracehttp.DEFAULT_READ_TIMEOUT, gracehttp.DEFAULT_WRITE_TIMEOUT) - err := s.ListenAndServe() - if err != nil { - fmt.Println(err) - } -} - -// 常见的二进制数据校验方式,生成校验结果 -func checksum(buffer []byte) uint32 { - var checksum uint32 - for _, b := range buffer { - checksum += uint32(b) - } - return checksum -} - -// 数据解包,防止黏包 -func bufferToMsgs(buffer []byte) []*gproc.Msg { - s := 0 - msgs := make([]*gproc.Msg, 0) - for s < len(buffer) { - fmt.Println(s) - length := gbinary.DecodeToInt(buffer[s : s + 4]) - if length < 0 || length > len(buffer) { - s++ - continue - } - checksum1 := gbinary.DecodeToUint32(buffer[s + 8 : s + 12]) - checksum2 := checksum(buffer[s + 12 : s + length]) - if checksum1 != checksum2 { - s++ - continue - } - msgs = append(msgs, &gproc.Msg { - Pid : gbinary.DecodeToInt(buffer[s + 4 : s + 8]), - Data : buffer[s + 12 : s + length], - }) - s += length - } - return msgs -} - - -func main() { - b := []byte{26, 0, 0, 0, 60, 109, 0, 0, 84, 5, 0, 0, 104, 101, 108, 108, 111, 32, 112, 114, 111, 99, 101, 115, 115, 33, 26, 0, 0, 0, 60, 109, 0, 0, 84, 5, 0, 0, 104, 101, 108, 108, 111, 32, 112, 114, 111, 99, 101, 115, 115, 33, 26, 0, 0, 0, 60, 109, 0, 0, 84, 5, 0, 0, 104, 101, 108, 108, 111, 32, 112, 114, 111, 99, 101, 115, 115, 33, 26, 0, 0, 0, 60, 109, 0, 0, 84, 5, 0, 0, 104, 101, 108, 108, 111, 32, 112, 114, 111, 99, 101, 115, 115, 33, 26, 0, 0, 0, 60, 109, 0, 0, 84, 5, 0, 0, 104, 101, 108, 108, 111, 32, 112, 114, 111, 99, 101, 115, 115, 33, 26, 0, 0, 0, 60, 109, 0, 0, 84, 5, 0, 0, 104, 101, 108, 108, 111, 32, 112, 114, 111, 99, 101, 115, 115, 33, 26, 0, 0, 0, 60, 109, 0, 0, 84, 5, 0, 0, 104, 101, 108, 108, 111, 32, 112, 114, 111, 99, 101, 115, 115, 33, 26, 0, 0, 0, 60, 109, 0, 0, 84, 5, 0, 0, 104, 101, 108, 108, 111, 32, 112, 114, 111, 99, 101, 115, 115, 33, 26, 0, 0, 0, 60, 109, 0, 0, 84, 5, 0, 0, 104, 101, 108, 108, 111, 32, 112, 114, 111, 99, 101, 115, 115, 33, 26, 0, 0, 0, 60, 109, 0, 0, 84, 5, 0, 0, 104, 101, 108, 108, 111, 32, 112, 114, 111, 99, 101, 115, 115, 33, 26, 0, 0, 0, 60, 109, 0, 0, 84, 5, 0, 0, 104, 101, 108, 108, 111, 32, 112, 114, 111, 99, 101, 115, 115, 33} - m := bufferToMsgs(b) - fmt.Println(m) -} +func main(){ + t1 := gfile.MTime("/home/john/Workspace/Go/GOPATH/src/gitee.com/johng/gf/geg/other/test.go") + t2 := gtime.Second() + fmt.Println(t1) + fmt.Println(t2) +} \ No newline at end of file