初步完成ghttp.Server在Linux及Windows系统下的功能及兼容性测试

This commit is contained in:
John
2018-05-13 14:04:37 +08:00
parent 71fb05ec44
commit 85020dfcc2
11 changed files with 217 additions and 80 deletions

7
g/g.go
View File

@ -32,6 +32,12 @@ type Map map[string]interface{}
// 常用list数据结构
type List []Map
// 阻塞等待HTTPServer执行完成(同一进程多HTTPServer情况下)
func Wait() {
ghttp.Wait()
}
// HTTPServer单例对象
func Server(name...interface{}) *ghttp.Server {
return ghttp.GetServer(name...)
@ -58,7 +64,6 @@ func Config() *gcfg.Config {
return gins.Config()
}
// 数据库操作对象,使用了连接池
func Database(name...string) *gdb.Db {
config := gins.Config()

View File

@ -13,6 +13,7 @@ import (
"errors"
"strings"
"reflect"
"runtime"
"net/http"
"gitee.com/johng/gf/g/os/glog"
"gitee.com/johng/gf/g/os/gproc"
@ -21,7 +22,6 @@ import (
"gitee.com/johng/gf/g/container/gmap"
"gitee.com/johng/gf/g/container/gtype"
"gitee.com/johng/gf/g/container/gqueue"
"runtime"
)
const (
@ -108,40 +108,31 @@ var serverMapping = gmap.NewStringInterfaceMap()
var procManager = gproc.NewManager()
// Web Server开始执行事件通道由于同一个进程支持多Server因此该通道为非阻塞
var runChan = make(chan struct{}, 100000)
// 已完成的run消息队列
var readyChan = make(chan struct{}, 100000)
// Web Server已完成服务事件通道当有事件时表示服务完成当前进程退出
var doneChan = make(chan struct{}, 100000)
// 阻塞消息队列用于ghttp.Wait
var waitChan = make(chan struct{}, 0)
// Web Server进程初始化
func init() {
go func() {
// 等待run消息(Run方法调用)
<- runChan
// 等待ready消息(Run方法调用)
<- readyChan
// 主进程只负责创建子进程
if !gproc.IsChild() {
sendProcessMsg(os.Getpid(), gMSG_START, nil)
}
// 开启进程消息监听处理
handleProcessMsgAndSignal()
// 服务执行完成,需要退出
doneChan <- struct{}{}
if !gproc.IsChild() {
glog.Printfln("%d: all web server shutdown smoothly", gproc.Pid())
}
// 停止进程等待
close(waitChan)
}()
}
// 阻塞等待所有Web Server停止常用于多Web Server场景以及需要将Web Server异步运行的场景
func Wait() {
<- waitChan
}
// 获取/创建一个默认配置的HTTP Server(默认监听端口是80)
// 单例模式请保证name的唯一性
func GetServer(name...interface{}) (*Server) {
@ -190,32 +181,47 @@ func GetServer(name...interface{}) (*Server) {
return s
}
// 阻塞执行监听
func (s *Server) Run() error {
runChan <- struct{}{}
// 作为守护协程异步执行(当同一进程中存在多个Web Server时需要采用这种方式执行)
// 需要结合Wait方式一起使用
func (s *Server) Start() error {
// 主进程,不执行任何业务,只负责进程管理
if !gproc.IsChild() {
<- doneChan
return nil
}
if s.status == 1 {
return errors.New("server is already running")
}
// 底层http server配置
if s.config.Handler == nil {
s.config.Handler = http.HandlerFunc(s.defaultHttpHandle)
}
// 开启异步关闭队列处理循环
s.startCloseQueueLoop()
return nil
}
// 阻塞执行监听
func (s *Server) Run() error {
if err := s.Start(); err != nil {
return err
}
// Web Server准备就绪待执行
readyChan <- struct{}{}
// 阻塞等待服务执行完成
<- doneChan
return nil
}
// 阻塞等待所有Web Server停止常用于多Web Server场景以及需要将Web Server异步运行的场景
// 这是一个与进程相关的方法
func Wait() {
readyChan <- struct{}{}
<- doneChan
}
// 开启底层Web Server执行
func (s *Server) startServer(fdMap listenerFdMap) {
// 开始执行底层Web Server创建端口监听
@ -239,13 +245,13 @@ func (s *Server) startServer(fdMap listenerFdMap) {
}
for _, v := range array {
go func(item string) {
go func(addrItem string) {
// windows系统不支持文件描述符传递socket通信平滑交接因此只能完整重启
if isFd && runtime.GOOS != "windows" {
tArray := strings.Split(item, "#")
tArray := strings.Split(addrItem, "#")
server = s.newGracefulServer(tArray[0], gconv.Int(tArray[1]))
} else {
server = s.newGracefulServer(item)
server = s.newGracefulServer(addrItem)
}
s.servers = append(s.servers, server)
if err := server.ListenAndServeTLS(s.config.HTTPSCertPath, s.config.HTTPSKeyPath); err != nil {
@ -271,13 +277,13 @@ func (s *Server) startServer(fdMap listenerFdMap) {
array = strings.Split(s.config.Addr, ",")
}
for _, v := range array {
go func(item string) {
go func(addrItem string) {
// windows系统不支持文件描述符传递socket通信平滑交接因此只能完整重启
if isFd && runtime.GOOS != "windows" {
tArray := strings.Split(item, "#")
tArray := strings.Split(addrItem, "#")
server = s.newGracefulServer(tArray[0], gconv.Int(tArray[1]))
} else {
server = s.newGracefulServer(item)
server = s.newGracefulServer(addrItem)
}
s.servers = append(s.servers, server)
if err := server.ListenAndServe(); err != nil {

View File

@ -15,7 +15,6 @@ import (
"gitee.com/johng/gf/g/encoding/gjson"
"gitee.com/johng/gf/g/container/gtype"
"gitee.com/johng/gf/g/encoding/gbinary"
"fmt"
)
const (
@ -26,15 +25,17 @@ const (
gMSG_REMOVE_PROC = 50
gMSG_HEARTBEAT = 60
gPROC_HEARTBEAT_INTERVAL = 1000 // (毫秒)进程间心跳间隔
gPROC_HEARTBEAT_TIMEOUT = 3000 // (毫秒)进程间心跳超时时间,如果子进程在这段内没有接收到任何心跳,那么自动退出,防止可能出现的僵尸子进程
gPROC_HEARTBEAT_INTERVAL = 1000 // (毫秒)进程间心跳间隔
gPROC_HEARTBEAT_TIMEOUT = 5000 // (毫秒)进程间心跳超时时间,如果子进程在这段内没有接收到任何心跳,那么自动退出,防止可能出现的僵尸子进程
gPROC_MULTI_CHILD_CLEAR_INTERVAL = 1000 // (毫秒)检测间隔,当存在多个子进程时(往往是重启间隔非常短且频繁造成),需要进行清理,最终留下一个最新的子进程
gPROC_MULTI_CHILD_CLEAR_MIN_EXPIRE = 5000 // (毫秒)当多个子进程存在时,允许子进程进程至少运行的最小时间,超过该时间则清理
)
// 进程信号量监听消息队列
var procSignalChan = make(chan os.Signal)
var procSignalChan = make(chan os.Signal)
// (主子进程)在第一次创建子进程成功之后才会开始心跳检测,同理对应超时时间才会生效
var heartbeatStarted = gtype.NewBool()
var checkHeartbeat = gtype.NewBool()
// 处理进程信号量监控以及进程间消息通信
func handleProcessMsgAndSignal() {
@ -43,6 +44,7 @@ func handleProcessMsgAndSignal() {
go handleChildProcessHeartbeat()
} else {
go handleMainProcessHeartbeat()
go handleMainProcessChildClear()
}
handleProcessMsg()
}
@ -53,8 +55,8 @@ func handleProcessMsg() {
for {
if msg := gproc.Receive(); msg != nil {
// 记录消息日志,用于调试
content := gconv.String(msg.Pid) + "=>" + gconv.String(gproc.Pid()) + ":" + fmt.Sprintf("%v\n", msg.Data)
fmt.Print(content)
//content := gconv.String(msg.Pid) + "=>" + gconv.String(gproc.Pid()) + ":" + fmt.Sprintf("%v\n", msg.Data)
//fmt.Print(content)
//gfile.PutContentsAppend("/tmp/gproc-log", content)
act := gbinary.DecodeToUint(msg.Data[0 : 1])
data := msg.Data[1 : ]
@ -64,7 +66,7 @@ func handleProcessMsg() {
// ===============
// 任何与父进程的通信都会更新最后通信时间
if msg.Pid == gproc.PPid() {
lastHeartbeatTime.Set(int(gtime.Millisecond()))
updateProcessChildUpdateTime()
}
switch act {
case gMSG_START: onCommChildStart(msg.Pid, data)
@ -82,6 +84,9 @@ func handleProcessMsg() {
if msg.Pid != gproc.Pid() {
updateProcessCommTime(msg.Pid)
}
if !procFirstTimeMap.Contains(msg.Pid) {
procFirstTimeMap.Set(msg.Pid, int(gtime.Millisecond()))
}
switch act {
case gMSG_START: onCommMainStart(msg.Pid, data)
case gMSG_RESTART: onCommMainRestart(msg.Pid, data)

View File

@ -21,37 +21,16 @@ import (
"gitee.com/johng/gf/g/container/gtype"
)
const (
gPROC_CHILD_MAX_IDLE_TIME = 3000 // 子进程闲置时间(未开启心跳机制的时间)
)
// (子进程)上一次从主进程接收心跳的时间戳
var lastHeartbeatTime = gtype.NewInt()
// 开启所有Web Server(根据消息启动)
func onCommChildStart(pid int, data []byte) {
if len(data) > 0 {
sfm := bufferToServerFdMap(data)
for k, v := range sfm {
GetServer(k).startServer(v)
}
} else {
serverMapping.RLockFunc(func(m map[string]interface{}) {
for _, v := range m {
v.(*Server).startServer(nil)
}
})
}
// 进程创建成功之后(开始执行服务时间点为准),通知主进程自身的存在,并开始执行心跳机制
sendProcessMsg(gproc.PPid(), gMSG_NEW_FORK, nil)
// 如果创建自己的父进程非gproc父进程那么表示该进程为重启创建的进程创建成功之后需要通知父进程销毁
if os.Getppid() != gproc.PPid() {
//glog.Printfln("%d: ask os.ppid %d to exit, proc.ppid:%d", gproc.Pid(), os.Getppid(), gproc.PPid())
sendProcessMsg(os.Getppid(), gMSG_SHUTDOWN, nil)
}
heartbeatStarted.Set(true)
}
// 心跳消息
func onCommChildHeartbeat(pid int, data []byte) {
//glog.Printfln("%d: update heartbeat", gproc.Pid())
lastHeartbeatTime.Set(int(gtime.Millisecond()))
updateProcessChildUpdateTime()
}
// 子进程收到重启消息那么将自身的ServerFdMap信息收集后发送给主进程由主进程进行统一调度
@ -60,6 +39,7 @@ func onCommChildRestart(pid int, data []byte) {
p := procManager.NewProcess(os.Args[0], os.Args, os.Environ())
// windows系统无法进行文件描述符操作只能重启进程
if runtime.GOOS == "windows" {
// windows下使用shutdownWebServers会造成协程阻塞这里直接使用close强制关闭
closeWebServers()
} else {
// 创建新的服务进程,子进程自动从父进程复制文件描述来监听同样的端口
@ -89,12 +69,17 @@ func onCommChildRestart(pid int, data []byte) {
}
}
// 友好关闭服务链接并退出
// 关闭服务链接并退出
func onCommChildShutdown(pid int, data []byte) {
sendProcessMsg(gproc.PPid(), gMSG_REMOVE_PROC, nil)
if runtime.GOOS != "windows" {
shutdownWebServers()
}
sendProcessMsg(gproc.PPid(), gMSG_REMOVE_PROC, nil)
}
// 更新上一次主进程主动与子进程通信的时间
func updateProcessChildUpdateTime() {
lastHeartbeatTime.Set(int(gtime.Millisecond()))
}
// 主进程与子进程相互异步方式发送心跳信息,保持活跃状态
@ -103,10 +88,16 @@ func handleChildProcessHeartbeat() {
time.Sleep(gPROC_HEARTBEAT_INTERVAL*time.Millisecond)
sendProcessMsg(gproc.PPid(), gMSG_HEARTBEAT, nil)
// 超过时间没有接收到主进程心跳,自动关闭退出
if heartbeatStarted.Val() && (int(gtime.Millisecond()) - lastHeartbeatTime.Val() > gPROC_HEARTBEAT_TIMEOUT) {
if checkHeartbeat.Val() && (int(gtime.Millisecond()) - lastHeartbeatTime.Val() > gPROC_HEARTBEAT_TIMEOUT) {
sendProcessMsg(gproc.Pid(), gMSG_SHUTDOWN, nil)
// 子进程有时会无法退出(僵尸?)这里直接使用exit而不是return
glog.Printfln("%d: heartbeat timeout, exit", gproc.Pid())
//glog.Printfln("%d: %d - %d > %d", gproc.Pid(), int(gtime.Millisecond()), lastHeartbeatTime.Val(), gPROC_HEARTBEAT_TIMEOUT)
//glog.Printfln("%d: heartbeat timeout, exit", gproc.Pid())
os.Exit(0)
}
// 未开启心跳检测的闲置超过一定时间则主动关闭
if !checkHeartbeat.Val() && gproc.Uptime() > gPROC_CHILD_MAX_IDLE_TIME {
//glog.Printfln("%d: max idle time exceeded, exit", gproc.Pid())
os.Exit(0)
}
}

View File

@ -0,0 +1,39 @@
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// Web Server进程间通信 - 子进程
// +build !windows
package ghttp
import (
"gitee.com/johng/gf/g/os/gproc"
)
// 开启所有Web Server(根据消息启动)
func onCommChildStart(pid int, data []byte) {
if len(data) > 0 {
sfm := bufferToServerFdMap(data)
for k, v := range sfm {
GetServer(k).startServer(v)
}
} else {
serverMapping.RLockFunc(func(m map[string]interface{}) {
for _, v := range m {
v.(*Server).startServer(nil)
}
})
}
// 进程创建成功之后(开始执行服务时间点为准),通知主进程自身的存在,并开始执行心跳机制
sendProcessMsg(gproc.PPid(), gMSG_NEW_FORK, nil)
// 如果创建自己的父进程非gproc父进程那么表示该进程为重启创建的进程创建成功之后需要通知父进程自行销毁
if gproc.PPidOS() != gproc.PPid() {
sendProcessMsg(gproc.PPidOS(), gMSG_SHUTDOWN, nil)
}
// 开始心跳时必须保证主进程时间有值,但是又不能等待主进程消息后再开始检测,因此这里自己更新一下通信时间
updateProcessChildUpdateTime()
checkHeartbeat.Set(true)
}

View File

@ -0,0 +1,35 @@
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package ghttp
import (
"os"
"gitee.com/johng/gf/g/os/gproc"
)
// 开启所有Web Server(根据消息启动)
func onCommChildStart(pid int, data []byte) {
// 进程创建成功之后(开始执行服务时间点为准),通知主进程自身的存在,并开始执行心跳机制
sendProcessMsg(gproc.PPid(), gMSG_NEW_FORK, nil)
// 如果创建自己的父进程非gproc父进程那么表示该进程为重启创建的进程创建成功之后需要通知父进程自行销毁
if gproc.PPidOS() != gproc.PPid() {
sendProcessMsg(gproc.PPidOS(), gMSG_SHUTDOWN, nil)
// 在windows下必须等待父进程销毁后才能表明Server资源已被释放才能开始端口监听否则会端口资源冲突
if p, err := os.FindProcess(gproc.PPidOS()); err == nil {
p.Wait()
}
}
// 开启Web Server服务
serverMapping.RLockFunc(func(m map[string]interface{}) {
for _, v := range m {
v.(*Server).startServer(nil)
}
})
// 开始心跳时必须保证主进程时间有值,但是又不能等待主进程消息后再开始检测,因此这里自己更新一下通信时间
updateProcessChildUpdateTime()
checkHeartbeat.Set(true)
}

View File

@ -3,19 +3,26 @@
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// Web Server进程间通信 - 主进程
// Web Server进程间通信 - 主进程.
// 管理子进程按照规则听话玩,不听话有一百种方法让子进程在本地混不下去.
package ghttp
import (
"os"
"time"
"gitee.com/johng/gf/g/os/glog"
"gitee.com/johng/gf/g/os/gproc"
"gitee.com/johng/gf/g/os/gtime"
"gitee.com/johng/gf/g/container/gmap"
)
// (主进程)子进程与主进程第一次通信的时间映射map
// 用以识别子进程创建时间先后顺序,当存在多个子进程时,主动销毁旧的子进程
var procFirstTimeMap = gmap.NewIntIntMap()
// (主进程)主进程与子进程上一次活跃时间映射map
var procUpdateMap = gmap.NewIntIntMap()
var procUpdateTimeMap = gmap.NewIntIntMap()
// 开启服务
func onCommMainStart(pid int, data []byte) {
@ -38,7 +45,7 @@ func onCommMainRestart(pid int, data []byte) {
// 新建子进程通知
func onCommMainNewFork(pid int, data []byte) {
procManager.AddProcess(pid)
heartbeatStarted.Set(true)
checkHeartbeat.Set(true)
}
// 销毁子进程通知
@ -53,7 +60,7 @@ func onCommMainShutdown(pid int, data []byte) {
// 更新指定进程的通信时间记录
func updateProcessCommTime(pid int) {
procUpdateMap.Set(pid, int(gtime.Millisecond()))
procUpdateTimeMap.Set(pid, int(gtime.Millisecond()))
}
// 主进程与子进程相互异步方式发送心跳信息,保持活跃状态
@ -62,9 +69,9 @@ func handleMainProcessHeartbeat() {
time.Sleep(gPROC_HEARTBEAT_INTERVAL*time.Millisecond)
procManager.Send(formatMsgBuffer(gMSG_HEARTBEAT, nil))
// 清理过期进程
if heartbeatStarted.Val() {
if checkHeartbeat.Val() {
for _, pid := range procManager.Pids() {
if int(gtime.Millisecond()) - procUpdateMap.Get(pid) > gPROC_HEARTBEAT_TIMEOUT {
if int(gtime.Millisecond()) - procUpdateTimeMap.Get(pid) > gPROC_HEARTBEAT_TIMEOUT {
// 这里需要手动从进程管理器中去掉该进程
procManager.RemoveProcess(pid)
sendProcessMsg(pid, gMSG_SHUTDOWN, nil)
@ -72,4 +79,25 @@ func handleMainProcessHeartbeat() {
}
}
}
}
// 清理多余的子进程
func handleMainProcessChildClear() {
for {
time.Sleep(gPROC_MULTI_CHILD_CLEAR_INTERVAL*time.Millisecond)
if procManager.Size() > 1 {
minPid := 0
minTime := int(gtime.Millisecond())
for _, pid := range procManager.Pids() {
if t := procFirstTimeMap.Get(pid); t < minTime {
minPid = pid
minTime = t
}
}
if minPid > 0 && procUpdateTimeMap.Get(minPid) - procFirstTimeMap.Get(minPid) > gPROC_MULTI_CHILD_CLEAR_MIN_EXPIRE {
sendProcessMsg(minPid, gMSG_SHUTDOWN, nil)
glog.Printfln("%d: multi child occurred, shutdown %d", gproc.Pid(), minPid)
}
}
}
}

View File

@ -144,9 +144,9 @@ func (s *gracefulServer) shutdown() {
// 执行请求强制关闭
func (s *gracefulServer) close() {
if err := s.httpServer.Close(); err != nil {
glog.Errorfln("%d: %s server [%s] close error: %v", gproc.Pid(), s.getProto(), s.addr, err)
glog.Errorfln("%d: %s server [%s] closed error: %v", gproc.Pid(), s.getProto(), s.addr, err)
} else {
glog.Printfln("%d: %s server [%s] close smoothly", gproc.Pid(), s.getProto(), s.addr)
glog.Printfln("%d: %s server [%s] closed smoothly", gproc.Pid(), s.getProto(), s.addr)
s.shutdownChan <- true
}
}

View File

@ -11,13 +11,21 @@ package gproc
import (
"os"
"time"
"gitee.com/johng/gf/g/util/gconv"
"gitee.com/johng/gf/g/container/gtype"
)
const (
gPROC_ENV_KEY_PPID_KEY = "gproc.ppid"
)
// 进程开始执行时间
var processStartTime = time.Now()
// 优雅退出标识符号
var isExited = gtype.NewBool()
// 获取当前进程ID
func Pid() int {
return os.Getpid()
@ -34,7 +42,7 @@ func PPid() int {
}
// 获取父进程ID(系统父进程)
func PpidOfOs() int {
func PPidOS() int {
return os.Getppid()
}
@ -43,3 +51,23 @@ func IsChild() bool {
return os.Getenv(gPROC_ENV_KEY_PPID_KEY) != ""
}
// 进程开始执行时间
func StartTime() time.Time {
return processStartTime
}
// 进程已经运行的时间(毫秒)
func Uptime() int {
return int(time.Now().UnixNano()/1e6 - processStartTime.UnixNano()/1e6)
}
// 标识当前进程为退出状态,其他业务可以根据此标识来执行优雅退出
func SetExited() {
isExited.Set(true)
}
// 当前进程是否被标识为退出状态
func Exited() bool {
return isExited.Val()
}

View File

@ -19,14 +19,14 @@ func main() {
r.Server.Shutdown()
})
s1.SetPort(8199, 8200)
go s1.Run()
s1.Start()
s2 := g.Server("s2")
s2.BindHandler("/", func(r *ghttp.Request){
r.Response.Writeln("hello s2")
})
s2.SetPort(8300, 8080)
go s2.Run()
s2.Start()
ghttp.Wait()
g.Wait()
}

View File

@ -43,7 +43,7 @@ func checksum(buffer []byte) uint32 {
}
func main(){
b := gfile.GetBinContents("/tmp/gproc/27501")
b := gfile.GetBinContents("/tmp/gproc/30588")
for _, msg := range bufferToMsgs(b) {
fmt.Println(msg.Pid)
fmt.Println(msg.Data)