完成ghttp.Server热重启机制在Linux_amd64及Windows_amd64下的测试

This commit is contained in:
John
2018-05-17 22:32:50 +08:00
parent 6204936001
commit 28b6f10908
10 changed files with 121 additions and 184 deletions

View File

@ -127,7 +127,7 @@ func init() {
doneChan <- struct{}{}
if !gproc.IsChild() {
glog.Printfln("%d: all web servers shutdown", gproc.Pid())
glog.Printfln("%d: all servers shutdown", gproc.Pid())
}
}()
}

View File

@ -21,7 +21,7 @@ import (
)
const (
gADMIN_ACTION_INTERVAL_LIMIT = 0 // (毫秒)每一次执行管理操作的间隔限制
gADMIN_ACTION_INTERVAL_LIMIT = 3000 // (毫秒)服务开启后允许执行管理操作的间隔限制
)
// 用于服务管理的对象
@ -33,6 +33,12 @@ var serverActionLocker sync.Mutex
// (进程级别)用于记录上一次操作的时间(毫秒)
var serverActionLastTime = gtype.NewInt64(gtime.Millisecond())
// 当前服务进程所处的互斥管理操作状态
// 1 : reload
// 2 : restart
// 4 : shutdown
var serverProcessStatus = gtype.NewInt()
// 服务管理首页
func (p *utilAdmin) Index(r *Request) {
data := map[string]interface{}{
@ -99,6 +105,9 @@ func (s *Server) EnableAdmin(pattern...string) {
func (s *Server) Reload() error {
serverActionLocker.Lock()
defer serverActionLocker.Unlock()
if err := s.checkActionStatus(); err != nil {
return err
}
if err := s.checkActionFrequence(); err != nil {
return err
}
@ -111,6 +120,9 @@ func (s *Server) Reload() error {
func (s *Server) Restart() error {
serverActionLocker.Lock()
defer serverActionLocker.Unlock()
if err := s.checkActionStatus(); err != nil {
return err
}
if err := s.checkActionFrequence(); err != nil {
return err
}
@ -123,6 +135,9 @@ func (s *Server) Restart() error {
func (s *Server) Shutdown() error {
serverActionLocker.Lock()
defer serverActionLocker.Unlock()
if err := s.checkActionStatus(); err != nil {
return err
}
if err := s.checkActionFrequence(); err != nil {
return err
}
@ -139,4 +154,20 @@ func (s *Server) checkActionFrequence() error {
}
serverActionLastTime.Set(gtime.Millisecond())
return nil
}
// 检查当前服务进程的状态
func (s *Server) checkActionStatus() error {
status := serverProcessStatus.Val()
if status > 0 {
switch status {
case 1:
return errors.New("server is reloading")
case 2:
return errors.New("server is restarting")
case 4:
return errors.New("server is shutting down")
}
}
return nil
}

View File

@ -15,8 +15,6 @@ import (
"gitee.com/johng/gf/g/container/gtype"
"gitee.com/johng/gf/g/encoding/gbinary"
"gitee.com/johng/gf/g/os/gtime"
"fmt"
"gitee.com/johng/gf/g/os/glog"
)
const (
@ -58,8 +56,8 @@ func handleProcessMsg() {
for {
if msg := gproc.Receive(); msg != nil {
// 记录消息日志,用于调试
content := gconv.String(msg.Pid) + "=>" + gconv.String(gproc.Pid()) + ":" + fmt.Sprintf("%v\n", msg.Data)
glog.Print(content)
//content := gconv.String(msg.Pid) + "=>" + gconv.String(gproc.Pid()) + ":" + fmt.Sprintf("%v\n", msg.Data)
//glog.Print(content)
//gfile.PutContentsAppend("/tmp/gproc-log", content)
act := gbinary.DecodeToUint(msg.Data[0 : 1])
data := msg.Data[1 : ]

View File

@ -21,7 +21,7 @@ import (
)
const (
gPROC_CHILD_MAX_IDLE_TIME = 3000 // 子进程闲置时间(未开启心跳机制的时间)
gPROC_CHILD_MAX_IDLE_TIME = 10000 // 子进程闲置时间(未开启心跳机制的时间)
)
// 心跳处理(方法为空逻辑放到公共通信switch中进行处理)

View File

@ -8,19 +8,12 @@ package ghttp
import (
"gitee.com/johng/gf/g/os/gproc"
"os"
)
// 开启所有Web Server(根据消息启动)
func onCommChildStart(pid int, data []byte) {
// 进程创建成功之后(开始执行服务时间点为准),通知主进程自身的存在,并开始执行心跳机制
sendProcessMsg(gproc.PPid(), gMSG_NEW_FORK, nil)
// 如果创建自己的父进程非gproc父进程那么表示该进程为重启创建的进程创建成功之后需要通知父进程自行销毁
if gproc.PPidOS() != gproc.PPid() {
if p, err := os.FindProcess(gproc.PPidOS()); err == nil {
p.Kill()
}
}
// 开启Web Server服务
serverMapping.RLockFunc(func(m map[string]interface{}) {
for _, v := range m {

View File

@ -10,7 +10,6 @@ package ghttp
import (
"os"
"fmt"
"time"
"gitee.com/johng/gf/g/os/glog"
"gitee.com/johng/gf/g/os/gtime"
@ -23,14 +22,13 @@ var procUpdateTimeMap = gmap.NewIntIntMap()
// 开启服务
func onCommMainStart(pid int, data []byte) {
p := procManager.NewProcess(os.Args[0], os.Args, os.Environ())
if _, err := p.Start(); err != nil {
glog.Errorfln("%d: fork new process error:%s", gproc.Pid(), err.Error())
return
fork := forkNewProcess()
if fork == nil {
os.Exit(1)
}
updateProcessCommTime(p.Pid())
updateProcessCommTime(fork.Pid())
// 子进程创建成功之后再发送执行命令
sendProcessMsg(p.Pid(), gMSG_START, nil)
sendProcessMsg(fork.Pid(), gMSG_START, nil)
}
// 心跳处理(方法为空逻辑放到公共通信switch中进行处理)
@ -50,12 +48,19 @@ func onCommMainRestart(pid int, data []byte) {
procManager.Send(formatMsgBuffer(gMSG_RESTART, nil))
return
}
// 否则杀掉子进程,然后新建一个完整的子进程
// 首先创建子进程,暂时不开始服务,否则会有端口冲突
fork := forkNewProcess()
if fork == nil {
os.Exit(1)
}
// 然后通知旧的子进程自动关闭并退出(不需要新建的子进程来处理)
sendProcessMsg(pid, gMSG_CLOSE, nil)
if p, err := os.FindProcess(pid); err == nil && p != nil {
p.Kill()
p.Wait()
}
sendProcessMsg(gproc.Pid(), gMSG_START, nil)
// 通知新的子进程执行服务监听
sendProcessMsg(fork.Pid(), gMSG_START, nil)
}
// 新建子进程通知
@ -76,6 +81,16 @@ func updateProcessCommTime(pid int) {
procUpdateTimeMap.Set(pid, int(gtime.Millisecond()))
}
// 创建一个子进程,但是暂时不执行服务监听
func forkNewProcess() *gproc.Process {
p := procManager.NewProcess(os.Args[0], os.Args, os.Environ())
if _, err := p.Start(); err != nil {
glog.Errorfln("%d: fork new process error:%s", gproc.Pid(), err.Error())
return nil
}
return p
}
// 主进程与子进程相互异步方式发送心跳信息,保持活跃状态
func handleMainProcessHeartbeat() {
for {
@ -86,17 +101,19 @@ func handleMainProcessHeartbeat() {
for _, pid := range procManager.Pids() {
updatetime := procUpdateTimeMap.Get(pid)
if updatetime > 0 && int(gtime.Millisecond()) - updatetime > gPROC_HEARTBEAT_TIMEOUT {
fmt.Println("remove pid", pid, int(gtime.Millisecond()), updatetime)
//fmt.Println("remove pid", pid, int(gtime.Millisecond()), updatetime)
// 这里需要手动从进程管理器中去掉该进程
procManager.RemoveProcess(pid)
sendProcessMsg(pid, gMSG_CLOSE, nil)
}
}
// (双保险)如果所有子进程都退出,并且主进程未活动达到超时时间,那么主进程也没存在的必要
if procManager.Size() == 0 && int(gtime.Millisecond()) - lastUpdateTime.Val() > gPROC_HEARTBEAT_TIMEOUT{
glog.Printfln("%d: all children died, exit", gproc.Pid())
os.Exit(0)
}
}
// (双保险)如果所有子进程都退出,并且主进程未活动达到超时时间,那么主进程也没存在的必要
if procManager.Size() == 0 && int(gtime.Millisecond()) - lastUpdateTime.Val() > gPROC_HEARTBEAT_TIMEOUT{
//glog.Printfln("%d: all children died, exit", gproc.Pid())
os.Exit(0)
}
}
}

View File

@ -6,9 +6,6 @@
// 文件监控.
// 使用时需要注意的是,一旦一个文件被删除,那么对其的监控将会失效。
// 特点:
// 1、底层使用了fsnotify机制作为异步监听插件
// 2、(可选)文件主动自动检查作为fsnotify文件监听的辅助手段来保障监听文件如果发生改变监控端将会及时收到提醒(解决某些业务场景下的fsnotify延迟问题)
package gfsnotify
import (
@ -20,18 +17,14 @@ import (
"gitee.com/johng/gf/g/container/gmap"
"gitee.com/johng/gf/g/container/glist"
"gitee.com/johng/gf/g/container/gqueue"
"gitee.com/johng/gf/g/container/gtype"
"gitee.com/johng/gf/g/os/gtime"
)
// 监听管理对象
type Watcher struct {
watcher *fsnotify.Watcher // 底层fsnotify对象
events *gqueue.Queue // 过滤后的事件通知,不会出现重复事件
closeChan chan struct{} // 关闭事件
callbacks *gmap.StringInterfaceMap // 监听的回调函数
watchUpdateTimeMap *gmap.StringIntMap // (毫秒)监控文件最新的通知时间
activeCheckInterval *gtype.Int // (毫秒)主动文件检查时间间隔
watcher *fsnotify.Watcher // 底层fsnotify对象
events *gqueue.Queue // 过滤后的事件通知,不会出现重复事件
closeChan chan struct{} // 关闭事件
callbacks *gmap.StringInterfaceMap // 监听的回调函数
}
// 监听事件对象
@ -74,32 +67,19 @@ func Remove(path string) error {
func New() (*Watcher, error) {
if watch, err := fsnotify.NewWatcher(); err == nil {
w := &Watcher {
watcher : watch,
events : gqueue.New(),
closeChan : make(chan struct{}, 1),
callbacks : gmap.NewStringInterfaceMap(),
watchUpdateTimeMap : gmap.NewStringIntMap(),
activeCheckInterval : gtype.NewInt(),
watcher : watch,
events : gqueue.New(),
closeChan : make(chan struct{}, 1),
callbacks : gmap.NewStringInterfaceMap(),
}
w.startWatchLoop()
w.startEventLoop()
w.startActiveCheckLoop()
return w, nil
} else {
return nil, err
}
}
// 启动主动文件更新检测机制
func (w *Watcher) EnableActiveCheck(interval int) {
w.activeCheckInterval.Set(interval)
}
// 关闭主动文件更新检测机制
func (w *Watcher) DisableActiveCheck() {
w.activeCheckInterval.Set(0)
}
// 关闭监听管理对象
func (w *Watcher) Close() {
w.watcher.Close()
@ -128,8 +108,6 @@ func (w *Watcher) Add(path string, callback func(event *Event)) error {
})
// 添加底层监听
w.watcher.Add(path)
// 添加默认更新时间
w.watchUpdateTimeMap.Set(path, int(gfile.MTimeMillisecond(path)))
return nil
}
@ -139,7 +117,7 @@ func (w *Watcher) Remove(path string) error {
return w.watcher.Remove(path)
}
// fsnotify监听循环
// 监听循环
func (w *Watcher) startWatchLoop() {
go func() {
for {
@ -173,7 +151,6 @@ func (w *Watcher) startEventLoop() {
w.watcher.Add(event.Path)
continue
}
w.watchUpdateTimeMap.Set(event.Path, int(gtime.Millisecond()))
if l := w.callbacks.Get(event.Path); l != nil {
grpool.Add(func() {
for _, v := range l.(*glist.List).FrontAll() {

View File

@ -1,55 +0,0 @@
// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package gfsnotify
import (
"time"
"gitee.com/johng/gf/g/os/gtime"
"gitee.com/johng/gf/g/os/gfile"
"fmt"
)
const (
gDEFAULT_ACTIVE_CHECK_INTERVAL = 500 // (毫秒)默认主动检测时间间隔,未开启时什么也不做
)
// 文件定时监听器循环
func (w *Watcher) startActiveCheckLoop() {
go func() {
for {
select {
// 关闭事件
case <- w.closeChan:
return
default:
if w.activeCheckInterval.Val() > 0 {
paths := w.watchUpdateTimeMap.Keys()
for _, path := range paths {
lastUpdateTime := w.watchUpdateTimeMap.Get(path)
if int(gtime.Millisecond()) - lastUpdateTime > w.activeCheckInterval.Val() {
fileUpdateTime := int(gfile.MTimeMillisecond(path))
fmt.Println("check:", path, fileUpdateTime, lastUpdateTime)
if fileUpdateTime > lastUpdateTime {
fmt.Println("update:", path)
w.watchUpdateTimeMap.Set(path, fileUpdateTime)
w.events.PushBack(&Event{
Path : path,
Op : Op(WRITE),
})
}
}
}
time.Sleep(time.Duration(w.activeCheckInterval.Val())*time.Millisecond)
} else {
time.Sleep(gDEFAULT_ACTIVE_CHECK_INTERVAL*time.Millisecond)
}
}
}
}()
}

View File

@ -7,28 +7,31 @@
package gproc
import (
"io"
"os"
"fmt"
"time"
"errors"
"gitee.com/johng/gf/g/os/glog"
"gitee.com/johng/gf/g/os/gfile"
"gitee.com/johng/gf/g/os/gtime"
"gitee.com/johng/gf/g/os/gflock"
"gitee.com/johng/gf/g/util/gconv"
"gitee.com/johng/gf/g/os/gfsnotify"
"gitee.com/johng/gf/g/container/gtype"
"gitee.com/johng/gf/g/container/gqueue"
"gitee.com/johng/gf/g/encoding/gbinary"
"gitee.com/johng/gf/g/os/gtime"
"io"
"errors"
)
const (
// 由于子进程的temp dir有可能会和父进程不一致(特别是windows下),影响进程间通信,这里统一使用环境变量设置
gPROC_TEMP_DIR_ENV_KEY = "gproc.tempdir"
gPROC_TEMP_DIR_ENV_KEY = "gproc.tempdir"
// 自动通信文件清理时间间隔
gPROC_COMM_AUTO_CLEAR_INTERVAL = time.Second
gPROC_COMM_AUTO_CLEAR_INTERVAL = time.Second
// 写入通信数据失败时候的重试次数
gPROC_COMM_FAILURE_RETRY_COUNT = 3
gPROC_COMM_FAILURE_RETRY_COUNT = 3
// (毫秒)主动通信内容检查时间间隔
gPROC_COMM_ACTIVE_CHECK_INTERVAL = 500
)
// 全局通信文件清理文件锁(同一时刻只能存在一个进程进行通信文件清理)
@ -37,8 +40,8 @@ var commClearLocker = gflock.New("comm.clear.lock")
var commLocker = gflock.New(fmt.Sprintf("%d.lock", os.Getpid()))
// 进程通信消息队列
var commQueue = gqueue.New()
// 文件监控器
var watcher, _ = gfsnotify.New()
// 上一次进程通信内容检查的时间
var commLastCheckTime = gtype.NewInt64()
// TCP通信数据结构定义
type Msg struct {
@ -64,6 +67,7 @@ func init() {
glog.Errorfln("%s is not writable for gproc", path)
os.Exit(1)
}
updateLastCheckTime()
if gtime.Second() - gfile.MTime(path) < 10 {
// 初始化时读取已有数据(文件修改时间在10秒以内)
checkCommBuffer(path)
@ -73,10 +77,9 @@ func init() {
os.Truncate(path, 0)
commLocker.UnLock()
}
watcher.EnableActiveCheck(1000)
// 文件事件监听,如果通信数据文件有任何变化,读取文件并添加到消息队列
err := watcher.Add(path, func(event *gfsnotify.Event) {
glog.Printfln("%d: gfsnotify", Pid())
err := gfsnotify.Add(path, func(event *gfsnotify.Event) {
updateLastCheckTime()
checkCommBuffer(path)
})
if err != nil {
@ -84,6 +87,12 @@ func init() {
}
go autoClearCommDir()
go autoActiveCheckComm()
}
// 更新最后通信检查时间
func updateLastCheckTime() {
commLastCheckTime.Set(gtime.Millisecond())
}
// 自动清理通信目录文件
@ -104,6 +113,17 @@ func autoClearCommDir() {
}
}
// 主动通信内容检测
func autoActiveCheckComm() {
for {
time.Sleep(gPROC_COMM_ACTIVE_CHECK_INTERVAL*time.Millisecond)
if gtime.Millisecond() - commLastCheckTime.Val() > gPROC_COMM_ACTIVE_CHECK_INTERVAL {
updateLastCheckTime()
checkCommBuffer(getCommFilePath(Pid()))
}
}
}
// 手动检查进程通信消息,如果存在消息曾推送到进程消息队列
func checkCommBuffer(path string) {
commLocker.Lock()
@ -146,7 +166,7 @@ func Send(pid int, data interface{}) error {
}
}
l.UnLock()
glog.Printfln("%d to %d, %v, %d, %v", Pid(), pid, data, gfile.Size(getCommFilePath(pid)), err)
//glog.Printfln("%d to %d, %v, %d, %v", Pid(), pid, data, gfile.Size(getCommFilePath(pid)), err)
return err
}

View File

@ -1,64 +1,20 @@
package main
import (
"fmt"
"gitee.com/johng/gf/g/os/gfile"
"gitee.com/johng/gf/g/os/gtime"
"gitee.com/johng/gf/g/encoding/gbinary"
"gitee.com/johng/gf/g/os/gproc"
"os"
import "fmt"
const (
CREATE = 1 << iota
WRITE
REMOVE
RENAME
CHMOD
)
// 数据解包,防止黏包
func bufferToMsgs(buffer []byte) []*gproc.Msg {
s := 0
msgs := make([]*gproc.Msg, 0)
for s < len(buffer) {
length := gbinary.DecodeToInt(buffer[s : s + 4])
if length < 0 || length > len(buffer) {
s++
continue
}
checksum1 := gbinary.DecodeToUint32(buffer[s + 8 : s + 12])
checksum2 := checksum(buffer[s + 12 : s + length])
if checksum1 != checksum2 {
s++
continue
}
msgs = append(msgs, &gproc.Msg {
Pid : gbinary.DecodeToInt(buffer[s + 4 : s + 8]),
Data : buffer[s + 12 : s + length],
})
s += length
}
return msgs
}
// 常见的二进制数据校验方式,生成校验结果
func checksum(buffer []byte) uint32 {
var checksum uint32
for _, b := range buffer {
checksum += uint32(b)
}
return checksum
}
func main(){
f, _ := os.Stat("/home/john/Workspace/Go/GOPATH/src/gitee.com/johng/gf/geg/other/test.go")
fmt.Println(f.ModTime().Unix())
fmt.Println(f.ModTime().Nanosecond())
fmt.Println(gfile.MTimeMillisecond("/home/john/Workspace/Go/GOPATH/src/gitee.com/johng/gf/geg/other/test.go"))
return
b := gfile.GetBinContents("/home/john/Documents/11248")
for _, msg := range bufferToMsgs(b) {
fmt.Println(msg.Pid)
fmt.Println(msg.Data)
}
return
t1 := gfile.MTime("/home/john/Workspace/Go/GOPATH/src/gitee.com/johng/gf/geg/other/test.go")
t2 := gtime.Second()
fmt.Println(t1)
fmt.Println(t2)
fmt.Println(CREATE)
fmt.Println(WRITE)
fmt.Println(REMOVE)
fmt.Println(RENAME)
}