完善ghttp.Server热重启特性示例,gflock示例,gproc示例,并改进gproc通信检测机制,修复写入数据文件指针未关闭问题

This commit is contained in:
John
2018-05-18 15:45:08 +08:00
parent 37e48f6f71
commit 721503019a
13 changed files with 123 additions and 102 deletions

2
TODO
View File

@ -11,7 +11,7 @@ ON THE WAY:
10. ghttp.Response增加输出内容后自动退出当前请求机制不需要用户手动return参考beego如何实现
11. 当二进制参数为nil时gjson.LoadContent并将gjson.Json对象ToMap时会报错
12. 改进控制器及执行对象注册,更友好地支持动态路由注册,例如:注册规则为 /channel/:name现有的控制器及执行对象注册很难友好支持这种动态形式
13. 当前gpage分页包的输出标签不支持li大多数CSS框架都是li+a标签模式需要提供可更加灵活的定制化功能实现
DONE:
1. gconv完善针对不同类型的判断例如尽量减少sprintf("%v", xxx)来执行string类型的转换

View File

@ -13,7 +13,6 @@ import (
"os"
"time"
"gitee.com/johng/gf/g/util/gconv"
"gitee.com/johng/gf/g/container/gtype"
)
const (
@ -23,9 +22,6 @@ const (
// 进程开始执行时间
var processStartTime = time.Now()
// 优雅退出标识符号
var isExited = gtype.NewBool()
// 获取当前进程ID
func Pid() int {
return os.Getpid()
@ -64,13 +60,3 @@ func Uptime() int {
return int(time.Now().UnixNano()/1e6 - processStartTime.UnixNano()/1e6)
}
// 标识当前进程为退出状态,其他业务可以根据此标识来执行优雅退出
func SetExited() {
isExited.Set(true)
}
// 当前进程是否被标识为退出状态
func Exited() bool {
return isExited.Val()
}

View File

@ -31,17 +31,17 @@ const (
// 写入通信数据失败时候的重试次数
gPROC_COMM_FAILURE_RETRY_COUNT = 3
// (毫秒)主动通信内容检查时间间隔
gPROC_COMM_ACTIVE_CHECK_INTERVAL = 500
gPROC_COMM_ACTIVE_CHECK_INTERVAL = 500*time.Millisecond
)
// 全局通信文件清理文件锁(同一时刻只能存在一个进程进行通信文件清理)
var commClearLocker = gflock.New("comm.clear.lock")
// 当前进程的文件锁
var commLocker = gflock.New(fmt.Sprintf("%d.lock", os.Getpid()))
var commLocker = gflock.New(fmt.Sprintf("%d.lock", os.Getpid()))
// 进程通信消息队列
var commQueue = gqueue.New()
// 上一次进程通信内容检查的时间
var commLastCheckTime = gtype.NewInt64()
var commQueue = gqueue.New()
// 通信文件修改时间
var commUpdateTime = gtype.NewInt64()
// TCP通信数据结构定义
type Msg struct {
@ -67,8 +67,9 @@ func init() {
glog.Errorfln("%s is not writable for gproc", path)
os.Exit(1)
}
updateLastCheckTime()
if gtime.Second() - gfile.MTime(path) < 10 {
fileMtime := gfile.MTime(path)
commUpdateTime.Set(fileMtime)
if gtime.Second() - fileMtime < 10 {
// 初始化时读取已有数据(文件修改时间在10秒以内)
checkCommBuffer(path)
} else {
@ -79,7 +80,6 @@ func init() {
}
// 文件事件监听,如果通信数据文件有任何变化,读取文件并添加到消息队列
err := gfsnotify.Add(path, func(event *gfsnotify.Event) {
updateLastCheckTime()
checkCommBuffer(path)
})
if err != nil {
@ -90,11 +90,6 @@ func init() {
go autoActiveCheckComm()
}
// 更新最后通信检查时间
func updateLastCheckTime() {
commLastCheckTime.Set(gtime.Millisecond())
}
// 自动清理通信目录文件
// @todo 目前是以时间过期规则进行清理,后期可以考虑加入进程存在性判断
func autoClearCommDir() {
@ -115,28 +110,34 @@ func autoClearCommDir() {
// 主动通信内容检测
func autoActiveCheckComm() {
path := getCommFilePath(Pid())
for {
time.Sleep(gPROC_COMM_ACTIVE_CHECK_INTERVAL*time.Millisecond)
if gtime.Millisecond() - commLastCheckTime.Val() > gPROC_COMM_ACTIVE_CHECK_INTERVAL {
updateLastCheckTime()
checkCommBuffer(getCommFilePath(Pid()))
}
time.Sleep(gPROC_COMM_ACTIVE_CHECK_INTERVAL)
checkCommBuffer(path)
}
}
// 手动检查进程通信消息,如果存在消息曾推送到进程消息队列
func checkCommBuffer(path string) {
commLocker.Lock()
defer commLocker.UnLock()
// 检测文件修改时间
mtime := gfile.MTimeMillisecond(path)
if mtime == commUpdateTime.Val() {
return
}
// 读取进程间通信数据
buffer := gfile.GetBinContents(path)
if len(buffer) > 0 {
os.Truncate(path, 0)
}
commLocker.UnLock()
if len(buffer) > 0 {
for _, v := range bufferToMsgs(buffer) {
commQueue.PushBack(v)
}
}
// 保存上一次检测的文件修改时间
commUpdateTime.Set(mtime)
}
// 获取其他进程传递到当前进程的消息包,阻塞执行
@ -149,14 +150,13 @@ func Receive() *Msg {
// 向指定gproc进程发送数据
// 数据格式:总长度(32bit) | PID(32bit) | 校验(32bit) | 参数(变长)
func Send(pid int, data interface{}) error {
func Send(pid int, data []byte) error {
var err error = nil
buffer := gconv.Bytes(data)
b := make([]byte, 0)
b = append(b, gbinary.EncodeInt32(int32(len(buffer) + 12))...)
b = append(b, gbinary.EncodeInt32(int32(len(data) + 12))...)
b = append(b, gbinary.EncodeInt32(int32(os.Getpid()))...)
b = append(b, gbinary.EncodeUint32(checksum(buffer))...)
b = append(b, buffer...)
b = append(b, gbinary.EncodeUint32(checksum(data))...)
b = append(b, data...)
l := gflock.New(fmt.Sprintf("%d.lock", pid))
l.Lock()
for i := gPROC_COMM_FAILURE_RETRY_COUNT; i > 0; i-- {
@ -176,6 +176,8 @@ func doSend(pid int, buffer []byte) error {
if err != nil{
return err
}
// 必须要Close才会更新文件的修改时间
defer file.Close()
// 获取原有文件内容大小
stat, err := file.Stat()
if err != nil {

View File

@ -152,7 +152,7 @@ func (m *Manager) SignalAll(sig os.Signal) error {
}
// 向所有进程发送消息
func (m *Manager) Send(data interface{}) error {
func (m *Manager) Send(data []byte) error {
for _, p := range m.Processes() {
if err := p.Send(data); err != nil {
return err
@ -162,7 +162,7 @@ func (m *Manager) Send(data interface{}) error {
}
// 向指定进程发送消息
func (m *Manager) SendTo(pid int, data interface{}) error {
func (m *Manager) SendTo(pid int, data []byte) error {
return Send(pid, data)
}

View File

@ -56,7 +56,7 @@ func (p *Process) Pid() int {
}
// 向进程发送消息
func (p *Process) Send(data interface{}) error {
func (p *Process) Send(data []byte) error {
if p.Process != nil {
return Send(p.Process.Pid, data)
}

View File

@ -0,0 +1,17 @@
package main
import (
"gitee.com/johng/gf/g"
"gitee.com/johng/gf/g/net/ghttp"
)
func main() {
s := g.Server()
s.BindHandler("/", func(r *ghttp.Request){
r.Response.Writeln("哈罗!")
})
s.EnableHTTPS("/home/john/temp/server.crt", "/home/john/temp/server.key")
s.EnableAdmin()
s.SetPort(8200)
s.Run()
}

View File

@ -2,29 +2,17 @@ package main
import (
"gitee.com/johng/gf/g"
"gitee.com/johng/gf/g/net/ghttp"
"time"
"gitee.com/johng/gf/g/os/gproc"
)
func main() {
s1 := g.Server("s1")
s1.EnableAdmin()
s1.BindHandler("/", func(r *ghttp.Request) {
pid := gproc.Pid()
r.Response.Writeln("before restart, pid:", pid)
time.Sleep(10*time.Second)
r.Response.Writeln("after restart, pid:", gproc.Pid())
})
s1.BindHandler("/pid", func(r *ghttp.Request) {
r.Response.Write(gproc.Pid())
})
s1.SetPort(8199, 8200)
s1.SetPort(8100, 8200)
s1.Start()
s2 := g.Server("s2")
s2.EnableAdmin()
s2.SetPort(8300, 8080)
s2.SetPort(8300, 8400)
s2.Start()
g.Wait()

View File

@ -1,29 +1,17 @@
package main
import (
"gitee.com/johng/gf/g/os/gflock"
"fmt"
"time"
"gitee.com/johng/gf/g/os/glog"
"gitee.com/johng/gf/g/os/gproc"
"gitee.com/johng/gf/g/os/gflock"
)
func test() {
l := gflock.New("1.lock")
fmt.Println(l.Path())
l.Lock()
fmt.Println("lock 1")
l.Lock()
fmt.Println("lock 2")
}
func active() {
i := 0
for {
time.Sleep(time.Second)
i++
}
}
func main() {
go active()
test()
l := gflock.New("demo.lock")
l.Lock()
glog.Printfln("locked by pid: %d", gproc.Pid())
time.Sleep(3*time.Second)
l.UnLock()
glog.Printfln("unlocked by pid: %d", gproc.Pid())
}

View File

@ -2,27 +2,25 @@ package main
import (
"os"
"fmt"
"time"
"gitee.com/johng/gf/g/os/glog"
"gitee.com/johng/gf/g/os/gproc"
"gitee.com/johng/gf/g/os/gtime"
)
func main () {
fmt.Printf("%d: I am child? %v\n", gproc.Pid(), gproc.IsChild())
if gproc.IsChild() {
gtime.SetInterval(time.Second, func() bool {
gproc.Send(gproc.PPid(), gtime.Datetime())
return true
})
select { }
glog.Printfln("%d: Hi, I am child, waiting 3 seconds to die", gproc.Pid())
time.Sleep(time.Second)
glog.Printfln("%d: 1", gproc.Pid())
time.Sleep(time.Second)
glog.Printfln("%d: 2", gproc.Pid())
time.Sleep(time.Second)
glog.Printfln("%d: 3", gproc.Pid())
} else {
m := gproc.NewManager()
p := m.NewProcess(os.Args[0], os.Args, os.Environ())
p.Start()
for {
msg := gproc.Receive()
fmt.Printf("receive from %d, data: %s\n", msg.Pid, string(msg.Data))
}
p.Wait()
glog.Printfln("%d: child died", gproc.Pid())
}
}

View File

@ -6,6 +6,10 @@ import (
)
func main () {
err := gproc.Send(23504, []byte{30})
fmt.Println(err)
pid := 28536
m := gproc.NewManager()
m.AddProcess(pid)
m.KillAll()
m.WaitAll()
fmt.Printf("%d was killed\n", pid)
}

View File

@ -0,0 +1,28 @@
package main
import (
"os"
"fmt"
"time"
"gitee.com/johng/gf/g/os/gproc"
"gitee.com/johng/gf/g/os/gtime"
)
func main () {
fmt.Printf("%d: I am child? %v\n", gproc.Pid(), gproc.IsChild())
if gproc.IsChild() {
gtime.SetInterval(time.Second, func() bool {
gproc.Send(gproc.PPid(), []byte(gtime.Datetime()))
return true
})
select { }
} else {
m := gproc.NewManager()
p := m.NewProcess(os.Args[0], os.Args, os.Environ())
p.Start()
for {
msg := gproc.Receive()
fmt.Printf("receive from %d, data: %s\n", msg.Pid, string(msg.Data))
}
}
}

View File

@ -0,0 +1,11 @@
package main
import (
"fmt"
"gitee.com/johng/gf/g/os/gproc"
)
func main () {
err := gproc.Send(23504, []byte{30})
fmt.Println(err)
}

View File

@ -2,21 +2,20 @@ package main
import (
"fmt"
"reflect"
"gitee.com/johng/gf/g/database/gdb"
"gitee.com/johng/gf/g/os/gtime"
"time"
"gitee.com/johng/gf/g/os/gfile"
)
func main() {
var value interface{}
value = gdb.Map{"a":1}
refValue := reflect.ValueOf(value)
gtime.SetInterval(10*time.Millisecond, func() bool {
path := "./temp.txt"
gfile.PutBinContentsAppend(path, []byte("1"))
fmt.Println(gfile.MTimeMillisecond(path))
return true
})
if refValue.Kind() == reflect.Map {
keys := refValue.MapKeys()
for _, k := range keys {
fmt.Println(k, refValue.MapIndex(k).Interface())
}
}
time.Sleep(time.Hour)
}