diff --git a/g/net/ghttp/ghttp_server.go b/g/net/ghttp/ghttp_server.go index 2541e6eec..13ade8479 100644 --- a/g/net/ghttp/ghttp_server.go +++ b/g/net/ghttp/ghttp_server.go @@ -316,19 +316,19 @@ func (s *Server) startServer(fdMap listenerFdMap) { s.status = 1 } -// 热重启Web Server +// 平滑重启Web Server func (s *Server) Reload() { sendProcessMsg(gproc.Pid(), gMSG_RELOAD, nil) } -// 重启Web Server +// 完整重启Web Server func (s *Server) Restart() { sendProcessMsg(gproc.Pid(), gMSG_RESTART, nil) } // 关闭Web Server func (s *Server) Shutdown() { - sendProcessMsg(gproc.Pid(), gMSG_SHUTDOWN, nil) + sendProcessMsg(gproc.PPid(), gMSG_SHUTDOWN, nil) } // 获取当前监听的文件描述符信息,构造成map返回 diff --git a/g/net/ghttp/ghttp_server_comm.go b/g/net/ghttp/ghttp_server_comm.go index 8857fb39e..57b78439d 100644 --- a/g/net/ghttp/ghttp_server_comm.go +++ b/g/net/ghttp/ghttp_server_comm.go @@ -18,13 +18,13 @@ import ( ) const ( - gMSG_START = 10 - gMSG_RELOAD = 20 - gMSG_RESTART = 30 - gMSG_SHUTDOWN = 40 - gMSG_CLOSE = 45 - gMSG_NEW_FORK = 50 - gMSG_HEARTBEAT = 70 + gMSG_START = 1 + gMSG_RELOAD = 2 + gMSG_RESTART = 3 + gMSG_SHUTDOWN = 4 + gMSG_CLOSE = 5 + gMSG_NEW_FORK = 6 + gMSG_HEARTBEAT = 7 gPROC_HEARTBEAT_INTERVAL = 1000 // (毫秒)进程间心跳间隔 gPROC_HEARTBEAT_TIMEOUT = 3000 // (毫秒)进程间心跳超时时间,如果子进程在这段内没有接收到任何心跳,那么自动退出,防止可能出现的僵尸子进程 diff --git a/g/net/ghttp/ghttp_server_comm_child.go b/g/net/ghttp/ghttp_server_comm_child.go index 2839ca9e2..5938c1c1d 100644 --- a/g/net/ghttp/ghttp_server_comm_child.go +++ b/g/net/ghttp/ghttp_server_comm_child.go @@ -29,7 +29,7 @@ func onCommChildHeartbeat(pid int, data []byte) { } -// 热重启,子进程收到重启消息,那么将自身的ServerFdMap信息收集后发送给主进程,由主进程进行统一调度 +// 平滑重启,子进程收到重启消息,那么将自身的ServerFdMap信息收集后发送给主进程,由主进程进行统一调度 func onCommChildReload(pid int, data []byte) { var buffer []byte = nil p := procManager.NewProcess(os.Args[0], os.Args, os.Environ()) diff --git a/g/net/ghttp/ghttp_server_comm_child_unix.go b/g/net/ghttp/ghttp_server_comm_child_unix.go index 20485b93a..7d8d84988 100644 --- a/g/net/ghttp/ghttp_server_comm_child_unix.go +++ b/g/net/ghttp/ghttp_server_comm_child_unix.go @@ -32,7 +32,7 @@ func onCommChildStart(pid int, data []byte) { sendProcessMsg(gproc.PPid(), gMSG_NEW_FORK, nil) // 如果创建自己的父进程非gproc父进程,那么表示该进程为重启创建的进程,创建成功之后需要通知父进程自行销毁 if gproc.PPidOS() != gproc.PPid() { - //sendProcessMsg(gproc.PPidOS(), gMSG_SHUTDOWN, nil) + //如果子进程已经继承了父进程的socket文件描述符,那么父进程没有存在的必要,直接kill掉 if p, err := os.FindProcess(gproc.PPidOS()); err == nil { p.Kill() } diff --git a/g/net/ghttp/ghttp_server_comm_main.go b/g/net/ghttp/ghttp_server_comm_main.go index 37a4ecc72..5666fbfbf 100644 --- a/g/net/ghttp/ghttp_server_comm_main.go +++ b/g/net/ghttp/ghttp_server_comm_main.go @@ -31,19 +31,20 @@ func onCommMainHeartbeat(pid int, data []byte) { updateProcessCommTime(pid) } -// 热重启服务 +// 平滑重启服务 func onCommMainReload(pid int, data []byte) { - // 向所有子进程发送重启命令,子进程将会搜集Web Server信息发送给父进程进行协调重启工作 procManager.Send(formatMsgBuffer(gMSG_RELOAD, nil)) } // 完整重启服务 func onCommMainRestart(pid int, data []byte) { + // 如果是父进程接收到重启指令,那么通知所有子进程重启 if pid == gproc.Pid() { procManager.Send(formatMsgBuffer(gMSG_RESTART, nil)) return } - if p, _ := os.FindProcess(pid); p != nil { + // 否则杀掉子进程,然后新建一个完整的子进程 + if p, err := os.FindProcess(pid); err == nil && p != nil { p.Kill() p.Wait() } @@ -56,9 +57,8 @@ func onCommMainNewFork(pid int, data []byte) { checkHeartbeat.Set(true) } -// 关闭服务,通知所有子进程退出 +// 关闭服务,通知所有子进程退出(Kill强制性退出) func onCommMainShutdown(pid int, data []byte) { - //procManager.Send(formatMsgBuffer(gMSG_SHUTDOWN, nil)) procManager.KillAll() procManager.WaitAll() } @@ -83,7 +83,7 @@ func handleMainProcessHeartbeat() { } } } - // 如果所有子进程都退出,并且达到超时时间,那么主进程也没存在的必要 + // 如果所有子进程都退出,并且主进程未活动达到超时时间,那么主进程也没存在的必要 if procManager.Size() == 0 && int(gtime.Millisecond()) - lastUpdateTime.Val() > gPROC_HEARTBEAT_TIMEOUT{ os.Exit(0) } diff --git a/g/os/gflock/gflock.go b/g/os/gflock/gflock.go index c8d10a849..92630fac0 100644 --- a/g/os/gflock/gflock.go +++ b/g/os/gflock/gflock.go @@ -15,8 +15,9 @@ import ( // 文件锁 type Locker struct { - mu sync.RWMutex - flock *flock.Flock + mu sync.RWMutex // 用于外部接口调用的互斥锁(阻塞机制) + fmu sync.RWMutex // 用于保证方法内部操作的原子性互斥锁 + flock *flock.Flock // 底层文件锁对象 } // 创建文件锁 @@ -36,22 +37,57 @@ func (l *Locker) Path() string { return l.flock.Path() } +// 当前文件锁是否处于锁定状态(Lock) +func (l *Locker) IsLocked() bool { + return l.flock.Locked() +} + +// 尝试Lock文件,如果失败立即返回 +func (l *Locker) TryLock() bool { + l.fmu.Lock() + defer l.fmu.Unlock() + ok, _ := l.flock.TryLock() + if ok { + l.mu.Lock() + } + return ok +} + +// 尝试RLock文件,如果失败立即返回 +func (l *Locker) TryRLock() bool { + l.fmu.Lock() + defer l.fmu.Unlock() + ok, _ := l.flock.TryRLock() + if ok { + l.mu.RLock() + } + return ok +} + func (l *Locker) Lock() { + l.fmu.Lock() + defer l.fmu.Unlock() l.mu.Lock() l.flock.Lock() } func (l *Locker) UnLock() { + l.fmu.Lock() + defer l.fmu.Unlock() l.flock.Unlock() l.mu.Unlock() } func (l *Locker) RLock() { + l.fmu.Lock() + defer l.fmu.Unlock() l.mu.RLock() l.flock.RLock() } func (l *Locker) RUnlock() { + l.fmu.Lock() + defer l.fmu.Unlock() l.flock.Unlock() l.mu.RUnlock() } diff --git a/g/os/gproc/gproc_comm.go b/g/os/gproc/gproc_comm.go index 9ef920e3d..027895574 100644 --- a/g/os/gproc/gproc_comm.go +++ b/g/os/gproc/gproc_comm.go @@ -27,6 +27,8 @@ const ( gPROC_COMM_AUTO_CLEAR_INTERVAL = time.Second ) +// 全局通信文件清理文件锁(同一时刻只能存在一个进程进行通信文件清理) +var commClearLocker = gflock.New("comm.clear.lock") // 当前进程的文件锁 var commLocker = gflock.New(fmt.Sprintf("%d.lock", os.Getpid())) // 进程通信消息队列 @@ -82,11 +84,14 @@ func autoClearCommDir() { dirPath := getCommDirPath() for { time.Sleep(gPROC_COMM_AUTO_CLEAR_INTERVAL) - for _, name := range gfile.ScanDir(dirPath) { - path := dirPath + gfile.Separator + name - if gtime.Second() - gfile.MTime(path) >= 10 { - gfile.Remove(path) + if commClearLocker.TryLock() { + for _, name := range gfile.ScanDir(dirPath) { + path := dirPath + gfile.Separator + name + if gtime.Second() - gfile.MTime(path) >= 10 { + gfile.Remove(path) + } } + commClearLocker.UnLock() } } } diff --git a/geg/os/gflock/gflock.go b/geg/os/gflock/gflock.go index b2ad294d7..2ceac44f0 100644 --- a/geg/os/gflock/gflock.go +++ b/geg/os/gflock/gflock.go @@ -9,9 +9,9 @@ import ( func main() { l := gflock.New("1.lock") fmt.Println(l.Path()) - fmt.Println(l.Lock()) + fmt.Println(l.TryLock()) fmt.Println("lock 1") - fmt.Println(l.Lock()) + l.Lock() fmt.Println("lock 1") time.Sleep(time.Hour) }