完成ghttp.Server热重启特性初步开发

This commit is contained in:
John
2018-05-11 18:34:17 +08:00
parent d2491241a2
commit dabc789113
7 changed files with 155 additions and 62 deletions

View File

@ -125,6 +125,10 @@ func init() {
handleProcessMsg()
// 服务执行完成,需要退出
doneChan <- struct{}{}
if !gproc.IsChild() {
glog.Printfln("all web server shutdown smoothly")
}
}()
}
@ -226,8 +230,6 @@ func (s *Server) Run() error {
// 阻塞等待服务执行完成
<- doneChan
glog.Printfln("web server pid %d exit successfully", gproc.Pid())
return nil
}
@ -262,7 +264,6 @@ func (s *Server) startServer(fdMap listenerFdMap) {
server = s.newGracefulServer(item)
}
s.servers = append(s.servers, server)
glog.Printfln("https server started listening on %s", server.addr)
if err := server.ListenAndServeTLS(s.config.HTTPSCertPath, s.config.HTTPSKeyPath); err != nil {
// 如果非关闭错误,那么提示报错,否则认为是正常的服务关闭操作
if !strings.EqualFold(http.ErrServerClosed.Error(), err.Error()) {
@ -293,7 +294,6 @@ func (s *Server) startServer(fdMap listenerFdMap) {
server = s.newGracefulServer(item)
}
s.servers = append(s.servers, server)
glog.Printfln("http server started listening on %s", server.addr)
if err := server.ListenAndServe(); err != nil {
// 如果非关闭错误,那么提示报错,否则认为是正常的服务关闭操作
if !strings.EqualFold(http.ErrServerClosed.Error(), err.Error()) {
@ -308,24 +308,12 @@ func (s *Server) startServer(fdMap listenerFdMap) {
// 重启Web Server
func (s *Server) Restart() {
// 如果是主进程,那么向所有子进程发送重启信号
if !gproc.IsChild() {
procManager.Send(formatMsgBuffer(gMSG_RESTART, nil))
return
}
sendProcessMsg(gproc.Pid(), gMSG_RESTART, nil)
}
// 关闭Web Server
func (s *Server) Shutdown() {
// 如果是主进程,那么向所有子进程发送关闭信号
if !gproc.IsChild() {
procManager.Send(formatMsgBuffer(gMSG_SHUTDOWN, nil))
return
}
for _, v := range s.servers {
v.shutdown()
}
sendProcessMsg(gproc.Pid(), gMSG_SHUTDOWN, nil)
}
// 获取当前监听的文件描述符信息构造成map返回

View File

@ -9,28 +9,34 @@ package ghttp
import (
"os"
"gitee.com/johng/gf/g/os/gproc"
"gitee.com/johng/gf/g/encoding/gbinary"
"strings"
"gitee.com/johng/gf/g/util/gconv"
"fmt"
"strings"
"syscall"
"os/signal"
"gitee.com/johng/gf/g/os/gproc"
"gitee.com/johng/gf/g/util/gconv"
"gitee.com/johng/gf/g/encoding/gjson"
"gitee.com/johng/gf/g/encoding/gbinary"
)
const (
gMSG_START = 10
gMSG_RESTART = 20
gMSG_CORESTART = 30
gMSG_SHUTDOWN = 40
gMSG_NEW_FORK = 50
gMSG_START = 10
gMSG_RESTART = 20
gMSG_SHUTDOWN = 30
gMSG_NEW_FORK = 40
gMSG_REMOVE_PROC = 50
)
// 进程信号量监听消息队列
var procSignalChan = make(chan os.Signal)
// 处理进程间消息
// 数据格式: 操作(8bit) | 参数(变长)
func handleProcessMsg() {
go handleProcessSignals()
for {
if msg := gproc.Receive(); msg != nil {
fmt.Println(gproc.Pid(), gproc.IsChild(), msg)
//fmt.Println(gproc.Pid(), gproc.IsChild(), msg)
act := gbinary.DecodeToUint(msg.Data[0 : 1])
data := msg.Data[1 : ]
if gproc.IsChild() {
@ -64,7 +70,7 @@ func handleProcessMsg() {
array := strings.Split(item, "#")
fd := uintptr(gconv.Uint(array[1]))
s += fmt.Sprintf("%s#%d", array[0], len(p.GetAttr().Files))
p.GetAttr().Files = append(p.GetAttr().Files, fd)
p.GetAttr().Files = append(p.GetAttr().Files, os.NewFile(fd, ""))
}
sfm[name][fdk] = strings.TrimRight(s, ",")
}
@ -72,19 +78,21 @@ func handleProcessMsg() {
}
p.SetPpid(gproc.Ppid())
p.Run()
fmt.Println(procManager)
b, _ := gjson.Encode(sfm)
sendProcessMsg(p.Pid(), gMSG_START, b)
sendProcessMsg(gproc.Ppid(), gMSG_NEW_FORK, gbinary.EncodeInt(p.Pid()))
sendProcessMsg(gproc.Pid(), gMSG_SHUTDOWN, nil)
sendProcessMsg(gproc.Ppid(), gMSG_NEW_FORK, gconv.Bytes(p.Pid()))
// 友好关闭服务链接并退出
case gMSG_SHUTDOWN:
serverMapping.RLockFunc(func(m map[string]interface{}) {
for _, v := range m {
v.(*Server).Shutdown()
for _, s := range v.(*Server).servers {
s.shutdown()
}
}
})
sendProcessMsg(gproc.Ppid(), gMSG_REMOVE_PROC, gbinary.EncodeInt(gproc.Pid()))
return
}
@ -102,22 +110,67 @@ func handleProcessMsg() {
// 向所有子进程发送重启命令子进程将会搜集Web Server信息发送给父进程进行协调重启工作
procManager.Send(formatMsgBuffer(gMSG_RESTART, nil))
// 协调重启服务
// 新建子进程通知
case gMSG_NEW_FORK:
//sendProcessMsg(p.Pid(), gMSG_START, data)
// 关闭旧的服务进程
//sendProcessMsg(msg.Pid, gMSG_SHUTDOWN, nil)
pid := gbinary.DecodeToInt(data)
procManager.AddProcess(pid)
// 销毁子进程通知
case gMSG_REMOVE_PROC:
pid := gbinary.DecodeToInt(data)
procManager.RemoveProcess(pid)
// 如果所有子进程都退出,那么主进程也主动退出
if procManager.Size() == 0 {
return
}
// 关闭服务
case gMSG_SHUTDOWN:
procManager.Send(formatMsgBuffer(gMSG_SHUTDOWN, nil))
return
}
}
}
}
}
// 信号量处理
func handleProcessSignals() {
var sig os.Signal
signal.Notify(
procSignalChan,
syscall.SIGINT,
syscall.SIGQUIT,
syscall.SIGKILL,
syscall.SIGHUP,
syscall.SIGTERM,
syscall.SIGUSR1,
)
for {
sig = <- procSignalChan
switch sig {
// 进程终止,停止所有子进程运行
case syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGTERM:
sendProcessMsg(gproc.Pid(), gMSG_SHUTDOWN, nil)
return
// 用户信号,重启服务
case syscall.SIGUSR1:
sendProcessMsg(gproc.Pid(), gMSG_RESTART, nil)
default:
}
}
}
func onMainShutDown() {
procManager.Send(formatMsgBuffer(gMSG_SHUTDOWN, nil))
}
func onMainRemoveProc() {
procManager.Send(formatMsgBuffer(gMSG_SHUTDOWN, nil))
}
// 向进程发送操作消息
func sendProcessMsg(pid int, act int, data []byte) {
gproc.Send(pid, formatMsgBuffer(act, data))

View File

@ -14,6 +14,7 @@ import (
"net/http"
"crypto/tls"
"gitee.com/johng/gf/g/os/glog"
"gitee.com/johng/gf/g/os/gproc"
)
// 优雅的Web Server对象封装
@ -92,8 +93,18 @@ func (s *gracefulServer) ListenAndServeTLS(certFile, keyFile string) error {
return s.doServe()
}
// 获取服务协议字符串
func (s *gracefulServer) getProto() string {
proto := "http"
if s.isHttps {
proto = "https"
}
return proto
}
// 开始执行Web Server服务处理
func (s *gracefulServer) doServe() error {
glog.Printfln("%d: %s server started listening on [%s]", gproc.Pid(), s.getProto(), s.addr)
err := s.httpServer.Serve(s.listener)
<-s.shutdownChan
return err
@ -123,9 +134,9 @@ func (s *gracefulServer) getNetListener(addr string) (net.Listener, error) {
// 执行请求优雅关闭
func (s *gracefulServer) shutdown() {
if err := s.httpServer.Shutdown(context.Background()); err != nil {
glog.Errorf("server %s shutdown error: %v\n", s.addr, err)
glog.Errorfln("%d: %s server [%s] shutdown error: %v", gproc.Pid(), s.getProto(), s.addr, err)
} else {
glog.Printf("server %s shutdown successfully\n", s.addr)
glog.Printfln("%d: %s server [%s] shutdown smoothly", gproc.Pid(), s.getProto(), s.addr)
s.shutdownChan <- true
}
}

View File

@ -20,7 +20,7 @@ import (
)
const (
// 由于子进程的temp dir有可能会和父进程不一致影响进程间通信这里统一使用环境变量设置
// 由于子进程的temp dir有可能会和父进程不一致(特别是windows下),影响进程间通信,这里统一使用环境变量设置
gPROC_TEMP_DIR_ENV_KEY = "gproc.tempdir"
)

View File

@ -9,9 +9,13 @@ package gproc
import (
"os"
"gitee.com/johng/gf/g/container/gmap"
"fmt"
"syscall"
"gitee.com/johng/gf/g/container/gmap"
"strings"
)
const (
gCHILD_ARGS_MARK_NAME = "--gproc-child"
)
// 进程管理器
@ -37,14 +41,20 @@ func NewProcess(path string, args []string, environment []string) *Process {
path : path,
args : make([]string, 0),
ppid : os.Getpid(),
attr : &syscall.ProcAttr {
Files: []uintptr{os.Stdin.Fd(), os.Stdout.Fd(), os.Stderr.Fd()},
attr : &os.ProcAttr {
Env : env,
Files : []*os.File{ os.Stdin,os.Stdout,os.Stderr },
},
}
p.args = append(p.args, args[0])
p.args = append(p.args, "--gproc-child")
if len(args) > 0 {
p.args = append(p.args, args[0])
}
// 判断是否加上子进程标识
if len(args) == 1 || (len(args) > 1 && !strings.EqualFold(args[1], gCHILD_ARGS_MARK_NAME)) {
p.args = append(p.args, gCHILD_ARGS_MARK_NAME)
}
if len(args) > 1 {
p.args = append(p.args, args[1:]...)
p.args = append(p.args, args[1 : ]...)
}
return p
}
@ -69,6 +79,7 @@ func (m *Manager) AddProcess(pid int) {
if process, err := os.FindProcess(pid); err == nil {
p := m.NewProcess("", nil, nil)
p.process = process
m.processes.Set(pid, p)
}
}
@ -133,6 +144,11 @@ func (m *Manager) SendTo(pid int, data interface{}) error {
return Send(pid, data)
}
// 清空管理器
func (m *Manager) Clear() {
m.processes.Clear()
}
// 当前进程总数
func (m *Manager) Size() int {
return m.processes.Size()

View File

@ -8,9 +8,8 @@ package gproc
import (
"os"
"errors"
"fmt"
"syscall"
"errors"
)
// 子进程
@ -18,8 +17,7 @@ type Process struct {
pm *Manager // 所属进程管理器
path string // 可执行文件绝对路径
args []string // 执行参数
//attr *os.ProcAttr // 进程属性
attr *syscall.ProcAttr // 进程属性
attr *os.ProcAttr // 进程属性
ppid int // 自定义关联的父进程ID
process *os.Process // 底层进程对象
}
@ -30,17 +28,44 @@ func (p *Process) Run() (int, error) {
return p.Pid(), nil
}
p.attr.Env = append(p.attr.Env, fmt.Sprintf("%s=%d", gPROC_ENV_KEY_PPID_KEY, p.ppid))
if pid, err := syscall.ForkExec(p.path, p.args, p.attr); err == nil {
p.process, _ = os.FindProcess(pid)
if process, err := os.StartProcess(p.path, p.args, p.attr); err == nil {
p.process = process
if p.pm != nil {
p.pm.processes.Set(pid, p)
p.pm.processes.Set(process.Pid, p)
}
return pid, nil
return process.Pid, nil
} else {
return 0, err
}
}
// 运行进程,守护进程,主进程退出后不退出
//func (p *Process) RunDaemon() (int, error) {
// if p.process != nil {
// return p.Pid(), nil
// }
// p.attr.Env = append(p.attr.Env, fmt.Sprintf("%s=%d", gPROC_ENV_KEY_PPID_KEY, p.ppid))
// procAttr := &syscall.ProcAttr {
// Dir : p.attr.Dir,
// Env : p.attr.Env,
// Sys : p.attr.Sys,
// Files : make([]uintptr, 0),
// }
// // 将os.ProcAttr.Files转换为底层的syscall.ProcAttr.Files
// for _, f := range p.attr.Files {
// procAttr.Files = append(procAttr.Files, f.Fd())
// }
// if pid, err := syscall.ForkExec(p.path, p.args, procAttr); err == nil {
// p.process, _ = os.FindProcess(pid)
// if p.pm != nil {
// p.pm.processes.Set(pid, p)
// }
// return pid, nil
// } else {
// return 0, err
// }
//}
func (p *Process) SetManager(m *Manager) {
p.pm = m
}
@ -70,11 +95,11 @@ func (p *Process) AddEnv(env []string) {
}
}
func (p *Process) SetAttr(attr *syscall.ProcAttr) {
func (p *Process) SetAttr(attr *os.ProcAttr) {
p.attr = attr
}
func (p *Process) GetAttr() *syscall.ProcAttr {
func (p *Process) GetAttr() *os.ProcAttr {
return p.attr
}

View File

@ -3,8 +3,6 @@ package main
import (
"gitee.com/johng/gf/g"
"gitee.com/johng/gf/g/net/ghttp"
"gitee.com/johng/gf/g/os/gtime"
"time"
)
func main() {
@ -13,10 +11,12 @@ func main() {
r.Response.Writeln("hello")
})
s.BindHandler("/restart", func(r *ghttp.Request){
r.Response.Writeln("restart server in 2 seconds")
gtime.SetTimeout(2*time.Second, func() {
r.Server.Restart()
})
r.Response.Writeln("restart server")
r.Server.Restart()
})
s.BindHandler("/shutdown", func(r *ghttp.Request){
r.Response.Writeln("shutdown server")
r.Server.Shutdown()
})
s.SetPort(8199)
s.Run()