diff --git a/TODO b/TODO index d25b15f25..fa2fbbde9 100644 --- a/TODO +++ b/TODO @@ -8,8 +8,9 @@ ON THE WAY: 7. 增加热编译工具,提高开发环境的开发/测试效率(媲美PHP开发效率); 8. 增加可选择性的orm tag特性,用以数据表记录与struct对象转换的键名属性映射; 9. orm增加更多数据库支持; - - +10. ghttp.Response增加输出内容后自动退出当前请求机制,不需要用户手动return,参考beego如何实现; +11. 当二进制参数为nil时,gjson.LoadContent并将gjson.Json对象ToMap时会报错; +12. 改进控制器及执行对象注册,更友好地支持动态路由注册,例如:注册规则为 /channel/:name,现有的控制器及执行对象注册很难友好支持这种动态形式; DONE: diff --git a/g/g.go b/g/g.go index 961214a69..8f399bca6 100644 --- a/g/g.go +++ b/g/g.go @@ -32,6 +32,12 @@ type Map map[string]interface{} // 常用list数据结构 type List []Map + +// 阻塞等待HTTPServer执行完成(同一进程多HTTPServer情况下) +func Wait() { + ghttp.Wait() +} + // HTTPServer单例对象 func Server(name...interface{}) *ghttp.Server { return ghttp.GetServer(name...) @@ -58,7 +64,6 @@ func Config() *gcfg.Config { return gins.Config() } - // 数据库操作对象,使用了连接池 func Database(name...string) *gdb.Db { config := gins.Config() diff --git a/g/net/ghttp/http_client_request.go b/g/net/ghttp/ghttp_client_request.go similarity index 100% rename from g/net/ghttp/http_client_request.go rename to g/net/ghttp/ghttp_client_request.go diff --git a/g/net/ghttp/http_client_response.go b/g/net/ghttp/ghttp_client_response.go similarity index 100% rename from g/net/ghttp/http_client_response.go rename to g/net/ghttp/ghttp_client_response.go diff --git a/g/net/ghttp/http_controller.go b/g/net/ghttp/ghttp_controller.go similarity index 100% rename from g/net/ghttp/http_controller.go rename to g/net/ghttp/ghttp_controller.go diff --git a/g/net/ghttp/http_func.go b/g/net/ghttp/ghttp_func.go similarity index 100% rename from g/net/ghttp/http_func.go rename to g/net/ghttp/ghttp_func.go diff --git a/g/net/ghttp/http_request.go b/g/net/ghttp/ghttp_request.go similarity index 100% rename from g/net/ghttp/http_request.go rename to g/net/ghttp/ghttp_request.go diff --git a/g/net/ghttp/http_response.go b/g/net/ghttp/ghttp_response.go similarity index 100% rename from g/net/ghttp/http_response.go rename to g/net/ghttp/ghttp_response.go diff --git a/g/net/ghttp/http_server.go b/g/net/ghttp/ghttp_server.go similarity index 51% rename from g/net/ghttp/http_server.go rename to g/net/ghttp/ghttp_server.go index 1315bb00b..84810816a 100644 --- a/g/net/ghttp/http_server.go +++ b/g/net/ghttp/ghttp_server.go @@ -7,12 +7,15 @@ package ghttp import ( + "os" "sync" "errors" "strings" "reflect" + "runtime" "net/http" "gitee.com/johng/gf/g/os/glog" + "gitee.com/johng/gf/g/os/gproc" "gitee.com/johng/gf/g/os/gcache" "gitee.com/johng/gf/g/util/gconv" "gitee.com/johng/gf/g/container/gmap" @@ -21,46 +24,55 @@ import ( ) const ( - gHTTP_METHODS = "GET,PUT,POST,DELETE,PATCH,HEAD,CONNECT,OPTIONS,TRACE" - gDEFAULT_SERVER = "default" - gDEFAULT_DOMAIN = "default" - gDEFAULT_METHOD = "ALL" - gDEFAULT_COOKIE_PATH = "/" // 默认path - gDEFAULT_COOKIE_MAX_AGE = 86400*365 // 默认cookie有效期(一年) - gDEFAULT_SESSION_MAX_AGE = 600 // 默认session有效期(600秒) - gDEFAULT_SESSION_ID_NAME = "gfsessionid" // 默认存放Cookie中的SessionId名称 + gHTTP_METHODS = "GET,PUT,POST,DELETE,PATCH,HEAD,CONNECT,OPTIONS,TRACE" + gDEFAULT_SERVER = "default" + gDEFAULT_DOMAIN = "default" + gDEFAULT_METHOD = "ALL" + gDEFAULT_COOKIE_PATH = "/" // 默认path + gDEFAULT_COOKIE_MAX_AGE = 86400*365 // 默认cookie有效期(一年) + gDEFAULT_SESSION_MAX_AGE = 600 // 默认session有效期(600秒) + gDEFAULT_SESSION_ID_NAME = "gfsessionid" // 默认存放Cookie中的SessionId名称 ) -// http server结构体 +// ghttp.Server结构体 type Server struct { - hmmu sync.RWMutex // handler互斥锁 - hhmu sync.RWMutex // hooks互斥锁 - hsmu sync.RWMutex // status handler互斥锁 - hmcmu sync.RWMutex // handlerCache互斥锁 - hhcmu sync.RWMutex // hooksCache互斥锁 + // 基本属性变量 name string // 服务名称,方便识别 config ServerConfig // 配置对象 status int8 // 当前服务器状态(0:未启动,1:运行中) + servers []*gracefulServer // 底层http.Server列表 methodsMap map[string]bool // 所有支持的HTTP Method(初始化时自动填充) + servedCount *gtype.Int // 已经服务的请求数(4-8字节,不考虑溢出情况),同时作为请求ID + closeQueue *gqueue.Queue // 请求结束的关闭队列(存放的是需要异步关闭处理的*Request对象) + signalQueue chan os.Signal // 终端命令行监听队列 + // 服务注册相关 + hmmu sync.RWMutex // handler互斥锁 + hmcmu sync.RWMutex // handlerCache互斥锁 handlerMap HandlerMap // 所有注册的回调函数(静态匹配) - statusHandlerMap map[string]HandlerFunc // 不同状态码下的注册处理方法(例如404状态时的处理方法) handlerTree map[string]interface{} // 所有注册的回调函数(动态匹配,树型+链表优先级匹配) - hooksTree map[string]interface{} // 所有注册的事件回调函数(动态匹配,树型+链表优先级匹配) handlerCache *gcache.Cache // 服务注册路由内存缓存 + // 事件回调注册 + hhmu sync.RWMutex // hooks互斥锁 + hhcmu sync.RWMutex // hooksCache互斥锁 + hooksTree map[string]interface{} // 所有注册的事件回调函数(动态匹配,树型+链表优先级匹配) hooksCache *gcache.Cache // 回调事件注册路由内存缓存 - servedCount *gtype.Int // 已经服务的请求数(4-8字节,不考虑溢出情况) + // 自定义状态码回调 + hsmu sync.RWMutex // status handler互斥锁 + statusHandlerMap map[string]HandlerFunc // 不同状态码下的注册处理方法(例如404状态时的处理方法) + // COOKIE cookieMaxAge *gtype.Int // Cookie有效期 + cookies *gmap.IntInterfaceMap // 当前服务器正在服务(请求正在执行)的Cookie(每个请求一个Cookie对象) + // SESSION sessionMaxAge *gtype.Int // Session有效期 sessionIdName *gtype.String // SessionId名称 - cookies *gmap.IntInterfaceMap // 当前服务器正在服务(请求正在执行)的Cookie(每个请求一个Cookie对象) sessions *gcache.Cache // Session内存缓存 - closeQueue *gqueue.Queue // 请求结束的关闭队列(存放的是需要异步关闭处理的*Request对象) + // 日志相关属性 logPath *gtype.String // 存放日志的目录路径 + logHandler *gtype.Interface // 自定义日志处理回调方法 errorLogEnabled *gtype.Bool // 是否开启error log accessLogEnabled *gtype.Bool // 是否开启access log accessLogger *glog.Logger // access log日志对象 errorLogger *glog.Logger // error log日志对象 - logHandler *gtype.Interface // 自定义的日志处理回调方法 } // 域名、URI与回调函数的绑定记录表 @@ -85,9 +97,41 @@ type HandlerItem struct { // http注册函数 type HandlerFunc func(r *Request) +// 文件描述符map +type listenerFdMap map[string]string + // Server表,用以存储和检索名称与Server对象之间的关联关系 var serverMapping = gmap.NewStringInterfaceMap() +// Web Server多进程管理器 +var procManager = gproc.NewManager() + +// Web Server开始执行事件通道,由于同一个进程支持多Server,因此该通道为非阻塞 +var readyChan = make(chan struct{}, 100000) +// Web Server已完成服务事件通道,当有事件时表示服务完成,当前进程退出 +var doneChan = make(chan struct{}, 100000) + +// Web Server进程初始化 +func init() { + go func() { + // 等待ready消息(Run方法调用) + <- readyChan + // 主进程只负责创建子进程 + if !gproc.IsChild() { + sendProcessMsg(os.Getpid(), gMSG_START, nil) + } + // 开启进程消息监听处理 + handleProcessMsgAndSignal() + + // 服务执行完成,需要退出 + doneChan <- struct{}{} + + if !gproc.IsChild() { + glog.Printfln("%d: all servers shutdown", gproc.Pid()) + } + }() +} + // 获取/创建一个默认配置的HTTP Server(默认监听端口是80) // 单例模式,请保证name的唯一性 func GetServer(name...interface{}) (*Server) { @@ -100,6 +144,7 @@ func GetServer(name...interface{}) (*Server) { } s := &Server { name : sname, + servers : make([]*gracefulServer, 0), methodsMap : make(map[string]bool), handlerMap : make(HandlerMap), statusHandlerMap : make(map[string]HandlerFunc), @@ -114,6 +159,7 @@ func GetServer(name...interface{}) (*Server) { sessionIdName : gtype.NewString(gDEFAULT_SESSION_ID_NAME), servedCount : gtype.NewInt(), closeQueue : gqueue.New(), + signalQueue : make(chan os.Signal), logPath : gtype.NewString(), accessLogEnabled : gtype.NewBool(), errorLogEnabled : gtype.NewBool(true), @@ -134,74 +180,167 @@ func GetServer(name...interface{}) (*Server) { return s } -// 阻塞执行监听 -func (s *Server) Run() error { +// 作为守护协程异步执行(当同一进程中存在多个Web Server时,需要采用这种方式执行) +// 需要结合Wait方式一起使用 +func (s *Server) Start() error { + // 主进程,不执行任何业务,只负责进程管理 + if !gproc.IsChild() { + return nil + } + if s.status == 1 { return errors.New("server is already running") } - // 底层http server配置 if s.config.Handler == nil { s.config.Handler = http.HandlerFunc(s.defaultHttpHandle) } - - // 开启异步处理队列处理循环 + // 开启异步关闭队列处理循环 s.startCloseQueueLoop() + return nil +} - // 开始执行底层Web Server创建,端口监听 - var wg sync.WaitGroup +// 阻塞执行监听 +func (s *Server) Run() error { + if err := s.Start(); err != nil { + return err + } + // Web Server准备就绪,待执行 + readyChan <- struct{}{} + // 阻塞等待服务执行完成 + <- doneChan + return nil +} + + +// 阻塞等待所有Web Server停止,常用于多Web Server场景,以及需要将Web Server异步运行的场景 +// 这是一个与进程相关的方法 +func Wait() { + readyChan <- struct{}{} + <- doneChan +} + + +// 开启底层Web Server执行 +func (s *Server) startServer(fdMap listenerFdMap) { + var httpsEnabled bool if len(s.config.HTTPSCertPath) > 0 && len(s.config.HTTPSKeyPath) > 0 { + // ================ // HTTPS + // ================ if len(s.config.HTTPSAddr) == 0 { if len(s.config.Addr) > 0 { s.config.HTTPSAddr = s.config.Addr + s.config.Addr = "" } else { s.config.HTTPSAddr = gDEFAULT_HTTPS_ADDR } } - array := strings.Split(s.config.HTTPSAddr, ",") + httpsEnabled = len(s.config.HTTPSAddr) > 0 + var array []string + if v, ok := fdMap["https"]; ok && len(v) > 0 { + array = strings.Split(v, ",") + } else { + array = strings.Split(s.config.HTTPSAddr, ",") + } for _, v := range array { - wg.Add(1) - go func(addr string) { - if err := s.newServer(addr).ListenAndServeTLS(s.config.HTTPSCertPath, s.config.HTTPSKeyPath); err != nil { - glog.Error(err) - wg.Done() + if len(v) == 0 { + continue + } + fd := 0 + addr := v + array := strings.Split(v, "#") + if len(array) > 1 { + addr = array[0] + // windows系统不支持文件描述符传递socket通信平滑交接,因此只能完整重启 + if runtime.GOOS != "windows" { + fd = gconv.Int(array[1]) } - }(v) + } + if fd > 0 { + s.servers = append(s.servers, s.newGracefulServer(addr, fd)) + } else { + s.servers = append(s.servers, s.newGracefulServer(addr)) + } + s.servers[len(s.servers) - 1].isHttps = true } } + // ================ // HTTP - if s.servedCount.Val() == 0 && len(s.config.Addr) == 0 { + // ================ + // 当HTTPS服务未启用时,默认HTTP地址才会生效 + if !httpsEnabled && len(s.config.Addr) == 0 { s.config.Addr = gDEFAULT_HTTP_ADDR } - array := strings.Split(s.config.Addr, ",") + var array []string + if v, ok := fdMap["http"]; ok && len(v) > 0 { + array = strings.Split(v, ",") + } else { + array = strings.Split(s.config.Addr, ",") + } for _, v := range array { - wg.Add(1) - go func(addr string) { - if err := s.newServer(addr).ListenAndServe(); err != nil { + if len(v) == 0 { + continue + } + fd := 0 + addr := v + array := strings.Split(v, "#") + if len(array) > 1 { + addr = array[0] + // windows系统不支持文件描述符传递socket通信平滑交接,因此只能完整重启 + if runtime.GOOS != "windows" { + fd = gconv.Int(array[1]) + } + } + if fd > 0 { + s.servers = append(s.servers, s.newGracefulServer(addr, fd)) + } else { + s.servers = append(s.servers, s.newGracefulServer(addr)) + } + } + // 开始执行异步监听 + for _, v := range s.servers { + go func(server *gracefulServer) { + var err error + if server.isHttps { + err = server.ListenAndServeTLS(s.config.HTTPSCertPath, s.config.HTTPSKeyPath) + } else { + err = server.ListenAndServe() + } + // 如果非关闭错误,那么提示报错,否则认为是正常的服务关闭操作 + if err != nil && !strings.EqualFold(http.ErrServerClosed.Error(), err.Error()) { glog.Error(err) - wg.Done() } }(v) } s.status = 1 - - // 阻塞执行,直到所有Web Server退出 - wg.Wait() - return nil } -// 生成一个底层的Web Server对象 -func (s *Server) newServer(addr string) *http.Server { - return &http.Server { - Addr : addr, - Handler : s.config.Handler, - ReadTimeout : s.config.ReadTimeout, - WriteTimeout : s.config.WriteTimeout, - IdleTimeout : s.config.IdleTimeout, - MaxHeaderBytes : s.config.MaxHeaderBytes, +// 获取当前监听的文件描述符信息,构造成map返回 +func (s *Server) getListenerFdMap() map[string]string { + m := map[string]string { + "https" : "", + "http" : "", } + // s.servers是从HTTPS到HTTP优先级遍历,解析的时候也应当按照这个顺序读取fd + for _, v := range s.servers { + str := v.addr + "#" + gconv.String(v.Fd()) + "," + if v.isHttps { + m["https"] += str + } else { + m["http"] += str + } + } + // 去掉末尾的","号 + if len(m["https"]) > 0 { + m["https"] = m["https"][0 : len(m["https"]) - 1] + } + if len(m["http"]) > 0 { + m["http"] = m["http"][0 : len(m["http"]) - 1] + } + + return m } // 清空当前的handlerCache diff --git a/g/net/ghttp/ghttp_server_admin.go b/g/net/ghttp/ghttp_server_admin.go new file mode 100644 index 000000000..fbee04ea1 --- /dev/null +++ b/g/net/ghttp/ghttp_server_admin.go @@ -0,0 +1,173 @@ +// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. +// pprof封装. + +package ghttp + +import ( + "strings" + "gitee.com/johng/gf/g/os/gview" + "runtime" + "gitee.com/johng/gf/g/os/gproc" + "sync" + "gitee.com/johng/gf/g/os/gtime" + "errors" + "fmt" + "gitee.com/johng/gf/g/container/gtype" + "gitee.com/johng/gf/g/os/glog" +) + +const ( + gADMIN_ACTION_INTERVAL_LIMIT = 3000 // (毫秒)服务开启后允许执行管理操作的间隔限制 +) + +// 用于服务管理的对象 +type utilAdmin struct {} + +// (进程级别)用于Web Server管理操作的互斥锁,保证管理操作的原子性 +var serverActionLocker sync.Mutex + +// (进程级别)用于记录上一次操作的时间(毫秒) +var serverActionLastTime = gtype.NewInt64(gtime.Millisecond()) + +// 当前服务进程所处的互斥管理操作状态 +// 1 : reload +// 2 : restart +// 4 : shutdown +var serverProcessStatus = gtype.NewInt() + +// 服务管理首页 +func (p *utilAdmin) Index(r *Request) { + data := map[string]interface{}{ + "uri" : strings.TrimRight(r.URL.Path, "/"), + } + buffer, _ := gview.ParseContent(` + + + gf ghttp admin + + +

reload

+

restart

+

shutdown

+ + + `, data) + r.Response.Write(buffer) +} + +// 服务热重启 +func (p *utilAdmin) Reload(r *Request) { + if runtime.GOOS == "windows" { + p.Restart(r) + } else { + if err := r.Server.Reload(); err == nil { + r.Response.Write("server reloaded") + } else { + r.Response.Write(err.Error()) + } + } +} + +// 服务完整重启 +func (p *utilAdmin) Restart(r *Request) { + if err := r.Server.Restart(); err == nil { + r.Response.Write("server restarted") + } else { + r.Response.Write(err.Error()) + } +} + +// 服务关闭 +func (p *utilAdmin) Shutdown(r *Request) { + r.Server.Shutdown() + if err := r.Server.Shutdown(); err == nil { + r.Response.Write("server shutdown") + } else { + r.Response.Write(err.Error()) + } +} + + +// 开启服务管理支持 +func (s *Server) EnableAdmin(pattern...string) { + p := "/debug/admin" + if len(pattern) > 0 { + p = pattern[0] + } + s.BindObject(p, &utilAdmin{}) +} + +// 平滑重启Web Server +func (s *Server) Reload() error { + serverActionLocker.Lock() + defer serverActionLocker.Unlock() + if err := s.checkActionStatus(); err != nil { + return err + } + if err := s.checkActionFrequence(); err != nil { + return err + } + glog.Printfln("%d: server reloading", gproc.Pid()) + sendProcessMsg(gproc.Pid(), gMSG_RELOAD, nil) + return nil +} + +// 完整重启Web Server +func (s *Server) Restart() error { + serverActionLocker.Lock() + defer serverActionLocker.Unlock() + if err := s.checkActionStatus(); err != nil { + return err + } + if err := s.checkActionFrequence(); err != nil { + return err + } + glog.Printfln("%d: server restarting", gproc.Pid()) + sendProcessMsg(gproc.Pid(), gMSG_RESTART, nil) + return nil +} + +// 关闭Web Server +func (s *Server) Shutdown() error { + serverActionLocker.Lock() + defer serverActionLocker.Unlock() + if err := s.checkActionStatus(); err != nil { + return err + } + if err := s.checkActionFrequence(); err != nil { + return err + } + glog.Printfln("%d: server shutting down", gproc.Pid()) + sendProcessMsg(gproc.PPid(), gMSG_SHUTDOWN, nil) + return nil +} + +// 检测当前操作的频繁度 +func (s *Server) checkActionFrequence() error { + interval := gtime.Millisecond() - serverActionLastTime.Val() + if interval < gADMIN_ACTION_INTERVAL_LIMIT { + return errors.New(fmt.Sprintf("too frequent action, please retry in %d ms", gADMIN_ACTION_INTERVAL_LIMIT - interval)) + } + serverActionLastTime.Set(gtime.Millisecond()) + return nil +} + +// 检查当前服务进程的状态 +func (s *Server) checkActionStatus() error { + status := serverProcessStatus.Val() + if status > 0 { + switch status { + case 1: + return errors.New("server is reloading") + case 2: + return errors.New("server is restarting") + case 4: + return errors.New("server is shutting down") + } + } + return nil +} \ No newline at end of file diff --git a/g/net/ghttp/ghttp_server_comm.go b/g/net/ghttp/ghttp_server_comm.go new file mode 100644 index 000000000..b8b6f292a --- /dev/null +++ b/g/net/ghttp/ghttp_server_comm.go @@ -0,0 +1,166 @@ +// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. +// Web Server进程间通信 + +package ghttp + +import ( + "os" + "gitee.com/johng/gf/g/os/gproc" + "gitee.com/johng/gf/g/util/gconv" + "gitee.com/johng/gf/g/encoding/gjson" + "gitee.com/johng/gf/g/container/gtype" + "gitee.com/johng/gf/g/encoding/gbinary" + "gitee.com/johng/gf/g/os/gtime" +) + +const ( + gMSG_START = 1 + gMSG_RELOAD = 2 + gMSG_RESTART = 3 + gMSG_SHUTDOWN = 4 + gMSG_CLOSE = 5 + gMSG_NEW_FORK = 6 + gMSG_HEARTBEAT = 7 + + gPROC_HEARTBEAT_INTERVAL = 1000 // (毫秒)进程间心跳间隔 + gPROC_HEARTBEAT_TIMEOUT = 3000 // (毫秒)进程间心跳超时时间,如果子进程在这段内没有接收到任何心跳,那么自动退出,防止可能出现的僵尸子进程 +) + +// 进程信号量监听消息队列 +var procSignalChan = make(chan os.Signal) + +// 上一次进程间心跳的时间戳 +var lastUpdateTime = gtype.NewInt(int(gtime.Millisecond())) + +// (主子进程)在第一次创建子进程成功之后才会开始心跳检测,同理对应超时时间才会生效 +var checkHeartbeat = gtype.NewBool() + +// 处理进程信号量监控以及进程间消息通信 +func handleProcessMsgAndSignal() { + go handleProcessSignal() + if gproc.IsChild() { + go handleChildProcessHeartbeat() + } else { + go handleMainProcessHeartbeat() + } + handleProcessMsg() +} + +// 处理进程间消息 +// 数据格式: 操作(8bit) | 参数(变长) +func handleProcessMsg() { + for { + if msg := gproc.Receive(); msg != nil { + // 记录消息日志,用于调试 + //content := gconv.String(msg.Pid) + "=>" + gconv.String(gproc.Pid()) + ":" + fmt.Sprintf("%v\n", msg.Data) + //glog.Print(content) + //gfile.PutContentsAppend("/tmp/gproc-log", content) + act := gbinary.DecodeToUint(msg.Data[0 : 1]) + data := msg.Data[1 : ] + if msg.Pid != gproc.Pid() { + updateProcessUpdateTime() + } + if gproc.IsChild() { + // =============== + // 子进程 + // =============== + switch act { + case gMSG_START: onCommChildStart(msg.Pid, data) + case gMSG_RELOAD: onCommChildReload(msg.Pid, data) + case gMSG_RESTART: onCommChildRestart(msg.Pid, data) + case gMSG_CLOSE: onCommChildClose(msg.Pid, data) + case gMSG_HEARTBEAT: onCommChildHeartbeat(msg.Pid, data) + case gMSG_SHUTDOWN: onCommChildShutdown(msg.Pid, data) + } + } else { + // =============== + // 父进程 + // =============== + // 任何进程消息都会自动更新最后通信时间记录 + if msg.Pid != gproc.Pid() { + updateProcessCommTime(msg.Pid) + } + switch act { + case gMSG_START: onCommMainStart(msg.Pid, data) + case gMSG_RELOAD: onCommMainReload(msg.Pid, data) + case gMSG_RESTART: onCommMainRestart(msg.Pid, data) + case gMSG_NEW_FORK: onCommMainNewFork(msg.Pid, data) + case gMSG_HEARTBEAT: onCommMainHeartbeat(msg.Pid, data) + case gMSG_SHUTDOWN: + onCommMainShutdown(msg.Pid, data) + return + } + } + } + } +} + +// 向进程发送操作消息 +func sendProcessMsg(pid int, act int, data []byte) error { + return gproc.Send(pid, formatMsgBuffer(act, data)) +} + +// 生成一条满足Web Server进程通信协议的消息 +func formatMsgBuffer(act int, data []byte) []byte { + return append(gbinary.EncodeUint8(uint8(act)), data...) +} + +// 获取所有Web Server的文件描述符map +func getServerFdMap() map[string]listenerFdMap { + sfm := make(map[string]listenerFdMap) + serverMapping.RLockFunc(func(m map[string]interface{}) { + for k, v := range m { + sfm[k] = v.(*Server).getListenerFdMap() + } + }) + return sfm +} + +// 二进制转换为FdMap +func bufferToServerFdMap(buffer []byte) map[string]listenerFdMap { + sfm := make(map[string]listenerFdMap) + if len(buffer) > 0 { + j, _ := gjson.LoadContent(buffer, "json") + for k, _ := range j.ToMap() { + m := make(map[string]string) + for k, v := range j.GetMap(k) { + m[k] = gconv.String(v) + } + sfm[k] = m + } + } + return sfm +} + +// 关优雅闭进程所有端口的Web Server服务 +// 注意,只是关闭Web Server服务,并不是退出进程 +func shutdownWebServers() { + serverMapping.RLockFunc(func(m map[string]interface{}) { + for _, v := range m { + for _, s := range v.(*Server).servers { + s.shutdown() + } + } + }) +} + +// 强制关闭进程所有端口的Web Server服务 +// 注意,只是关闭Web Server服务,并不是退出进程 +func closeWebServers() { + serverMapping.RLockFunc(func(m map[string]interface{}) { + for _, v := range m { + for _, s := range v.(*Server).servers { + s.close() + } + } + }) +} + +// 更新上一次进程间通信的时间 +func updateProcessUpdateTime() { + lastUpdateTime.Set(int(gtime.Millisecond())) +} \ No newline at end of file diff --git a/g/net/ghttp/ghttp_server_comm_child.go b/g/net/ghttp/ghttp_server_comm_child.go new file mode 100644 index 000000000..bd60638ce --- /dev/null +++ b/g/net/ghttp/ghttp_server_comm_child.go @@ -0,0 +1,109 @@ +// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. +// Web Server进程间通信 - 子进程 + +package ghttp + +import ( + "os" + "fmt" + "time" + "strings" + "runtime" + "gitee.com/johng/gf/g/os/glog" + "gitee.com/johng/gf/g/os/gproc" + "gitee.com/johng/gf/g/os/gtime" + "gitee.com/johng/gf/g/util/gconv" + "gitee.com/johng/gf/g/encoding/gjson" +) + +const ( + gPROC_CHILD_MAX_IDLE_TIME = 10000 // 子进程闲置时间(未开启心跳机制的时间) +) + +// 心跳处理(方法为空,逻辑放到公共通信switch中进行处理) +func onCommChildHeartbeat(pid int, data []byte) { + +} + +// 平滑重启,子进程收到重启消息,那么将自身的ServerFdMap信息收集后发送给主进程,由主进程进行统一调度 +func onCommChildReload(pid int, data []byte) { + var buffer []byte = nil + p := procManager.NewProcess(os.Args[0], os.Args, os.Environ()) + // windows系统无法进行文件描述符操作,只能重启进程 + if runtime.GOOS == "windows" { + // windows下使用shutdown会造成协程阻塞,这里直接使用close强制关闭 + closeWebServers() + } else { + // 创建新的服务进程,子进程自动从父进程复制文件描述来监听同样的端口 + sfm := getServerFdMap() + // 将sfm中的fd按照子进程创建时的文件描述符顺序进行整理,以便子进程获取到正确的fd + for name, m := range sfm { + for fdk, fdv := range m { + if len(fdv) > 0 { + s := "" + for _, item := range strings.Split(fdv, ",") { + array := strings.Split(item, "#") + fd := uintptr(gconv.Uint(array[1])) + if fd > 0 { + s += fmt.Sprintf("%s#%d,", array[0], 3 + len(p.ExtraFiles)) + p.ExtraFiles = append(p.ExtraFiles, os.NewFile(fd, "")) + } else { + s += fmt.Sprintf("%s#%d,", array[0], 0) + } + } + sfm[name][fdk] = strings.TrimRight(s, ",") + } + } + } + buffer, _ = gjson.Encode(sfm) + } + p.PPid = gproc.PPid() + if newPid, err := p.Start(); err == nil { + sendProcessMsg(newPid, gMSG_START, buffer) + } else { + glog.Errorfln("%d: fork process failed, error:%s, %s", gproc.Pid(), err.Error(), string(buffer)) + } +} + +// 完整重启 +func onCommChildRestart(pid int, data []byte) { + sendProcessMsg(gproc.PPid(), gMSG_RESTART, nil) +} + +// 优雅关闭服务链接并退出 +func onCommChildShutdown(pid int, data []byte) { + if runtime.GOOS != "windows" { + shutdownWebServers() + } + os.Exit(0) +} + +// 强制性关闭服务链接并退出 +func onCommChildClose(pid int, data []byte) { + closeWebServers() + os.Exit(0) +} + +// 主进程与子进程相互异步方式发送心跳信息,保持活跃状态 +func handleChildProcessHeartbeat() { + for { + time.Sleep(gPROC_HEARTBEAT_INTERVAL*time.Millisecond) + sendProcessMsg(gproc.PPid(), gMSG_HEARTBEAT, nil) + // 超过时间没有接收到主进程心跳,自动关闭退出 + if checkHeartbeat.Val() && (int(gtime.Millisecond()) - lastUpdateTime.Val() > gPROC_HEARTBEAT_TIMEOUT) { + // 子进程有时会无法退出(僵尸?),这里直接使用exit,而不是return + glog.Printfln("%d: %d - %d > %d", gproc.Pid(), int(gtime.Millisecond()), lastUpdateTime.Val(), gPROC_HEARTBEAT_TIMEOUT) + glog.Printfln("%d: heartbeat timeout[%dms], exit", gproc.Pid(), gPROC_HEARTBEAT_TIMEOUT) + os.Exit(0) + } + // 未开启心跳检测的闲置超过一定时间则主动关闭 + if !checkHeartbeat.Val() && gproc.Uptime() > gPROC_CHILD_MAX_IDLE_TIME { + glog.Printfln("%d: idle timeout[%dms], exit", gproc.Pid(), gPROC_CHILD_MAX_IDLE_TIME) + os.Exit(0) + } + } +} \ No newline at end of file diff --git a/g/net/ghttp/ghttp_server_comm_child_unix.go b/g/net/ghttp/ghttp_server_comm_child_unix.go new file mode 100644 index 000000000..7d8d84988 --- /dev/null +++ b/g/net/ghttp/ghttp_server_comm_child_unix.go @@ -0,0 +1,43 @@ +// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. +// Web Server进程间通信 - 子进程 + +// +build !windows + +package ghttp + +import ( + "os" + "gitee.com/johng/gf/g/os/gproc" +) + +// 开启所有Web Server(根据消息启动) +func onCommChildStart(pid int, data []byte) { + if len(data) > 0 { + sfm := bufferToServerFdMap(data) + for k, v := range sfm { + GetServer(k).startServer(v) + } + } else { + serverMapping.RLockFunc(func(m map[string]interface{}) { + for _, v := range m { + v.(*Server).startServer(nil) + } + }) + } + // 进程创建成功之后(开始执行服务时间点为准),通知主进程自身的存在,并开始执行心跳机制 + sendProcessMsg(gproc.PPid(), gMSG_NEW_FORK, nil) + // 如果创建自己的父进程非gproc父进程,那么表示该进程为重启创建的进程,创建成功之后需要通知父进程自行销毁 + if gproc.PPidOS() != gproc.PPid() { + //如果子进程已经继承了父进程的socket文件描述符,那么父进程没有存在的必要,直接kill掉 + if p, err := os.FindProcess(gproc.PPidOS()); err == nil { + p.Kill() + } + } + // 开始心跳时必须保证主进程时间有值,但是又不能等待主进程消息后再开始检测,因此这里自己更新一下通信时间 + updateProcessUpdateTime() + checkHeartbeat.Set(true) +} \ No newline at end of file diff --git a/g/net/ghttp/ghttp_server_comm_child_windows.go b/g/net/ghttp/ghttp_server_comm_child_windows.go new file mode 100644 index 000000000..a28027441 --- /dev/null +++ b/g/net/ghttp/ghttp_server_comm_child_windows.go @@ -0,0 +1,26 @@ +// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +package ghttp + +import ( + "gitee.com/johng/gf/g/os/gproc" +) + +// 开启所有Web Server(根据消息启动) +func onCommChildStart(pid int, data []byte) { + // 进程创建成功之后(开始执行服务时间点为准),通知主进程自身的存在,并开始执行心跳机制 + sendProcessMsg(gproc.PPid(), gMSG_NEW_FORK, nil) + // 开启Web Server服务 + serverMapping.RLockFunc(func(m map[string]interface{}) { + for _, v := range m { + v.(*Server).startServer(nil) + } + }) + // 开始心跳时必须保证主进程时间有值,但是又不能等待主进程消息后再开始检测,因此这里自己更新一下通信时间 + updateProcessUpdateTime() + checkHeartbeat.Set(true) +} \ No newline at end of file diff --git a/g/net/ghttp/ghttp_server_comm_main.go b/g/net/ghttp/ghttp_server_comm_main.go new file mode 100644 index 000000000..7b9b1e4a6 --- /dev/null +++ b/g/net/ghttp/ghttp_server_comm_main.go @@ -0,0 +1,119 @@ +// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. +// Web Server进程间通信 - 主进程. +// 管理子进程按照规则听话玩,不然有一百种方法让子进程在本地混不下去. + +package ghttp + +import ( + "os" + "time" + "gitee.com/johng/gf/g/os/glog" + "gitee.com/johng/gf/g/os/gtime" + "gitee.com/johng/gf/g/os/gproc" + "gitee.com/johng/gf/g/container/gmap" +) + +// (主进程)主进程与子进程上一次活跃时间映射map +var procUpdateTimeMap = gmap.NewIntIntMap() + +// 开启服务 +func onCommMainStart(pid int, data []byte) { + fork := forkNewProcess() + if fork == nil { + os.Exit(1) + } + updateProcessCommTime(fork.Pid()) + // 子进程创建成功之后再发送执行命令 + sendProcessMsg(fork.Pid(), gMSG_START, nil) +} + +// 心跳处理(方法为空,逻辑放到公共通信switch中进行处理) +func onCommMainHeartbeat(pid int, data []byte) { + +} + +// 平滑重启服务 +func onCommMainReload(pid int, data []byte) { + procManager.Send(formatMsgBuffer(gMSG_RELOAD, nil)) +} + +// 完整重启服务 +func onCommMainRestart(pid int, data []byte) { + // 如果是父进程自身发送的重启指令,那么通知所有子进程重启 + if pid == gproc.Pid() { + procManager.Send(formatMsgBuffer(gMSG_RESTART, nil)) + return + } + // 首先创建子进程,暂时不开始服务,否则会有端口冲突 + fork := forkNewProcess() + if fork == nil { + os.Exit(1) + } + // 然后通知旧的子进程自动关闭并退出(不需要新建的子进程来处理) + sendProcessMsg(pid, gMSG_CLOSE, nil) + if p, err := os.FindProcess(pid); err == nil && p != nil { + p.Kill() + p.Wait() + } + // 通知新的子进程执行服务监听 + sendProcessMsg(fork.Pid(), gMSG_START, nil) +} + +// 新建子进程通知 +func onCommMainNewFork(pid int, data []byte) { + procManager.AddProcess(pid) + checkHeartbeat.Set(true) +} + +// 关闭服务,通知所有子进程退出(Kill强制性退出) +func onCommMainShutdown(pid int, data []byte) { + procManager.Send(formatMsgBuffer(gMSG_CLOSE, nil)) + procManager.KillAll() + procManager.WaitAll() +} + +// 更新指定进程的通信时间记录 +func updateProcessCommTime(pid int) { + procUpdateTimeMap.Set(pid, int(gtime.Millisecond())) +} + +// 创建一个子进程,但是暂时不执行服务监听 +func forkNewProcess() *gproc.Process { + p := procManager.NewProcess(os.Args[0], os.Args, os.Environ()) + if _, err := p.Start(); err != nil { + glog.Errorfln("%d: fork new process error:%s", gproc.Pid(), err.Error()) + return nil + } + return p +} + +// 主进程与子进程相互异步方式发送心跳信息,保持活跃状态 +func handleMainProcessHeartbeat() { + for { + time.Sleep(gPROC_HEARTBEAT_INTERVAL*time.Millisecond) + procManager.Send(formatMsgBuffer(gMSG_HEARTBEAT, nil)) + // 清理过期进程 + if checkHeartbeat.Val() { + for _, pid := range procManager.Pids() { + updatetime := procUpdateTimeMap.Get(pid) + if updatetime > 0 && int(gtime.Millisecond()) - updatetime > gPROC_HEARTBEAT_TIMEOUT { + //fmt.Println("remove pid", pid, int(gtime.Millisecond()), updatetime) + // 这里需要手动从进程管理器中去掉该进程 + procManager.RemoveProcess(pid) + sendProcessMsg(pid, gMSG_CLOSE, nil) + } + } + + // (双保险)如果所有子进程都退出,并且主进程未活动达到超时时间,那么主进程也没存在的必要 + if procManager.Size() == 0 && int(gtime.Millisecond()) - lastUpdateTime.Val() > gPROC_HEARTBEAT_TIMEOUT{ + glog.Printfln("%d: all children died, exit", gproc.Pid()) + os.Exit(0) + } + } + + } +} diff --git a/g/net/ghttp/ghttp_server_comm_signal_unix.go b/g/net/ghttp/ghttp_server_comm_signal_unix.go new file mode 100644 index 000000000..0c8316419 --- /dev/null +++ b/g/net/ghttp/ghttp_server_comm_signal_unix.go @@ -0,0 +1,53 @@ +// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +// +build !windows + +package ghttp + +import ( + "os" + "syscall" + "os/signal" + "gitee.com/johng/gf/g/os/gproc" +) + +// 信号量处理 +func handleProcessSignal() { + var sig os.Signal + signal.Notify( + procSignalChan, + syscall.SIGINT, + syscall.SIGQUIT, + syscall.SIGKILL, + syscall.SIGHUP, + syscall.SIGTERM, + syscall.SIGUSR1, + syscall.SIGUSR2, + ) + for { + sig = <- procSignalChan + switch sig { + // 进程终止,停止所有子进程运行 + case syscall.SIGINT, syscall.SIGQUIT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGTERM: + sendProcessMsg(gproc.Pid(), gMSG_SHUTDOWN, nil) + if gproc.IsChild() { + sendProcessMsg(gproc.PPid(), gMSG_SHUTDOWN, nil) + } + return + + // 用户信号,热重启服务 + case syscall.SIGUSR1: + sendProcessMsg(gproc.Pid(), gMSG_RELOAD, nil) + + // 用户信号,完整重启服务 + case syscall.SIGUSR2: + sendProcessMsg(gproc.Pid(), gMSG_RESTART, nil) + + default: + } + } +} \ No newline at end of file diff --git a/g/net/ghttp/ghttp_server_comm_signal_windows.go b/g/net/ghttp/ghttp_server_comm_signal_windows.go new file mode 100644 index 000000000..148375961 --- /dev/null +++ b/g/net/ghttp/ghttp_server_comm_signal_windows.go @@ -0,0 +1,12 @@ +// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +package ghttp + +// windows不处理信号量 +func handleProcessSignal() { + +} \ No newline at end of file diff --git a/g/net/ghttp/http_server_config.go b/g/net/ghttp/ghttp_server_config.go similarity index 100% rename from g/net/ghttp/http_server_config.go rename to g/net/ghttp/ghttp_server_config.go diff --git a/g/net/ghttp/http_server_cookie.go b/g/net/ghttp/ghttp_server_cookie.go similarity index 100% rename from g/net/ghttp/http_server_cookie.go rename to g/net/ghttp/ghttp_server_cookie.go diff --git a/g/net/ghttp/http_server_domain.go b/g/net/ghttp/ghttp_server_domain.go similarity index 100% rename from g/net/ghttp/http_server_domain.go rename to g/net/ghttp/ghttp_server_domain.go diff --git a/g/net/ghttp/ghttp_server_graceful.go b/g/net/ghttp/ghttp_server_graceful.go new file mode 100644 index 000000000..13a17bdf2 --- /dev/null +++ b/g/net/ghttp/ghttp_server_graceful.go @@ -0,0 +1,172 @@ +// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +package ghttp + +import ( + "os" + "fmt" + "net" + "context" + "net/http" + "crypto/tls" + "gitee.com/johng/gf/g/os/glog" + "gitee.com/johng/gf/g/os/gproc" +) + +// 优雅的Web Server对象封装 +type gracefulServer struct { + fd uintptr + addr string + httpServer *http.Server + rawListener net.Listener // 原始listener + listener net.Listener // 接口化封装的listener + isHttps bool + shutdownChan chan bool +} + +// 创建一个优雅的Http Server +func (s *Server) newGracefulServer(addr string, fd...int) *gracefulServer { + gs := &gracefulServer { + addr : addr, + httpServer : s.newHttpServer(addr), + shutdownChan : make(chan bool), + } + // 是否有继承的文件描述符 + if len(fd) > 0 && fd[0] > 0 { + gs.fd = uintptr(fd[0]) + } + return gs +} + +// 生成一个底层的Web Server对象 +func (s *Server) newHttpServer(addr string) *http.Server { + return &http.Server { + Addr : addr, + Handler : s.config.Handler, + ReadTimeout : s.config.ReadTimeout, + WriteTimeout : s.config.WriteTimeout, + IdleTimeout : s.config.IdleTimeout, + MaxHeaderBytes : s.config.MaxHeaderBytes, + } +} + +// 执行HTTP监听 +func (s *gracefulServer) ListenAndServe() error { + addr := s.httpServer.Addr + ln, err := s.getNetListener(addr) + if err != nil { + return err + } + s.listener = ln + s.rawListener = ln + return s.doServe() +} + +// 获得文件描述符 +func (s *gracefulServer) Fd() uintptr { + if s.rawListener != nil { + file, err := s.rawListener.(*net.TCPListener).File() + if err == nil { + return file.Fd() + } + } + return 0 +} + +// 设置自定义fd +func (s *gracefulServer) setFd(fd int) { + s.fd = uintptr(fd) +} + +// 执行HTTPS监听 +func (s *gracefulServer) ListenAndServeTLS(certFile, keyFile string) error { + addr := s.httpServer.Addr + config := &tls.Config{} + if s.httpServer.TLSConfig != nil { + *config = *s.httpServer.TLSConfig + } + if config.NextProtos == nil { + config.NextProtos = []string{"http/1.1"} + } + var err error + config.Certificates = make([]tls.Certificate, 1) + config.Certificates[0], err = tls.LoadX509KeyPair(certFile, keyFile) + if err != nil { + return err + } + ln, err := s.getNetListener(addr) + if err != nil { + return err + } + + s.listener = tls.NewListener(ln, config) + s.rawListener = ln + return s.doServe() +} + +// 获取服务协议字符串 +func (s *gracefulServer) getProto() string { + proto := "http" + if s.isHttps { + proto = "https" + } + return proto +} + +// 开始执行Web Server服务处理 +func (s *gracefulServer) doServe() error { + action := "started" + if s.fd != 0 { + action = "reloaded" + } + glog.Printfln("%d: %s server %s listening on [%s]", gproc.Pid(), s.getProto(), action, s.addr) + err := s.httpServer.Serve(s.listener) + <-s.shutdownChan + return err +} + +// 自定义的net.Listener +func (s *gracefulServer) getNetListener(addr string) (net.Listener, error) { + var ln net.Listener + var err error + if s.fd > 0 { + f := os.NewFile(s.fd, "") + ln, err = net.FileListener(f) + if err != nil { + err = fmt.Errorf("%d: net.FileListener error: %v", gproc.Pid(), err) + return nil, err + } + } else { + ln, err = net.Listen("tcp", addr) + if err != nil { + err = fmt.Errorf("%d: net.Listen error: %v", gproc.Pid(), err) + return nil, err + } + } + return ln, nil +} + +// 执行请求优雅关闭 +func (s *gracefulServer) shutdown() { + if err := s.httpServer.Shutdown(context.Background()); err != nil { + glog.Errorfln("%d: %s server [%s] shutdown error: %v", gproc.Pid(), s.getProto(), s.addr, err) + } else { + //glog.Printfln("%d: %s server [%s] shutdown smoothly", gproc.Pid(), s.getProto(), s.addr) + s.shutdownChan <- true + } +} + +// 执行请求强制关闭 +func (s *gracefulServer) close() { + if err := s.httpServer.Close(); err != nil { + glog.Errorfln("%d: %s server [%s] closed error: %v", gproc.Pid(), s.getProto(), s.addr, err) + } else { + //glog.Printfln("%d: %s server [%s] closed smoothly", gproc.Pid(), s.getProto(), s.addr) + s.shutdownChan <- true + } +} + diff --git a/g/net/ghttp/http_server_handler.go b/g/net/ghttp/ghttp_server_handler.go similarity index 90% rename from g/net/ghttp/http_server_handler.go rename to g/net/ghttp/ghttp_server_handler.go index 9550fc9cb..1d10478cc 100644 --- a/g/net/ghttp/http_server_handler.go +++ b/g/net/ghttp/ghttp_server_handler.go @@ -16,8 +16,8 @@ import ( "net/url" "net/http" "gitee.com/johng/gf/g/os/gfile" - "gitee.com/johng/gf/g/encoding/ghtml" "gitee.com/johng/gf/g/os/gtime" + "gitee.com/johng/gf/g/encoding/ghtml" ) // 默认HTTP Server处理入口,http包底层默认使用了gorutine异步处理请求,所以这里不再异步执行 @@ -169,3 +169,20 @@ func (s *Server)listDir(r *Request, f http.File) { } r.Response.Write("\n") } + +// 开启异步队列处理循环,该异步线程与Server同生命周期 +func (s *Server) startCloseQueueLoop() { + go func() { + for { + if v := s.closeQueue.PopFront(); v != nil { + r := v.(*Request) + s.callHookHandler(r, "BeforeClose") + // 关闭当前会话的Cookie + r.Cookie.Close() + // 更新Session会话超时时间 + r.Session.UpdateExpire() + s.callHookHandler(r, "AfterClose") + } + } + }() +} \ No newline at end of file diff --git a/g/net/ghttp/http_server_hooks.go b/g/net/ghttp/ghttp_server_hooks.go similarity index 100% rename from g/net/ghttp/http_server_hooks.go rename to g/net/ghttp/ghttp_server_hooks.go diff --git a/g/net/ghttp/http_server_log.go b/g/net/ghttp/ghttp_server_log.go similarity index 100% rename from g/net/ghttp/http_server_log.go rename to g/net/ghttp/ghttp_server_log.go diff --git a/g/net/ghttp/http_server_options.go b/g/net/ghttp/ghttp_server_options.go similarity index 100% rename from g/net/ghttp/http_server_options.go rename to g/net/ghttp/ghttp_server_options.go diff --git a/g/net/ghttp/http_server_pprof.go b/g/net/ghttp/ghttp_server_pprof.go similarity index 84% rename from g/net/ghttp/http_server_pprof.go rename to g/net/ghttp/ghttp_server_pprof.go index b5043394e..0205a0b3a 100644 --- a/g/net/ghttp/http_server_pprof.go +++ b/g/net/ghttp/ghttp_server_pprof.go @@ -11,13 +11,13 @@ import ( "strings" runpprof "runtime/pprof" netpprof "net/http/pprof" - "gitee.com/johng/gf/g/frame/gins" + "gitee.com/johng/gf/g/os/gview" ) // 用于pprof的对象 -type utilpprof struct {} +type utilPprof struct {} -func (p *utilpprof) Index(r *Request) { +func (p *utilPprof) Index(r *Request) { profiles := runpprof.Profiles() action := r.Get("action") data := map[string]interface{}{ @@ -25,8 +25,7 @@ func (p *utilpprof) Index(r *Request) { "profiles" : profiles, } if len(action) == 0 { - view := gins.View() - buffer, _ := view.ParseContent(` + buffer, _ := gview.ParseContent(` gf ghttp pprof @@ -52,19 +51,19 @@ func (p *utilpprof) Index(r *Request) { } } -func (p *utilpprof) Cmdline(r *Request) { +func (p *utilPprof) Cmdline(r *Request) { netpprof.Cmdline(r.Response.Writer, &r.Request) } -func (p *utilpprof) Profile(r *Request) { +func (p *utilPprof) Profile(r *Request) { netpprof.Profile(r.Response.Writer, &r.Request) } -func (p *utilpprof) Symbol(r *Request) { +func (p *utilPprof) Symbol(r *Request) { netpprof.Symbol(r.Response.Writer, &r.Request) } -func (p *utilpprof) Trace(r *Request) { +func (p *utilPprof) Trace(r *Request) { netpprof.Trace(r.Response.Writer, &r.Request) } @@ -74,7 +73,7 @@ func (s *Server) EnablePprof(pattern...string) { if len(pattern) > 0 { p = pattern[0] } - up := &utilpprof{} + up := &utilPprof{} _, _, uri, _ := s.parsePattern(p) uri = strings.TrimRight(uri, "/") s.BindHandler(uri + "/*action", up.Index) diff --git a/g/net/ghttp/http_server_router.go b/g/net/ghttp/ghttp_server_router.go similarity index 100% rename from g/net/ghttp/http_server_router.go rename to g/net/ghttp/ghttp_server_router.go diff --git a/g/net/ghttp/http_server_service.go b/g/net/ghttp/ghttp_server_service.go similarity index 90% rename from g/net/ghttp/http_server_service.go rename to g/net/ghttp/ghttp_server_service.go index 54a907b36..678d3e652 100644 --- a/g/net/ghttp/http_server_service.go +++ b/g/net/ghttp/ghttp_server_service.go @@ -34,23 +34,29 @@ func (s *Server)bindHandlerByMap(m HandlerMap) error { return nil } -// 将方法名称按照设定的规则转换为URI并附加到指定的URI后面 -func (s *Server)appendMethodNameToUriWithPattern(pattern string, name string) string { +// 将方法名称按照设定的规则合并到pattern中. +// 规则1:pattern中的URI包含{method}关键字,则替换该关键字为方法名称 +// 规则2:如果不满足规则1,那么直接将防发明附加到pattern中的URI后面 +func (s *Server)mergeMethodNameToPattern(pattern string, name string) string { + // 方法名中间存在大写字母,转换为小写URI地址以“-”号链接每个单词 + method := "" + for i := 0; i < len(name); i++ { + if i > 0 && gutil.IsLetterUpper(name[i]) { + method += "-" + } + method += strings.ToLower(string(name[i])) + } + if strings.Index(pattern, "{method}") != -1 { + return strings.Replace(pattern, "{method}", method, -1) + } // 检测域名后缀 array := strings.Split(pattern, "@") // 分离URI(其实可能包含HTTP Method) uri := array[0] uri = strings.TrimRight(uri, "/") + "/" - // 方法名中间存在大写字母,转换为小写URI地址以“-”号链接每个单词 - for i := 0; i < len(name); i++ { - if i > 0 && gutil.IsLetterUpper(name[i]) { - uri += "-" - } - uri += strings.ToLower(string(name[i])) - } // 加上指定域名后缀 if len(array) > 1 { - uri += "@" + array[1] + return uri + "@" + array[1] } return uri } @@ -72,7 +78,7 @@ func (s *Server)BindObject(pattern string, obj interface{}) error { t := v.Type() for i := 0; i < v.NumMethod(); i++ { name := t.Method(i).Name - key := s.appendMethodNameToUriWithPattern(pattern, name) + key := s.mergeMethodNameToPattern(pattern, name) m[key] = &HandlerItem { ctype : nil, fname : "", @@ -100,7 +106,7 @@ func (s *Server)BindObjectMethod(pattern string, obj interface{}, methods string if !fval.IsValid() { return errors.New("invalid method name:" + name) } - key := s.appendMethodNameToUriWithPattern(pattern, name) + key := s.mergeMethodNameToPattern(pattern, name) m[key] = &HandlerItem{ ctype : nil, fname : "", @@ -152,7 +158,7 @@ func (s *Server)BindController(pattern string, c Controller) error { if name == "Init" || name == "Shut" || name == "Exit" { continue } - key := s.appendMethodNameToUriWithPattern(pattern, name) + key := s.mergeMethodNameToPattern(pattern, name) m[key] = &HandlerItem { ctype : v.Elem().Type(), fname : name, @@ -181,7 +187,7 @@ func (s *Server)BindControllerMethod(pattern string, c Controller, methods strin if !cval.MethodByName(name).IsValid() { return errors.New("invalid method name:" + name) } - key := s.appendMethodNameToUriWithPattern(pattern, name) + key := s.mergeMethodNameToPattern(pattern, name) m[key] = &HandlerItem { ctype : ctype, fname : name, diff --git a/g/net/ghttp/http_server_session.go b/g/net/ghttp/ghttp_server_session.go similarity index 100% rename from g/net/ghttp/http_server_session.go rename to g/net/ghttp/ghttp_server_session.go diff --git a/g/net/ghttp/http_server_status.go b/g/net/ghttp/ghttp_server_status.go similarity index 100% rename from g/net/ghttp/http_server_status.go rename to g/net/ghttp/ghttp_server_status.go diff --git a/g/net/ghttp/http_server_auto.go b/g/net/ghttp/http_server_auto.go deleted file mode 100644 index f889b1ed3..000000000 --- a/g/net/ghttp/http_server_auto.go +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved. -// -// This Source Code Form is subject to the terms of the MIT License. -// If a copy of the MIT was not distributed with this file, -// You can obtain one at https://gitee.com/johng/gf. -// 异步工作协程. - -package ghttp - -// 开启异步队列处理循环,该异步线程与Server同生命周期 -func (s *Server) startCloseQueueLoop() { - go func() { - for { - if v := s.closeQueue.PopFront(); v != nil { - r := v.(*Request) - s.callHookHandler(r, "BeforeClose") - // 关闭当前会话的Cookie - r.Cookie.Close() - // 更新Session会话超时时间 - r.Session.UpdateExpire() - s.callHookHandler(r, "AfterClose") - } - } - }() -} \ No newline at end of file diff --git a/g/net/gtcp/tcp_func.go b/g/net/gtcp/tcp_func.go new file mode 100644 index 000000000..d25db129d --- /dev/null +++ b/g/net/gtcp/tcp_func.go @@ -0,0 +1,92 @@ +// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +package gtcp + +import ( + "io" + "net" + "time" +) + +const ( + gDEFAULT_RETRY_INTERVAL = 100 // 默认重试时间间隔 +) + +type Retry struct { + Count int // 重试次数 + Interval int // 重试间隔(毫秒) +} + +// 常见的二进制数据校验方式,生成校验结果 +func Checksum(buffer []byte) uint32 { + var checksum uint32 + for _, b := range buffer { + checksum += uint32(b) + } + return checksum +} + +// 获取数据 +func Receive(conn net.Conn, retry...Retry) []byte { + size := 1024 + data := make([]byte, 0) + for { + buffer := make([]byte, size) + length, err := conn.Read(buffer) + if length < 1 && err != nil { + if err == io.EOF || len(retry) == 0 || retry[0].Count == 0 { + break + } + if len(retry) > 0 { + retry[0].Count-- + if retry[0].Interval == 0 { + retry[0].Interval = gDEFAULT_RETRY_INTERVAL + } + time.Sleep(time.Duration(retry[0].Interval) * time.Millisecond) + } + } else { + data = append(data, buffer[0:length]...) + if err == io.EOF { + break + } + } + } + return data +} + +// 带超时时间的数据获取 +func ReceiveWithTimeout(conn net.Conn, timeout time.Duration, retry...Retry) []byte { + conn.SetReadDeadline(time.Now().Add(timeout)) + return Receive(conn, retry...) +} + +// 发送数据 +func Send(conn net.Conn, data []byte, retry...Retry) error { + for { + _, err := conn.Write(data) + if err != nil { + if len(retry) == 0 || retry[0].Count == 0 { + return err + } + if len(retry) > 0 { + retry[0].Count-- + if retry[0].Interval == 0 { + retry[0].Interval = gDEFAULT_RETRY_INTERVAL + } + time.Sleep(time.Duration(retry[0].Interval) * time.Millisecond) + } + } else { + return nil + } + } +} + +// 带超时时间的数据发送 +func SendWithTimeout(conn net.Conn, data []byte, timeout time.Duration, retry...Retry) error { + conn.SetWriteDeadline(time.Now().Add(timeout)) + return Send(conn, data, retry...) +} \ No newline at end of file diff --git a/g/os/gfile/gfile.go b/g/os/gfile/gfile.go index 2ab52d11c..286571bdc 100644 --- a/g/os/gfile/gfile.go +++ b/g/os/gfile/gfile.go @@ -46,6 +46,10 @@ func Mkdir(path string) error { // 给定文件的绝对路径创建文件 func Create(path string) error { + dir := Dir(path) + if !Exists(dir) { + Mkdir(dir) + } f, err := os.Create(path) if err != nil { return err @@ -107,7 +111,7 @@ func Info(path string) *os.FileInfo { return &info } -// 修改时间 +// 修改时间(秒) func MTime(path string) int64 { f, e := os.Stat(path) if e != nil { @@ -116,6 +120,17 @@ func MTime(path string) int64 { return f.ModTime().Unix() } +// 修改时间(毫秒) +func MTimeMillisecond(path string) int64 { + f, e := os.Stat(path) + if e != nil { + return 0 + } + seconds := f.ModTime().Unix() + nanoSeconds := f.ModTime().Nanosecond() + return seconds*1000 + int64(nanoSeconds/1000000) +} + // 文件大小(bytes) func Size(path string) int64 { f, e := os.Stat(path) @@ -320,6 +335,11 @@ func putContents(path string, data []byte, flag int, perm os.FileMode) error { return nil } +// Truncate +func Truncate(path string, size int) error { + return os.Truncate(path, int64(size)) +} + // (文本)写入文件内容 func PutContents(path string, content string) error { return putContents(path, []byte(content), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666) @@ -462,4 +482,9 @@ func MainPkgPath() string { return p } return "" +} + +// 系统临时目录 +func TempDir() string { + return os.TempDir() } \ No newline at end of file diff --git a/g/os/gflock/gflock.go b/g/os/gflock/gflock.go new file mode 100644 index 000000000..70835a3b4 --- /dev/null +++ b/g/os/gflock/gflock.go @@ -0,0 +1,80 @@ +// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +// 文件锁. +package gflock + +import ( + "sync" + "github.com/theckman/go-flock" + "gitee.com/johng/gf/g/os/gfile" +) + +// 文件锁 +type Locker struct { + mu sync.RWMutex // 用于外部接口调用的互斥锁(阻塞机制) + flock *flock.Flock // 底层文件锁对象 +} + +// 创建文件锁 +func New(file string) *Locker { + dir := gfile.TempDir() + gfile.Separator + "gflock" + if !gfile.Exists(dir) { + gfile.Mkdir(dir) + } + path := dir + gfile.Separator + file + lock := flock.NewFlock(path) + return &Locker{ + flock : lock, + } +} + +func (l *Locker) Path() string { + return l.flock.Path() +} + +// 当前文件锁是否处于锁定状态(Lock) +func (l *Locker) IsLocked() bool { + return l.flock.Locked() +} + +// 尝试Lock文件,如果失败立即返回 +func (l *Locker) TryLock() bool { + ok, _ := l.flock.TryLock() + if ok { + l.mu.Lock() + } + return ok +} + +// 尝试RLock文件,如果失败立即返回 +func (l *Locker) TryRLock() bool { + ok, _ := l.flock.TryRLock() + if ok { + l.mu.RLock() + } + return ok +} + +func (l *Locker) Lock() { + l.mu.Lock() + l.flock.Lock() +} + +func (l *Locker) UnLock() { + l.flock.Unlock() + l.mu.Unlock() +} + +func (l *Locker) RLock() { + l.mu.RLock() + l.flock.RLock() +} + +func (l *Locker) RUnlock() { + l.flock.Unlock() + l.mu.RUnlock() +} diff --git a/g/os/gfsnotify/gfsnotify.go b/g/os/gfsnotify/gfsnotify.go index 2bd5398d8..cbed56312 100644 --- a/g/os/gfsnotify/gfsnotify.go +++ b/g/os/gfsnotify/gfsnotify.go @@ -13,9 +13,7 @@ import ( "gitee.com/johng/gf/g/os/glog" "github.com/fsnotify/fsnotify" "gitee.com/johng/gf/g/os/gfile" - "gitee.com/johng/gf/g/os/gcache" "gitee.com/johng/gf/g/os/grpool" - "gitee.com/johng/gf/g/util/gconv" "gitee.com/johng/gf/g/container/gmap" "gitee.com/johng/gf/g/container/glist" "gitee.com/johng/gf/g/container/gqueue" @@ -25,7 +23,6 @@ import ( type Watcher struct { watcher *fsnotify.Watcher // 底层fsnotify对象 events *gqueue.Queue // 过滤后的事件通知,不会出现重复事件 - eventCache *gcache.Cache // 用于进行事件过滤,当同一监听文件在100ms内出现相同事件,则过滤 closeChan chan struct{} // 关闭事件 callbacks *gmap.StringInterfaceMap // 监听的回调函数 } @@ -66,15 +63,12 @@ func Remove(path string) error { return watcher.Remove(path) } - - // 创建监听管理对象 func New() (*Watcher, error) { if watch, err := fsnotify.NewWatcher(); err == nil { w := &Watcher { watcher : watch, events : gqueue.New(), - eventCache : gcache.New(), closeChan : make(chan struct{}, 1), callbacks : gmap.NewStringInterfaceMap(), } @@ -134,9 +128,6 @@ func (w *Watcher) startWatchLoop() { // 监听事件 case ev := <- w.watcher.Events: - if !w.eventCache.Lock(ev.Name + ":" + gconv.String(ev.Op), 100) { - continue - } w.events.PushBack(&Event{ Path : ev.Name, Op : Op(ev.Op), @@ -155,7 +146,7 @@ func (w *Watcher) startEventLoop() { for { if v := w.events.PopFront(); v != nil { event := v.(*Event) - // 如果是文件删除时间,判断该文件是否存在,如果存在,那么将此事件认为“假删除”,并重新添加监控 + // 如果是文件删除事件,判断该文件是否存在,如果存在,那么将此事件认为“假删除”,并重新添加监控 if event.IsRemove() && gfile.Exists(event.Path){ w.watcher.Add(event.Path) continue diff --git a/g/os/gproc/gproc.go b/g/os/gproc/gproc.go new file mode 100644 index 000000000..8c435a8af --- /dev/null +++ b/g/os/gproc/gproc.go @@ -0,0 +1,76 @@ +// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +// 进程管理/通信. +// 本进程管理从syscall, os.StartProcess, exec.Cmd都使用过, +// 最后采用了exec.Cmd来实现多进程管理,这是一个顶层的跨平台封装,兼容性更好,另外两个是偏底层的接口。 +package gproc + +import ( + "os" + "time" + "gitee.com/johng/gf/g/util/gconv" + "gitee.com/johng/gf/g/container/gtype" +) + +const ( + gPROC_ENV_KEY_PPID_KEY = "gproc.ppid" +) + +// 进程开始执行时间 +var processStartTime = time.Now() + +// 优雅退出标识符号 +var isExited = gtype.NewBool() + +// 获取当前进程ID +func Pid() int { + return os.Getpid() +} + +// 获取父进程ID(gproc父进程,如果当前进程本身就是父进程,那么返回自身的pid,不存在时则使用系统父进程) +func PPid() int { + if !IsChild() { + return Pid() + } + // gPROC_ENV_KEY_PPID_KEY为gproc包自定义的父进程 + ppidValue := os.Getenv(gPROC_ENV_KEY_PPID_KEY) + if ppidValue != "" { + return gconv.Int(ppidValue) + } + return PPidOS() +} + +// 获取父进程ID(系统父进程) +func PPidOS() int { + return os.Getppid() +} + +// 判断当前进程是否为gproc创建的子进程 +func IsChild() bool { + return os.Getenv(gPROC_ENV_KEY_PPID_KEY) != "" +} + +// 进程开始执行时间 +func StartTime() time.Time { + return processStartTime +} + +// 进程已经运行的时间(毫秒) +func Uptime() int { + return int(time.Now().UnixNano()/1e6 - processStartTime.UnixNano()/1e6) +} + +// 标识当前进程为退出状态,其他业务可以根据此标识来执行优雅退出 +func SetExited() { + isExited.Set(true) +} + +// 当前进程是否被标识为退出状态 +func Exited() bool { + return isExited.Val() +} + diff --git a/g/os/gproc/gproc_comm.go b/g/os/gproc/gproc_comm.go new file mode 100644 index 000000000..6ff56ef34 --- /dev/null +++ b/g/os/gproc/gproc_comm.go @@ -0,0 +1,252 @@ +// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +package gproc + +import ( + "io" + "os" + "fmt" + "time" + "errors" + "gitee.com/johng/gf/g/os/glog" + "gitee.com/johng/gf/g/os/gfile" + "gitee.com/johng/gf/g/os/gtime" + "gitee.com/johng/gf/g/os/gflock" + "gitee.com/johng/gf/g/util/gconv" + "gitee.com/johng/gf/g/os/gfsnotify" + "gitee.com/johng/gf/g/container/gtype" + "gitee.com/johng/gf/g/container/gqueue" + "gitee.com/johng/gf/g/encoding/gbinary" +) + +const ( + // 由于子进程的temp dir有可能会和父进程不一致(特别是windows下),影响进程间通信,这里统一使用环境变量设置 + gPROC_TEMP_DIR_ENV_KEY = "gproc.tempdir" + // 自动通信文件清理时间间隔 + gPROC_COMM_AUTO_CLEAR_INTERVAL = time.Second + // 写入通信数据失败时候的重试次数 + gPROC_COMM_FAILURE_RETRY_COUNT = 3 + // (毫秒)主动通信内容检查时间间隔 + gPROC_COMM_ACTIVE_CHECK_INTERVAL = 500 +) + +// 全局通信文件清理文件锁(同一时刻只能存在一个进程进行通信文件清理) +var commClearLocker = gflock.New("comm.clear.lock") +// 当前进程的文件锁 +var commLocker = gflock.New(fmt.Sprintf("%d.lock", os.Getpid())) +// 进程通信消息队列 +var commQueue = gqueue.New() +// 上一次进程通信内容检查的时间 +var commLastCheckTime = gtype.NewInt64() + +// TCP通信数据结构定义 +type Msg struct { + Pid int // PID,哪个进程发送的消息 + Data []byte // 参数,消息附带的参数 +} + +// 进程管理/通信初始化操作 +func init() { + path := getCommFilePath(os.Getpid()) + if !gfile.Exists(path) { + // 判断是否需要创建通信文件 + commLocker.Lock() + err := gfile.Create(path) + commLocker.UnLock() + if err != nil { + glog.Error(err) + os.Exit(1) + } + } + // 检测写入权限 + if !gfile.IsWritable(path) { + glog.Errorfln("%s is not writable for gproc", path) + os.Exit(1) + } + updateLastCheckTime() + if gtime.Second() - gfile.MTime(path) < 10 { + // 初始化时读取已有数据(文件修改时间在10秒以内) + checkCommBuffer(path) + } else { + // 否则清空旧的数据内容 + commLocker.Lock() + os.Truncate(path, 0) + commLocker.UnLock() + } + // 文件事件监听,如果通信数据文件有任何变化,读取文件并添加到消息队列 + err := gfsnotify.Add(path, func(event *gfsnotify.Event) { + updateLastCheckTime() + checkCommBuffer(path) + }) + if err != nil { + glog.Error(err) + } + + go autoClearCommDir() + go autoActiveCheckComm() +} + +// 更新最后通信检查时间 +func updateLastCheckTime() { + commLastCheckTime.Set(gtime.Millisecond()) +} + +// 自动清理通信目录文件 +// @todo 目前是以时间过期规则进行清理,后期可以考虑加入进程存在性判断 +func autoClearCommDir() { + dirPath := getCommDirPath() + for { + time.Sleep(gPROC_COMM_AUTO_CLEAR_INTERVAL) + if commClearLocker.TryLock() { + for _, name := range gfile.ScanDir(dirPath) { + path := dirPath + gfile.Separator + name + if gtime.Second() - gfile.MTime(path) >= 10 { + gfile.Remove(path) + } + } + commClearLocker.UnLock() + } + } +} + +// 主动通信内容检测 +func autoActiveCheckComm() { + for { + time.Sleep(gPROC_COMM_ACTIVE_CHECK_INTERVAL*time.Millisecond) + if gtime.Millisecond() - commLastCheckTime.Val() > gPROC_COMM_ACTIVE_CHECK_INTERVAL { + updateLastCheckTime() + checkCommBuffer(getCommFilePath(Pid())) + } + } +} + +// 手动检查进程通信消息,如果存在消息曾推送到进程消息队列 +func checkCommBuffer(path string) { + commLocker.Lock() + buffer := gfile.GetBinContents(path) + if len(buffer) > 0 { + os.Truncate(path, 0) + } + commLocker.UnLock() + if len(buffer) > 0 { + for _, v := range bufferToMsgs(buffer) { + commQueue.PushBack(v) + } + } +} + +// 获取其他进程传递到当前进程的消息包,阻塞执行 +func Receive() *Msg { + if v := commQueue.PopFront(); v != nil { + return v.(*Msg) + } + return nil +} + +// 向指定gproc进程发送数据 +// 数据格式:总长度(32bit) | PID(32bit) | 校验(32bit) | 参数(变长) +func Send(pid int, data interface{}) error { + var err error = nil + buffer := gconv.Bytes(data) + b := make([]byte, 0) + b = append(b, gbinary.EncodeInt32(int32(len(buffer) + 12))...) + b = append(b, gbinary.EncodeInt32(int32(os.Getpid()))...) + b = append(b, gbinary.EncodeUint32(checksum(buffer))...) + b = append(b, buffer...) + l := gflock.New(fmt.Sprintf("%d.lock", pid)) + l.Lock() + for i := gPROC_COMM_FAILURE_RETRY_COUNT; i > 0; i-- { + err = doSend(pid, b) + if err == nil { + break + } + } + l.UnLock() + //glog.Printfln("%d to %d, %v, %d, %v", Pid(), pid, data, gfile.Size(getCommFilePath(pid)), err) + return err +} + +// 执行进程间通信数据写入 +func doSend(pid int, buffer []byte) error { + file, err := gfile.OpenWithFlag(getCommFilePath(pid), os.O_RDWR|os.O_CREATE|os.O_APPEND) + if err != nil{ + return err + } + // 获取原有文件内容大小 + stat, err := file.Stat() + if err != nil { + return err + } + oldSize := stat.Size() + // 执行数据写入 + writeSize, err := file.Write(buffer) + if err != nil { + return err + } + if writeSize < len(buffer) { + return io.ErrShortWrite + } + // 写入成功之后获取最新文件内容大小,执行对比 + if stat, err := file.Stat(); err != nil { + return err + } else { + // 由于文件锁机制的保证,同一时刻只会有一个进程(&协程)在执行写入,不会出现数据粘包情况 + // 这里从严谨性考虑增加大小判断,更进一步避免粘包,或者丢包情况 + if stat.Size() - int64(writeSize) != oldSize { + return errors.New("error writing data") + } + } + return nil +} + +// 获取指定进程的通信文件地址 +func getCommFilePath(pid int) string { + return getCommDirPath() + gfile.Separator + gconv.String(pid) +} + +// 获取进程间通信目录地址 +func getCommDirPath() string { + tempDir := os.Getenv("gproc.tempdir") + if tempDir == "" { + tempDir = gfile.TempDir() + } + return tempDir + gfile.Separator + "gproc" +} + +// 数据解包,防止黏包 +func bufferToMsgs(buffer []byte) []*Msg { + s := 0 + msgs := make([]*Msg, 0) + for s < len(buffer) { + length := gbinary.DecodeToInt(buffer[s : s + 4]) + if length < 0 || length > len(buffer) { + s++ + continue + } + checksum1 := gbinary.DecodeToUint32(buffer[s + 8 : s + 12]) + checksum2 := checksum(buffer[s + 12 : s + length]) + if checksum1 != checksum2 { + s++ + continue + } + msgs = append(msgs, &Msg { + Pid : gbinary.DecodeToInt(buffer[s + 4 : s + 8]), + Data : buffer[s + 12 : s + length], + }) + s += length + } + return msgs +} + +// 常见的二进制数据校验方式,生成校验结果 +func checksum(buffer []byte) uint32 { + var checksum uint32 + for _, b := range buffer { + checksum += uint32(b) + } + return checksum +} \ No newline at end of file diff --git a/g/os/gproc/gproc_manager.go b/g/os/gproc/gproc_manager.go new file mode 100644 index 000000000..8fb8a7ea9 --- /dev/null +++ b/g/os/gproc/gproc_manager.go @@ -0,0 +1,177 @@ +// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +// 进程管理. +package gproc + +import ( + "os" + "fmt" + "strings" + "os/exec" + "gitee.com/johng/gf/g/container/gmap" +) + +const ( + gCHILD_ARGS_MARK_NAME = "--gproc-child" +) + +// 进程管理器 +type Manager struct { + processes *gmap.IntInterfaceMap // 所管理的子进程map +} + +// 创建一个进程管理器 +func NewManager() *Manager { + return &Manager{ + processes : gmap.NewIntInterfaceMap(), + } +} + +// 创建一个进程(不执行) +func NewProcess(path string, args []string, environment []string) *Process { + env := make([]string, len(environment) + 1) + for k, v := range environment { + env[k] = v + } + env[len(env) - 1] = fmt.Sprintf("%s=%s", gPROC_TEMP_DIR_ENV_KEY, os.TempDir()) + p := &Process { + Manager : nil, + PPid : os.Getpid(), + Cmd : exec.Cmd { + Args : []string{path}, + Path : path, + Stdin : os.Stdin, + Stdout : os.Stdout, + Stderr : os.Stderr, + Env : env, + ExtraFiles : make([]*os.File, 0), + }, + } + // 当前工作目录 + if d, err := os.Getwd(); err == nil { + p.Dir = d + } + // 判断是否加上子进程标识 + hasChildMark := false + childMarkLen := len(gCHILD_ARGS_MARK_NAME) + for _, v := range args { + if len(v) >= childMarkLen && strings.EqualFold(v[0 : childMarkLen], gCHILD_ARGS_MARK_NAME) { + hasChildMark = true + } + } + if !hasChildMark { + p.Args = append(p.Args, gCHILD_ARGS_MARK_NAME) + } + if len(args) > 0 { + start := 0 + if strings.EqualFold(path, args[0]) { + start = 1 + } + p.Args = append(p.Args, args[start : ]...) + } + return p +} + +// 创建一个进程(不执行) +func (m *Manager) NewProcess(path string, args []string, environment []string) *Process { + p := NewProcess(path, args, environment) + p.Manager = m + return p +} + +// 获取当前进程管理器中的一个进程 +func (m *Manager) GetProcess(pid int) *Process { + if v := m.processes.Get(pid); v != nil { + return v.(*Process) + } + return nil +} + +// 添加一个已存在进程到进程管理器中 +func (m *Manager) AddProcess(pid int) { + if process, err := os.FindProcess(pid); err == nil { + p := m.NewProcess("", nil, nil) + p.Process = process + m.processes.Set(pid, p) + } +} + +// 移除进程管理器中的指定进程 +func (m *Manager) RemoveProcess(pid int) { + m.processes.Remove(pid) +} + +// 获取所有的进程对象,构成列表返回 +func (m *Manager) Processes() []*Process { + processes := make([]*Process, 0) + m.processes.RLockFunc(func(m map[int]interface{}) { + for _, v := range m { + processes = append(processes, v.(*Process)) + } + }) + return processes +} + +// 获取所有的进程pid,构成列表返回 +func (m *Manager) Pids() []int { + return m.processes.Keys() +} + +// 等待所有子进程结束 +func (m *Manager) WaitAll() { + processes := m.Processes() + if len(processes) > 0 { + for _, p := range processes { + p.Wait() + } + } +} + +// 关闭所有的进程 +func (m *Manager) KillAll() error { + for _, p := range m.Processes() { + if err := p.Kill(); err != nil { + return err + } + } + return nil +} + +// 向所有进程发送信号量 +func (m *Manager) SignalAll(sig os.Signal) error { + for _, p := range m.Processes() { + if err := p.Signal(sig); err != nil { + return err + } + } + return nil +} + +// 向所有进程发送消息 +func (m *Manager) Send(data interface{}) error { + for _, p := range m.Processes() { + if err := p.Send(data); err != nil { + return err + } + } + return nil +} + +// 向指定进程发送消息 +func (m *Manager) SendTo(pid int, data interface{}) error { + return Send(pid, data) +} + +// 清空管理器 +func (m *Manager) Clear() { + m.processes.Clear() +} + +// 当前进程总数 +func (m *Manager) Size() int { + return m.processes.Size() +} \ No newline at end of file diff --git a/g/os/gproc/gproc_proccess.go b/g/os/gproc/gproc_proccess.go new file mode 100644 index 000000000..3ee7a224f --- /dev/null +++ b/g/os/gproc/gproc_proccess.go @@ -0,0 +1,90 @@ +// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +package gproc + +import ( + "os" + "fmt" + "errors" + "os/exec" +) + +// 子进程 +type Process struct { + exec.Cmd + Manager *Manager // 所属进程管理器 + PPid int // 自定义关联的父进程ID +} + +// 开始执行(非阻塞) +func (p *Process) Start() (int, error) { + if p.Process != nil { + return p.Pid(), nil + } + if p.PPid > 0 { + p.Env = append(p.Env, fmt.Sprintf("%s=%d", gPROC_ENV_KEY_PPID_KEY, p.PPid)) + } + if err := p.Cmd.Start(); err == nil { + if p.Manager != nil { + p.Manager.processes.Set(p.Process.Pid, p) + } + return p.Process.Pid, nil + } else { + return 0, err + } +} + +// 运行进程(阻塞等待执行完毕) +func (p *Process) Run() error { + if _, err := p.Start(); err == nil { + return p.Wait() + } else { + return err + } +} + +// PID +func (p *Process) Pid() int { + if p.Process != nil { + return p.Process.Pid + } + return 0 +} + +// 向进程发送消息 +func (p *Process) Send(data interface{}) error { + if p.Process != nil { + return Send(p.Process.Pid, data) + } + return errors.New("process not running") +} + + +// Release releases any resources associated with the Process p, +// rendering it unusable in the future. +// Release only needs to be called if Wait is not. +func (p *Process) Release() error { + return p.Process.Release() +} + +// Kill causes the Process to exit immediately. +func (p *Process) Kill() error { + if err := p.Process.Kill(); err == nil { + if p.Manager != nil { + p.Manager.processes.Remove(p.Pid()) + } + return nil + } else { + return err + } +} + +// Signal sends a signal to the Process. +// Sending Interrupt on Windows is not implemented. +func (p *Process) Signal(sig os.Signal) error { + return p.Process.Signal(sig) +} \ No newline at end of file diff --git a/geg/encoding/gbase64.go b/geg/encoding/gbase64.go new file mode 100644 index 000000000..416d35d1a --- /dev/null +++ b/geg/encoding/gbase64.go @@ -0,0 +1,18 @@ +package main + +import ( + "fmt" + "gitee.com/johng/gf/g/encoding/gbase64" +) + +func main() { + s := "john" + b := gbase64.Encode(s) + c, e := gbase64.Decode(b) + fmt.Println(b) + fmt.Println(c) + fmt.Println(e) +} + + + diff --git a/geg/frame/mvc/controller/demo/rule.go b/geg/frame/mvc/controller/demo/rule.go new file mode 100644 index 000000000..224f9a885 --- /dev/null +++ b/geg/frame/mvc/controller/demo/rule.go @@ -0,0 +1,19 @@ +package demo + +import ( + "gitee.com/johng/gf/g" + "gitee.com/johng/gf/g/frame/gmvc" +) + +type ControllerRule struct { + gmvc.Controller +} + +func init() { + g.Server().BindController("/rule/{method}/:name", &ControllerRule{}) +} + +func (c *ControllerRule) Show() { + c.Response.Write(c.Request.Get("name")) +} + diff --git a/geg/frame/mvc/main.go b/geg/frame/mvc/main.go index e4161b36b..0544de505 100644 --- a/geg/frame/mvc/main.go +++ b/geg/frame/mvc/main.go @@ -1,11 +1,11 @@ package main import ( - "gitee.com/johng/gf/g/net/ghttp" + "gitee.com/johng/gf/g" _ "gitee.com/johng/gf/geg/frame/mvc/controller/demo" ) func main() { - ghttp.GetServer().SetPort(8199) - ghttp.GetServer().Run() + g.Server().SetPort(8199) + g.Server().Run() } diff --git a/geg/net/ghttp/https/https_http.go b/geg/net/ghttp/https/https_http.go index 2ab74b08b..3a42b01a7 100644 --- a/geg/net/ghttp/https/https_http.go +++ b/geg/net/ghttp/https/https_http.go @@ -6,11 +6,13 @@ import ( func main() { s := ghttp.GetServer() + s.EnableAdmin() s.BindHandler("/", func(r *ghttp.Request){ r.Response.Writeln("您可以同时通过HTTP和HTTPS方式看到该内容!") }) s.EnableHTTPS("/home/john/temp/server.crt", "/home/john/temp/server.key") - s.SetHTTPSPort(443) - s.SetPort(80) + s.SetHTTPSPort(8198, 8199) + s.SetPort(8200, 8300) + s.EnableAdmin() s.Run() } \ No newline at end of file diff --git a/geg/net/ghttp/reload/admin.go b/geg/net/ghttp/reload/admin.go new file mode 100644 index 000000000..55438e229 --- /dev/null +++ b/geg/net/ghttp/reload/admin.go @@ -0,0 +1,12 @@ +package main + +import ( + "gitee.com/johng/gf/g" +) + +func main() { + s := g.Server() + s.EnableAdmin() + s.SetPort(8199) + s.Run() +} \ No newline at end of file diff --git a/geg/net/ghttp/reload/https_http.go b/geg/net/ghttp/reload/https_http.go new file mode 100644 index 000000000..3a42b01a7 --- /dev/null +++ b/geg/net/ghttp/reload/https_http.go @@ -0,0 +1,18 @@ +package main + +import ( + "gitee.com/johng/gf/g/net/ghttp" +) + +func main() { + s := ghttp.GetServer() + s.EnableAdmin() + s.BindHandler("/", func(r *ghttp.Request){ + r.Response.Writeln("您可以同时通过HTTP和HTTPS方式看到该内容!") + }) + s.EnableHTTPS("/home/john/temp/server.crt", "/home/john/temp/server.key") + s.SetHTTPSPort(8198, 8199) + s.SetPort(8200, 8300) + s.EnableAdmin() + s.Run() +} \ No newline at end of file diff --git a/geg/net/ghttp/reload/multi_port.go b/geg/net/ghttp/reload/multi_port.go new file mode 100644 index 000000000..359520ab8 --- /dev/null +++ b/geg/net/ghttp/reload/multi_port.go @@ -0,0 +1,23 @@ +package main + +import ( + "gitee.com/johng/gf/g" + "gitee.com/johng/gf/g/net/ghttp" +) + +func main() { + s := g.Server() + s.BindHandler("/", func(r *ghttp.Request){ + r.Response.Writeln("hello") + }) + s.BindHandler("/restart", func(r *ghttp.Request){ + r.Response.Writeln("restart server") + r.Server.Restart() + }) + s.BindHandler("/shutdown", func(r *ghttp.Request){ + r.Response.Writeln("shutdown server") + r.Server.Shutdown() + }) + s.SetPort(8199, 8200) + s.Run() +} \ No newline at end of file diff --git a/geg/net/ghttp/reload/multi_port_and_server.go b/geg/net/ghttp/reload/multi_port_and_server.go new file mode 100644 index 000000000..536d35cd1 --- /dev/null +++ b/geg/net/ghttp/reload/multi_port_and_server.go @@ -0,0 +1,31 @@ +package main + +import ( + "gitee.com/johng/gf/g" + "gitee.com/johng/gf/g/net/ghttp" + "time" + "gitee.com/johng/gf/g/os/gproc" +) + +func main() { + s1 := g.Server("s1") + s1.EnableAdmin() + s1.BindHandler("/", func(r *ghttp.Request) { + pid := gproc.Pid() + r.Response.Writeln("before restart, pid:", pid) + time.Sleep(10*time.Second) + r.Response.Writeln("after restart, pid:", gproc.Pid()) + }) + s1.BindHandler("/pid", func(r *ghttp.Request) { + r.Response.Write(gproc.Pid()) + }) + s1.SetPort(8199, 8200) + s1.Start() + + s2 := g.Server("s2") + s2.EnableAdmin() + s2.SetPort(8300, 8080) + s2.Start() + + g.Wait() +} \ No newline at end of file diff --git a/geg/net/ghttp/reload/simple.go b/geg/net/ghttp/reload/simple.go new file mode 100644 index 000000000..0129446a8 --- /dev/null +++ b/geg/net/ghttp/reload/simple.go @@ -0,0 +1,23 @@ +package main + +import ( + "gitee.com/johng/gf/g" + "gitee.com/johng/gf/g/net/ghttp" + "gitee.com/johng/gf/g/os/gproc" + "time" +) + +func main() { + s := g.Server() + s.BindHandler("/sleep", func(r *ghttp.Request){ + r.Response.Writeln(gproc.Pid()) + time.Sleep(10*time.Second) + r.Response.Writeln(gproc.Pid()) + }) + s.BindHandler("/pid", func(r *ghttp.Request){ + r.Response.Writeln(gproc.Pid()) + }) + s.EnableAdmin() + s.SetPort(8199) + s.Run() +} \ No newline at end of file diff --git a/geg/os/gflock/flock.go b/geg/os/gflock/flock.go new file mode 100644 index 000000000..cb02bfd7b --- /dev/null +++ b/geg/os/gflock/flock.go @@ -0,0 +1,17 @@ +package main + +import ( + "fmt" + "github.com/theckman/go-flock" + "time" +) + +func main() { + l := flock.NewFlock("/tmp/go-lock.lock") + l.Lock() + fmt.Printf("lock 1") + l.Lock() + fmt.Printf("lock 1") + + time.Sleep(time.Hour) +} diff --git a/geg/os/gflock/gflock.go b/geg/os/gflock/gflock.go new file mode 100644 index 000000000..2b5e3256b --- /dev/null +++ b/geg/os/gflock/gflock.go @@ -0,0 +1,29 @@ +package main + +import ( + "gitee.com/johng/gf/g/os/gflock" + "fmt" + "time" +) + +func test() { + l := gflock.New("1.lock") + fmt.Println(l.Path()) + l.Lock() + fmt.Println("lock 1") + l.Lock() + fmt.Println("lock 2") +} + +func active() { + i := 0 + for { + time.Sleep(time.Second) + i++ + } +} + +func main() { + go active() + test() +} diff --git a/geg/os/gfsnotify/gfsnotify.go b/geg/os/gfsnotify/gfsnotify.go index 0ab4144e7..023411a6b 100644 --- a/geg/os/gfsnotify/gfsnotify.go +++ b/geg/os/gfsnotify/gfsnotify.go @@ -6,7 +6,7 @@ import ( ) func main() { - err := gfsnotify.Add("/home/john/Documents/temp.txt", func(event *gfsnotify.Event) { + err := gfsnotify.Add("./temp.txt", func(event *gfsnotify.Event) { if event.IsCreate() { log.Println("创建文件 : ", event.Path) } diff --git a/geg/os/gproc/cmd.go b/geg/os/gproc/cmd.go new file mode 100644 index 000000000..0cd84127f --- /dev/null +++ b/geg/os/gproc/cmd.go @@ -0,0 +1,15 @@ +package main + +import ( + "os" + "fmt" + "time" + "os/exec" +) + +func main () { + cmd := exec.Command(os.Args[0], "1") + time.Sleep(3*time.Second) + fmt.Println(cmd.Start()) + time.Sleep(time.Hour) +} diff --git a/geg/os/gproc/gproc.go b/geg/os/gproc/gproc.go new file mode 100644 index 000000000..7761ef34a --- /dev/null +++ b/geg/os/gproc/gproc.go @@ -0,0 +1,28 @@ +package main + +import ( + "os" + "fmt" + "time" + "gitee.com/johng/gf/g/os/gproc" + "gitee.com/johng/gf/g/os/gtime" +) + +func main () { + fmt.Printf("%d: I am child? %v\n", gproc.Pid(), gproc.IsChild()) + if gproc.IsChild() { + gtime.SetInterval(time.Second, func() bool { + gproc.Send(gproc.PPid(), gtime.Datetime()) + return true + }) + select { } + } else { + m := gproc.NewManager() + p := m.NewProcess(os.Args[0], os.Args, os.Environ()) + p.Start() + for { + msg := gproc.Receive() + fmt.Printf("receive from %d, data: %s\n", msg.Pid, string(msg.Data)) + } + } +} diff --git a/geg/os/gproc/gproc2.go b/geg/os/gproc/gproc2.go new file mode 100644 index 000000000..c48ddd7ec --- /dev/null +++ b/geg/os/gproc/gproc2.go @@ -0,0 +1,11 @@ +package main + +import ( + "fmt" + "gitee.com/johng/gf/g/os/gproc" +) + +func main () { + err := gproc.Send(23504, []byte{30}) + fmt.Println(err) +} diff --git a/geg/other/graceful.go b/geg/other/graceful.go new file mode 100644 index 000000000..dbe5b5168 --- /dev/null +++ b/geg/other/graceful.go @@ -0,0 +1,19 @@ +package main + +import ( + "fmt" + "net/http" + + "github.com/tabalt/gracehttp" +) + +func main() { + http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "hello world") + }) + + err := gracehttp.ListenAndServe(":8888", nil) + if err != nil { + fmt.Println(err) + } +} \ No newline at end of file diff --git a/geg/other/test2.go b/geg/other/test2.go index a3aceda50..dfeab4a61 100644 --- a/geg/other/test2.go +++ b/geg/other/test2.go @@ -2,16 +2,24 @@ package main import ( "fmt" + "gitee.com/johng/gf/g/os/gpm" + "os" + "time" + "gitee.com/johng/gf/g/os/glog" ) func main() { - //var v interface{} - m := map[string]int { - "age" : 18, + m := gproc.New() + env := os.Environ() + env = append(env, "child=1") + p := m.NewProcess(os.Args[0], os.Args, env) + if os.Getenv("child") != "" { + time.Sleep(3*time.Second) + glog.Error("error") + } else { + pid, err := p.Run() + fmt.Println(pid) + fmt.Println(err) + fmt.Println(p.Wait()) } - //v = m - p := &m - (*p)["age"] = 16 - //fmt.Println(v) - fmt.Println(m) } \ No newline at end of file diff --git a/version.go b/version.go index a4d2a08c0..7227f4322 100644 --- a/version.go +++ b/version.go @@ -1,5 +1,5 @@ package gf -const VERSION = "0.97 beta" +const VERSION = "0.98 beta" const AUTHORS = "john"