完成gproc包的测试

This commit is contained in:
John
2018-05-10 17:48:47 +08:00
parent dc20caeb27
commit d97e8c8204
11 changed files with 108 additions and 46 deletions

View File

@ -26,6 +26,7 @@ import (
"syscall"
"time"
"gitee.com/johng/gf/g/os/gcmd"
"gitee.com/johng/gf/g/os/gproc"
)
const (
@ -189,8 +190,7 @@ func (s *Server) startServer() {
time.Sleep(1000*time.Second)
return
}
// 信号量控制监听
go s.handleSignals()
// 开始执行底层Web Server创建端口监听
var fd = 3
var wg sync.WaitGroup

View File

@ -21,13 +21,21 @@ type Locker struct {
// 创建文件锁
func New(file string) *Locker {
path := gfile.TempDir() + gfile.Separator + "gflock" + gfile.Separator + file
dir := gfile.TempDir() + gfile.Separator + "gflock"
if !gfile.Exists(dir) {
gfile.Mkdir(dir)
}
path := dir + gfile.Separator + file
lock := flock.NewFlock(path)
return &Locker{
flock : lock,
}
}
func (l *Locker) Path() string {
return l.flock.Path()
}
func (l *Locker) Lock() {
l.mu.Lock()
l.flock.Lock()

View File

@ -25,7 +25,7 @@ import (
type Watcher struct {
watcher *fsnotify.Watcher // 底层fsnotify对象
events *gqueue.Queue // 过滤后的事件通知,不会出现重复事件
eventCache *gcache.Cache // 用于进行事件过滤当同一监听文件在100ms内出现相同事件则过滤
eventCache *gcache.Cache // 用于进行事件过滤当同一监听文件在10ms内出现相同事件则过滤
closeChan chan struct{} // 关闭事件
callbacks *gmap.StringInterfaceMap // 监听的回调函数
}
@ -132,7 +132,7 @@ func (w *Watcher) startWatchLoop() {
// 监听事件
case ev := <- w.watcher.Events:
if !w.eventCache.Lock(ev.Name + ":" + gconv.String(ev.Op), 100) {
if !w.eventCache.Lock(ev.Name + ":" + gconv.String(ev.Op), 10) {
continue
}
w.events.PushBack(&Event{

View File

@ -9,6 +9,7 @@ package gproc
import (
"os"
"fmt"
"gitee.com/johng/gf/g/os/glog"
"gitee.com/johng/gf/g/os/gfile"
"gitee.com/johng/gf/g/os/gflock"
"gitee.com/johng/gf/g/util/gconv"
@ -17,8 +18,11 @@ import (
"gitee.com/johng/gf/g/encoding/gbinary"
)
// gproc进程通信共享文件目录地址
var commDirPath = gfile.TempDir() + gfile.Separator + "gproc"
const (
// 由于子进程的temp dir有可能会和父进程不一致影响进程间通信这里统一使用环境变量设置
gPROC_TEMP_DIR_ENV_KEY = "gproc.tempdir"
)
// 当前进程的文件锁
var commLocker = gflock.New(fmt.Sprintf("%d.lock", os.Getpid()))
// 进程通信消息队列
@ -37,7 +41,7 @@ func init() {
gfile.Create(path)
}
// 文件事件监听,如果通信数据文件有任何变化,读取文件并添加到消息队列
gfsnotify.Add(path, func(event *gfsnotify.Event) {
err := gfsnotify.Add(path, func(event *gfsnotify.Event) {
commLocker.Lock()
buffer := gfile.GetBinContents(path)
os.Truncate(path, 0)
@ -46,6 +50,9 @@ func init() {
commQueue.PushBack(v)
}
})
if err != nil {
glog.Error(err)
}
}
// 获取其他进程传递到当前进程的消息包,阻塞执行
@ -74,7 +81,11 @@ func Send(pid int, data interface{}) error {
// 获取指定进程的通信文件地址
func getCommFilePath(pid int) string {
return commDirPath + gfile.Separator + gconv.String(pid)
tempDir := os.Getenv("gproc.tempdir")
if tempDir == "" {
tempDir = gfile.TempDir()
}
return tempDir + gfile.Separator + "gproc" + gfile.Separator + gconv.String(pid)
}
// 数据解包,防止黏包
@ -82,7 +93,7 @@ func bufferToMsgs(buffer []byte) []*Msg {
s := 0
msgs := make([]*Msg, 0)
for s < len(buffer) {
length := gbinary.DecodeToInt(buffer[s : 4])
length := gbinary.DecodeToInt(buffer[s : s + 4])
if length < 0 || length > len(buffer) {
s++
continue

View File

@ -27,11 +27,12 @@ func New () *Manager {
// 创建一个进程(不执行)
func (m *Manager) NewProcess(path string, args []string, environment []string) *Process {
env := make([]string, len(environment) + 1)
env := make([]string, len(environment) + 2)
for k, v := range environment {
env[k] = v
}
env[len(env) - 1] = fmt.Sprintf("%s=%d", gPROC_ENV_KEY_PPID_KEY, os.Getpid())
env[len(env) - 2] = fmt.Sprintf("%s=%d", gPROC_ENV_KEY_PPID_KEY, os.Getpid())
env[len(env) - 1] = fmt.Sprintf("%s=%s", gPROC_TEMP_DIR_ENV_KEY, os.TempDir())
p := &Process {
pm : m,
path : path,

17
geg/os/gflock/flock.go Normal file
View File

@ -0,0 +1,17 @@
package main
import (
"fmt"
"github.com/theckman/go-flock"
"time"
)
func main() {
l := flock.NewFlock("/tmp/go-lock.lock")
l.Lock()
fmt.Printf("lock 1")
l.Lock()
fmt.Printf("lock 1")
time.Sleep(time.Hour)
}

View File

@ -1,25 +1,17 @@
package main
import (
"github.com/theckman/go-flock"
"gitee.com/johng/gf/g/os/gflock"
"fmt"
"time"
)
func main() {
fileLock := flock.NewFlock("/var/lock/go-lock.lock")
fmt.Println(fileLock.TryLock())
fmt.Println(fileLock.TryRLock())
//time.Sleep(1000*time.Second)
//fmt.Println(locked)
// fmt.Println(fileLock.Locked())
//fmt.Println(err)
// if err != nil {
// // handle locking error
// }
//
// if locked {
// // do work
// fileLock.Unlock()
// }
l := gflock.New("1.lock")
fmt.Println(l.Path())
fmt.Println(l.Lock())
fmt.Println("lock 1")
fmt.Println(l.Lock())
fmt.Println("lock 1")
time.Sleep(time.Hour)
}

View File

@ -6,7 +6,7 @@ import (
)
func main() {
err := gfsnotify.Add("/home/john/Documents/temp.txt", func(event *gfsnotify.Event) {
err := gfsnotify.Add("./temp.txt", func(event *gfsnotify.Event) {
if event.IsCreate() {
log.Println("创建文件 : ", event.Path)
}

View File

@ -4,24 +4,26 @@ import (
"os"
"fmt"
"time"
"gitee.com/johng/gf/g/os/gtime"
"gitee.com/johng/gf/g/os/gproc"
"gitee.com/johng/gf/g/os/gtime"
)
func main () {
if gproc.IsChild() {
gtime.SetInterval(3*time.Second, func() bool {
fmt.Printf(" child pid is %d\n", os.Getpid())
gtime.SetInterval(time.Second, func() bool {
gproc.Send(gproc.Ppid(), gtime.Datetime())
return true
})
select { }
} else {
fmt.Printf("parent pid is %d\n", os.Getpid())
m := gproc.New()
p := m.NewProcess(os.Args[0], os.Args, nil)
p.Run()
for {
msg := gproc.Receive()
fmt.Printf("pid is %d, receive from %d: %s\n", os.Getpid(), msg.Pid, string(msg.Data))
fmt.Printf("receive from %d, data: %s\n", msg.Pid, string(msg.Data))
}
}
}

View File

@ -1,11 +1,11 @@
package main
import (
"gitee.com/johng/gf/g/os/gproc"
"fmt"
"gitee.com/johng/gf/g/os/gproc"
)
func main () {
err := gproc.Send(11177, "hello process!")
err := gproc.Send(29260, "hello process!")
fmt.Println(err)
}

View File

@ -3,11 +3,9 @@ package main
import (
"fmt"
"net/http"
"github.com/tabalt/gracehttp"
"os"
"gitee.com/johng/gf/g/os/gpm"
"time"
"gitee.com/johng/gf/g/encoding/gbinary"
"gitee.com/johng/gf/g/os/gproc"
)
@ -22,11 +20,44 @@ func test() {
}
}
func main() {
m := gproc.New()
args := os.Args
args = append(args, "--child=1")
p := m.NewProcess(args[0], args, nil)
p.Run()
time.Sleep(100*time.Second)
// 常见的二进制数据校验方式,生成校验结果
func checksum(buffer []byte) uint32 {
var checksum uint32
for _, b := range buffer {
checksum += uint32(b)
}
return checksum
}
// 数据解包,防止黏包
func bufferToMsgs(buffer []byte) []*gproc.Msg {
s := 0
msgs := make([]*gproc.Msg, 0)
for s < len(buffer) {
fmt.Println(s)
length := gbinary.DecodeToInt(buffer[s : s + 4])
if length < 0 || length > len(buffer) {
s++
continue
}
checksum1 := gbinary.DecodeToUint32(buffer[s + 8 : s + 12])
checksum2 := checksum(buffer[s + 12 : s + length])
if checksum1 != checksum2 {
s++
continue
}
msgs = append(msgs, &gproc.Msg {
Pid : gbinary.DecodeToInt(buffer[s + 4 : s + 8]),
Data : buffer[s + 12 : s + length],
})
s += length
}
return msgs
}
func main() {
b := []byte{26, 0, 0, 0, 60, 109, 0, 0, 84, 5, 0, 0, 104, 101, 108, 108, 111, 32, 112, 114, 111, 99, 101, 115, 115, 33, 26, 0, 0, 0, 60, 109, 0, 0, 84, 5, 0, 0, 104, 101, 108, 108, 111, 32, 112, 114, 111, 99, 101, 115, 115, 33, 26, 0, 0, 0, 60, 109, 0, 0, 84, 5, 0, 0, 104, 101, 108, 108, 111, 32, 112, 114, 111, 99, 101, 115, 115, 33, 26, 0, 0, 0, 60, 109, 0, 0, 84, 5, 0, 0, 104, 101, 108, 108, 111, 32, 112, 114, 111, 99, 101, 115, 115, 33, 26, 0, 0, 0, 60, 109, 0, 0, 84, 5, 0, 0, 104, 101, 108, 108, 111, 32, 112, 114, 111, 99, 101, 115, 115, 33, 26, 0, 0, 0, 60, 109, 0, 0, 84, 5, 0, 0, 104, 101, 108, 108, 111, 32, 112, 114, 111, 99, 101, 115, 115, 33, 26, 0, 0, 0, 60, 109, 0, 0, 84, 5, 0, 0, 104, 101, 108, 108, 111, 32, 112, 114, 111, 99, 101, 115, 115, 33, 26, 0, 0, 0, 60, 109, 0, 0, 84, 5, 0, 0, 104, 101, 108, 108, 111, 32, 112, 114, 111, 99, 101, 115, 115, 33, 26, 0, 0, 0, 60, 109, 0, 0, 84, 5, 0, 0, 104, 101, 108, 108, 111, 32, 112, 114, 111, 99, 101, 115, 115, 33, 26, 0, 0, 0, 60, 109, 0, 0, 84, 5, 0, 0, 104, 101, 108, 108, 111, 32, 112, 114, 111, 99, 101, 115, 115, 33, 26, 0, 0, 0, 60, 109, 0, 0, 84, 5, 0, 0, 104, 101, 108, 108, 111, 32, 112, 114, 111, 99, 101, 115, 115, 33}
m := bufferToMsgs(b)
fmt.Println(m)
}