diff --git a/g/os/gfile/gfile.go b/g/os/gfile/gfile.go index 4993dba69..1efde864e 100644 --- a/g/os/gfile/gfile.go +++ b/g/os/gfile/gfile.go @@ -320,6 +320,11 @@ func putContents(path string, data []byte, flag int, perm os.FileMode) error { return nil } +// Truncate +func Truncate(path string, size int) error { + return os.Truncate(path, int64(size)) +} + // (文本)写入文件内容 func PutContents(path string, content string) error { return putContents(path, []byte(content), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666) diff --git a/g/os/gflock/gflock.go b/g/os/gflock/gflock.go index 08e5b2c8a..b59718b4b 100644 --- a/g/os/gflock/gflock.go +++ b/g/os/gflock/gflock.go @@ -8,22 +8,42 @@ package gflock import ( + "sync" "github.com/theckman/go-flock" "gitee.com/johng/gf/g/os/gfile" ) // 文件锁 type Locker struct { + mu sync.RWMutex flock *flock.Flock } // 创建文件锁 func New(file string) *Locker { - path := gfile.TempDir() + gfile.Separator + file + path := gfile.TempDir() + gfile.Separator + "gflock" + gfile.Separator + file lock := flock.NewFlock(path) return &Locker{ - lock, + flock : lock, } } +func (l *Locker) Lock() { + l.mu.Lock() + l.flock.Lock() +} +func (l *Locker) UnLock() { + l.flock.Unlock() + l.mu.Unlock() +} + +func (l *Locker) RLock() { + l.mu.RLock() + l.flock.RLock() +} + +func (l *Locker) RUnlock() { + l.flock.Unlock() + l.mu.RUnlock() +} diff --git a/g/os/gflock/internals/flock/LICENSE b/g/os/gflock/internals/flock/LICENSE deleted file mode 100644 index aff7d358e..000000000 --- a/g/os/gflock/internals/flock/LICENSE +++ /dev/null @@ -1,27 +0,0 @@ -Copyright (c) 2015, Tim Heckman -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are met: - -* Redistributions of source code must retain the above copyright notice, this - list of conditions and the following disclaimer. - -* Redistributions in binary form must reproduce the above copyright notice, - this list of conditions and the following disclaimer in the documentation - and/or other materials provided with the distribution. - -* Neither the name of linode-netint nor the names of its - contributors may be used to endorse or promote products derived from - this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff --git a/g/os/gflock/internals/flock/flock.go b/g/os/gflock/internals/flock/flock.go deleted file mode 100644 index d04059748..000000000 --- a/g/os/gflock/internals/flock/flock.go +++ /dev/null @@ -1,109 +0,0 @@ -// From & Thanks: https://github.com/theckman/go-flock -// Copyright 2015 Tim Heckman. All rights reserved. -// Use of this source code is governed by the BSD 3-Clause -// license that can be found in the LICENSE file. - -// Package flock implements a thread-safe sync.Locker interface for file locking. -// It also includes a non-blocking TryLock() function to allow locking -// without blocking execution. -// -// Package flock is released under the BSD 3-Clause License. See the LICENSE file -// for more details. -// -// While using this library, remember that the locking behaviors are not -// guaranteed to be the same on each platform. For example, some UNIX-like -// operating systems will transparently convert a shared lock to an exclusive -// lock. If you Unlock() the flock from a location where you believe that you -// have the shared lock, you may accidently drop the exclusive lock. -package flock - -import ( - "context" - "os" - "sync" - "time" -) - -// Flock is the struct type to handle file locking. All fields are unexported, -// with access to some of the fields provided by getter methods (Path() and Locked()). -type Flock struct { - mu sync.RWMutex // 用于接口层的阻塞互斥锁 - path string - m sync.RWMutex - fh *os.File - l bool - r bool -} - -// NewFlock is a function to return a new instance of *Flock. The only parameter -// it takes is the path to the desired lockfile. -func NewFlock(path string) *Flock { - return &Flock{path: path} -} - -// Path is a function to return the path as provided in NewFlock(). -func (f *Flock) Path() string { - return f.path -} - -// Locked is a function to return the current lock state (locked: true, unlocked: false). -func (f *Flock) Locked() bool { - f.m.RLock() - defer f.m.RUnlock() - return f.l -} - -// RLocked is a function to return the current read lock state (locked: true, unlocked: false). -func (f *Flock) RLocked() bool { - f.m.RLock() - defer f.m.RUnlock() - return f.r -} - -func (f *Flock) String() string { - return f.path -} - -// TryLockContext repeatedly tries to take an exclusive lock until one of the -// conditions is met: TryLock succeeds, TryLock fails with error, or Context -// Done channel is closed. -func (f *Flock) TryLockContext(ctx context.Context, retryDelay time.Duration) (bool, error) { - return tryCtx(f.TryLock, ctx, retryDelay) -} - -// TryRLockContext repeatedly tries to take a shared lock until one of the -// conditions is met: TryRLock succeeds, TryRLock fails with error, or Context -// Done channel is closed. -func (f *Flock) TryRLockContext(ctx context.Context, retryDelay time.Duration) (bool, error) { - return tryCtx(f.TryRLock, ctx, retryDelay) -} - -func tryCtx(fn func() (bool, error), ctx context.Context, retryDelay time.Duration) (bool, error) { - if ctx.Err() != nil { - return false, ctx.Err() - } - for { - if ok, err := fn(); ok || err != nil { - return ok, err - } - select { - case <-ctx.Done(): - return false, ctx.Err() - case <-time.After(retryDelay): - // try again - } - } -} - -func (f *Flock) setFh() error { - // open a new os.File instance - // create it if it doesn't exist, and open the file read-only. - fh, err := os.OpenFile(f.path, os.O_CREATE|os.O_RDONLY, os.FileMode(0600)) - if err != nil { - return err - } - - // set the filehandle on the struct - f.fh = fh - return nil -} diff --git a/g/os/gflock/internals/flock/flock_unix.go b/g/os/gflock/internals/flock/flock_unix.go deleted file mode 100644 index 8aa896f80..000000000 --- a/g/os/gflock/internals/flock/flock_unix.go +++ /dev/null @@ -1,159 +0,0 @@ -// Copyright 2015 Tim Heckman. All rights reserved. -// Use of this source code is governed by the BSD 3-Clause -// license that can be found in the LICENSE file. - -// +build !windows - -package flock - -import ( - "syscall" -) - -// Lock is a blocking call to try and take an exclusive file lock. It will wait -// until it is able to obtain the exclusive file lock. It's recommended that -// TryLock() be used over this function. This function may block the ability to -// query the current Locked() or RLocked() status due to a RW-mutex lock. -// -// If we are already exclusive-locked, this function short-circuits and returns -// immediately assuming it can take the mutex lock. -// -// If the *Flock has a shared lock (RLock), this may transparently replace the -// shared lock with an exclusive lock on some UNIX-like operating systems. Be -// careful when using exclusive locks in conjunction with shared locks -// (RLock()), because calling Unlock() may accidentally release the exclusive -// lock that was once a shared lock. -func (f *Flock) Lock() error { - f.mu.Lock() - err := f.lock(&f.l, syscall.LOCK_EX) - if err != nil { - f.mu.Unlock() - return err - } - return nil -} - -// RLock is a blocking call to try and take a ahred file lock. It will wait -// until it is able to obtain the shared file lock. It's recommended that -// TryRLock() be used over this function. This function may block the ability to -// query the current Locked() or RLocked() status due to a RW-mutex lock. -// -// If we are already shared-locked, this function short-circuits and returns -// immediately assuming it can take the mutex lock. -func (f *Flock) RLock() error { - f.mu.RLock() - err := f.lock(&f.r, syscall.LOCK_SH) - if err != nil { - f.mu.RUnlock() - return err - } - return nil -} - -func (f *Flock) lock(locked *bool, flag int) error { - f.m.Lock() - defer f.m.Unlock() - - if *locked { - return nil - } - - if f.fh == nil { - if err := f.setFh(); err != nil { - return err - } - } - - if err := syscall.Flock(int(f.fh.Fd()), flag); err != nil { - return err - } - - *locked = true - return nil -} - -// Unlock is a function to unlock the file. This file takes a RW-mutex lock, so -// while it is running the Locked() and RLocked() functions will be blocked. -// -// This function short-circuits if we are unlocked already. If not, it calls -// syscall.LOCK_UN on the file and closes the file descriptor. It does not -// remove the file from disk. It's up to your application to do. -// -// Please note, if your shared lock became an exclusive lock this may -// unintentionally drop the exclusive lock if called by the consumer that -// believes they have a shared lock. Please see Lock() for more details. -func (f *Flock) Unlock() error { - f.m.Lock() - defer f.m.Unlock() - - // if we aren't locked or if the lockfile instance is nil - // just return a nil error because we are unlocked - if (!f.l && !f.r) || f.fh == nil { - return nil - } - - // mark the file as unlocked - if err := syscall.Flock(int(f.fh.Fd()), syscall.LOCK_UN); err != nil { - return err - } - - f.fh.Close() - - f.l = false - f.r = false - f.fh = nil - - f.mu.Unlock() - return nil -} - -// TryLock is the preferred function for taking an exclusive file lock. This -// function takes an RW-mutex lock before it tries to lock the file, so there is -// the possibility that this function may block for a short time if another -// goroutine is trying to take any action. -// -// The actual file lock is non-blocking. If we are unable to get the exclusive -// file lock, the function will return false instead of waiting for the lock. If -// we get the lock, we also set the *Flock instance as being exclusive-locked. -func (f *Flock) TryLock() (bool, error) { - return f.try(&f.l, syscall.LOCK_EX) -} - -// TryRLock is the preferred function for taking a shared file lock. This -// function takes an RW-mutex lock before it tries to lock the file, so there is -// the possibility that this function may block for a short time if another -// goroutine is trying to take any action. -// -// The actual file lock is non-blocking. If we are unable to get the shared file -// lock, the function will return false instead of waiting for the lock. If we -// get the lock, we also set the *Flock instance as being share-locked. -func (f *Flock) TryRLock() (bool, error) { - return f.try(&f.r, syscall.LOCK_SH) -} - -func (f *Flock) try(locked *bool, flag int) (bool, error) { - f.m.Lock() - defer f.m.Unlock() - - if *locked { - return true, nil - } - - if f.fh == nil { - if err := f.setFh(); err != nil { - return false, err - } - } - - err := syscall.Flock(int(f.fh.Fd()), flag|syscall.LOCK_NB) - - switch err { - case syscall.EWOULDBLOCK: - return false, nil - case nil: - *locked = true - return true, nil - } - - return false, err -} diff --git a/g/os/gflock/internals/flock/flock_winapi.go b/g/os/gflock/internals/flock/flock_winapi.go deleted file mode 100644 index fe405a255..000000000 --- a/g/os/gflock/internals/flock/flock_winapi.go +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright 2015 Tim Heckman. All rights reserved. -// Use of this source code is governed by the BSD 3-Clause -// license that can be found in the LICENSE file. - -// +build windows - -package flock - -import ( - "syscall" - "unsafe" -) - -var ( - kernel32, _ = syscall.LoadLibrary("kernel32.dll") - procLockFileEx, _ = syscall.GetProcAddress(kernel32, "LockFileEx") - procUnlockFileEx, _ = syscall.GetProcAddress(kernel32, "UnlockFileEx") -) - -const ( - winLockfileFailImmediately = 0x00000001 - winLockfileExclusiveLock = 0x00000002 - winLockfileSharedLock = 0x00000000 -) - -// Use of 0x00000000 for the shared lock is a guess based on some the MS Windows -// `LockFileEX` docs, which document the `LOCKFILE_EXCLUSIVE_LOCK` flag as: -// -// > The function requests an exclusive lock. Otherwise, it requests a shared -// > lock. -// -// https://msdn.microsoft.com/en-us/library/windows/desktop/aa365203(v=vs.85).aspx - -func lockFileEx(handle syscall.Handle, flags uint32, reserved uint32, numberOfBytesToLockLow uint32, numberOfBytesToLockHigh uint32, offset *syscall.Overlapped) (bool, syscall.Errno) { - r1, _, errNo := syscall.Syscall6( - uintptr(procLockFileEx), - 6, - uintptr(handle), - uintptr(flags), - uintptr(reserved), - uintptr(numberOfBytesToLockLow), - uintptr(numberOfBytesToLockHigh), - uintptr(unsafe.Pointer(offset))) - - if r1 != 1 { - if errNo == 0 { - return false, syscall.EINVAL - } - - return false, errNo - } - - return true, 0 -} - -func unlockFileEx(handle syscall.Handle, reserved uint32, numberOfBytesToLockLow uint32, numberOfBytesToLockHigh uint32, offset *syscall.Overlapped) (bool, syscall.Errno) { - r1, _, errNo := syscall.Syscall6( - uintptr(procUnlockFileEx), - 5, - uintptr(handle), - uintptr(reserved), - uintptr(numberOfBytesToLockLow), - uintptr(numberOfBytesToLockHigh), - uintptr(unsafe.Pointer(offset)), - 0) - - if r1 != 1 { - if errNo == 0 { - return false, syscall.EINVAL - } - - return false, errNo - } - - return true, 0 -} diff --git a/g/os/gflock/internals/flock/flock_windows.go b/g/os/gflock/internals/flock/flock_windows.go deleted file mode 100644 index a0103f6da..000000000 --- a/g/os/gflock/internals/flock/flock_windows.go +++ /dev/null @@ -1,140 +0,0 @@ -// Copyright 2015 Tim Heckman. All rights reserved. -// Use of this source code is governed by the BSD 3-Clause -// license that can be found in the LICENSE file. - -package flock - -import ( - "syscall" -) - -// ErrorLockViolation is the error code returned from the Windows syscall when a -// lock would block and you ask to fail immediately. -const ErrorLockViolation syscall.Errno = 0x21 // 33 - -// Lock is a blocking call to try and take an exclusive file lock. It will wait -// until it is able to obtain the exclusive file lock. It's recommended that -// TryLock() be used over this function. This function may block the ability to -// query the current Locked() or RLocked() status due to a RW-mutex lock. -// -// If we are already locked, this function short-circuits and returns -// immediately assuming it can take the mutex lock. -func (f *Flock) Lock() error { - return f.lock(&f.l, winLockfileExclusiveLock) -} - -// RLock is a blocking call to try and take a sahred file lock. It will wait -// until it is able to obtain the shared file lock. It's recommended that -// TryRLock() be used over this function. This function may block the ability to -// query the current Locked() or RLocked() status due to a RW-mutex lock. -// -// If we are already locked, this function short-circuits and returns -// immediately assuming it can take the mutex lock. -func (f *Flock) RLock() error { - return f.lock(&f.r, winLockfileSharedLock) -} - -func (f *Flock) lock(locked *bool, flag uint32) error { - f.m.Lock() - defer f.m.Unlock() - - if *locked { - return nil - } - - if f.fh == nil { - if err := f.setFh(); err != nil { - return err - } - } - - if _, errNo := lockFileEx(syscall.Handle(f.fh.Fd()), flag, 0, 1, 0, &syscall.Overlapped{}); errNo > 0 { - return errNo - } - - *locked = true - return nil -} - -// Unlock is a function to unlock the file. This file takes a RW-mutex lock, so -// while it is running the Locked() and RLocked() functions will be blocked. -// -// This function short-circuits if we are unlocked already. If not, it calls -// UnlockFileEx() on the file and closes the file descriptor. It does not remove -// the file from disk. It's up to your application to do. -func (f *Flock) Unlock() error { - f.m.Lock() - defer f.m.Unlock() - - // if we aren't locked or if the lockfile instance is nil - // just return a nil error because we are unlocked - if (!f.l && !f.r) || f.fh == nil { - return nil - } - - // mark the file as unlocked - if _, errNo := unlockFileEx(syscall.Handle(f.fh.Fd()), 0, 1, 0, &syscall.Overlapped{}); errNo > 0 { - return errNo - } - - f.fh.Close() - - f.l = false - f.r = false - f.fh = nil - - return nil -} - -// TryLock is the preferred function for taking an exlusive file lock. This -// function does take a RW-mutex lock before it tries to lock the file, so there -// is the possibility that this function may block for a short time if another -// goroutine is trying to take any action. -// -// The actual file lock is non-blocking. If we are unable to get the exclusive -// file lock, the function will return false instead of waiting for the lock. If -// we get the lock, we also set the *Flock instance as being exclusive-locked. -func (f *Flock) TryLock() (bool, error) { - return f.try(&f.l, winLockfileExclusiveLock) -} - -// TryRLock is the preferred function for taking a shared file lock. This -// function does take a RW-mutex lock before it tries to lock the file, so there -// is the possibility that this function may block for a short time if another -// goroutine is trying to take any action. -// -// The actual file lock is non-blocking. If we are unable to get the shared file -// lock, the function will return false instead of waiting for the lock. If we -// get the lock, we also set the *Flock instance as being shared-locked. -func (f *Flock) TryRLock() (bool, error) { - return f.try(&f.r, winLockfileSharedLock) -} - -func (f *Flock) try(locked *bool, flag uint32) (bool, error) { - f.m.Lock() - defer f.m.Unlock() - - if *locked { - return true, nil - } - - if f.fh == nil { - if err := f.setFh(); err != nil { - return false, err - } - } - - _, errNo := lockFileEx(syscall.Handle(f.fh.Fd()), flag|winLockfileFailImmediately, 0, 1, 0, &syscall.Overlapped{}) - - if errNo > 0 { - if errNo == ErrorLockViolation || errNo == syscall.ERROR_IO_PENDING { - return false, nil - } - - return false, errNo - } - - *locked = true - - return true, nil -} diff --git a/g/os/gfsnotify/gfsnotify.go b/g/os/gfsnotify/gfsnotify.go index 2bd5398d8..a062376f7 100644 --- a/g/os/gfsnotify/gfsnotify.go +++ b/g/os/gfsnotify/gfsnotify.go @@ -66,8 +66,6 @@ func Remove(path string) error { return watcher.Remove(path) } - - // 创建监听管理对象 func New() (*Watcher, error) { if watch, err := fsnotify.NewWatcher(); err == nil { @@ -155,7 +153,7 @@ func (w *Watcher) startEventLoop() { for { if v := w.events.PopFront(); v != nil { event := v.(*Event) - // 如果是文件删除时间,判断该文件是否存在,如果存在,那么将此事件认为“假删除”,并重新添加监控 + // 如果是文件删除事件,判断该文件是否存在,如果存在,那么将此事件认为“假删除”,并重新添加监控 if event.IsRemove() && gfile.Exists(event.Path){ w.watcher.Add(event.Path) continue diff --git a/g/os/gproc/gproc.go b/g/os/gproc/gproc.go index 74735533d..1131b6533 100644 --- a/g/os/gproc/gproc.go +++ b/g/os/gproc/gproc.go @@ -9,74 +9,25 @@ package gproc import ( "os" - "gitee.com/johng/gf/g/container/gmap" - "gitee.com/johng/gf/g/encoding/gbinary" - "gitee.com/johng/gf/g/net/gtcp" - "net" + "gitee.com/johng/gf/g/util/gconv" ) const ( - gCOMMUNICATION_MAIN_PORT = 30000 - gCOMMUNICATION_CHILD_PORT = 40000 - gCHILD_PROCESS_ENV_KEY = "gf.process.manager.child" - gCHILD_PROCESS_ENV_STRING = gCHILD_PROCESS_ENV_KEY + "=1" + gPROC_ENV_KEY_PPID_KEY = "gproc.ppid" ) -// TCP通信数据结构定义 -type Msg struct { - Pid int // PID,哪个进程发送的消息 - Data []byte // 参数,消息附带的参数 +// 获取当前进程ID +func Pid() int { + return os.Getpid() } -// 获取其他进程传递到当前进程的消息包,阻塞执行 -func GetMsg() *Msg { - if v := msgQueue.PopFront(); v != nil { - return v.(*Msg) - } - return nil +// 获取父进程ID +func Ppid() int { + return gconv.Int(os.Getenv(gPROC_ENV_KEY_PPID_KEY)) } // 判断当前进程是否为gproc创建的子进程 func IsChild() bool { - return os.Getenv(gCHILD_PROCESS_ENV_KEY) != "" + return os.Getenv(gPROC_ENV_KEY_PPID_KEY) != "" } - -// TCP数据通信处理回调函数 -// 数据格式:总长度(32bit) | PID(32bit) | 校验(32bit) | 参数(变长) -func tcpServiceHandler(conn net.Conn) { - buffer := gtcp.Receive(conn, gtcp.Retry{3, 100}) - msgs := bufferToMsgs(buffer) - if len(msgs) == 0 { - conn.Close() - return - } - for _, msg := range msgs { - msgQueue.PushBack(msg) - } -} - -// 数据解包,防止黏包 -func bufferToMsgs(buffer []byte) []*Msg { - s := 0 - msgs := make([]*Msg, 0) - for s < len(buffer) { - length := gbinary.DecodeToInt(buffer[s : 4]) - if length < 0 || length > len(buffer) { - s++ - continue - } - checksum1 := gbinary.DecodeToUint32(buffer[s + 8 : s + 12]) - checksum2 := gtcp.Checksum(buffer[s + 12 : s + length]) - if checksum1 != checksum2 { - s++ - continue - } - msgs = append(msgs, &Msg { - Pid : gbinary.DecodeToInt(buffer[s + 4 : s + 8]), - Data : buffer[s + 12 : s + length], - }) - s += length - } - return msgs -} diff --git a/g/os/gproc/gproc_comm.go b/g/os/gproc/gproc_comm.go new file mode 100644 index 000000000..e948b4cf9 --- /dev/null +++ b/g/os/gproc/gproc_comm.go @@ -0,0 +1,112 @@ +// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +package gproc + +import ( + "os" + "fmt" + "gitee.com/johng/gf/g/os/gfile" + "gitee.com/johng/gf/g/os/gflock" + "gitee.com/johng/gf/g/util/gconv" + "gitee.com/johng/gf/g/os/gfsnotify" + "gitee.com/johng/gf/g/container/gqueue" + "gitee.com/johng/gf/g/encoding/gbinary" +) + +// gproc进程通信共享文件目录地址 +var commDirPath = gfile.TempDir() + gfile.Separator + "gproc" +// 当前进程的文件锁 +var commLocker = gflock.New(fmt.Sprintf("%d.lock", os.Getpid())) +// 进程通信消息队列 +var commQueue = gqueue.New() + +// TCP通信数据结构定义 +type Msg struct { + Pid int // PID,哪个进程发送的消息 + Data []byte // 参数,消息附带的参数 +} + +// 进程管理/通信初始化操作 +func init() { + path := getCommFilePath(os.Getpid()) + if !gfile.Exists(path) { + gfile.Create(path) + } + // 文件事件监听,如果通信数据文件有任何变化,读取文件并添加到消息队列 + gfsnotify.Add(path, func(event *gfsnotify.Event) { + commLocker.Lock() + buffer := gfile.GetBinContents(path) + os.Truncate(path, 0) + commLocker.UnLock() + for _, v := range bufferToMsgs(buffer) { + commQueue.PushBack(v) + } + }) +} + +// 获取其他进程传递到当前进程的消息包,阻塞执行 +func Receive() *Msg { + if v := commQueue.PopFront(); v != nil { + return v.(*Msg) + } + return nil +} + +// 向指定gproc进程发送数据 +// 数据格式:总长度(32bit) | PID(32bit) | 校验(32bit) | 参数(变长) +func Send(pid int, data interface{}) error { + buffer := gconv.Bytes(data) + b := make([]byte, 0) + b = append(b, gbinary.EncodeInt32(int32(len(buffer) + 12))...) + b = append(b, gbinary.EncodeInt32(int32(os.Getpid()))...) + b = append(b, gbinary.EncodeUint32(checksum(buffer))...) + b = append(b, buffer...) + l := gflock.New(fmt.Sprintf("%d.lock", pid)) + l.Lock() + err := gfile.PutBinContentsAppend(getCommFilePath(pid), b) + l.UnLock() + return err +} + +// 获取指定进程的通信文件地址 +func getCommFilePath(pid int) string { + return commDirPath + gfile.Separator + gconv.String(pid) +} + +// 数据解包,防止黏包 +func bufferToMsgs(buffer []byte) []*Msg { + s := 0 + msgs := make([]*Msg, 0) + for s < len(buffer) { + length := gbinary.DecodeToInt(buffer[s : 4]) + if length < 0 || length > len(buffer) { + s++ + continue + } + checksum1 := gbinary.DecodeToUint32(buffer[s + 8 : s + 12]) + checksum2 := checksum(buffer[s + 12 : s + length]) + if checksum1 != checksum2 { + s++ + continue + } + msgs = append(msgs, &Msg { + Pid : gbinary.DecodeToInt(buffer[s + 4 : s + 8]), + Data : buffer[s + 12 : s + length], + }) + s += length + } + return msgs +} + +// 常见的二进制数据校验方式,生成校验结果 +func checksum(buffer []byte) uint32 { + var checksum uint32 + for _, b := range buffer { + checksum += uint32(b) + } + return checksum +} \ No newline at end of file diff --git a/g/os/gproc/gproc_manager.go b/g/os/gproc/gproc_manager.go index c8ab8b235..f21da9bd1 100644 --- a/g/os/gproc/gproc_manager.go +++ b/g/os/gproc/gproc_manager.go @@ -9,13 +9,8 @@ package gproc import ( "os" - "net" - "fmt" - "gitee.com/johng/gf/g/os/glog" - "gitee.com/johng/gf/g/net/gtcp" "gitee.com/johng/gf/g/container/gmap" - "gitee.com/johng/gf/g/encoding/gbinary" - "gitee.com/johng/gf/g/container/gqueue" + "fmt" ) // 进程管理器 @@ -23,9 +18,6 @@ type Manager struct { processes *gmap.IntInterfaceMap // 所管理的子进程map } -// 进程通信消息队列 -var msgQueue = gqueue.New() - // 创建一个进程管理器 func New () *Manager { return &Manager{ @@ -33,46 +25,28 @@ func New () *Manager { } } -// 创建主进程与子进程的TCP通信监听服务 -func (m *Manager) startTcpService() { - go func() { - var listen *net.TCPListener - for i := gCOMMUNICATION_MAIN_PORT; i < gCOMMUNICATION_MAIN_PORT + 10000; i++ { - addr, err := net.ResolveTCPAddr("tcp4", fmt.Sprintf("127.0.0.1:%d", i)) - if err != nil { - continue - } - listen, err = net.ListenTCP("tcp", addr) - if err != nil { - continue - } - } - for { - if conn, err := listen.Accept(); err != nil { - glog.Error(err) - } else if conn != nil { - go tcpServiceHandler(conn) - } - } - }() -} - // 创建一个进程(不执行) func (m *Manager) NewProcess(path string, args []string, environment []string) *Process { env := make([]string, len(environment) + 1) for k, v := range environment { env[k] = v } - env[len(env)] = gCHILD_PROCESS_ENV_STRING - return &Process { + env[len(env) - 1] = fmt.Sprintf("%s=%d", gPROC_ENV_KEY_PPID_KEY, os.Getpid()) + p := &Process { pm : m, path : path, - args : args, + args : make([]string, 0), attr : &os.ProcAttr { Env : env, Files : []*os.File{ os.Stdin,os.Stdout,os.Stderr }, }, } + p.args = append(p.args, args[0]) + p.args = append(p.args, "--gproc-child") + if len(args) > 1 { + p.args = append(p.args, args[1:]...) + } + return p } // 获取当前进程管理器中的一个进程 @@ -124,6 +98,11 @@ func (m *Manager) SignalAll(sig os.Signal) error { return nil } +// 获取当前进程管理器中的一个进程 +func (m *Manager) Send(pid int, data interface{}) error { + return Send(pid, data) +} + // 当前进程总数 func (m *Manager) Size() int { return m.processes.Size() diff --git a/g/os/gproc/gproc_proccess.go b/g/os/gproc/gproc_proccess.go index b6a85b523..239623dfc 100644 --- a/g/os/gproc/gproc_proccess.go +++ b/g/os/gproc/gproc_proccess.go @@ -8,10 +8,7 @@ package gproc import ( "os" - "fmt" - "gitee.com/johng/gf/g/os/glog" - "net" - "gitee.com/johng/gf/g/net/gtcp" + "errors" ) // 子进程 @@ -37,30 +34,6 @@ func (p *Process) Run() (int, error) { } } -// 创建主进程与子进程的TCP通信监听服务 -func (p *Process) startTcpService() { - go func() { - var listen *net.TCPListener - for i := gCOMMUNICATION_CHILD_PORT; i < gCOMMUNICATION_CHILD_PORT + 10000; i++ { - addr, err := net.ResolveTCPAddr("tcp4", fmt.Sprintf("127.0.0.1:%d", i)) - if err != nil { - continue - } - listen, err = net.ListenTCP("tcp", addr) - if err != nil { - continue - } - } - for { - if conn, err := listen.Accept(); err != nil { - glog.Error(err) - } else if conn != nil { - go tcpServiceHandler(conn) - } - } - }() -} - func (p *Process) SetArgs(args []string) { p.args = args } @@ -97,6 +70,15 @@ func (p *Process) Pid() int { return 0 } +// 向进程发送消息 +func (p *Process) Send(data interface{}) error { + if p.process != nil { + return Send(p.process.Pid, data) + } + return errors.New("process not running") +} + + // Release releases any resources associated with the Process p, // rendering it unusable in the future. // Release only needs to be called if Wait is not. diff --git a/geg/os/gproc/gproc.go b/geg/os/gproc/gproc.go new file mode 100644 index 000000000..73c4f6896 --- /dev/null +++ b/geg/os/gproc/gproc.go @@ -0,0 +1,27 @@ +package main + +import ( + "os" + "fmt" + "time" + "gitee.com/johng/gf/g/os/gtime" + "gitee.com/johng/gf/g/os/gproc" +) + +func main () { + if gproc.IsChild() { + gtime.SetInterval(3*time.Second, func() bool { + gproc.Send(gproc.Ppid(), gtime.Datetime()) + return true + }) + select { } + } else { + m := gproc.New() + p := m.NewProcess(os.Args[0], os.Args, nil) + p.Run() + for { + msg := gproc.Receive() + fmt.Printf("pid is %d, receive from %d: %s\n", os.Getpid(), msg.Pid, string(msg.Data)) + } + } +} diff --git a/geg/os/gproc/gproc2.go b/geg/os/gproc/gproc2.go new file mode 100644 index 000000000..11ee52b4b --- /dev/null +++ b/geg/os/gproc/gproc2.go @@ -0,0 +1,11 @@ +package main + +import ( + "gitee.com/johng/gf/g/os/gproc" + "fmt" +) + +func main () { + err := gproc.Send(11177, "hello process!") + fmt.Println(err) +}