ghttp.Server热重启特性开发中

This commit is contained in:
John
2018-05-10 19:16:41 +08:00
parent d97e8c8204
commit 806d565a32
7 changed files with 190 additions and 87 deletions

View File

@ -20,13 +20,13 @@ import (
"gitee.com/johng/gf/g/container/gtype"
"gitee.com/johng/gf/g/container/gqueue"
"fmt"
"gitee.com/johng/gf/g/os/gpm"
"net"
"os/signal"
"syscall"
"time"
"gitee.com/johng/gf/g/os/gcmd"
"gitee.com/johng/gf/g/os/gproc"
"gitee.com/johng/gf/g/encoding/gjson"
"gitee.com/johng/gf/g/os/gtime"
"time"
)
const (
@ -83,7 +83,6 @@ type Server struct {
errorLogger *glog.Logger // error log日志对象
// 多进程管理控制
manager *gproc.Manager // 多进程管理
heartbeats
}
// 域名、URI与回调函数的绑定记录表
@ -146,8 +145,7 @@ func GetServer(name...interface{}) (*Server) {
accessLogger : glog.New(),
errorLogger : glog.New(),
logHandler : gtype.NewInterface(),
manager : gproc.New(),
signalChan : make(chan os.Signal),
manager : gproc.NewManager(),
}
s.errorLogger.SetBacktraceSkip(4)
s.accessLogger.SetBacktraceSkip(4)
@ -176,25 +174,26 @@ func (s *Server) Run() error {
// 开启异步关闭队列处理循环
s.startCloseQueueLoop()
// 开启Web Server执行
s.startServer()
// 主进程只负责创建子进程
if !gproc.IsChild() {
p := s.manager.NewProcess(os.Args[0], os.Args, os.Environ())
p.Run()
gtime.SetTimeout(3*time.Second, func() {
b, _ := gjson.Encode(s.getAllListenerFdMap())
s.sendMsg(p.Pid(), gMSG_START, b)
})
}
// 开启进程消息监听处理
s.handleProcessMsg()
return nil
}
// 开启底层Web Server执行
func (s *Server) startServer() {
// 主进程只负责创建子进程
if !s.isChildProcess() {
s.forkChildProcess()
time.Sleep(10*time.Second)
time.Sleep(1000*time.Second)
return
}
func (s *Server) startServer(fdMap map[string]string) {
fmt.Println("startServer")
// 开始执行底层Web Server创建端口监听
var fd = 3
var wg sync.WaitGroup
var fcount = s.processFileCount()
var server *gracefulServer
if len(s.config.HTTPSCertPath) > 0 && len(s.config.HTTPSKeyPath) > 0 {
// HTTPS
@ -205,15 +204,23 @@ func (s *Server) startServer() {
s.config.HTTPSAddr = gDEFAULT_HTTPS_ADDR
}
}
array := strings.Split(s.config.HTTPSAddr, ",")
var array []string
var isFd bool
if v, ok := fdMap["https"]; ok && len(v) > 0 {
isFd = true
array = strings.Split(v, ",")
} else {
array = strings.Split(s.config.HTTPSAddr, ",")
}
for _, v := range array {
wg.Add(1)
go func(addr string) {
if s.isChildProcess() && fcount > 0 {
server = s.newGracefulServer(addr, fd)
fd++
go func(item string) {
if isFd {
tArray := strings.Split(item, ":")
server = s.newGracefulServer(tArray[0], gconv.Int(tArray[1]))
} else {
server = s.newGracefulServer(addr)
server = s.newGracefulServer(item)
}
s.servers = append(s.servers, server)
if err := server.ListenAndServeTLS(s.config.HTTPSCertPath, s.config.HTTPSKeyPath); err != nil {
@ -230,22 +237,27 @@ func (s *Server) startServer() {
if s.servedCount.Val() == 0 && len(s.config.Addr) == 0 {
s.config.Addr = gDEFAULT_HTTP_ADDR
}
array := strings.Split(s.config.Addr, ",")
var array []string
var isFd bool
if v, ok := fdMap["http"]; ok && len(v) > 0 {
isFd = true
array = strings.Split(v, ",")
} else {
array = strings.Split(s.config.Addr, ",")
}
for _, v := range array {
wg.Add(1)
go func(addr string) {
if s.isChildProcess() && fcount > 0 {
server = s.newGracefulServer(addr, fd)
fd++
go func(item string) {
if isFd {
tArray := strings.Split(item, ":")
server = s.newGracefulServer(tArray[0], gconv.Int(tArray[1]))
} else {
server = s.newGracefulServer(addr)
server = s.newGracefulServer(item)
}
s.servers = append(s.servers, server)
if err := server.ListenAndServe(); err != nil {
// 如果非关闭错误,那么提示报错,否则认为是正常的服务关闭操作
if !strings.EqualFold(http.ErrServerClosed.Error(), err.Error()) {
glog.Println(fd)
glog.Println(os.Args)
glog.Error(err)
}
wg.Done()
@ -259,26 +271,6 @@ func (s *Server) startServer() {
wg.Wait()
}
// 异步处理信号量监控
func (s *Server) handleSignals() {
var sig os.Signal
signal.Notify(
s.signalChan,
syscall.SIGTERM,
syscall.SIGUSR2,
)
for {
sig = <- s.signalChan
switch sig {
case syscall.SIGTERM: s.Shutdown()
case syscall.SIGUSR2: s.Restart()
default:
}
}
}
// 重启Web Server
func (s *Server) Restart() {
// 如果是主进程,那么向所有子进程发送重启信号
@ -325,6 +317,43 @@ func (s *Server) getTopId() int {
return 0
}
// 获取当前监听的文件描述符信息构造成map返回
func (s *Server) getAllListenerFdMap() map[string]string {
m := map[string]string{
"http" : "",
"https" : "",
}
for _, v := range s.servers {
if f, e := v.listener.(*net.TCPListener).File(); e == nil {
str := v.addr + ":" + gconv.String(f.Fd()) + ","
if v.isHttps {
m["https"] += str
} else {
m["http"] += str
}
} else {
glog.Errorfln("failed to get listener file: %v", e)
}
}
if len(m["http"]) > 0 {
m["http"] = m["http"][0 : len(m["http"]) - 1]
}
if len(m["https"]) > 0 {
m["https"] = m["https"][0 : len(m["https"]) - 1]
}
return m
}
// 二进制转换为FdMap
func (s *Server) bufferToFdMap(buffer []byte) map[string]string {
m := make(map[string]string)
j, _ := gjson.LoadContent(buffer, "json")
for k, v := range j.ToMap() {
m[k] = gconv.String(v)
}
return m
}
// 创建子进程来监听并处理新的HTTP请求与父进程使用的是同一个socket文件描述符
func (s *Server) forkChildProcess() (int, error) {
// 获取所有http server的file

View File

@ -1,26 +0,0 @@
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package ghttp
import (
"fmt"
"gitee.com/johng/gf/g/net/gtcp"
)
// 开启命令监听端口
func (s *Server) startCmdService() {
s.BindHandler("/heartbeat", func(r *Request) {
})
s.BindHandler("/restart", func(r *Request) {
})
server := s.newGracefulServer(fmt.Sprintf("127.0.0.1:%d", s.cmdPort))
if err := server.ListenAndServe(); err != nil {
}
}

View File

@ -0,0 +1,59 @@
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// Web Server进程间通信
package ghttp
import (
"os"
"gitee.com/johng/gf/g/os/gproc"
"gitee.com/johng/gf/g/encoding/gbinary"
"fmt"
)
const (
gMSG_START = iota
gMSG_RESTART
gMSG_SHUTDOWN
gMSG_EXIT
)
// 处理进程间消息
// 数据格式: 操作(8bit) | 参数(变长)
func (s *Server) handleProcessMsg() {
for {
if msg := gproc.Receive(); msg != nil {
fmt.Println(msg)
act := gbinary.DecodeToInt(msg.Data[0 : 1])
data := msg.Data[1 : ]
if gproc.IsChild() {
switch act {
case gMSG_START:
s.startServer(s.bufferToFdMap(data))
case gMSG_RESTART:
case gMSG_SHUTDOWN: s.Shutdown()
case gMSG_EXIT: os.Exit(0)
}
} else {
switch act {
case gMSG_START:
case gMSG_RESTART:
case gMSG_SHUTDOWN:
case gMSG_EXIT: os.Exit(0)
}
}
}
}
}
// 向进程发送操作消息
func (s *Server) sendMsg(pid int, act int, data []byte) {
gproc.Send(pid, append(gbinary.EncodeInt8(int8(act)), data...))
}

View File

@ -22,6 +22,7 @@ type gracefulServer struct {
addr string
httpServer *http.Server
listener net.Listener
isHttps bool
shutdownChan chan bool
}
@ -61,6 +62,11 @@ func (s *gracefulServer) ListenAndServe() error {
return s.doServe()
}
// 设置自定义fd
func (s *gracefulServer) setFd(fd int) {
s.fd = uintptr(fd)
}
// 执行HTTPS监听
func (s *gracefulServer) ListenAndServeTLS(certFile, keyFile string) error {
addr := s.httpServer.Addr
@ -82,6 +88,7 @@ func (s *gracefulServer) ListenAndServeTLS(certFile, keyFile string) error {
return err
}
s.listener = tls.NewListener(ln, config)
s.isHttps = true
return s.doServe()
}

View File

@ -40,8 +40,10 @@ func init() {
if !gfile.Exists(path) {
gfile.Create(path)
}
fmt.Println(path)
// 文件事件监听,如果通信数据文件有任何变化,读取文件并添加到消息队列
err := gfsnotify.Add(path, func(event *gfsnotify.Event) {
fmt.Println(event)
commLocker.Lock()
buffer := gfile.GetBinContents(path)
os.Truncate(path, 0)

View File

@ -19,24 +19,23 @@ type Manager struct {
}
// 创建一个进程管理器
func New () *Manager {
func NewManager() *Manager {
return &Manager{
processes : gmap.NewIntInterfaceMap(),
}
}
// 创建一个进程(不执行)
func (m *Manager) NewProcess(path string, args []string, environment []string) *Process {
func NewProcess(path string, args []string, environment []string) *Process {
env := make([]string, len(environment) + 2)
for k, v := range environment {
env[k] = v
}
env[len(env) - 2] = fmt.Sprintf("%s=%d", gPROC_ENV_KEY_PPID_KEY, os.Getpid())
env[len(env) - 1] = fmt.Sprintf("%s=%s", gPROC_TEMP_DIR_ENV_KEY, os.TempDir())
p := &Process {
pm : m,
path : path,
args : make([]string, 0),
ppid : os.Getppid(),
attr : &os.ProcAttr {
Env : env,
Files : []*os.File{ os.Stdin,os.Stdout,os.Stderr },
@ -50,6 +49,13 @@ func (m *Manager) NewProcess(path string, args []string, environment []string) *
return p
}
// 创建一个进程(不执行)
func (m *Manager) NewProcess(path string, args []string, environment []string) *Process {
p := NewProcess(path, args, environment)
p.SetManager(m)
return p
}
// 获取当前进程管理器中的一个进程
func (m *Manager) GetProcess(pid int) *Process {
if v := m.processes.Get(pid); v != nil {
@ -99,8 +105,18 @@ func (m *Manager) SignalAll(sig os.Signal) error {
return nil
}
// 获取当前进程管理器中的一个进程
func (m *Manager) Send(pid int, data interface{}) error {
// 向所有进程发送消息
func (m *Manager) Send(data interface{}) error {
for _, p := range m.Processes() {
if err := p.Send(data); err != nil {
return err
}
}
return nil
}
// 向指定进程发送消息
func (m *Manager) SendTo(pid int, data interface{}) error {
return Send(pid, data)
}

View File

@ -9,6 +9,7 @@ package gproc
import (
"os"
"errors"
"fmt"
)
// 子进程
@ -17,6 +18,7 @@ type Process struct {
path string // 可执行文件绝对路径
args []string // 执行参数
attr *os.ProcAttr // 进程属性
ppid int // 自定义关联的父进程ID
process *os.Process // 底层进程对象
}
@ -25,15 +27,27 @@ func (p *Process) Run() (int, error) {
if p.process != nil {
return p.Pid(), nil
}
p.attr.Env = append(p.attr.Env, fmt.Sprintf("%s=%d", gPROC_ENV_KEY_PPID_KEY, p.ppid))
if process, err := os.StartProcess(p.path, p.args, p.attr); err == nil {
p.process = process
p.pm.processes.Set(process.Pid, p)
if p.pm != nil {
p.pm.processes.Set(process.Pid, p)
}
return process.Pid, nil
} else {
return 0, err
}
}
func (p *Process) SetManager(m *Manager) {
p.pm = m
}
// 设置自定义的父进程ID
func (p *Process) SetPpid(ppid int) {
p.ppid = ppid
}
func (p *Process) SetArgs(args []string) {
p.args = args
}
@ -89,7 +103,9 @@ func (p *Process) Release() error {
// Kill causes the Process to exit immediately.
func (p *Process) Kill() error {
if err := p.process.Kill(); err == nil {
p.pm.processes.Remove(p.Pid())
if p.pm != nil {
p.pm.processes.Remove(p.Pid())
}
return nil
} else {
return err