完成gproc进程管理及通信包 & gflock文件锁包开发

This commit is contained in:
John
2018-05-10 16:07:14 +08:00
parent 44b9cb77ec
commit dc20caeb27
14 changed files with 212 additions and 638 deletions

View File

@ -320,6 +320,11 @@ func putContents(path string, data []byte, flag int, perm os.FileMode) error {
return nil
}
// Truncate
func Truncate(path string, size int) error {
return os.Truncate(path, int64(size))
}
// (文本)写入文件内容
func PutContents(path string, content string) error {
return putContents(path, []byte(content), os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0666)

View File

@ -8,22 +8,42 @@
package gflock
import (
"sync"
"github.com/theckman/go-flock"
"gitee.com/johng/gf/g/os/gfile"
)
// 文件锁
type Locker struct {
mu sync.RWMutex
flock *flock.Flock
}
// 创建文件锁
func New(file string) *Locker {
path := gfile.TempDir() + gfile.Separator + file
path := gfile.TempDir() + gfile.Separator + "gflock" + gfile.Separator + file
lock := flock.NewFlock(path)
return &Locker{
lock,
flock : lock,
}
}
func (l *Locker) Lock() {
l.mu.Lock()
l.flock.Lock()
}
func (l *Locker) UnLock() {
l.flock.Unlock()
l.mu.Unlock()
}
func (l *Locker) RLock() {
l.mu.RLock()
l.flock.RLock()
}
func (l *Locker) RUnlock() {
l.flock.Unlock()
l.mu.RUnlock()
}

View File

@ -1,27 +0,0 @@
Copyright (c) 2015, Tim Heckman
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright notice, this
list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright notice,
this list of conditions and the following disclaimer in the documentation
and/or other materials provided with the distribution.
* Neither the name of linode-netint nor the names of its
contributors may be used to endorse or promote products derived from
this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

View File

@ -1,109 +0,0 @@
// From & Thanks: https://github.com/theckman/go-flock
// Copyright 2015 Tim Heckman. All rights reserved.
// Use of this source code is governed by the BSD 3-Clause
// license that can be found in the LICENSE file.
// Package flock implements a thread-safe sync.Locker interface for file locking.
// It also includes a non-blocking TryLock() function to allow locking
// without blocking execution.
//
// Package flock is released under the BSD 3-Clause License. See the LICENSE file
// for more details.
//
// While using this library, remember that the locking behaviors are not
// guaranteed to be the same on each platform. For example, some UNIX-like
// operating systems will transparently convert a shared lock to an exclusive
// lock. If you Unlock() the flock from a location where you believe that you
// have the shared lock, you may accidently drop the exclusive lock.
package flock
import (
"context"
"os"
"sync"
"time"
)
// Flock is the struct type to handle file locking. All fields are unexported,
// with access to some of the fields provided by getter methods (Path() and Locked()).
type Flock struct {
mu sync.RWMutex // 用于接口层的阻塞互斥锁
path string
m sync.RWMutex
fh *os.File
l bool
r bool
}
// NewFlock is a function to return a new instance of *Flock. The only parameter
// it takes is the path to the desired lockfile.
func NewFlock(path string) *Flock {
return &Flock{path: path}
}
// Path is a function to return the path as provided in NewFlock().
func (f *Flock) Path() string {
return f.path
}
// Locked is a function to return the current lock state (locked: true, unlocked: false).
func (f *Flock) Locked() bool {
f.m.RLock()
defer f.m.RUnlock()
return f.l
}
// RLocked is a function to return the current read lock state (locked: true, unlocked: false).
func (f *Flock) RLocked() bool {
f.m.RLock()
defer f.m.RUnlock()
return f.r
}
func (f *Flock) String() string {
return f.path
}
// TryLockContext repeatedly tries to take an exclusive lock until one of the
// conditions is met: TryLock succeeds, TryLock fails with error, or Context
// Done channel is closed.
func (f *Flock) TryLockContext(ctx context.Context, retryDelay time.Duration) (bool, error) {
return tryCtx(f.TryLock, ctx, retryDelay)
}
// TryRLockContext repeatedly tries to take a shared lock until one of the
// conditions is met: TryRLock succeeds, TryRLock fails with error, or Context
// Done channel is closed.
func (f *Flock) TryRLockContext(ctx context.Context, retryDelay time.Duration) (bool, error) {
return tryCtx(f.TryRLock, ctx, retryDelay)
}
func tryCtx(fn func() (bool, error), ctx context.Context, retryDelay time.Duration) (bool, error) {
if ctx.Err() != nil {
return false, ctx.Err()
}
for {
if ok, err := fn(); ok || err != nil {
return ok, err
}
select {
case <-ctx.Done():
return false, ctx.Err()
case <-time.After(retryDelay):
// try again
}
}
}
func (f *Flock) setFh() error {
// open a new os.File instance
// create it if it doesn't exist, and open the file read-only.
fh, err := os.OpenFile(f.path, os.O_CREATE|os.O_RDONLY, os.FileMode(0600))
if err != nil {
return err
}
// set the filehandle on the struct
f.fh = fh
return nil
}

View File

@ -1,159 +0,0 @@
// Copyright 2015 Tim Heckman. All rights reserved.
// Use of this source code is governed by the BSD 3-Clause
// license that can be found in the LICENSE file.
// +build !windows
package flock
import (
"syscall"
)
// Lock is a blocking call to try and take an exclusive file lock. It will wait
// until it is able to obtain the exclusive file lock. It's recommended that
// TryLock() be used over this function. This function may block the ability to
// query the current Locked() or RLocked() status due to a RW-mutex lock.
//
// If we are already exclusive-locked, this function short-circuits and returns
// immediately assuming it can take the mutex lock.
//
// If the *Flock has a shared lock (RLock), this may transparently replace the
// shared lock with an exclusive lock on some UNIX-like operating systems. Be
// careful when using exclusive locks in conjunction with shared locks
// (RLock()), because calling Unlock() may accidentally release the exclusive
// lock that was once a shared lock.
func (f *Flock) Lock() error {
f.mu.Lock()
err := f.lock(&f.l, syscall.LOCK_EX)
if err != nil {
f.mu.Unlock()
return err
}
return nil
}
// RLock is a blocking call to try and take a ahred file lock. It will wait
// until it is able to obtain the shared file lock. It's recommended that
// TryRLock() be used over this function. This function may block the ability to
// query the current Locked() or RLocked() status due to a RW-mutex lock.
//
// If we are already shared-locked, this function short-circuits and returns
// immediately assuming it can take the mutex lock.
func (f *Flock) RLock() error {
f.mu.RLock()
err := f.lock(&f.r, syscall.LOCK_SH)
if err != nil {
f.mu.RUnlock()
return err
}
return nil
}
func (f *Flock) lock(locked *bool, flag int) error {
f.m.Lock()
defer f.m.Unlock()
if *locked {
return nil
}
if f.fh == nil {
if err := f.setFh(); err != nil {
return err
}
}
if err := syscall.Flock(int(f.fh.Fd()), flag); err != nil {
return err
}
*locked = true
return nil
}
// Unlock is a function to unlock the file. This file takes a RW-mutex lock, so
// while it is running the Locked() and RLocked() functions will be blocked.
//
// This function short-circuits if we are unlocked already. If not, it calls
// syscall.LOCK_UN on the file and closes the file descriptor. It does not
// remove the file from disk. It's up to your application to do.
//
// Please note, if your shared lock became an exclusive lock this may
// unintentionally drop the exclusive lock if called by the consumer that
// believes they have a shared lock. Please see Lock() for more details.
func (f *Flock) Unlock() error {
f.m.Lock()
defer f.m.Unlock()
// if we aren't locked or if the lockfile instance is nil
// just return a nil error because we are unlocked
if (!f.l && !f.r) || f.fh == nil {
return nil
}
// mark the file as unlocked
if err := syscall.Flock(int(f.fh.Fd()), syscall.LOCK_UN); err != nil {
return err
}
f.fh.Close()
f.l = false
f.r = false
f.fh = nil
f.mu.Unlock()
return nil
}
// TryLock is the preferred function for taking an exclusive file lock. This
// function takes an RW-mutex lock before it tries to lock the file, so there is
// the possibility that this function may block for a short time if another
// goroutine is trying to take any action.
//
// The actual file lock is non-blocking. If we are unable to get the exclusive
// file lock, the function will return false instead of waiting for the lock. If
// we get the lock, we also set the *Flock instance as being exclusive-locked.
func (f *Flock) TryLock() (bool, error) {
return f.try(&f.l, syscall.LOCK_EX)
}
// TryRLock is the preferred function for taking a shared file lock. This
// function takes an RW-mutex lock before it tries to lock the file, so there is
// the possibility that this function may block for a short time if another
// goroutine is trying to take any action.
//
// The actual file lock is non-blocking. If we are unable to get the shared file
// lock, the function will return false instead of waiting for the lock. If we
// get the lock, we also set the *Flock instance as being share-locked.
func (f *Flock) TryRLock() (bool, error) {
return f.try(&f.r, syscall.LOCK_SH)
}
func (f *Flock) try(locked *bool, flag int) (bool, error) {
f.m.Lock()
defer f.m.Unlock()
if *locked {
return true, nil
}
if f.fh == nil {
if err := f.setFh(); err != nil {
return false, err
}
}
err := syscall.Flock(int(f.fh.Fd()), flag|syscall.LOCK_NB)
switch err {
case syscall.EWOULDBLOCK:
return false, nil
case nil:
*locked = true
return true, nil
}
return false, err
}

View File

@ -1,76 +0,0 @@
// Copyright 2015 Tim Heckman. All rights reserved.
// Use of this source code is governed by the BSD 3-Clause
// license that can be found in the LICENSE file.
// +build windows
package flock
import (
"syscall"
"unsafe"
)
var (
kernel32, _ = syscall.LoadLibrary("kernel32.dll")
procLockFileEx, _ = syscall.GetProcAddress(kernel32, "LockFileEx")
procUnlockFileEx, _ = syscall.GetProcAddress(kernel32, "UnlockFileEx")
)
const (
winLockfileFailImmediately = 0x00000001
winLockfileExclusiveLock = 0x00000002
winLockfileSharedLock = 0x00000000
)
// Use of 0x00000000 for the shared lock is a guess based on some the MS Windows
// `LockFileEX` docs, which document the `LOCKFILE_EXCLUSIVE_LOCK` flag as:
//
// > The function requests an exclusive lock. Otherwise, it requests a shared
// > lock.
//
// https://msdn.microsoft.com/en-us/library/windows/desktop/aa365203(v=vs.85).aspx
func lockFileEx(handle syscall.Handle, flags uint32, reserved uint32, numberOfBytesToLockLow uint32, numberOfBytesToLockHigh uint32, offset *syscall.Overlapped) (bool, syscall.Errno) {
r1, _, errNo := syscall.Syscall6(
uintptr(procLockFileEx),
6,
uintptr(handle),
uintptr(flags),
uintptr(reserved),
uintptr(numberOfBytesToLockLow),
uintptr(numberOfBytesToLockHigh),
uintptr(unsafe.Pointer(offset)))
if r1 != 1 {
if errNo == 0 {
return false, syscall.EINVAL
}
return false, errNo
}
return true, 0
}
func unlockFileEx(handle syscall.Handle, reserved uint32, numberOfBytesToLockLow uint32, numberOfBytesToLockHigh uint32, offset *syscall.Overlapped) (bool, syscall.Errno) {
r1, _, errNo := syscall.Syscall6(
uintptr(procUnlockFileEx),
5,
uintptr(handle),
uintptr(reserved),
uintptr(numberOfBytesToLockLow),
uintptr(numberOfBytesToLockHigh),
uintptr(unsafe.Pointer(offset)),
0)
if r1 != 1 {
if errNo == 0 {
return false, syscall.EINVAL
}
return false, errNo
}
return true, 0
}

View File

@ -1,140 +0,0 @@
// Copyright 2015 Tim Heckman. All rights reserved.
// Use of this source code is governed by the BSD 3-Clause
// license that can be found in the LICENSE file.
package flock
import (
"syscall"
)
// ErrorLockViolation is the error code returned from the Windows syscall when a
// lock would block and you ask to fail immediately.
const ErrorLockViolation syscall.Errno = 0x21 // 33
// Lock is a blocking call to try and take an exclusive file lock. It will wait
// until it is able to obtain the exclusive file lock. It's recommended that
// TryLock() be used over this function. This function may block the ability to
// query the current Locked() or RLocked() status due to a RW-mutex lock.
//
// If we are already locked, this function short-circuits and returns
// immediately assuming it can take the mutex lock.
func (f *Flock) Lock() error {
return f.lock(&f.l, winLockfileExclusiveLock)
}
// RLock is a blocking call to try and take a sahred file lock. It will wait
// until it is able to obtain the shared file lock. It's recommended that
// TryRLock() be used over this function. This function may block the ability to
// query the current Locked() or RLocked() status due to a RW-mutex lock.
//
// If we are already locked, this function short-circuits and returns
// immediately assuming it can take the mutex lock.
func (f *Flock) RLock() error {
return f.lock(&f.r, winLockfileSharedLock)
}
func (f *Flock) lock(locked *bool, flag uint32) error {
f.m.Lock()
defer f.m.Unlock()
if *locked {
return nil
}
if f.fh == nil {
if err := f.setFh(); err != nil {
return err
}
}
if _, errNo := lockFileEx(syscall.Handle(f.fh.Fd()), flag, 0, 1, 0, &syscall.Overlapped{}); errNo > 0 {
return errNo
}
*locked = true
return nil
}
// Unlock is a function to unlock the file. This file takes a RW-mutex lock, so
// while it is running the Locked() and RLocked() functions will be blocked.
//
// This function short-circuits if we are unlocked already. If not, it calls
// UnlockFileEx() on the file and closes the file descriptor. It does not remove
// the file from disk. It's up to your application to do.
func (f *Flock) Unlock() error {
f.m.Lock()
defer f.m.Unlock()
// if we aren't locked or if the lockfile instance is nil
// just return a nil error because we are unlocked
if (!f.l && !f.r) || f.fh == nil {
return nil
}
// mark the file as unlocked
if _, errNo := unlockFileEx(syscall.Handle(f.fh.Fd()), 0, 1, 0, &syscall.Overlapped{}); errNo > 0 {
return errNo
}
f.fh.Close()
f.l = false
f.r = false
f.fh = nil
return nil
}
// TryLock is the preferred function for taking an exlusive file lock. This
// function does take a RW-mutex lock before it tries to lock the file, so there
// is the possibility that this function may block for a short time if another
// goroutine is trying to take any action.
//
// The actual file lock is non-blocking. If we are unable to get the exclusive
// file lock, the function will return false instead of waiting for the lock. If
// we get the lock, we also set the *Flock instance as being exclusive-locked.
func (f *Flock) TryLock() (bool, error) {
return f.try(&f.l, winLockfileExclusiveLock)
}
// TryRLock is the preferred function for taking a shared file lock. This
// function does take a RW-mutex lock before it tries to lock the file, so there
// is the possibility that this function may block for a short time if another
// goroutine is trying to take any action.
//
// The actual file lock is non-blocking. If we are unable to get the shared file
// lock, the function will return false instead of waiting for the lock. If we
// get the lock, we also set the *Flock instance as being shared-locked.
func (f *Flock) TryRLock() (bool, error) {
return f.try(&f.r, winLockfileSharedLock)
}
func (f *Flock) try(locked *bool, flag uint32) (bool, error) {
f.m.Lock()
defer f.m.Unlock()
if *locked {
return true, nil
}
if f.fh == nil {
if err := f.setFh(); err != nil {
return false, err
}
}
_, errNo := lockFileEx(syscall.Handle(f.fh.Fd()), flag|winLockfileFailImmediately, 0, 1, 0, &syscall.Overlapped{})
if errNo > 0 {
if errNo == ErrorLockViolation || errNo == syscall.ERROR_IO_PENDING {
return false, nil
}
return false, errNo
}
*locked = true
return true, nil
}

View File

@ -66,8 +66,6 @@ func Remove(path string) error {
return watcher.Remove(path)
}
// 创建监听管理对象
func New() (*Watcher, error) {
if watch, err := fsnotify.NewWatcher(); err == nil {
@ -155,7 +153,7 @@ func (w *Watcher) startEventLoop() {
for {
if v := w.events.PopFront(); v != nil {
event := v.(*Event)
// 如果是文件删除时间,判断该文件是否存在,如果存在,那么将此事件认为“假删除”,并重新添加监控
// 如果是文件删除事件,判断该文件是否存在,如果存在,那么将此事件认为“假删除”,并重新添加监控
if event.IsRemove() && gfile.Exists(event.Path){
w.watcher.Add(event.Path)
continue

View File

@ -9,74 +9,25 @@ package gproc
import (
"os"
"gitee.com/johng/gf/g/container/gmap"
"gitee.com/johng/gf/g/encoding/gbinary"
"gitee.com/johng/gf/g/net/gtcp"
"net"
"gitee.com/johng/gf/g/util/gconv"
)
const (
gCOMMUNICATION_MAIN_PORT = 30000
gCOMMUNICATION_CHILD_PORT = 40000
gCHILD_PROCESS_ENV_KEY = "gf.process.manager.child"
gCHILD_PROCESS_ENV_STRING = gCHILD_PROCESS_ENV_KEY + "=1"
gPROC_ENV_KEY_PPID_KEY = "gproc.ppid"
)
// TCP通信数据结构定义
type Msg struct {
Pid int // PID哪个进程发送的消息
Data []byte // 参数,消息附带的参数
// 获取当前进程ID
func Pid() int {
return os.Getpid()
}
// 获取其他进程传递到当前进程的消息包,阻塞执行
func GetMsg() *Msg {
if v := msgQueue.PopFront(); v != nil {
return v.(*Msg)
}
return nil
// 获取父进程ID
func Ppid() int {
return gconv.Int(os.Getenv(gPROC_ENV_KEY_PPID_KEY))
}
// 判断当前进程是否为gproc创建的子进程
func IsChild() bool {
return os.Getenv(gCHILD_PROCESS_ENV_KEY) != ""
return os.Getenv(gPROC_ENV_KEY_PPID_KEY) != ""
}
// TCP数据通信处理回调函数
// 数据格式:总长度(32bit) | PID(32bit) | 校验(32bit) | 参数(变长)
func tcpServiceHandler(conn net.Conn) {
buffer := gtcp.Receive(conn, gtcp.Retry{3, 100})
msgs := bufferToMsgs(buffer)
if len(msgs) == 0 {
conn.Close()
return
}
for _, msg := range msgs {
msgQueue.PushBack(msg)
}
}
// 数据解包,防止黏包
func bufferToMsgs(buffer []byte) []*Msg {
s := 0
msgs := make([]*Msg, 0)
for s < len(buffer) {
length := gbinary.DecodeToInt(buffer[s : 4])
if length < 0 || length > len(buffer) {
s++
continue
}
checksum1 := gbinary.DecodeToUint32(buffer[s + 8 : s + 12])
checksum2 := gtcp.Checksum(buffer[s + 12 : s + length])
if checksum1 != checksum2 {
s++
continue
}
msgs = append(msgs, &Msg {
Pid : gbinary.DecodeToInt(buffer[s + 4 : s + 8]),
Data : buffer[s + 12 : s + length],
})
s += length
}
return msgs
}

112
g/os/gproc/gproc_comm.go Normal file
View File

@ -0,0 +1,112 @@
// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package gproc
import (
"os"
"fmt"
"gitee.com/johng/gf/g/os/gfile"
"gitee.com/johng/gf/g/os/gflock"
"gitee.com/johng/gf/g/util/gconv"
"gitee.com/johng/gf/g/os/gfsnotify"
"gitee.com/johng/gf/g/container/gqueue"
"gitee.com/johng/gf/g/encoding/gbinary"
)
// gproc进程通信共享文件目录地址
var commDirPath = gfile.TempDir() + gfile.Separator + "gproc"
// 当前进程的文件锁
var commLocker = gflock.New(fmt.Sprintf("%d.lock", os.Getpid()))
// 进程通信消息队列
var commQueue = gqueue.New()
// TCP通信数据结构定义
type Msg struct {
Pid int // PID哪个进程发送的消息
Data []byte // 参数,消息附带的参数
}
// 进程管理/通信初始化操作
func init() {
path := getCommFilePath(os.Getpid())
if !gfile.Exists(path) {
gfile.Create(path)
}
// 文件事件监听,如果通信数据文件有任何变化,读取文件并添加到消息队列
gfsnotify.Add(path, func(event *gfsnotify.Event) {
commLocker.Lock()
buffer := gfile.GetBinContents(path)
os.Truncate(path, 0)
commLocker.UnLock()
for _, v := range bufferToMsgs(buffer) {
commQueue.PushBack(v)
}
})
}
// 获取其他进程传递到当前进程的消息包,阻塞执行
func Receive() *Msg {
if v := commQueue.PopFront(); v != nil {
return v.(*Msg)
}
return nil
}
// 向指定gproc进程发送数据
// 数据格式:总长度(32bit) | PID(32bit) | 校验(32bit) | 参数(变长)
func Send(pid int, data interface{}) error {
buffer := gconv.Bytes(data)
b := make([]byte, 0)
b = append(b, gbinary.EncodeInt32(int32(len(buffer) + 12))...)
b = append(b, gbinary.EncodeInt32(int32(os.Getpid()))...)
b = append(b, gbinary.EncodeUint32(checksum(buffer))...)
b = append(b, buffer...)
l := gflock.New(fmt.Sprintf("%d.lock", pid))
l.Lock()
err := gfile.PutBinContentsAppend(getCommFilePath(pid), b)
l.UnLock()
return err
}
// 获取指定进程的通信文件地址
func getCommFilePath(pid int) string {
return commDirPath + gfile.Separator + gconv.String(pid)
}
// 数据解包,防止黏包
func bufferToMsgs(buffer []byte) []*Msg {
s := 0
msgs := make([]*Msg, 0)
for s < len(buffer) {
length := gbinary.DecodeToInt(buffer[s : 4])
if length < 0 || length > len(buffer) {
s++
continue
}
checksum1 := gbinary.DecodeToUint32(buffer[s + 8 : s + 12])
checksum2 := checksum(buffer[s + 12 : s + length])
if checksum1 != checksum2 {
s++
continue
}
msgs = append(msgs, &Msg {
Pid : gbinary.DecodeToInt(buffer[s + 4 : s + 8]),
Data : buffer[s + 12 : s + length],
})
s += length
}
return msgs
}
// 常见的二进制数据校验方式,生成校验结果
func checksum(buffer []byte) uint32 {
var checksum uint32
for _, b := range buffer {
checksum += uint32(b)
}
return checksum
}

View File

@ -9,13 +9,8 @@ package gproc
import (
"os"
"net"
"fmt"
"gitee.com/johng/gf/g/os/glog"
"gitee.com/johng/gf/g/net/gtcp"
"gitee.com/johng/gf/g/container/gmap"
"gitee.com/johng/gf/g/encoding/gbinary"
"gitee.com/johng/gf/g/container/gqueue"
"fmt"
)
// 进程管理器
@ -23,9 +18,6 @@ type Manager struct {
processes *gmap.IntInterfaceMap // 所管理的子进程map
}
// 进程通信消息队列
var msgQueue = gqueue.New()
// 创建一个进程管理器
func New () *Manager {
return &Manager{
@ -33,46 +25,28 @@ func New () *Manager {
}
}
// 创建主进程与子进程的TCP通信监听服务
func (m *Manager) startTcpService() {
go func() {
var listen *net.TCPListener
for i := gCOMMUNICATION_MAIN_PORT; i < gCOMMUNICATION_MAIN_PORT + 10000; i++ {
addr, err := net.ResolveTCPAddr("tcp4", fmt.Sprintf("127.0.0.1:%d", i))
if err != nil {
continue
}
listen, err = net.ListenTCP("tcp", addr)
if err != nil {
continue
}
}
for {
if conn, err := listen.Accept(); err != nil {
glog.Error(err)
} else if conn != nil {
go tcpServiceHandler(conn)
}
}
}()
}
// 创建一个进程(不执行)
func (m *Manager) NewProcess(path string, args []string, environment []string) *Process {
env := make([]string, len(environment) + 1)
for k, v := range environment {
env[k] = v
}
env[len(env)] = gCHILD_PROCESS_ENV_STRING
return &Process {
env[len(env) - 1] = fmt.Sprintf("%s=%d", gPROC_ENV_KEY_PPID_KEY, os.Getpid())
p := &Process {
pm : m,
path : path,
args : args,
args : make([]string, 0),
attr : &os.ProcAttr {
Env : env,
Files : []*os.File{ os.Stdin,os.Stdout,os.Stderr },
},
}
p.args = append(p.args, args[0])
p.args = append(p.args, "--gproc-child")
if len(args) > 1 {
p.args = append(p.args, args[1:]...)
}
return p
}
// 获取当前进程管理器中的一个进程
@ -124,6 +98,11 @@ func (m *Manager) SignalAll(sig os.Signal) error {
return nil
}
// 获取当前进程管理器中的一个进程
func (m *Manager) Send(pid int, data interface{}) error {
return Send(pid, data)
}
// 当前进程总数
func (m *Manager) Size() int {
return m.processes.Size()

View File

@ -8,10 +8,7 @@ package gproc
import (
"os"
"fmt"
"gitee.com/johng/gf/g/os/glog"
"net"
"gitee.com/johng/gf/g/net/gtcp"
"errors"
)
// 子进程
@ -37,30 +34,6 @@ func (p *Process) Run() (int, error) {
}
}
// 创建主进程与子进程的TCP通信监听服务
func (p *Process) startTcpService() {
go func() {
var listen *net.TCPListener
for i := gCOMMUNICATION_CHILD_PORT; i < gCOMMUNICATION_CHILD_PORT + 10000; i++ {
addr, err := net.ResolveTCPAddr("tcp4", fmt.Sprintf("127.0.0.1:%d", i))
if err != nil {
continue
}
listen, err = net.ListenTCP("tcp", addr)
if err != nil {
continue
}
}
for {
if conn, err := listen.Accept(); err != nil {
glog.Error(err)
} else if conn != nil {
go tcpServiceHandler(conn)
}
}
}()
}
func (p *Process) SetArgs(args []string) {
p.args = args
}
@ -97,6 +70,15 @@ func (p *Process) Pid() int {
return 0
}
// 向进程发送消息
func (p *Process) Send(data interface{}) error {
if p.process != nil {
return Send(p.process.Pid, data)
}
return errors.New("process not running")
}
// Release releases any resources associated with the Process p,
// rendering it unusable in the future.
// Release only needs to be called if Wait is not.

27
geg/os/gproc/gproc.go Normal file
View File

@ -0,0 +1,27 @@
package main
import (
"os"
"fmt"
"time"
"gitee.com/johng/gf/g/os/gtime"
"gitee.com/johng/gf/g/os/gproc"
)
func main () {
if gproc.IsChild() {
gtime.SetInterval(3*time.Second, func() bool {
gproc.Send(gproc.Ppid(), gtime.Datetime())
return true
})
select { }
} else {
m := gproc.New()
p := m.NewProcess(os.Args[0], os.Args, nil)
p.Run()
for {
msg := gproc.Receive()
fmt.Printf("pid is %d, receive from %d: %s\n", os.Getpid(), msg.Pid, string(msg.Data))
}
}
}

11
geg/os/gproc/gproc2.go Normal file
View File

@ -0,0 +1,11 @@
package main
import (
"gitee.com/johng/gf/g/os/gproc"
"fmt"
)
func main () {
err := gproc.Send(11177, "hello process!")
fmt.Println(err)
}