ghttp.Server热重启特性测试中

This commit is contained in:
John
2018-05-11 23:17:25 +08:00
parent 6f6a5debde
commit 1ac6de9627
11 changed files with 352 additions and 119 deletions

View File

@ -18,7 +18,6 @@ import (
"gitee.com/johng/gf/g/os/gproc"
"gitee.com/johng/gf/g/os/gcache"
"gitee.com/johng/gf/g/util/gconv"
"gitee.com/johng/gf/g/encoding/gjson"
"gitee.com/johng/gf/g/container/gmap"
"gitee.com/johng/gf/g/container/gtype"
"gitee.com/johng/gf/g/container/gqueue"
@ -111,6 +110,8 @@ var procManager = gproc.NewManager()
var runChan = make(chan struct{}, 100000)
// 已完成的run消息队列
var doneChan = make(chan struct{}, 100000)
// 阻塞消息队列用于ghttp.Wait
var waitChan = make(chan struct{}, 0)
// Web Server进程初始化
func init() {
@ -122,41 +123,22 @@ func init() {
sendProcessMsg(os.Getpid(), gMSG_START, nil)
}
// 开启进程消息监听处理
handleProcessMsg()
handleProcessMsgAndSignal()
// 服务执行完成,需要退出
doneChan <- struct{}{}
if !gproc.IsChild() {
glog.Printfln("all web server shutdown smoothly")
glog.Printfln("%d: all web server shutdown smoothly", gproc.Pid())
}
// 停止进程等待
close(waitChan)
}()
}
// 获取所有Web Server的文件描述符map
func getServerFdMap() map[string]listenerFdMap {
sfm := make(map[string]listenerFdMap)
serverMapping.RLockFunc(func(m map[string]interface{}) {
for k, v := range m {
sfm[k] = v.(*Server).getListenerFdMap()
}
})
return sfm
}
// 二进制转换为FdMap
func bufferToServerFdMap(buffer []byte) map[string]listenerFdMap {
sfm := make(map[string]listenerFdMap)
if len(buffer) > 0 {
j, _ := gjson.LoadContent(buffer, "json")
for k, _ := range j.ToMap() {
m := make(map[string]string)
for k, v := range j.GetMap(k) {
m[k] = gconv.String(v)
}
sfm[k] = m
}
}
return sfm
// 阻塞等待所有Web Server停止常用于多Web Server场景以及需要将Web Server异步运行的场景
func Wait() {
<- waitChan
}
// 获取/创建一个默认配置的HTTP Server(默认监听端口是80)
@ -267,6 +249,7 @@ func (s *Server) startServer(fdMap listenerFdMap) {
if err := server.ListenAndServeTLS(s.config.HTTPSCertPath, s.config.HTTPSKeyPath); err != nil {
// 如果非关闭错误,那么提示报错,否则认为是正常的服务关闭操作
if !strings.EqualFold(http.ErrServerClosed.Error(), err.Error()) {
s.servers = s.servers[0 : len(s.servers) - 1]
glog.Error(err)
}
}
@ -297,6 +280,7 @@ func (s *Server) startServer(fdMap listenerFdMap) {
if err := server.ListenAndServe(); err != nil {
// 如果非关闭错误,那么提示报错,否则认为是正常的服务关闭操作
if !strings.EqualFold(http.ErrServerClosed.Error(), err.Error()) {
s.servers = s.servers[0 : len(s.servers) - 1]
glog.Error(err)
}
}

View File

@ -9,13 +9,12 @@ package ghttp
import (
"os"
"fmt"
"strings"
"syscall"
"os/signal"
"gitee.com/johng/gf/g/os/gproc"
"gitee.com/johng/gf/g/util/gconv"
"gitee.com/johng/gf/g/encoding/gjson"
"gitee.com/johng/gf/g/container/gtype"
"gitee.com/johng/gf/g/encoding/gbinary"
)
@ -25,108 +24,65 @@ const (
gMSG_SHUTDOWN = 30
gMSG_NEW_FORK = 40
gMSG_REMOVE_PROC = 50
gMSG_HEARTBEAT = 60
gPROC_HEARTBEAT_INTERVAL = 1000 // (毫秒)进程间心跳间隔
gPROC_HEARTBEAT_TIMEOUT = 3000 // (毫秒)进程间心跳超时时间,如果子进程在这段内没有接收到任何心跳,那么自动退出,防止可能出现的僵尸子进程
)
// 进程信号量监听消息队列
var procSignalChan = make(chan os.Signal)
var procSignalChan = make(chan os.Signal)
// (主子进程)在第一次创建子进程成功之后才会开始心跳检测,同理对应超时时间才会生效
var heartbeatStarted = gtype.NewBool()
// 处理进程信号量监控以及进程间消息通信
func handleProcessMsgAndSignal() {
go handleProcessSignal()
if gproc.IsChild() {
go handleChildProcessHeartbeat()
} else {
go handleMainProcessHeartbeat()
}
handleProcessMsg()
}
// 处理进程间消息
// 数据格式: 操作(8bit) | 参数(变长)
func handleProcessMsg() {
go handleProcessSignals()
for {
if msg := gproc.Receive(); msg != nil {
//fmt.Println(gproc.Pid(), gproc.IsChild(), msg)
// 记录消息日志,用于调试
//gfile.PutContentsAppend("/tmp/gproc-log",
// gconv.String(msg.Pid) + "=>" + gconv.String(gproc.Pid()) + ":" + fmt.Sprintf("%v\n", msg.Data),
//)
act := gbinary.DecodeToUint(msg.Data[0 : 1])
data := msg.Data[1 : ]
if gproc.IsChild() {
// 子进程
switch act {
// 开启所有Web Server(根据消息启动)
case gMSG_START:
if len(data) > 0 {
sfm := bufferToServerFdMap(data)
for k, v := range sfm {
GetServer(k).startServer(v)
}
} else {
serverMapping.RLockFunc(func(m map[string]interface{}) {
for _, v := range m {
v.(*Server).startServer(nil)
}
})
}
// 子进程收到重启消息那么将自身的ServerFdMap信息收集后发送给主进程由主进程进行统一调度
case gMSG_RESTART:
// 创建新的服务进程,子进程自动从父进程复制文件描述来监听同样的端口
sfm := getServerFdMap()
p := procManager.NewProcess(os.Args[0], os.Args, os.Environ())
for name, m := range sfm {
for fdk, fdv := range m {
if len(fdv) > 0 {
s := ""
for _, item := range strings.Split(fdv, ",") {
array := strings.Split(item, "#")
fd := uintptr(gconv.Uint(array[1]))
s += fmt.Sprintf("%s#%d", array[0], len(p.GetAttr().Files))
p.GetAttr().Files = append(p.GetAttr().Files, os.NewFile(fd, ""))
}
sfm[name][fdk] = strings.TrimRight(s, ",")
}
}
}
p.SetPpid(gproc.Ppid())
p.Run()
b, _ := gjson.Encode(sfm)
sendProcessMsg(p.Pid(), gMSG_START, b)
sendProcessMsg(gproc.Ppid(), gMSG_NEW_FORK, gbinary.EncodeInt(p.Pid()))
sendProcessMsg(gproc.Pid(), gMSG_SHUTDOWN, nil)
// 友好关闭服务链接并退出
case gMSG_START: onCommChildStart(msg.Pid, data)
case gMSG_RESTART: onCommChildRestart(msg.Pid, data)
case gMSG_HEARTBEAT: onCommChildHeartbeat(msg.Pid, data)
case gMSG_SHUTDOWN:
serverMapping.RLockFunc(func(m map[string]interface{}) {
for _, v := range m {
for _, s := range v.(*Server).servers {
s.shutdown()
}
}
})
sendProcessMsg(gproc.Ppid(), gMSG_REMOVE_PROC, gbinary.EncodeInt(gproc.Pid()))
onCommChildShutdown(msg.Pid, data)
return
}
} else {
// 父进程
switch act {
// 开启服务
case gMSG_START:
p := procManager.NewProcess(os.Args[0], os.Args, os.Environ())
p.Run()
sendProcessMsg(p.Pid(), gMSG_START, nil)
// 重启服务
case gMSG_RESTART:
// 向所有子进程发送重启命令子进程将会搜集Web Server信息发送给父进程进行协调重启工作
procManager.Send(formatMsgBuffer(gMSG_RESTART, nil))
// 新建子进程通知
case gMSG_NEW_FORK:
pid := gbinary.DecodeToInt(data)
procManager.AddProcess(pid)
// 销毁子进程通知
case gMSG_START: onCommMainStart(msg.Pid, data)
case gMSG_RESTART: onCommMainRestart(msg.Pid, data)
case gMSG_NEW_FORK: onCommMainNewFork(msg.Pid, data)
case gMSG_HEARTBEAT: onCommMainHeartbeat(msg.Pid, data)
case gMSG_REMOVE_PROC:
pid := gbinary.DecodeToInt(data)
procManager.RemoveProcess(pid)
onCommMainRemoveProc(msg.Pid, data)
// 如果所有子进程都退出,那么主进程也主动退出
if procManager.Size() == 0 {
return
}
// 关闭服务
case gMSG_SHUTDOWN:
procManager.Send(formatMsgBuffer(gMSG_SHUTDOWN, nil))
onCommMainShutdown(msg.Pid, data)
return
}
}
@ -135,7 +91,7 @@ func handleProcessMsg() {
}
// 信号量处理
func handleProcessSignals() {
func handleProcessSignal() {
var sig os.Signal
signal.Notify(
procSignalChan,
@ -163,14 +119,6 @@ func handleProcessSignals() {
}
}
func onMainShutDown() {
procManager.Send(formatMsgBuffer(gMSG_SHUTDOWN, nil))
}
func onMainRemoveProc() {
procManager.Send(formatMsgBuffer(gMSG_SHUTDOWN, nil))
}
// 向进程发送操作消息
func sendProcessMsg(pid int, act int, data []byte) {
gproc.Send(pid, formatMsgBuffer(act, data))
@ -181,3 +129,29 @@ func formatMsgBuffer(act int, data []byte) []byte {
return append(gbinary.EncodeUint8(uint8(act)), data...)
}
// 获取所有Web Server的文件描述符map
func getServerFdMap() map[string]listenerFdMap {
sfm := make(map[string]listenerFdMap)
serverMapping.RLockFunc(func(m map[string]interface{}) {
for k, v := range m {
sfm[k] = v.(*Server).getListenerFdMap()
}
})
return sfm
}
// 二进制转换为FdMap
func bufferToServerFdMap(buffer []byte) map[string]listenerFdMap {
sfm := make(map[string]listenerFdMap)
if len(buffer) > 0 {
j, _ := gjson.LoadContent(buffer, "json")
for k, _ := range j.ToMap() {
m := make(map[string]string)
for k, v := range j.GetMap(k) {
m[k] = gconv.String(v)
}
sfm[k] = m
}
}
return sfm
}

View File

@ -0,0 +1,103 @@
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// Web Server进程间通信 - 子进程
package ghttp
import (
"os"
"fmt"
"time"
"strings"
"gitee.com/johng/gf/g/os/gproc"
"gitee.com/johng/gf/g/os/gtime"
"gitee.com/johng/gf/g/util/gconv"
"gitee.com/johng/gf/g/encoding/gjson"
"gitee.com/johng/gf/g/container/gtype"
"gitee.com/johng/gf/g/encoding/gbinary"
)
// (子进程)上一次从主进程接收心跳的时间戳
var lastHeartbeatTime = gtype.NewInt()
// 开启所有Web Server(根据消息启动)
func onCommChildStart(pid int, data []byte) {
if len(data) > 0 {
sfm := bufferToServerFdMap(data)
for k, v := range sfm {
GetServer(k).startServer(v)
}
} else {
fmt.Println(serverMapping)
serverMapping.RLockFunc(func(m map[string]interface{}) {
for _, v := range m {
v.(*Server).startServer(nil)
}
})
}
heartbeatStarted.Set(true)
}
// 心跳消息
func onCommChildHeartbeat(pid int, data []byte) {
//glog.Printfln("%d: child heartbeat", gproc.Pid())
lastHeartbeatTime.Set(int(gtime.Millisecond()))
}
// 子进程收到重启消息那么将自身的ServerFdMap信息收集后发送给主进程由主进程进行统一调度
func onCommChildRestart(pid int, data []byte) {
// 创建新的服务进程,子进程自动从父进程复制文件描述来监听同样的端口
sfm := getServerFdMap()
p := procManager.NewProcess(os.Args[0], os.Args, os.Environ())
// 将sfm中的fd按照子进程创建时的文件描述符顺序进行整理以便子进程获取到正确的fd
for name, m := range sfm {
for fdk, fdv := range m {
if len(fdv) > 0 {
s := ""
for _, item := range strings.Split(fdv, ",") {
array := strings.Split(item, "#")
fd := uintptr(gconv.Uint(array[1]))
s += fmt.Sprintf("%s#%d,", array[0], len(p.GetAttr().Files))
p.GetAttr().Files = append(p.GetAttr().Files, os.NewFile(fd, ""))
}
sfm[name][fdk] = strings.TrimRight(s, ",")
}
}
}
p.SetPpid(gproc.Ppid())
p.Run()
// 编码,通信
b, _ := gjson.Encode(sfm)
sendProcessMsg(p.Pid(), gMSG_START, b)
sendProcessMsg(gproc.Ppid(), gMSG_NEW_FORK, gbinary.EncodeInt(p.Pid()))
sendProcessMsg(gproc.Pid(), gMSG_SHUTDOWN, nil)
}
// 友好关闭服务链接并退出
func onCommChildShutdown(pid int, data []byte) {
serverMapping.RLockFunc(func(m map[string]interface{}) {
for _, v := range m {
for _, s := range v.(*Server).servers {
s.shutdown()
}
}
})
sendProcessMsg(gproc.Ppid(), gMSG_REMOVE_PROC, gbinary.EncodeInt(gproc.Pid()))
}
// 主进程与子进程相互异步方式发送心跳信息,保持活跃状态
func handleChildProcessHeartbeat() {
for {
time.Sleep(gPROC_HEARTBEAT_INTERVAL*time.Millisecond)
sendProcessMsg(gproc.Ppid(), gMSG_HEARTBEAT, nil)
// 超过时间没有接收到主进程心跳,自动关闭退出
if heartbeatStarted.Val() && (int(gtime.Millisecond()) - lastHeartbeatTime.Val() > gPROC_HEARTBEAT_TIMEOUT) {
sendProcessMsg(gproc.Pid(), gMSG_SHUTDOWN, nil)
// 子进程有时会无法退出这里直接使用exit而不是return
os.Exit(0)
}
}
}

View File

@ -0,0 +1,73 @@
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// Web Server进程间通信 - 主进程
package ghttp
import (
"os"
"time"
"gitee.com/johng/gf/g/os/gtime"
"gitee.com/johng/gf/g/container/gmap"
"gitee.com/johng/gf/g/encoding/gbinary"
)
// (主进程)主进程与子进程上一次活跃时间映射map
var procUpdateMap = gmap.NewIntIntMap()
// 开启服务
func onCommMainStart(pid int, data []byte) {
p := procManager.NewProcess(os.Args[0], os.Args, os.Environ())
p.Run()
sendProcessMsg(p.Pid(), gMSG_START, nil)
}
// 心跳处理
func onCommMainHeartbeat(pid int, data []byte) {
//glog.Printfln("%d: main heartbeat", gproc.Pid())
procUpdateMap.Set(pid, int(gtime.Millisecond()))
}
// 重启服务
func onCommMainRestart(pid int, data []byte) {
// 向所有子进程发送重启命令子进程将会搜集Web Server信息发送给父进程进行协调重启工作
procManager.Send(formatMsgBuffer(gMSG_RESTART, nil))
}
// 新建子进程通知
func onCommMainNewFork(pid int, data []byte) {
procManager.AddProcess(gbinary.DecodeToInt(data))
heartbeatStarted.Set(true)
}
// 销毁子进程通知
func onCommMainRemoveProc(pid int, data []byte) {
procManager.RemoveProcess(gbinary.DecodeToInt(data))
}
// 关闭服务,通知所有子进程退出
func onCommMainShutdown(pid int, data []byte) {
procManager.Send(formatMsgBuffer(gMSG_SHUTDOWN, nil))
}
// 主进程与子进程相互异步方式发送心跳信息,保持活跃状态
func handleMainProcessHeartbeat() {
for {
time.Sleep(gPROC_HEARTBEAT_INTERVAL*time.Millisecond)
procManager.Send(formatMsgBuffer(gMSG_HEARTBEAT, nil))
// 清理过期进程
if heartbeatStarted.Val() {
for _, pid := range procManager.Pids() {
if int(gtime.Millisecond()) - procUpdateMap.Get(pid) > gPROC_HEARTBEAT_TIMEOUT {
// 这里需要手动从进程管理器中去掉该进程
procManager.RemoveProcess(pid)
sendProcessMsg(pid, gMSG_SHUTDOWN, nil)
return
}
}
}
}
}

View File

@ -118,13 +118,13 @@ func (s *gracefulServer) getNetListener(addr string) (net.Listener, error) {
f := os.NewFile(s.fd, "")
ln, err = net.FileListener(f)
if err != nil {
err = fmt.Errorf("net.FileListener error: %v", err)
err = fmt.Errorf("%d: net.FileListener error: %v", gproc.Pid(), err)
return nil, err
}
} else {
ln, err = net.Listen("tcp", addr)
if err != nil {
err = fmt.Errorf("net.Listen error: %v", err)
err = fmt.Errorf("%d: net.Listen error: %v", gproc.Pid(), err)
return nil, err
}
}

View File

@ -99,6 +99,11 @@ func (m *Manager) Processes() []*Process {
return processes
}
// 获取所有的进程pid构成列表返回
func (m *Manager) Pids() []int {
return m.processes.Keys()
}
// 等待所有子进程结束
func (m *Manager) WaitAll() {
processes := m.Processes()

View File

@ -0,0 +1,23 @@
package main
import (
"gitee.com/johng/gf/g"
"gitee.com/johng/gf/g/net/ghttp"
)
func main() {
s := g.Server()
s.BindHandler("/", func(r *ghttp.Request){
r.Response.Writeln("hello")
})
s.BindHandler("/restart", func(r *ghttp.Request){
r.Response.Writeln("restart server")
r.Server.Restart()
})
s.BindHandler("/shutdown", func(r *ghttp.Request){
r.Response.Writeln("shutdown server")
r.Server.Shutdown()
})
s.SetPort(8199, 8200)
s.Run()
}

View File

@ -0,0 +1,32 @@
package main
import (
"gitee.com/johng/gf/g"
"gitee.com/johng/gf/g/net/ghttp"
)
func main() {
s1 := g.Server("s1")
s1.BindHandler("/", func(r *ghttp.Request){
r.Response.Writeln("hello s1")
})
s1.BindHandler("/restart", func(r *ghttp.Request){
r.Response.Writeln("restart server")
r.Server.Restart()
})
s1.BindHandler("/shutdown", func(r *ghttp.Request){
r.Response.Writeln("shutdown server")
r.Server.Shutdown()
})
s1.SetPort(8199, 8200)
go s1.Run()
s2 := g.Server("s2")
s2.BindHandler("/", func(r *ghttp.Request){
r.Response.Writeln("hello s2")
})
s2.SetPort(8300, 8080)
go s2.Run()
ghttp.Wait()
}

View File

@ -6,6 +6,6 @@ import (
)
func main () {
err := gproc.Send(26248, []byte{40})
err := gproc.Send(23504, []byte{30})
fmt.Println(err)
}

View File

@ -5,11 +5,50 @@ import (
"gitee.com/johng/gf/g/os/gfile"
"gitee.com/johng/gf/g/os/gtime"
"gitee.com/johng/gf/g/encoding/gbinary"
"gitee.com/johng/gf/g/os/gproc"
)
// 数据解包,防止黏包
func bufferToMsgs(buffer []byte) []*gproc.Msg {
s := 0
msgs := make([]*gproc.Msg, 0)
for s < len(buffer) {
length := gbinary.DecodeToInt(buffer[s : s + 4])
if length < 0 || length > len(buffer) {
s++
continue
}
checksum1 := gbinary.DecodeToUint32(buffer[s + 8 : s + 12])
checksum2 := checksum(buffer[s + 12 : s + length])
if checksum1 != checksum2 {
s++
continue
}
msgs = append(msgs, &gproc.Msg {
Pid : gbinary.DecodeToInt(buffer[s + 4 : s + 8]),
Data : buffer[s + 12 : s + length],
})
s += length
}
return msgs
}
// 常见的二进制数据校验方式,生成校验结果
func checksum(buffer []byte) uint32 {
var checksum uint32
for _, b := range buffer {
checksum += uint32(b)
}
return checksum
}
func main(){
fmt.Println(uint8(int(300)))
b := gfile.GetBinContents("/tmp/gproc/27501")
for _, msg := range bufferToMsgs(b) {
fmt.Println(msg.Pid)
fmt.Println(msg.Data)
}
return
t1 := gfile.MTime("/home/john/Workspace/Go/GOPATH/src/gitee.com/johng/gf/geg/other/test.go")
t2 := gtime.Second()