From 721503019a82095298fa6c962cde4803d09efecd Mon Sep 17 00:00:00 2001 From: John Date: Fri, 18 May 2018 15:45:08 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84ghttp.Server=E7=83=AD?= =?UTF-8?q?=E9=87=8D=E5=90=AF=E7=89=B9=E6=80=A7=E7=A4=BA=E4=BE=8B=EF=BC=8C?= =?UTF-8?q?gflock=E7=A4=BA=E4=BE=8B=EF=BC=8Cgproc=E7=A4=BA=E4=BE=8B?= =?UTF-8?q?=EF=BC=8C=E5=B9=B6=E6=94=B9=E8=BF=9Bgproc=E9=80=9A=E4=BF=A1?= =?UTF-8?q?=E6=A3=80=E6=B5=8B=E6=9C=BA=E5=88=B6=EF=BC=8C=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?=E5=86=99=E5=85=A5=E6=95=B0=E6=8D=AE=E6=96=87=E4=BB=B6=E6=8C=87?= =?UTF-8?q?=E9=92=88=E6=9C=AA=E5=85=B3=E9=97=AD=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- TODO | 2 +- g/os/gproc/gproc.go | 14 ------ g/os/gproc/gproc_comm.go | 50 ++++++++++--------- g/os/gproc/gproc_manager.go | 4 +- g/os/gproc/gproc_proccess.go | 2 +- geg/net/ghttp/reload/https.go | 17 +++++++ geg/net/ghttp/reload/multi_port_and_server.go | 16 +----- geg/os/gflock/gflock.go | 30 ++++------- geg/os/gproc/gproc.go | 22 ++++---- geg/os/gproc/gproc2.go | 8 ++- geg/os/gproc/gproc_comm.go | 28 +++++++++++ geg/os/gproc/gproc_comm2.go | 11 ++++ geg/other/test.go | 21 ++++---- 13 files changed, 123 insertions(+), 102 deletions(-) create mode 100644 geg/net/ghttp/reload/https.go create mode 100644 geg/os/gproc/gproc_comm.go create mode 100644 geg/os/gproc/gproc_comm2.go diff --git a/TODO b/TODO index fa2fbbde9..912d2466d 100644 --- a/TODO +++ b/TODO @@ -11,7 +11,7 @@ ON THE WAY: 10. ghttp.Response增加输出内容后自动退出当前请求机制,不需要用户手动return,参考beego如何实现; 11. 当二进制参数为nil时,gjson.LoadContent并将gjson.Json对象ToMap时会报错; 12. 改进控制器及执行对象注册,更友好地支持动态路由注册,例如:注册规则为 /channel/:name,现有的控制器及执行对象注册很难友好支持这种动态形式; - +13. 当前gpage分页包的输出标签不支持li,大多数CSS框架都是li+a标签模式,需要提供可更加灵活的定制化功能实现; DONE: 1. gconv完善针对不同类型的判断,例如:尽量减少sprintf("%v", xxx)来执行string类型的转换; diff --git a/g/os/gproc/gproc.go b/g/os/gproc/gproc.go index 8c435a8af..e2830e0bb 100644 --- a/g/os/gproc/gproc.go +++ b/g/os/gproc/gproc.go @@ -13,7 +13,6 @@ import ( "os" "time" "gitee.com/johng/gf/g/util/gconv" - "gitee.com/johng/gf/g/container/gtype" ) const ( @@ -23,9 +22,6 @@ const ( // 进程开始执行时间 var processStartTime = time.Now() -// 优雅退出标识符号 -var isExited = gtype.NewBool() - // 获取当前进程ID func Pid() int { return os.Getpid() @@ -64,13 +60,3 @@ func Uptime() int { return int(time.Now().UnixNano()/1e6 - processStartTime.UnixNano()/1e6) } -// 标识当前进程为退出状态,其他业务可以根据此标识来执行优雅退出 -func SetExited() { - isExited.Set(true) -} - -// 当前进程是否被标识为退出状态 -func Exited() bool { - return isExited.Val() -} - diff --git a/g/os/gproc/gproc_comm.go b/g/os/gproc/gproc_comm.go index 6ff56ef34..5f0393d58 100644 --- a/g/os/gproc/gproc_comm.go +++ b/g/os/gproc/gproc_comm.go @@ -31,17 +31,17 @@ const ( // 写入通信数据失败时候的重试次数 gPROC_COMM_FAILURE_RETRY_COUNT = 3 // (毫秒)主动通信内容检查时间间隔 - gPROC_COMM_ACTIVE_CHECK_INTERVAL = 500 + gPROC_COMM_ACTIVE_CHECK_INTERVAL = 500*time.Millisecond ) // 全局通信文件清理文件锁(同一时刻只能存在一个进程进行通信文件清理) var commClearLocker = gflock.New("comm.clear.lock") // 当前进程的文件锁 -var commLocker = gflock.New(fmt.Sprintf("%d.lock", os.Getpid())) +var commLocker = gflock.New(fmt.Sprintf("%d.lock", os.Getpid())) // 进程通信消息队列 -var commQueue = gqueue.New() -// 上一次进程通信内容检查的时间 -var commLastCheckTime = gtype.NewInt64() +var commQueue = gqueue.New() +// 通信文件修改时间 +var commUpdateTime = gtype.NewInt64() // TCP通信数据结构定义 type Msg struct { @@ -67,8 +67,9 @@ func init() { glog.Errorfln("%s is not writable for gproc", path) os.Exit(1) } - updateLastCheckTime() - if gtime.Second() - gfile.MTime(path) < 10 { + fileMtime := gfile.MTime(path) + commUpdateTime.Set(fileMtime) + if gtime.Second() - fileMtime < 10 { // 初始化时读取已有数据(文件修改时间在10秒以内) checkCommBuffer(path) } else { @@ -79,7 +80,6 @@ func init() { } // 文件事件监听,如果通信数据文件有任何变化,读取文件并添加到消息队列 err := gfsnotify.Add(path, func(event *gfsnotify.Event) { - updateLastCheckTime() checkCommBuffer(path) }) if err != nil { @@ -90,11 +90,6 @@ func init() { go autoActiveCheckComm() } -// 更新最后通信检查时间 -func updateLastCheckTime() { - commLastCheckTime.Set(gtime.Millisecond()) -} - // 自动清理通信目录文件 // @todo 目前是以时间过期规则进行清理,后期可以考虑加入进程存在性判断 func autoClearCommDir() { @@ -115,28 +110,34 @@ func autoClearCommDir() { // 主动通信内容检测 func autoActiveCheckComm() { + path := getCommFilePath(Pid()) for { - time.Sleep(gPROC_COMM_ACTIVE_CHECK_INTERVAL*time.Millisecond) - if gtime.Millisecond() - commLastCheckTime.Val() > gPROC_COMM_ACTIVE_CHECK_INTERVAL { - updateLastCheckTime() - checkCommBuffer(getCommFilePath(Pid())) - } + time.Sleep(gPROC_COMM_ACTIVE_CHECK_INTERVAL) + checkCommBuffer(path) } } // 手动检查进程通信消息,如果存在消息曾推送到进程消息队列 func checkCommBuffer(path string) { commLocker.Lock() + defer commLocker.UnLock() + // 检测文件修改时间 + mtime := gfile.MTimeMillisecond(path) + if mtime == commUpdateTime.Val() { + return + } + // 读取进程间通信数据 buffer := gfile.GetBinContents(path) if len(buffer) > 0 { os.Truncate(path, 0) } - commLocker.UnLock() if len(buffer) > 0 { for _, v := range bufferToMsgs(buffer) { commQueue.PushBack(v) } } + // 保存上一次检测的文件修改时间 + commUpdateTime.Set(mtime) } // 获取其他进程传递到当前进程的消息包,阻塞执行 @@ -149,14 +150,13 @@ func Receive() *Msg { // 向指定gproc进程发送数据 // 数据格式:总长度(32bit) | PID(32bit) | 校验(32bit) | 参数(变长) -func Send(pid int, data interface{}) error { +func Send(pid int, data []byte) error { var err error = nil - buffer := gconv.Bytes(data) b := make([]byte, 0) - b = append(b, gbinary.EncodeInt32(int32(len(buffer) + 12))...) + b = append(b, gbinary.EncodeInt32(int32(len(data) + 12))...) b = append(b, gbinary.EncodeInt32(int32(os.Getpid()))...) - b = append(b, gbinary.EncodeUint32(checksum(buffer))...) - b = append(b, buffer...) + b = append(b, gbinary.EncodeUint32(checksum(data))...) + b = append(b, data...) l := gflock.New(fmt.Sprintf("%d.lock", pid)) l.Lock() for i := gPROC_COMM_FAILURE_RETRY_COUNT; i > 0; i-- { @@ -176,6 +176,8 @@ func doSend(pid int, buffer []byte) error { if err != nil{ return err } + // 必须要Close才会更新文件的修改时间 + defer file.Close() // 获取原有文件内容大小 stat, err := file.Stat() if err != nil { diff --git a/g/os/gproc/gproc_manager.go b/g/os/gproc/gproc_manager.go index 8fb8a7ea9..a9e12105f 100644 --- a/g/os/gproc/gproc_manager.go +++ b/g/os/gproc/gproc_manager.go @@ -152,7 +152,7 @@ func (m *Manager) SignalAll(sig os.Signal) error { } // 向所有进程发送消息 -func (m *Manager) Send(data interface{}) error { +func (m *Manager) Send(data []byte) error { for _, p := range m.Processes() { if err := p.Send(data); err != nil { return err @@ -162,7 +162,7 @@ func (m *Manager) Send(data interface{}) error { } // 向指定进程发送消息 -func (m *Manager) SendTo(pid int, data interface{}) error { +func (m *Manager) SendTo(pid int, data []byte) error { return Send(pid, data) } diff --git a/g/os/gproc/gproc_proccess.go b/g/os/gproc/gproc_proccess.go index 3ee7a224f..c40cb7336 100644 --- a/g/os/gproc/gproc_proccess.go +++ b/g/os/gproc/gproc_proccess.go @@ -56,7 +56,7 @@ func (p *Process) Pid() int { } // 向进程发送消息 -func (p *Process) Send(data interface{}) error { +func (p *Process) Send(data []byte) error { if p.Process != nil { return Send(p.Process.Pid, data) } diff --git a/geg/net/ghttp/reload/https.go b/geg/net/ghttp/reload/https.go new file mode 100644 index 000000000..7804ab61c --- /dev/null +++ b/geg/net/ghttp/reload/https.go @@ -0,0 +1,17 @@ +package main + +import ( + "gitee.com/johng/gf/g" + "gitee.com/johng/gf/g/net/ghttp" +) + +func main() { + s := g.Server() + s.BindHandler("/", func(r *ghttp.Request){ + r.Response.Writeln("哈罗!") + }) + s.EnableHTTPS("/home/john/temp/server.crt", "/home/john/temp/server.key") + s.EnableAdmin() + s.SetPort(8200) + s.Run() +} \ No newline at end of file diff --git a/geg/net/ghttp/reload/multi_port_and_server.go b/geg/net/ghttp/reload/multi_port_and_server.go index 536d35cd1..96d054bf0 100644 --- a/geg/net/ghttp/reload/multi_port_and_server.go +++ b/geg/net/ghttp/reload/multi_port_and_server.go @@ -2,29 +2,17 @@ package main import ( "gitee.com/johng/gf/g" - "gitee.com/johng/gf/g/net/ghttp" - "time" - "gitee.com/johng/gf/g/os/gproc" ) func main() { s1 := g.Server("s1") s1.EnableAdmin() - s1.BindHandler("/", func(r *ghttp.Request) { - pid := gproc.Pid() - r.Response.Writeln("before restart, pid:", pid) - time.Sleep(10*time.Second) - r.Response.Writeln("after restart, pid:", gproc.Pid()) - }) - s1.BindHandler("/pid", func(r *ghttp.Request) { - r.Response.Write(gproc.Pid()) - }) - s1.SetPort(8199, 8200) + s1.SetPort(8100, 8200) s1.Start() s2 := g.Server("s2") s2.EnableAdmin() - s2.SetPort(8300, 8080) + s2.SetPort(8300, 8400) s2.Start() g.Wait() diff --git a/geg/os/gflock/gflock.go b/geg/os/gflock/gflock.go index 2b5e3256b..2adc33b75 100644 --- a/geg/os/gflock/gflock.go +++ b/geg/os/gflock/gflock.go @@ -1,29 +1,17 @@ package main import ( - "gitee.com/johng/gf/g/os/gflock" - "fmt" "time" + "gitee.com/johng/gf/g/os/glog" + "gitee.com/johng/gf/g/os/gproc" + "gitee.com/johng/gf/g/os/gflock" ) -func test() { - l := gflock.New("1.lock") - fmt.Println(l.Path()) - l.Lock() - fmt.Println("lock 1") - l.Lock() - fmt.Println("lock 2") -} - -func active() { - i := 0 - for { - time.Sleep(time.Second) - i++ - } -} - func main() { - go active() - test() + l := gflock.New("demo.lock") + l.Lock() + glog.Printfln("locked by pid: %d", gproc.Pid()) + time.Sleep(3*time.Second) + l.UnLock() + glog.Printfln("unlocked by pid: %d", gproc.Pid()) } diff --git a/geg/os/gproc/gproc.go b/geg/os/gproc/gproc.go index 7761ef34a..f8b103902 100644 --- a/geg/os/gproc/gproc.go +++ b/geg/os/gproc/gproc.go @@ -2,27 +2,25 @@ package main import ( "os" - "fmt" "time" + "gitee.com/johng/gf/g/os/glog" "gitee.com/johng/gf/g/os/gproc" - "gitee.com/johng/gf/g/os/gtime" ) func main () { - fmt.Printf("%d: I am child? %v\n", gproc.Pid(), gproc.IsChild()) if gproc.IsChild() { - gtime.SetInterval(time.Second, func() bool { - gproc.Send(gproc.PPid(), gtime.Datetime()) - return true - }) - select { } + glog.Printfln("%d: Hi, I am child, waiting 3 seconds to die", gproc.Pid()) + time.Sleep(time.Second) + glog.Printfln("%d: 1", gproc.Pid()) + time.Sleep(time.Second) + glog.Printfln("%d: 2", gproc.Pid()) + time.Sleep(time.Second) + glog.Printfln("%d: 3", gproc.Pid()) } else { m := gproc.NewManager() p := m.NewProcess(os.Args[0], os.Args, os.Environ()) p.Start() - for { - msg := gproc.Receive() - fmt.Printf("receive from %d, data: %s\n", msg.Pid, string(msg.Data)) - } + p.Wait() + glog.Printfln("%d: child died", gproc.Pid()) } } diff --git a/geg/os/gproc/gproc2.go b/geg/os/gproc/gproc2.go index c48ddd7ec..ae9590215 100644 --- a/geg/os/gproc/gproc2.go +++ b/geg/os/gproc/gproc2.go @@ -6,6 +6,10 @@ import ( ) func main () { - err := gproc.Send(23504, []byte{30}) - fmt.Println(err) + pid := 28536 + m := gproc.NewManager() + m.AddProcess(pid) + m.KillAll() + m.WaitAll() + fmt.Printf("%d was killed\n", pid) } diff --git a/geg/os/gproc/gproc_comm.go b/geg/os/gproc/gproc_comm.go new file mode 100644 index 000000000..aba0c8b08 --- /dev/null +++ b/geg/os/gproc/gproc_comm.go @@ -0,0 +1,28 @@ +package main + +import ( + "os" + "fmt" + "time" + "gitee.com/johng/gf/g/os/gproc" + "gitee.com/johng/gf/g/os/gtime" +) + +func main () { + fmt.Printf("%d: I am child? %v\n", gproc.Pid(), gproc.IsChild()) + if gproc.IsChild() { + gtime.SetInterval(time.Second, func() bool { + gproc.Send(gproc.PPid(), []byte(gtime.Datetime())) + return true + }) + select { } + } else { + m := gproc.NewManager() + p := m.NewProcess(os.Args[0], os.Args, os.Environ()) + p.Start() + for { + msg := gproc.Receive() + fmt.Printf("receive from %d, data: %s\n", msg.Pid, string(msg.Data)) + } + } +} diff --git a/geg/os/gproc/gproc_comm2.go b/geg/os/gproc/gproc_comm2.go new file mode 100644 index 000000000..c48ddd7ec --- /dev/null +++ b/geg/os/gproc/gproc_comm2.go @@ -0,0 +1,11 @@ +package main + +import ( + "fmt" + "gitee.com/johng/gf/g/os/gproc" +) + +func main () { + err := gproc.Send(23504, []byte{30}) + fmt.Println(err) +} diff --git a/geg/other/test.go b/geg/other/test.go index b37cf228b..085a3d00b 100644 --- a/geg/other/test.go +++ b/geg/other/test.go @@ -2,21 +2,20 @@ package main import ( "fmt" - "reflect" - "gitee.com/johng/gf/g/database/gdb" + "gitee.com/johng/gf/g/os/gtime" + "time" + "gitee.com/johng/gf/g/os/gfile" ) func main() { - var value interface{} - value = gdb.Map{"a":1} - refValue := reflect.ValueOf(value) + gtime.SetInterval(10*time.Millisecond, func() bool { + path := "./temp.txt" + gfile.PutBinContentsAppend(path, []byte("1")) + fmt.Println(gfile.MTimeMillisecond(path)) + return true + }) - if refValue.Kind() == reflect.Map { - keys := refValue.MapKeys() - for _, k := range keys { - fmt.Println(k, refValue.MapIndex(k).Interface()) - } - } + time.Sleep(time.Hour) }