diff --git a/g/net/ghttp/ghttp_server.go b/g/net/ghttp/ghttp_server.go index d0216ec09..21e96adc0 100644 --- a/g/net/ghttp/ghttp_server.go +++ b/g/net/ghttp/ghttp_server.go @@ -26,6 +26,7 @@ import ( "syscall" "time" "gitee.com/johng/gf/g/os/gcmd" + "gitee.com/johng/gf/g/os/gproc" ) const ( @@ -189,8 +190,7 @@ func (s *Server) startServer() { time.Sleep(1000*time.Second) return } - // 信号量控制监听 - go s.handleSignals() + // 开始执行底层Web Server创建,端口监听 var fd = 3 var wg sync.WaitGroup diff --git a/g/os/gflock/gflock.go b/g/os/gflock/gflock.go index b59718b4b..c8d10a849 100644 --- a/g/os/gflock/gflock.go +++ b/g/os/gflock/gflock.go @@ -21,13 +21,21 @@ type Locker struct { // 创建文件锁 func New(file string) *Locker { - path := gfile.TempDir() + gfile.Separator + "gflock" + gfile.Separator + file + dir := gfile.TempDir() + gfile.Separator + "gflock" + if !gfile.Exists(dir) { + gfile.Mkdir(dir) + } + path := dir + gfile.Separator + file lock := flock.NewFlock(path) return &Locker{ flock : lock, } } +func (l *Locker) Path() string { + return l.flock.Path() +} + func (l *Locker) Lock() { l.mu.Lock() l.flock.Lock() diff --git a/g/os/gfsnotify/gfsnotify.go b/g/os/gfsnotify/gfsnotify.go index a062376f7..c032f5e2c 100644 --- a/g/os/gfsnotify/gfsnotify.go +++ b/g/os/gfsnotify/gfsnotify.go @@ -25,7 +25,7 @@ import ( type Watcher struct { watcher *fsnotify.Watcher // 底层fsnotify对象 events *gqueue.Queue // 过滤后的事件通知,不会出现重复事件 - eventCache *gcache.Cache // 用于进行事件过滤,当同一监听文件在100ms内出现相同事件,则过滤 + eventCache *gcache.Cache // 用于进行事件过滤,当同一监听文件在10ms内出现相同事件,则过滤 closeChan chan struct{} // 关闭事件 callbacks *gmap.StringInterfaceMap // 监听的回调函数 } @@ -132,7 +132,7 @@ func (w *Watcher) startWatchLoop() { // 监听事件 case ev := <- w.watcher.Events: - if !w.eventCache.Lock(ev.Name + ":" + gconv.String(ev.Op), 100) { + if !w.eventCache.Lock(ev.Name + ":" + gconv.String(ev.Op), 10) { continue } w.events.PushBack(&Event{ diff --git a/g/os/gproc/gproc_comm.go b/g/os/gproc/gproc_comm.go index e948b4cf9..70a42cd6f 100644 --- a/g/os/gproc/gproc_comm.go +++ b/g/os/gproc/gproc_comm.go @@ -9,6 +9,7 @@ package gproc import ( "os" "fmt" + "gitee.com/johng/gf/g/os/glog" "gitee.com/johng/gf/g/os/gfile" "gitee.com/johng/gf/g/os/gflock" "gitee.com/johng/gf/g/util/gconv" @@ -17,8 +18,11 @@ import ( "gitee.com/johng/gf/g/encoding/gbinary" ) -// gproc进程通信共享文件目录地址 -var commDirPath = gfile.TempDir() + gfile.Separator + "gproc" +const ( + // 由于子进程的temp dir有可能会和父进程不一致,影响进程间通信,这里统一使用环境变量设置 + gPROC_TEMP_DIR_ENV_KEY = "gproc.tempdir" +) + // 当前进程的文件锁 var commLocker = gflock.New(fmt.Sprintf("%d.lock", os.Getpid())) // 进程通信消息队列 @@ -37,7 +41,7 @@ func init() { gfile.Create(path) } // 文件事件监听,如果通信数据文件有任何变化,读取文件并添加到消息队列 - gfsnotify.Add(path, func(event *gfsnotify.Event) { + err := gfsnotify.Add(path, func(event *gfsnotify.Event) { commLocker.Lock() buffer := gfile.GetBinContents(path) os.Truncate(path, 0) @@ -46,6 +50,9 @@ func init() { commQueue.PushBack(v) } }) + if err != nil { + glog.Error(err) + } } // 获取其他进程传递到当前进程的消息包,阻塞执行 @@ -74,7 +81,11 @@ func Send(pid int, data interface{}) error { // 获取指定进程的通信文件地址 func getCommFilePath(pid int) string { - return commDirPath + gfile.Separator + gconv.String(pid) + tempDir := os.Getenv("gproc.tempdir") + if tempDir == "" { + tempDir = gfile.TempDir() + } + return tempDir + gfile.Separator + "gproc" + gfile.Separator + gconv.String(pid) } // 数据解包,防止黏包 @@ -82,7 +93,7 @@ func bufferToMsgs(buffer []byte) []*Msg { s := 0 msgs := make([]*Msg, 0) for s < len(buffer) { - length := gbinary.DecodeToInt(buffer[s : 4]) + length := gbinary.DecodeToInt(buffer[s : s + 4]) if length < 0 || length > len(buffer) { s++ continue diff --git a/g/os/gproc/gproc_manager.go b/g/os/gproc/gproc_manager.go index f21da9bd1..18b397b4b 100644 --- a/g/os/gproc/gproc_manager.go +++ b/g/os/gproc/gproc_manager.go @@ -27,11 +27,12 @@ func New () *Manager { // 创建一个进程(不执行) func (m *Manager) NewProcess(path string, args []string, environment []string) *Process { - env := make([]string, len(environment) + 1) + env := make([]string, len(environment) + 2) for k, v := range environment { env[k] = v } - env[len(env) - 1] = fmt.Sprintf("%s=%d", gPROC_ENV_KEY_PPID_KEY, os.Getpid()) + env[len(env) - 2] = fmt.Sprintf("%s=%d", gPROC_ENV_KEY_PPID_KEY, os.Getpid()) + env[len(env) - 1] = fmt.Sprintf("%s=%s", gPROC_TEMP_DIR_ENV_KEY, os.TempDir()) p := &Process { pm : m, path : path, diff --git a/geg/os/gflock/flock.go b/geg/os/gflock/flock.go new file mode 100644 index 000000000..cb02bfd7b --- /dev/null +++ b/geg/os/gflock/flock.go @@ -0,0 +1,17 @@ +package main + +import ( + "fmt" + "github.com/theckman/go-flock" + "time" +) + +func main() { + l := flock.NewFlock("/tmp/go-lock.lock") + l.Lock() + fmt.Printf("lock 1") + l.Lock() + fmt.Printf("lock 1") + + time.Sleep(time.Hour) +} diff --git a/geg/os/gflock/gflock.go b/geg/os/gflock/gflock.go index 2fc22fc97..b2ad294d7 100644 --- a/geg/os/gflock/gflock.go +++ b/geg/os/gflock/gflock.go @@ -1,25 +1,17 @@ package main import ( - "github.com/theckman/go-flock" + "gitee.com/johng/gf/g/os/gflock" "fmt" + "time" ) func main() { - fileLock := flock.NewFlock("/var/lock/go-lock.lock") - - fmt.Println(fileLock.TryLock()) - fmt.Println(fileLock.TryRLock()) - //time.Sleep(1000*time.Second) -//fmt.Println(locked) -// fmt.Println(fileLock.Locked()) -//fmt.Println(err) -// if err != nil { -// // handle locking error -// } -// -// if locked { -// // do work -// fileLock.Unlock() -// } + l := gflock.New("1.lock") + fmt.Println(l.Path()) + fmt.Println(l.Lock()) + fmt.Println("lock 1") + fmt.Println(l.Lock()) + fmt.Println("lock 1") + time.Sleep(time.Hour) } diff --git a/geg/os/gfsnotify/gfsnotify.go b/geg/os/gfsnotify/gfsnotify.go index 0ab4144e7..023411a6b 100644 --- a/geg/os/gfsnotify/gfsnotify.go +++ b/geg/os/gfsnotify/gfsnotify.go @@ -6,7 +6,7 @@ import ( ) func main() { - err := gfsnotify.Add("/home/john/Documents/temp.txt", func(event *gfsnotify.Event) { + err := gfsnotify.Add("./temp.txt", func(event *gfsnotify.Event) { if event.IsCreate() { log.Println("创建文件 : ", event.Path) } diff --git a/geg/os/gproc/gproc.go b/geg/os/gproc/gproc.go index 73c4f6896..85332d7ae 100644 --- a/geg/os/gproc/gproc.go +++ b/geg/os/gproc/gproc.go @@ -4,24 +4,26 @@ import ( "os" "fmt" "time" - "gitee.com/johng/gf/g/os/gtime" "gitee.com/johng/gf/g/os/gproc" + "gitee.com/johng/gf/g/os/gtime" ) func main () { if gproc.IsChild() { - gtime.SetInterval(3*time.Second, func() bool { + fmt.Printf(" child pid is %d\n", os.Getpid()) + gtime.SetInterval(time.Second, func() bool { gproc.Send(gproc.Ppid(), gtime.Datetime()) return true }) select { } } else { + fmt.Printf("parent pid is %d\n", os.Getpid()) m := gproc.New() p := m.NewProcess(os.Args[0], os.Args, nil) p.Run() for { msg := gproc.Receive() - fmt.Printf("pid is %d, receive from %d: %s\n", os.Getpid(), msg.Pid, string(msg.Data)) + fmt.Printf("receive from %d, data: %s\n", msg.Pid, string(msg.Data)) } } } diff --git a/geg/os/gproc/gproc2.go b/geg/os/gproc/gproc2.go index 11ee52b4b..a0a3f0b56 100644 --- a/geg/os/gproc/gproc2.go +++ b/geg/os/gproc/gproc2.go @@ -1,11 +1,11 @@ package main import ( - "gitee.com/johng/gf/g/os/gproc" "fmt" + "gitee.com/johng/gf/g/os/gproc" ) func main () { - err := gproc.Send(11177, "hello process!") + err := gproc.Send(29260, "hello process!") fmt.Println(err) } diff --git a/geg/other/test.go b/geg/other/test.go index e57a6b864..9a6703bc3 100644 --- a/geg/other/test.go +++ b/geg/other/test.go @@ -3,11 +3,9 @@ package main import ( "fmt" "net/http" - "github.com/tabalt/gracehttp" - "os" - "gitee.com/johng/gf/g/os/gpm" - "time" + "gitee.com/johng/gf/g/encoding/gbinary" + "gitee.com/johng/gf/g/os/gproc" ) @@ -22,11 +20,44 @@ func test() { } } -func main() { - m := gproc.New() - args := os.Args - args = append(args, "--child=1") - p := m.NewProcess(args[0], args, nil) - p.Run() - time.Sleep(100*time.Second) +// 常见的二进制数据校验方式,生成校验结果 +func checksum(buffer []byte) uint32 { + var checksum uint32 + for _, b := range buffer { + checksum += uint32(b) + } + return checksum +} + +// 数据解包,防止黏包 +func bufferToMsgs(buffer []byte) []*gproc.Msg { + s := 0 + msgs := make([]*gproc.Msg, 0) + for s < len(buffer) { + fmt.Println(s) + length := gbinary.DecodeToInt(buffer[s : s + 4]) + if length < 0 || length > len(buffer) { + s++ + continue + } + checksum1 := gbinary.DecodeToUint32(buffer[s + 8 : s + 12]) + checksum2 := checksum(buffer[s + 12 : s + length]) + if checksum1 != checksum2 { + s++ + continue + } + msgs = append(msgs, &gproc.Msg { + Pid : gbinary.DecodeToInt(buffer[s + 4 : s + 8]), + Data : buffer[s + 12 : s + length], + }) + s += length + } + return msgs +} + + +func main() { + b := []byte{26, 0, 0, 0, 60, 109, 0, 0, 84, 5, 0, 0, 104, 101, 108, 108, 111, 32, 112, 114, 111, 99, 101, 115, 115, 33, 26, 0, 0, 0, 60, 109, 0, 0, 84, 5, 0, 0, 104, 101, 108, 108, 111, 32, 112, 114, 111, 99, 101, 115, 115, 33, 26, 0, 0, 0, 60, 109, 0, 0, 84, 5, 0, 0, 104, 101, 108, 108, 111, 32, 112, 114, 111, 99, 101, 115, 115, 33, 26, 0, 0, 0, 60, 109, 0, 0, 84, 5, 0, 0, 104, 101, 108, 108, 111, 32, 112, 114, 111, 99, 101, 115, 115, 33, 26, 0, 0, 0, 60, 109, 0, 0, 84, 5, 0, 0, 104, 101, 108, 108, 111, 32, 112, 114, 111, 99, 101, 115, 115, 33, 26, 0, 0, 0, 60, 109, 0, 0, 84, 5, 0, 0, 104, 101, 108, 108, 111, 32, 112, 114, 111, 99, 101, 115, 115, 33, 26, 0, 0, 0, 60, 109, 0, 0, 84, 5, 0, 0, 104, 101, 108, 108, 111, 32, 112, 114, 111, 99, 101, 115, 115, 33, 26, 0, 0, 0, 60, 109, 0, 0, 84, 5, 0, 0, 104, 101, 108, 108, 111, 32, 112, 114, 111, 99, 101, 115, 115, 33, 26, 0, 0, 0, 60, 109, 0, 0, 84, 5, 0, 0, 104, 101, 108, 108, 111, 32, 112, 114, 111, 99, 101, 115, 115, 33, 26, 0, 0, 0, 60, 109, 0, 0, 84, 5, 0, 0, 104, 101, 108, 108, 111, 32, 112, 114, 111, 99, 101, 115, 115, 33, 26, 0, 0, 0, 60, 109, 0, 0, 84, 5, 0, 0, 104, 101, 108, 108, 111, 32, 112, 114, 111, 99, 101, 115, 115, 33} + m := bufferToMsgs(b) + fmt.Println(m) }