mirror of
https://gitee.com/johng/gf
synced 2026-06-07 10:22:11 +08:00
ghttp.Server热重启机制开发中
This commit is contained in:
3
TODO
3
TODO
@ -8,7 +8,8 @@ ON THE WAY:
|
||||
7. 增加热编译工具,提高开发环境的开发/测试效率(媲美PHP开发效率);
|
||||
8. 增加可选择性的orm tag特性,用以数据表记录与struct对象转换的键名属性映射;
|
||||
9. orm增加更多数据库支持;
|
||||
|
||||
10. ghttp.Response增加输出内容后自动退出当前请求机制,不需要用户手动return,参考beego如何实现;
|
||||
11. 当二进制参数为nil时,gjson.LoadContent并将gjson.Json对象ToMap时会报错;
|
||||
|
||||
|
||||
|
||||
|
||||
@ -8,20 +8,20 @@ package ghttp
|
||||
|
||||
import (
|
||||
"os"
|
||||
"net"
|
||||
"sync"
|
||||
"errors"
|
||||
"strings"
|
||||
"reflect"
|
||||
"net/http"
|
||||
"gitee.com/johng/gf/g/os/glog"
|
||||
"gitee.com/johng/gf/g/os/gproc"
|
||||
"gitee.com/johng/gf/g/os/gcache"
|
||||
"gitee.com/johng/gf/g/util/gconv"
|
||||
"gitee.com/johng/gf/g/encoding/gjson"
|
||||
"gitee.com/johng/gf/g/container/gmap"
|
||||
"gitee.com/johng/gf/g/container/gtype"
|
||||
"gitee.com/johng/gf/g/container/gqueue"
|
||||
"net"
|
||||
"gitee.com/johng/gf/g/os/gproc"
|
||||
"gitee.com/johng/gf/g/encoding/gjson"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -115,6 +115,7 @@ var doneChan = make(chan struct{}, 100000)
|
||||
// Web Server进程初始化
|
||||
func init() {
|
||||
go func() {
|
||||
// 等待run消息(Run方法调用)
|
||||
<- runChan
|
||||
// 主进程只负责创建子进程
|
||||
if !gproc.IsChild() {
|
||||
@ -122,6 +123,7 @@ func init() {
|
||||
}
|
||||
// 开启进程消息监听处理
|
||||
handleProcessMsg()
|
||||
// 服务执行完成,需要退出
|
||||
doneChan <- struct{}{}
|
||||
}()
|
||||
}
|
||||
@ -222,14 +224,16 @@ func (s *Server) Run() error {
|
||||
// 开启异步关闭队列处理循环
|
||||
s.startCloseQueueLoop()
|
||||
|
||||
// 阻塞等待服务执行完成
|
||||
<- doneChan
|
||||
|
||||
glog.Printfln("web server pid %d exit successfully", gproc.Pid())
|
||||
return nil
|
||||
}
|
||||
|
||||
// 开启底层Web Server执行
|
||||
func (s *Server) startServer(fdMap listenerFdMap) {
|
||||
// 开始执行底层Web Server创建,端口监听
|
||||
var wg sync.WaitGroup
|
||||
var server *gracefulServer
|
||||
if len(s.config.HTTPSCertPath) > 0 && len(s.config.HTTPSKeyPath) > 0 {
|
||||
// HTTPS
|
||||
@ -250,10 +254,9 @@ func (s *Server) startServer(fdMap listenerFdMap) {
|
||||
}
|
||||
|
||||
for _, v := range array {
|
||||
wg.Add(1)
|
||||
go func(item string) {
|
||||
if isFd {
|
||||
tArray := strings.Split(item, ":")
|
||||
tArray := strings.Split(item, "#")
|
||||
server = s.newGracefulServer(tArray[0], gconv.Int(tArray[1]))
|
||||
} else {
|
||||
server = s.newGracefulServer(item)
|
||||
@ -265,7 +268,6 @@ func (s *Server) startServer(fdMap listenerFdMap) {
|
||||
if !strings.EqualFold(http.ErrServerClosed.Error(), err.Error()) {
|
||||
glog.Error(err)
|
||||
}
|
||||
wg.Done()
|
||||
}
|
||||
}(v)
|
||||
}
|
||||
@ -283,10 +285,9 @@ func (s *Server) startServer(fdMap listenerFdMap) {
|
||||
array = strings.Split(s.config.Addr, ",")
|
||||
}
|
||||
for _, v := range array {
|
||||
wg.Add(1)
|
||||
go func(item string) {
|
||||
if isFd {
|
||||
tArray := strings.Split(item, ":")
|
||||
tArray := strings.Split(item, "#")
|
||||
server = s.newGracefulServer(tArray[0], gconv.Int(tArray[1]))
|
||||
} else {
|
||||
server = s.newGracefulServer(item)
|
||||
@ -298,37 +299,28 @@ func (s *Server) startServer(fdMap listenerFdMap) {
|
||||
if !strings.EqualFold(http.ErrServerClosed.Error(), err.Error()) {
|
||||
glog.Error(err)
|
||||
}
|
||||
wg.Done()
|
||||
}
|
||||
}(v)
|
||||
}
|
||||
|
||||
s.status = 1
|
||||
|
||||
// 阻塞执行,直到所有Web Server退出
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// 重启Web Server
|
||||
func (s *Server) Restart() {
|
||||
// 如果是主进程,那么向所有子进程发送重启信号
|
||||
if !gproc.IsChild() {
|
||||
|
||||
procManager.Send(formatMsgBuffer(gMSG_RESTART, nil))
|
||||
return
|
||||
}
|
||||
//if pid, err := s.forkChildProcess(); err != nil {
|
||||
// glog.Errorf("server restart failed: %v, continue serving\n", err)
|
||||
//} else {
|
||||
// glog.Printf("server restart successfully, new pid: %d\n", pid)
|
||||
// s.Shutdown()
|
||||
//}
|
||||
sendProcessMsg(gproc.Pid(), gMSG_RESTART, nil)
|
||||
}
|
||||
|
||||
// 关闭Web Server
|
||||
func (s *Server) Shutdown() {
|
||||
// 如果是主进程,那么向所有子进程发送关闭信号
|
||||
if !gproc.IsChild() {
|
||||
|
||||
procManager.Send(formatMsgBuffer(gMSG_SHUTDOWN, nil))
|
||||
return
|
||||
}
|
||||
for _, v := range s.servers {
|
||||
@ -336,17 +328,15 @@ func (s *Server) Shutdown() {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
// 获取当前监听的文件描述符信息,构造成map返回
|
||||
func (s *Server) getListenerFdMap() map[string]string {
|
||||
m := map[string]string{
|
||||
m := map[string]string {
|
||||
"http" : "",
|
||||
"https" : "",
|
||||
}
|
||||
for _, v := range s.servers {
|
||||
if f, e := v.listener.(*net.TCPListener).File(); e == nil {
|
||||
str := v.addr + ":" + gconv.String(f.Fd()) + ","
|
||||
str := v.addr + "#" + gconv.String(f.Fd()) + ","
|
||||
if v.isHttps {
|
||||
m["https"] += str
|
||||
} else {
|
||||
|
||||
@ -8,41 +8,75 @@
|
||||
package ghttp
|
||||
|
||||
import (
|
||||
"os"
|
||||
"gitee.com/johng/gf/g/os/gproc"
|
||||
"gitee.com/johng/gf/g/encoding/gbinary"
|
||||
"strings"
|
||||
"gitee.com/johng/gf/g/util/gconv"
|
||||
"fmt"
|
||||
"gitee.com/johng/gf/g/encoding/gjson"
|
||||
"os"
|
||||
)
|
||||
|
||||
const (
|
||||
gMSG_START = iota
|
||||
gMSG_RESTART
|
||||
gMSG_SHUTDOWN
|
||||
gMSG_START = 10
|
||||
gMSG_RESTART = 20
|
||||
gMSG_CORESTART = 30
|
||||
gMSG_SHUTDOWN = 40
|
||||
gMSG_NEW_FORK = 50
|
||||
)
|
||||
|
||||
|
||||
// 处理进程间消息
|
||||
// 数据格式: 操作(8bit) | 参数(变长)
|
||||
func handleProcessMsg() {
|
||||
for {
|
||||
if msg := gproc.Receive(); msg != nil {
|
||||
fmt.Println(msg)
|
||||
act := gbinary.DecodeToInt(msg.Data[0 : 1])
|
||||
fmt.Println(gproc.Pid(), gproc.IsChild(), msg)
|
||||
act := gbinary.DecodeToUint(msg.Data[0 : 1])
|
||||
data := msg.Data[1 : ]
|
||||
if gproc.IsChild() {
|
||||
// 子进程
|
||||
switch act {
|
||||
// 开启所有Web Server(根据消息启动)
|
||||
case gMSG_START:
|
||||
sfm := bufferToServerFdMap(data)
|
||||
for k, v := range sfm {
|
||||
GetServer(k).startServer(v)
|
||||
if len(data) > 0 {
|
||||
sfm := bufferToServerFdMap(data)
|
||||
for k, v := range sfm {
|
||||
GetServer(k).startServer(v)
|
||||
}
|
||||
} else {
|
||||
serverMapping.RLockFunc(func(m map[string]interface{}) {
|
||||
for _, v := range m {
|
||||
v.(*Server).startServer(nil)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// 子进程收到重启消息,那么将自身的ServerFdMap信息收集后发送给主进程,由主进程进行统一调度
|
||||
case gMSG_RESTART:
|
||||
b, _ := gjson.Encode(getServerFdMap())
|
||||
sendProcessMsg(gproc.Ppid(), gMSG_RESTART, b)
|
||||
// 创建新的服务进程,子进程自动从父进程复制文件描述来监听同样的端口
|
||||
sfm := getServerFdMap()
|
||||
p := procManager.NewProcess(os.Args[0], os.Args, os.Environ())
|
||||
for name, m := range sfm {
|
||||
for fdk, fdv := range m {
|
||||
if len(fdv) > 0 {
|
||||
s := ""
|
||||
for _, item := range strings.Split(fdv, ",") {
|
||||
array := strings.Split(item, "#")
|
||||
fd := uintptr(gconv.Uint(array[1]))
|
||||
s += fmt.Sprintf("%s#%d", array[0], len(p.GetAttr().Files))
|
||||
p.GetAttr().Files = append(p.GetAttr().Files, fd)
|
||||
}
|
||||
sfm[name][fdk] = strings.TrimRight(s, ",")
|
||||
}
|
||||
}
|
||||
}
|
||||
p.SetPpid(gproc.Ppid())
|
||||
p.Run()
|
||||
fmt.Println(procManager)
|
||||
b, _ := gjson.Encode(sfm)
|
||||
sendProcessMsg(p.Pid(), gMSG_START, b)
|
||||
sendProcessMsg(gproc.Pid(), gMSG_SHUTDOWN, nil)
|
||||
sendProcessMsg(gproc.Ppid(), gMSG_NEW_FORK, gconv.Bytes(p.Pid()))
|
||||
|
||||
// 友好关闭服务链接并退出
|
||||
case gMSG_SHUTDOWN:
|
||||
@ -55,6 +89,7 @@ func handleProcessMsg() {
|
||||
|
||||
}
|
||||
} else {
|
||||
// 父进程
|
||||
switch act {
|
||||
// 开启服务
|
||||
case gMSG_START:
|
||||
@ -64,12 +99,14 @@ func handleProcessMsg() {
|
||||
|
||||
// 重启服务
|
||||
case gMSG_RESTART:
|
||||
// 创建新的服务进程,使用文件描述来监听同样的端口
|
||||
p := procManager.NewProcess(os.Args[0], os.Args, os.Environ())
|
||||
p.Run()
|
||||
sendProcessMsg(p.Pid(), gMSG_START, data)
|
||||
// 向所有子进程发送重启命令,子进程将会搜集Web Server信息发送给父进程进行协调重启工作
|
||||
procManager.Send(formatMsgBuffer(gMSG_RESTART, nil))
|
||||
|
||||
// 协调重启服务
|
||||
case gMSG_NEW_FORK:
|
||||
//sendProcessMsg(p.Pid(), gMSG_START, data)
|
||||
// 关闭旧的服务进程
|
||||
sendProcessMsg(msg.Pid, gMSG_SHUTDOWN, nil)
|
||||
//sendProcessMsg(msg.Pid, gMSG_SHUTDOWN, nil)
|
||||
|
||||
// 关闭服务
|
||||
case gMSG_SHUTDOWN:
|
||||
@ -88,6 +125,6 @@ func sendProcessMsg(pid int, act int, data []byte) {
|
||||
|
||||
// 生成一条满足Web Server进程通信协议的消息
|
||||
func formatMsgBuffer(act int, data []byte) []byte {
|
||||
return append(gbinary.EncodeInt8(int8(act)), data...)
|
||||
return append(gbinary.EncodeUint8(uint8(act)), data...)
|
||||
}
|
||||
|
||||
|
||||
@ -13,9 +13,7 @@ import (
|
||||
"gitee.com/johng/gf/g/os/glog"
|
||||
"github.com/fsnotify/fsnotify"
|
||||
"gitee.com/johng/gf/g/os/gfile"
|
||||
"gitee.com/johng/gf/g/os/gcache"
|
||||
"gitee.com/johng/gf/g/os/grpool"
|
||||
"gitee.com/johng/gf/g/util/gconv"
|
||||
"gitee.com/johng/gf/g/container/gmap"
|
||||
"gitee.com/johng/gf/g/container/glist"
|
||||
"gitee.com/johng/gf/g/container/gqueue"
|
||||
@ -25,7 +23,6 @@ import (
|
||||
type Watcher struct {
|
||||
watcher *fsnotify.Watcher // 底层fsnotify对象
|
||||
events *gqueue.Queue // 过滤后的事件通知,不会出现重复事件
|
||||
eventCache *gcache.Cache // 用于进行事件过滤,当同一监听文件在10ms内出现相同事件,则过滤
|
||||
closeChan chan struct{} // 关闭事件
|
||||
callbacks *gmap.StringInterfaceMap // 监听的回调函数
|
||||
}
|
||||
@ -72,7 +69,6 @@ func New() (*Watcher, error) {
|
||||
w := &Watcher {
|
||||
watcher : watch,
|
||||
events : gqueue.New(),
|
||||
eventCache : gcache.New(),
|
||||
closeChan : make(chan struct{}, 1),
|
||||
callbacks : gmap.NewStringInterfaceMap(),
|
||||
}
|
||||
@ -132,9 +128,6 @@ func (w *Watcher) startWatchLoop() {
|
||||
|
||||
// 监听事件
|
||||
case ev := <- w.watcher.Events:
|
||||
if !w.eventCache.Lock(ev.Name + ":" + gconv.String(ev.Op), 10) {
|
||||
continue
|
||||
}
|
||||
w.events.PushBack(&Event{
|
||||
Path : ev.Name,
|
||||
Op : Op(ev.Op),
|
||||
|
||||
@ -11,6 +11,7 @@ import (
|
||||
"os"
|
||||
"gitee.com/johng/gf/g/container/gmap"
|
||||
"fmt"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
// 进程管理器
|
||||
@ -35,10 +36,9 @@ func NewProcess(path string, args []string, environment []string) *Process {
|
||||
p := &Process {
|
||||
path : path,
|
||||
args : make([]string, 0),
|
||||
ppid : os.Getppid(),
|
||||
attr : &os.ProcAttr {
|
||||
Env : env,
|
||||
Files : []*os.File{ os.Stdin,os.Stdout,os.Stderr },
|
||||
ppid : os.Getpid(),
|
||||
attr : &syscall.ProcAttr {
|
||||
Files: []uintptr{os.Stdin.Fd(), os.Stdout.Fd(), os.Stderr.Fd()},
|
||||
},
|
||||
}
|
||||
p.args = append(p.args, args[0])
|
||||
@ -64,6 +64,19 @@ func (m *Manager) GetProcess(pid int) *Process {
|
||||
return nil
|
||||
}
|
||||
|
||||
// 添加一个已存在进程到进程管理器中
|
||||
func (m *Manager) AddProcess(pid int) {
|
||||
if process, err := os.FindProcess(pid); err == nil {
|
||||
p := m.NewProcess("", nil, nil)
|
||||
p.process = process
|
||||
}
|
||||
}
|
||||
|
||||
// 移除进程管理器中的指定进程
|
||||
func (m *Manager) RemoveProcess(pid int) {
|
||||
m.processes.Remove(pid)
|
||||
}
|
||||
|
||||
// 获取所有的进程对象,构成列表返回
|
||||
func (m *Manager) Processes() []*Process {
|
||||
processes := make([]*Process, 0)
|
||||
|
||||
@ -10,6 +10,7 @@ import (
|
||||
"os"
|
||||
"errors"
|
||||
"fmt"
|
||||
"syscall"
|
||||
)
|
||||
|
||||
// 子进程
|
||||
@ -17,7 +18,8 @@ type Process struct {
|
||||
pm *Manager // 所属进程管理器
|
||||
path string // 可执行文件绝对路径
|
||||
args []string // 执行参数
|
||||
attr *os.ProcAttr // 进程属性
|
||||
//attr *os.ProcAttr // 进程属性
|
||||
attr *syscall.ProcAttr // 进程属性
|
||||
ppid int // 自定义关联的父进程ID
|
||||
process *os.Process // 底层进程对象
|
||||
}
|
||||
@ -28,12 +30,12 @@ func (p *Process) Run() (int, error) {
|
||||
return p.Pid(), nil
|
||||
}
|
||||
p.attr.Env = append(p.attr.Env, fmt.Sprintf("%s=%d", gPROC_ENV_KEY_PPID_KEY, p.ppid))
|
||||
if process, err := os.StartProcess(p.path, p.args, p.attr); err == nil {
|
||||
p.process = process
|
||||
if pid, err := syscall.ForkExec(p.path, p.args, p.attr); err == nil {
|
||||
p.process, _ = os.FindProcess(pid)
|
||||
if p.pm != nil {
|
||||
p.pm.processes.Set(process.Pid, p)
|
||||
p.pm.processes.Set(pid, p)
|
||||
}
|
||||
return process.Pid, nil
|
||||
return pid, nil
|
||||
} else {
|
||||
return 0, err
|
||||
}
|
||||
@ -68,11 +70,11 @@ func (p *Process) AddEnv(env []string) {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Process) SetAttr(attr *os.ProcAttr) {
|
||||
func (p *Process) SetAttr(attr *syscall.ProcAttr) {
|
||||
p.attr = attr
|
||||
}
|
||||
|
||||
func (p *Process) GetAttr() *os.ProcAttr {
|
||||
func (p *Process) GetAttr() *syscall.ProcAttr {
|
||||
return p.attr
|
||||
}
|
||||
|
||||
|
||||
23
geg/net/ghttp/hot_restart.go
Normal file
23
geg/net/ghttp/hot_restart.go
Normal file
@ -0,0 +1,23 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"gitee.com/johng/gf/g"
|
||||
"gitee.com/johng/gf/g/net/ghttp"
|
||||
"gitee.com/johng/gf/g/os/gtime"
|
||||
"time"
|
||||
)
|
||||
|
||||
func main() {
|
||||
s := g.Server()
|
||||
s.BindHandler("/", func(r *ghttp.Request){
|
||||
r.Response.Writeln("hello")
|
||||
})
|
||||
s.BindHandler("/restart", func(r *ghttp.Request){
|
||||
r.Response.Writeln("restart server in 2 seconds")
|
||||
gtime.SetTimeout(2*time.Second, func() {
|
||||
r.Server.Restart()
|
||||
})
|
||||
})
|
||||
s.SetPort(8199)
|
||||
s.Run()
|
||||
}
|
||||
@ -6,6 +6,6 @@ import (
|
||||
)
|
||||
|
||||
func main () {
|
||||
err := gproc.Send(29260, "hello process!")
|
||||
err := gproc.Send(26248, []byte{40})
|
||||
fmt.Println(err)
|
||||
}
|
||||
|
||||
@ -4,10 +4,13 @@ import (
|
||||
"fmt"
|
||||
"gitee.com/johng/gf/g/os/gfile"
|
||||
"gitee.com/johng/gf/g/os/gtime"
|
||||
"gitee.com/johng/gf/g/encoding/gbinary"
|
||||
)
|
||||
|
||||
|
||||
func main(){
|
||||
fmt.Println(uint8(int(300)))
|
||||
return
|
||||
t1 := gfile.MTime("/home/john/Workspace/Go/GOPATH/src/gitee.com/johng/gf/geg/other/test.go")
|
||||
t2 := gtime.Second()
|
||||
fmt.Println(t1)
|
||||
|
||||
Reference in New Issue
Block a user