diff --git a/g/net/ghttp/ghttp_server.go b/g/net/ghttp/ghttp_server.go index 32c9373a0..84810816a 100644 --- a/g/net/ghttp/ghttp_server.go +++ b/g/net/ghttp/ghttp_server.go @@ -127,7 +127,7 @@ func init() { doneChan <- struct{}{} if !gproc.IsChild() { - glog.Printfln("%d: all web servers shutdown", gproc.Pid()) + glog.Printfln("%d: all servers shutdown", gproc.Pid()) } }() } diff --git a/g/net/ghttp/ghttp_server_admin.go b/g/net/ghttp/ghttp_server_admin.go index d80abc7bb..fbee04ea1 100644 --- a/g/net/ghttp/ghttp_server_admin.go +++ b/g/net/ghttp/ghttp_server_admin.go @@ -21,7 +21,7 @@ import ( ) const ( - gADMIN_ACTION_INTERVAL_LIMIT = 0 // (毫秒)每一次执行管理操作的间隔限制 + gADMIN_ACTION_INTERVAL_LIMIT = 3000 // (毫秒)服务开启后允许执行管理操作的间隔限制 ) // 用于服务管理的对象 @@ -33,6 +33,12 @@ var serverActionLocker sync.Mutex // (进程级别)用于记录上一次操作的时间(毫秒) var serverActionLastTime = gtype.NewInt64(gtime.Millisecond()) +// 当前服务进程所处的互斥管理操作状态 +// 1 : reload +// 2 : restart +// 4 : shutdown +var serverProcessStatus = gtype.NewInt() + // 服务管理首页 func (p *utilAdmin) Index(r *Request) { data := map[string]interface{}{ @@ -99,6 +105,9 @@ func (s *Server) EnableAdmin(pattern...string) { func (s *Server) Reload() error { serverActionLocker.Lock() defer serverActionLocker.Unlock() + if err := s.checkActionStatus(); err != nil { + return err + } if err := s.checkActionFrequence(); err != nil { return err } @@ -111,6 +120,9 @@ func (s *Server) Reload() error { func (s *Server) Restart() error { serverActionLocker.Lock() defer serverActionLocker.Unlock() + if err := s.checkActionStatus(); err != nil { + return err + } if err := s.checkActionFrequence(); err != nil { return err } @@ -123,6 +135,9 @@ func (s *Server) Restart() error { func (s *Server) Shutdown() error { serverActionLocker.Lock() defer serverActionLocker.Unlock() + if err := s.checkActionStatus(); err != nil { + return err + } if err := s.checkActionFrequence(); err != nil { return err } @@ -139,4 +154,20 @@ func (s *Server) checkActionFrequence() error { } serverActionLastTime.Set(gtime.Millisecond()) return nil +} + +// 检查当前服务进程的状态 +func (s *Server) checkActionStatus() error { + status := serverProcessStatus.Val() + if status > 0 { + switch status { + case 1: + return errors.New("server is reloading") + case 2: + return errors.New("server is restarting") + case 4: + return errors.New("server is shutting down") + } + } + return nil } \ No newline at end of file diff --git a/g/net/ghttp/ghttp_server_comm.go b/g/net/ghttp/ghttp_server_comm.go index 060ae499a..b8b6f292a 100644 --- a/g/net/ghttp/ghttp_server_comm.go +++ b/g/net/ghttp/ghttp_server_comm.go @@ -15,8 +15,6 @@ import ( "gitee.com/johng/gf/g/container/gtype" "gitee.com/johng/gf/g/encoding/gbinary" "gitee.com/johng/gf/g/os/gtime" - "fmt" - "gitee.com/johng/gf/g/os/glog" ) const ( @@ -58,8 +56,8 @@ func handleProcessMsg() { for { if msg := gproc.Receive(); msg != nil { // 记录消息日志,用于调试 - content := gconv.String(msg.Pid) + "=>" + gconv.String(gproc.Pid()) + ":" + fmt.Sprintf("%v\n", msg.Data) - glog.Print(content) + //content := gconv.String(msg.Pid) + "=>" + gconv.String(gproc.Pid()) + ":" + fmt.Sprintf("%v\n", msg.Data) + //glog.Print(content) //gfile.PutContentsAppend("/tmp/gproc-log", content) act := gbinary.DecodeToUint(msg.Data[0 : 1]) data := msg.Data[1 : ] diff --git a/g/net/ghttp/ghttp_server_comm_child.go b/g/net/ghttp/ghttp_server_comm_child.go index ac40dc704..bd60638ce 100644 --- a/g/net/ghttp/ghttp_server_comm_child.go +++ b/g/net/ghttp/ghttp_server_comm_child.go @@ -21,7 +21,7 @@ import ( ) const ( - gPROC_CHILD_MAX_IDLE_TIME = 3000 // 子进程闲置时间(未开启心跳机制的时间) + gPROC_CHILD_MAX_IDLE_TIME = 10000 // 子进程闲置时间(未开启心跳机制的时间) ) // 心跳处理(方法为空,逻辑放到公共通信switch中进行处理) diff --git a/g/net/ghttp/ghttp_server_comm_child_windows.go b/g/net/ghttp/ghttp_server_comm_child_windows.go index fc1bf9463..a28027441 100644 --- a/g/net/ghttp/ghttp_server_comm_child_windows.go +++ b/g/net/ghttp/ghttp_server_comm_child_windows.go @@ -8,19 +8,12 @@ package ghttp import ( "gitee.com/johng/gf/g/os/gproc" - "os" ) // 开启所有Web Server(根据消息启动) func onCommChildStart(pid int, data []byte) { // 进程创建成功之后(开始执行服务时间点为准),通知主进程自身的存在,并开始执行心跳机制 sendProcessMsg(gproc.PPid(), gMSG_NEW_FORK, nil) - // 如果创建自己的父进程非gproc父进程,那么表示该进程为重启创建的进程,创建成功之后需要通知父进程自行销毁 - if gproc.PPidOS() != gproc.PPid() { - if p, err := os.FindProcess(gproc.PPidOS()); err == nil { - p.Kill() - } - } // 开启Web Server服务 serverMapping.RLockFunc(func(m map[string]interface{}) { for _, v := range m { diff --git a/g/net/ghttp/ghttp_server_comm_main.go b/g/net/ghttp/ghttp_server_comm_main.go index a1bafa766..7b9b1e4a6 100644 --- a/g/net/ghttp/ghttp_server_comm_main.go +++ b/g/net/ghttp/ghttp_server_comm_main.go @@ -10,7 +10,6 @@ package ghttp import ( "os" - "fmt" "time" "gitee.com/johng/gf/g/os/glog" "gitee.com/johng/gf/g/os/gtime" @@ -23,14 +22,13 @@ var procUpdateTimeMap = gmap.NewIntIntMap() // 开启服务 func onCommMainStart(pid int, data []byte) { - p := procManager.NewProcess(os.Args[0], os.Args, os.Environ()) - if _, err := p.Start(); err != nil { - glog.Errorfln("%d: fork new process error:%s", gproc.Pid(), err.Error()) - return + fork := forkNewProcess() + if fork == nil { + os.Exit(1) } - updateProcessCommTime(p.Pid()) + updateProcessCommTime(fork.Pid()) // 子进程创建成功之后再发送执行命令 - sendProcessMsg(p.Pid(), gMSG_START, nil) + sendProcessMsg(fork.Pid(), gMSG_START, nil) } // 心跳处理(方法为空,逻辑放到公共通信switch中进行处理) @@ -50,12 +48,19 @@ func onCommMainRestart(pid int, data []byte) { procManager.Send(formatMsgBuffer(gMSG_RESTART, nil)) return } - // 否则杀掉子进程,然后新建一个完整的子进程 + // 首先创建子进程,暂时不开始服务,否则会有端口冲突 + fork := forkNewProcess() + if fork == nil { + os.Exit(1) + } + // 然后通知旧的子进程自动关闭并退出(不需要新建的子进程来处理) + sendProcessMsg(pid, gMSG_CLOSE, nil) if p, err := os.FindProcess(pid); err == nil && p != nil { p.Kill() p.Wait() } - sendProcessMsg(gproc.Pid(), gMSG_START, nil) + // 通知新的子进程执行服务监听 + sendProcessMsg(fork.Pid(), gMSG_START, nil) } // 新建子进程通知 @@ -76,6 +81,16 @@ func updateProcessCommTime(pid int) { procUpdateTimeMap.Set(pid, int(gtime.Millisecond())) } +// 创建一个子进程,但是暂时不执行服务监听 +func forkNewProcess() *gproc.Process { + p := procManager.NewProcess(os.Args[0], os.Args, os.Environ()) + if _, err := p.Start(); err != nil { + glog.Errorfln("%d: fork new process error:%s", gproc.Pid(), err.Error()) + return nil + } + return p +} + // 主进程与子进程相互异步方式发送心跳信息,保持活跃状态 func handleMainProcessHeartbeat() { for { @@ -86,17 +101,19 @@ func handleMainProcessHeartbeat() { for _, pid := range procManager.Pids() { updatetime := procUpdateTimeMap.Get(pid) if updatetime > 0 && int(gtime.Millisecond()) - updatetime > gPROC_HEARTBEAT_TIMEOUT { - fmt.Println("remove pid", pid, int(gtime.Millisecond()), updatetime) + //fmt.Println("remove pid", pid, int(gtime.Millisecond()), updatetime) // 这里需要手动从进程管理器中去掉该进程 procManager.RemoveProcess(pid) sendProcessMsg(pid, gMSG_CLOSE, nil) } } + + // (双保险)如果所有子进程都退出,并且主进程未活动达到超时时间,那么主进程也没存在的必要 + if procManager.Size() == 0 && int(gtime.Millisecond()) - lastUpdateTime.Val() > gPROC_HEARTBEAT_TIMEOUT{ + glog.Printfln("%d: all children died, exit", gproc.Pid()) + os.Exit(0) + } } - // (双保险)如果所有子进程都退出,并且主进程未活动达到超时时间,那么主进程也没存在的必要 - if procManager.Size() == 0 && int(gtime.Millisecond()) - lastUpdateTime.Val() > gPROC_HEARTBEAT_TIMEOUT{ - //glog.Printfln("%d: all children died, exit", gproc.Pid()) - os.Exit(0) - } + } } diff --git a/g/os/gfsnotify/gfsnotify.go b/g/os/gfsnotify/gfsnotify.go index 8152721ac..cbed56312 100644 --- a/g/os/gfsnotify/gfsnotify.go +++ b/g/os/gfsnotify/gfsnotify.go @@ -6,9 +6,6 @@ // 文件监控. // 使用时需要注意的是,一旦一个文件被删除,那么对其的监控将会失效。 -// 特点: -// 1、底层使用了fsnotify机制作为异步监听插件; -// 2、(可选)文件主动自动检查作为fsnotify文件监听的辅助手段来保障监听文件如果发生改变,监控端将会及时收到提醒(解决某些业务场景下的fsnotify延迟问题); package gfsnotify import ( @@ -20,18 +17,14 @@ import ( "gitee.com/johng/gf/g/container/gmap" "gitee.com/johng/gf/g/container/glist" "gitee.com/johng/gf/g/container/gqueue" - "gitee.com/johng/gf/g/container/gtype" - "gitee.com/johng/gf/g/os/gtime" ) // 监听管理对象 type Watcher struct { - watcher *fsnotify.Watcher // 底层fsnotify对象 - events *gqueue.Queue // 过滤后的事件通知,不会出现重复事件 - closeChan chan struct{} // 关闭事件 - callbacks *gmap.StringInterfaceMap // 监听的回调函数 - watchUpdateTimeMap *gmap.StringIntMap // (毫秒)监控文件最新的通知时间 - activeCheckInterval *gtype.Int // (毫秒)主动文件检查时间间隔 + watcher *fsnotify.Watcher // 底层fsnotify对象 + events *gqueue.Queue // 过滤后的事件通知,不会出现重复事件 + closeChan chan struct{} // 关闭事件 + callbacks *gmap.StringInterfaceMap // 监听的回调函数 } // 监听事件对象 @@ -74,32 +67,19 @@ func Remove(path string) error { func New() (*Watcher, error) { if watch, err := fsnotify.NewWatcher(); err == nil { w := &Watcher { - watcher : watch, - events : gqueue.New(), - closeChan : make(chan struct{}, 1), - callbacks : gmap.NewStringInterfaceMap(), - watchUpdateTimeMap : gmap.NewStringIntMap(), - activeCheckInterval : gtype.NewInt(), + watcher : watch, + events : gqueue.New(), + closeChan : make(chan struct{}, 1), + callbacks : gmap.NewStringInterfaceMap(), } w.startWatchLoop() w.startEventLoop() - w.startActiveCheckLoop() return w, nil } else { return nil, err } } -// 启动主动文件更新检测机制 -func (w *Watcher) EnableActiveCheck(interval int) { - w.activeCheckInterval.Set(interval) -} - -// 关闭主动文件更新检测机制 -func (w *Watcher) DisableActiveCheck() { - w.activeCheckInterval.Set(0) -} - // 关闭监听管理对象 func (w *Watcher) Close() { w.watcher.Close() @@ -128,8 +108,6 @@ func (w *Watcher) Add(path string, callback func(event *Event)) error { }) // 添加底层监听 w.watcher.Add(path) - // 添加默认更新时间 - w.watchUpdateTimeMap.Set(path, int(gfile.MTimeMillisecond(path))) return nil } @@ -139,7 +117,7 @@ func (w *Watcher) Remove(path string) error { return w.watcher.Remove(path) } -// fsnotify监听循环 +// 监听循环 func (w *Watcher) startWatchLoop() { go func() { for { @@ -173,7 +151,6 @@ func (w *Watcher) startEventLoop() { w.watcher.Add(event.Path) continue } - w.watchUpdateTimeMap.Set(event.Path, int(gtime.Millisecond())) if l := w.callbacks.Get(event.Path); l != nil { grpool.Add(func() { for _, v := range l.(*glist.List).FrontAll() { diff --git a/g/os/gfsnotify/gfsnotify_check.go b/g/os/gfsnotify/gfsnotify_check.go deleted file mode 100644 index 7aae8acf1..000000000 --- a/g/os/gfsnotify/gfsnotify_check.go +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved. -// -// This Source Code Form is subject to the terms of the MIT License. -// If a copy of the MIT was not distributed with this file, -// You can obtain one at https://gitee.com/johng/gf. - -package gfsnotify - -import ( - "time" - "gitee.com/johng/gf/g/os/gtime" - "gitee.com/johng/gf/g/os/gfile" - "fmt" -) - -const ( - gDEFAULT_ACTIVE_CHECK_INTERVAL = 500 // (毫秒)默认主动检测时间间隔,未开启时什么也不做 -) - -// 文件定时监听器循环 -func (w *Watcher) startActiveCheckLoop() { - go func() { - for { - select { - // 关闭事件 - case <- w.closeChan: - return - - default: - if w.activeCheckInterval.Val() > 0 { - paths := w.watchUpdateTimeMap.Keys() - for _, path := range paths { - lastUpdateTime := w.watchUpdateTimeMap.Get(path) - if int(gtime.Millisecond()) - lastUpdateTime > w.activeCheckInterval.Val() { - - fileUpdateTime := int(gfile.MTimeMillisecond(path)) - fmt.Println("check:", path, fileUpdateTime, lastUpdateTime) - if fileUpdateTime > lastUpdateTime { - fmt.Println("update:", path) - w.watchUpdateTimeMap.Set(path, fileUpdateTime) - w.events.PushBack(&Event{ - Path : path, - Op : Op(WRITE), - }) - } - } - } - time.Sleep(time.Duration(w.activeCheckInterval.Val())*time.Millisecond) - } else { - time.Sleep(gDEFAULT_ACTIVE_CHECK_INTERVAL*time.Millisecond) - } - } - } - }() -} \ No newline at end of file diff --git a/g/os/gproc/gproc_comm.go b/g/os/gproc/gproc_comm.go index 8c1d261e7..6ff56ef34 100644 --- a/g/os/gproc/gproc_comm.go +++ b/g/os/gproc/gproc_comm.go @@ -7,28 +7,31 @@ package gproc import ( + "io" "os" "fmt" "time" + "errors" "gitee.com/johng/gf/g/os/glog" "gitee.com/johng/gf/g/os/gfile" + "gitee.com/johng/gf/g/os/gtime" "gitee.com/johng/gf/g/os/gflock" "gitee.com/johng/gf/g/util/gconv" "gitee.com/johng/gf/g/os/gfsnotify" + "gitee.com/johng/gf/g/container/gtype" "gitee.com/johng/gf/g/container/gqueue" "gitee.com/johng/gf/g/encoding/gbinary" - "gitee.com/johng/gf/g/os/gtime" - "io" - "errors" ) const ( // 由于子进程的temp dir有可能会和父进程不一致(特别是windows下),影响进程间通信,这里统一使用环境变量设置 - gPROC_TEMP_DIR_ENV_KEY = "gproc.tempdir" + gPROC_TEMP_DIR_ENV_KEY = "gproc.tempdir" // 自动通信文件清理时间间隔 - gPROC_COMM_AUTO_CLEAR_INTERVAL = time.Second + gPROC_COMM_AUTO_CLEAR_INTERVAL = time.Second // 写入通信数据失败时候的重试次数 - gPROC_COMM_FAILURE_RETRY_COUNT = 3 + gPROC_COMM_FAILURE_RETRY_COUNT = 3 + // (毫秒)主动通信内容检查时间间隔 + gPROC_COMM_ACTIVE_CHECK_INTERVAL = 500 ) // 全局通信文件清理文件锁(同一时刻只能存在一个进程进行通信文件清理) @@ -37,8 +40,8 @@ var commClearLocker = gflock.New("comm.clear.lock") var commLocker = gflock.New(fmt.Sprintf("%d.lock", os.Getpid())) // 进程通信消息队列 var commQueue = gqueue.New() -// 文件监控器 -var watcher, _ = gfsnotify.New() +// 上一次进程通信内容检查的时间 +var commLastCheckTime = gtype.NewInt64() // TCP通信数据结构定义 type Msg struct { @@ -64,6 +67,7 @@ func init() { glog.Errorfln("%s is not writable for gproc", path) os.Exit(1) } + updateLastCheckTime() if gtime.Second() - gfile.MTime(path) < 10 { // 初始化时读取已有数据(文件修改时间在10秒以内) checkCommBuffer(path) @@ -73,10 +77,9 @@ func init() { os.Truncate(path, 0) commLocker.UnLock() } - watcher.EnableActiveCheck(1000) // 文件事件监听,如果通信数据文件有任何变化,读取文件并添加到消息队列 - err := watcher.Add(path, func(event *gfsnotify.Event) { - glog.Printfln("%d: gfsnotify", Pid()) + err := gfsnotify.Add(path, func(event *gfsnotify.Event) { + updateLastCheckTime() checkCommBuffer(path) }) if err != nil { @@ -84,6 +87,12 @@ func init() { } go autoClearCommDir() + go autoActiveCheckComm() +} + +// 更新最后通信检查时间 +func updateLastCheckTime() { + commLastCheckTime.Set(gtime.Millisecond()) } // 自动清理通信目录文件 @@ -104,6 +113,17 @@ func autoClearCommDir() { } } +// 主动通信内容检测 +func autoActiveCheckComm() { + for { + time.Sleep(gPROC_COMM_ACTIVE_CHECK_INTERVAL*time.Millisecond) + if gtime.Millisecond() - commLastCheckTime.Val() > gPROC_COMM_ACTIVE_CHECK_INTERVAL { + updateLastCheckTime() + checkCommBuffer(getCommFilePath(Pid())) + } + } +} + // 手动检查进程通信消息,如果存在消息曾推送到进程消息队列 func checkCommBuffer(path string) { commLocker.Lock() @@ -146,7 +166,7 @@ func Send(pid int, data interface{}) error { } } l.UnLock() - glog.Printfln("%d to %d, %v, %d, %v", Pid(), pid, data, gfile.Size(getCommFilePath(pid)), err) + //glog.Printfln("%d to %d, %v, %d, %v", Pid(), pid, data, gfile.Size(getCommFilePath(pid)), err) return err } diff --git a/geg/other/test.go b/geg/other/test.go index 63d854de7..da3806bec 100644 --- a/geg/other/test.go +++ b/geg/other/test.go @@ -1,64 +1,20 @@ package main -import ( - "fmt" - "gitee.com/johng/gf/g/os/gfile" - "gitee.com/johng/gf/g/os/gtime" - "gitee.com/johng/gf/g/encoding/gbinary" - "gitee.com/johng/gf/g/os/gproc" - "os" +import "fmt" + +const ( + CREATE = 1 << iota + WRITE + REMOVE + RENAME + CHMOD ) -// 数据解包,防止黏包 -func bufferToMsgs(buffer []byte) []*gproc.Msg { - s := 0 - msgs := make([]*gproc.Msg, 0) - for s < len(buffer) { - length := gbinary.DecodeToInt(buffer[s : s + 4]) - if length < 0 || length > len(buffer) { - s++ - continue - } - checksum1 := gbinary.DecodeToUint32(buffer[s + 8 : s + 12]) - checksum2 := checksum(buffer[s + 12 : s + length]) - if checksum1 != checksum2 { - s++ - continue - } - msgs = append(msgs, &gproc.Msg { - Pid : gbinary.DecodeToInt(buffer[s + 4 : s + 8]), - Data : buffer[s + 12 : s + length], - }) - s += length - } - return msgs -} - -// 常见的二进制数据校验方式,生成校验结果 -func checksum(buffer []byte) uint32 { - var checksum uint32 - for _, b := range buffer { - checksum += uint32(b) - } - return checksum -} func main(){ - f, _ := os.Stat("/home/john/Workspace/Go/GOPATH/src/gitee.com/johng/gf/geg/other/test.go") - fmt.Println(f.ModTime().Unix()) - fmt.Println(f.ModTime().Nanosecond()) - fmt.Println(gfile.MTimeMillisecond("/home/john/Workspace/Go/GOPATH/src/gitee.com/johng/gf/geg/other/test.go")) - return - b := gfile.GetBinContents("/home/john/Documents/11248") - for _, msg := range bufferToMsgs(b) { - fmt.Println(msg.Pid) - fmt.Println(msg.Data) - } - - return - t1 := gfile.MTime("/home/john/Workspace/Go/GOPATH/src/gitee.com/johng/gf/geg/other/test.go") - t2 := gtime.Second() - fmt.Println(t1) - fmt.Println(t2) + fmt.Println(CREATE) + fmt.Println(WRITE) + fmt.Println(REMOVE) + fmt.Println(RENAME) } \ No newline at end of file