diff --git a/TODO b/TODO index d25b15f25..7b1f2297b 100644 --- a/TODO +++ b/TODO @@ -8,7 +8,8 @@ ON THE WAY: 7. 增加热编译工具,提高开发环境的开发/测试效率(媲美PHP开发效率); 8. 增加可选择性的orm tag特性,用以数据表记录与struct对象转换的键名属性映射; 9. orm增加更多数据库支持; - +10. ghttp.Response增加输出内容后自动退出当前请求机制,不需要用户手动return,参考beego如何实现; +11. 当二进制参数为nil时,gjson.LoadContent并将gjson.Json对象ToMap时会报错; diff --git a/g/net/ghttp/ghttp_server.go b/g/net/ghttp/ghttp_server.go index 72e742b34..ee17765b1 100644 --- a/g/net/ghttp/ghttp_server.go +++ b/g/net/ghttp/ghttp_server.go @@ -8,20 +8,20 @@ package ghttp import ( "os" + "net" "sync" "errors" "strings" "reflect" "net/http" "gitee.com/johng/gf/g/os/glog" + "gitee.com/johng/gf/g/os/gproc" "gitee.com/johng/gf/g/os/gcache" "gitee.com/johng/gf/g/util/gconv" + "gitee.com/johng/gf/g/encoding/gjson" "gitee.com/johng/gf/g/container/gmap" "gitee.com/johng/gf/g/container/gtype" "gitee.com/johng/gf/g/container/gqueue" - "net" - "gitee.com/johng/gf/g/os/gproc" - "gitee.com/johng/gf/g/encoding/gjson" ) const ( @@ -115,6 +115,7 @@ var doneChan = make(chan struct{}, 100000) // Web Server进程初始化 func init() { go func() { + // 等待run消息(Run方法调用) <- runChan // 主进程只负责创建子进程 if !gproc.IsChild() { @@ -122,6 +123,7 @@ func init() { } // 开启进程消息监听处理 handleProcessMsg() + // 服务执行完成,需要退出 doneChan <- struct{}{} }() } @@ -222,14 +224,16 @@ func (s *Server) Run() error { // 开启异步关闭队列处理循环 s.startCloseQueueLoop() + // 阻塞等待服务执行完成 <- doneChan + + glog.Printfln("web server pid %d exit successfully", gproc.Pid()) return nil } // 开启底层Web Server执行 func (s *Server) startServer(fdMap listenerFdMap) { // 开始执行底层Web Server创建,端口监听 - var wg sync.WaitGroup var server *gracefulServer if len(s.config.HTTPSCertPath) > 0 && len(s.config.HTTPSKeyPath) > 0 { // HTTPS @@ -250,10 +254,9 @@ func (s *Server) startServer(fdMap listenerFdMap) { } for _, v := range array { - wg.Add(1) go func(item string) { if isFd { - tArray := strings.Split(item, ":") + tArray := strings.Split(item, "#") server = s.newGracefulServer(tArray[0], gconv.Int(tArray[1])) } else { server = s.newGracefulServer(item) @@ -265,7 +268,6 @@ func (s *Server) startServer(fdMap listenerFdMap) { if !strings.EqualFold(http.ErrServerClosed.Error(), err.Error()) { glog.Error(err) } - wg.Done() } }(v) } @@ -283,10 +285,9 @@ func (s *Server) startServer(fdMap listenerFdMap) { array = strings.Split(s.config.Addr, ",") } for _, v := range array { - wg.Add(1) go func(item string) { if isFd { - tArray := strings.Split(item, ":") + tArray := strings.Split(item, "#") server = s.newGracefulServer(tArray[0], gconv.Int(tArray[1])) } else { server = s.newGracefulServer(item) @@ -298,37 +299,28 @@ func (s *Server) startServer(fdMap listenerFdMap) { if !strings.EqualFold(http.ErrServerClosed.Error(), err.Error()) { glog.Error(err) } - wg.Done() } }(v) } s.status = 1 - - // 阻塞执行,直到所有Web Server退出 - wg.Wait() } // 重启Web Server func (s *Server) Restart() { // 如果是主进程,那么向所有子进程发送重启信号 if !gproc.IsChild() { - + procManager.Send(formatMsgBuffer(gMSG_RESTART, nil)) return } - //if pid, err := s.forkChildProcess(); err != nil { - // glog.Errorf("server restart failed: %v, continue serving\n", err) - //} else { - // glog.Printf("server restart successfully, new pid: %d\n", pid) - // s.Shutdown() - //} + sendProcessMsg(gproc.Pid(), gMSG_RESTART, nil) } // 关闭Web Server func (s *Server) Shutdown() { // 如果是主进程,那么向所有子进程发送关闭信号 if !gproc.IsChild() { - + procManager.Send(formatMsgBuffer(gMSG_SHUTDOWN, nil)) return } for _, v := range s.servers { @@ -336,17 +328,15 @@ func (s *Server) Shutdown() { } } - - // 获取当前监听的文件描述符信息,构造成map返回 func (s *Server) getListenerFdMap() map[string]string { - m := map[string]string{ + m := map[string]string { "http" : "", "https" : "", } for _, v := range s.servers { if f, e := v.listener.(*net.TCPListener).File(); e == nil { - str := v.addr + ":" + gconv.String(f.Fd()) + "," + str := v.addr + "#" + gconv.String(f.Fd()) + "," if v.isHttps { m["https"] += str } else { diff --git a/g/net/ghttp/ghttp_server_comm.go b/g/net/ghttp/ghttp_server_comm.go index 7dca87c9c..975d8abda 100644 --- a/g/net/ghttp/ghttp_server_comm.go +++ b/g/net/ghttp/ghttp_server_comm.go @@ -8,41 +8,75 @@ package ghttp import ( + "os" "gitee.com/johng/gf/g/os/gproc" "gitee.com/johng/gf/g/encoding/gbinary" + "strings" + "gitee.com/johng/gf/g/util/gconv" "fmt" "gitee.com/johng/gf/g/encoding/gjson" - "os" ) const ( - gMSG_START = iota - gMSG_RESTART - gMSG_SHUTDOWN + gMSG_START = 10 + gMSG_RESTART = 20 + gMSG_CORESTART = 30 + gMSG_SHUTDOWN = 40 + gMSG_NEW_FORK = 50 ) - // 处理进程间消息 // 数据格式: 操作(8bit) | 参数(变长) func handleProcessMsg() { for { if msg := gproc.Receive(); msg != nil { - fmt.Println(msg) - act := gbinary.DecodeToInt(msg.Data[0 : 1]) + fmt.Println(gproc.Pid(), gproc.IsChild(), msg) + act := gbinary.DecodeToUint(msg.Data[0 : 1]) data := msg.Data[1 : ] if gproc.IsChild() { + // 子进程 switch act { // 开启所有Web Server(根据消息启动) case gMSG_START: - sfm := bufferToServerFdMap(data) - for k, v := range sfm { - GetServer(k).startServer(v) + if len(data) > 0 { + sfm := bufferToServerFdMap(data) + for k, v := range sfm { + GetServer(k).startServer(v) + } + } else { + serverMapping.RLockFunc(func(m map[string]interface{}) { + for _, v := range m { + v.(*Server).startServer(nil) + } + }) } // 子进程收到重启消息,那么将自身的ServerFdMap信息收集后发送给主进程,由主进程进行统一调度 case gMSG_RESTART: - b, _ := gjson.Encode(getServerFdMap()) - sendProcessMsg(gproc.Ppid(), gMSG_RESTART, b) + // 创建新的服务进程,子进程自动从父进程复制文件描述来监听同样的端口 + sfm := getServerFdMap() + p := procManager.NewProcess(os.Args[0], os.Args, os.Environ()) + for name, m := range sfm { + for fdk, fdv := range m { + if len(fdv) > 0 { + s := "" + for _, item := range strings.Split(fdv, ",") { + array := strings.Split(item, "#") + fd := uintptr(gconv.Uint(array[1])) + s += fmt.Sprintf("%s#%d", array[0], len(p.GetAttr().Files)) + p.GetAttr().Files = append(p.GetAttr().Files, fd) + } + sfm[name][fdk] = strings.TrimRight(s, ",") + } + } + } + p.SetPpid(gproc.Ppid()) + p.Run() + fmt.Println(procManager) + b, _ := gjson.Encode(sfm) + sendProcessMsg(p.Pid(), gMSG_START, b) + sendProcessMsg(gproc.Pid(), gMSG_SHUTDOWN, nil) + sendProcessMsg(gproc.Ppid(), gMSG_NEW_FORK, gconv.Bytes(p.Pid())) // 友好关闭服务链接并退出 case gMSG_SHUTDOWN: @@ -55,6 +89,7 @@ func handleProcessMsg() { } } else { + // 父进程 switch act { // 开启服务 case gMSG_START: @@ -64,12 +99,14 @@ func handleProcessMsg() { // 重启服务 case gMSG_RESTART: - // 创建新的服务进程,使用文件描述来监听同样的端口 - p := procManager.NewProcess(os.Args[0], os.Args, os.Environ()) - p.Run() - sendProcessMsg(p.Pid(), gMSG_START, data) + // 向所有子进程发送重启命令,子进程将会搜集Web Server信息发送给父进程进行协调重启工作 + procManager.Send(formatMsgBuffer(gMSG_RESTART, nil)) + + // 协调重启服务 + case gMSG_NEW_FORK: + //sendProcessMsg(p.Pid(), gMSG_START, data) // 关闭旧的服务进程 - sendProcessMsg(msg.Pid, gMSG_SHUTDOWN, nil) + //sendProcessMsg(msg.Pid, gMSG_SHUTDOWN, nil) // 关闭服务 case gMSG_SHUTDOWN: @@ -88,6 +125,6 @@ func sendProcessMsg(pid int, act int, data []byte) { // 生成一条满足Web Server进程通信协议的消息 func formatMsgBuffer(act int, data []byte) []byte { - return append(gbinary.EncodeInt8(int8(act)), data...) + return append(gbinary.EncodeUint8(uint8(act)), data...) } diff --git a/g/os/gfsnotify/gfsnotify.go b/g/os/gfsnotify/gfsnotify.go index c032f5e2c..cbed56312 100644 --- a/g/os/gfsnotify/gfsnotify.go +++ b/g/os/gfsnotify/gfsnotify.go @@ -13,9 +13,7 @@ import ( "gitee.com/johng/gf/g/os/glog" "github.com/fsnotify/fsnotify" "gitee.com/johng/gf/g/os/gfile" - "gitee.com/johng/gf/g/os/gcache" "gitee.com/johng/gf/g/os/grpool" - "gitee.com/johng/gf/g/util/gconv" "gitee.com/johng/gf/g/container/gmap" "gitee.com/johng/gf/g/container/glist" "gitee.com/johng/gf/g/container/gqueue" @@ -25,7 +23,6 @@ import ( type Watcher struct { watcher *fsnotify.Watcher // 底层fsnotify对象 events *gqueue.Queue // 过滤后的事件通知,不会出现重复事件 - eventCache *gcache.Cache // 用于进行事件过滤,当同一监听文件在10ms内出现相同事件,则过滤 closeChan chan struct{} // 关闭事件 callbacks *gmap.StringInterfaceMap // 监听的回调函数 } @@ -72,7 +69,6 @@ func New() (*Watcher, error) { w := &Watcher { watcher : watch, events : gqueue.New(), - eventCache : gcache.New(), closeChan : make(chan struct{}, 1), callbacks : gmap.NewStringInterfaceMap(), } @@ -132,9 +128,6 @@ func (w *Watcher) startWatchLoop() { // 监听事件 case ev := <- w.watcher.Events: - if !w.eventCache.Lock(ev.Name + ":" + gconv.String(ev.Op), 10) { - continue - } w.events.PushBack(&Event{ Path : ev.Name, Op : Op(ev.Op), diff --git a/g/os/gproc/gproc_manager.go b/g/os/gproc/gproc_manager.go index 336c2a931..dc2bb65c7 100644 --- a/g/os/gproc/gproc_manager.go +++ b/g/os/gproc/gproc_manager.go @@ -11,6 +11,7 @@ import ( "os" "gitee.com/johng/gf/g/container/gmap" "fmt" + "syscall" ) // 进程管理器 @@ -35,10 +36,9 @@ func NewProcess(path string, args []string, environment []string) *Process { p := &Process { path : path, args : make([]string, 0), - ppid : os.Getppid(), - attr : &os.ProcAttr { - Env : env, - Files : []*os.File{ os.Stdin,os.Stdout,os.Stderr }, + ppid : os.Getpid(), + attr : &syscall.ProcAttr { + Files: []uintptr{os.Stdin.Fd(), os.Stdout.Fd(), os.Stderr.Fd()}, }, } p.args = append(p.args, args[0]) @@ -64,6 +64,19 @@ func (m *Manager) GetProcess(pid int) *Process { return nil } +// 添加一个已存在进程到进程管理器中 +func (m *Manager) AddProcess(pid int) { + if process, err := os.FindProcess(pid); err == nil { + p := m.NewProcess("", nil, nil) + p.process = process + } +} + +// 移除进程管理器中的指定进程 +func (m *Manager) RemoveProcess(pid int) { + m.processes.Remove(pid) +} + // 获取所有的进程对象,构成列表返回 func (m *Manager) Processes() []*Process { processes := make([]*Process, 0) diff --git a/g/os/gproc/gproc_proccess.go b/g/os/gproc/gproc_proccess.go index 0420d1a17..b52459c55 100644 --- a/g/os/gproc/gproc_proccess.go +++ b/g/os/gproc/gproc_proccess.go @@ -10,6 +10,7 @@ import ( "os" "errors" "fmt" + "syscall" ) // 子进程 @@ -17,7 +18,8 @@ type Process struct { pm *Manager // 所属进程管理器 path string // 可执行文件绝对路径 args []string // 执行参数 - attr *os.ProcAttr // 进程属性 + //attr *os.ProcAttr // 进程属性 + attr *syscall.ProcAttr // 进程属性 ppid int // 自定义关联的父进程ID process *os.Process // 底层进程对象 } @@ -28,12 +30,12 @@ func (p *Process) Run() (int, error) { return p.Pid(), nil } p.attr.Env = append(p.attr.Env, fmt.Sprintf("%s=%d", gPROC_ENV_KEY_PPID_KEY, p.ppid)) - if process, err := os.StartProcess(p.path, p.args, p.attr); err == nil { - p.process = process + if pid, err := syscall.ForkExec(p.path, p.args, p.attr); err == nil { + p.process, _ = os.FindProcess(pid) if p.pm != nil { - p.pm.processes.Set(process.Pid, p) + p.pm.processes.Set(pid, p) } - return process.Pid, nil + return pid, nil } else { return 0, err } @@ -68,11 +70,11 @@ func (p *Process) AddEnv(env []string) { } } -func (p *Process) SetAttr(attr *os.ProcAttr) { +func (p *Process) SetAttr(attr *syscall.ProcAttr) { p.attr = attr } -func (p *Process) GetAttr() *os.ProcAttr { +func (p *Process) GetAttr() *syscall.ProcAttr { return p.attr } diff --git a/geg/net/ghttp/hot_restart.go b/geg/net/ghttp/hot_restart.go new file mode 100644 index 000000000..4869e139c --- /dev/null +++ b/geg/net/ghttp/hot_restart.go @@ -0,0 +1,23 @@ +package main + +import ( + "gitee.com/johng/gf/g" + "gitee.com/johng/gf/g/net/ghttp" + "gitee.com/johng/gf/g/os/gtime" + "time" +) + +func main() { + s := g.Server() + s.BindHandler("/", func(r *ghttp.Request){ + r.Response.Writeln("hello") + }) + s.BindHandler("/restart", func(r *ghttp.Request){ + r.Response.Writeln("restart server in 2 seconds") + gtime.SetTimeout(2*time.Second, func() { + r.Server.Restart() + }) + }) + s.SetPort(8199) + s.Run() +} \ No newline at end of file diff --git a/geg/os/gproc/gproc2.go b/geg/os/gproc/gproc2.go index a0a3f0b56..e92aed49c 100644 --- a/geg/os/gproc/gproc2.go +++ b/geg/os/gproc/gproc2.go @@ -6,6 +6,6 @@ import ( ) func main () { - err := gproc.Send(29260, "hello process!") + err := gproc.Send(26248, []byte{40}) fmt.Println(err) } diff --git a/geg/other/test.go b/geg/other/test.go index af3b1e9cb..1c91035f0 100644 --- a/geg/other/test.go +++ b/geg/other/test.go @@ -4,10 +4,13 @@ import ( "fmt" "gitee.com/johng/gf/g/os/gfile" "gitee.com/johng/gf/g/os/gtime" + "gitee.com/johng/gf/g/encoding/gbinary" ) func main(){ + fmt.Println(uint8(int(300))) + return t1 := gfile.MTime("/home/john/Workspace/Go/GOPATH/src/gitee.com/johng/gf/geg/other/test.go") t2 := gtime.Second() fmt.Println(t1)