mirror of
https://gitee.com/johng/gf
synced 2026-06-07 02:12:11 +08:00
改进gproc包进程通信方式为TCP/IP
This commit is contained in:
@ -15,6 +15,8 @@ import (
|
||||
"gitee.com/johng/gf/g/container/gtype"
|
||||
"gitee.com/johng/gf/g/encoding/gbinary"
|
||||
"gitee.com/johng/gf/g/os/gtime"
|
||||
"gitee.com/johng/gf/g/os/glog"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
@ -26,8 +28,10 @@ const (
|
||||
gMSG_NEW_FORK = 6
|
||||
gMSG_HEARTBEAT = 7
|
||||
|
||||
gPROC_HEARTBEAT_INTERVAL = 1000 // (毫秒)进程间心跳间隔
|
||||
gPROC_HEARTBEAT_TIMEOUT = 3000 // (毫秒)进程间心跳超时时间,如果子进程在这段内没有接收到任何心跳,那么自动退出,防止可能出现的僵尸子进程
|
||||
gPROC_FAILURE_RETRY_COUNT = 3 // 发送消息失败重试次数
|
||||
gPROC_FAILURE_RETRY_INTERVAL = 500 // (毫秒)发送消息失败时重试间隔
|
||||
gPROC_HEARTBEAT_INTERVAL = 1000 // (毫秒)进程间心跳间隔
|
||||
gPROC_HEARTBEAT_TIMEOUT = 3000 // (毫秒)进程间心跳超时时间,如果子进程在这段内没有接收到任何心跳,那么自动退出,防止可能出现的僵尸子进程
|
||||
)
|
||||
|
||||
// 进程信号量监听消息队列
|
||||
@ -101,7 +105,16 @@ func handleProcessMsg() {
|
||||
|
||||
// 向进程发送操作消息
|
||||
func sendProcessMsg(pid int, act int, data []byte) error {
|
||||
return gproc.Send(pid, formatMsgBuffer(act, data))
|
||||
var err error
|
||||
for i := gPROC_FAILURE_RETRY_COUNT; i > 0; i-- {
|
||||
if err = gproc.Send(pid, formatMsgBuffer(act, data)); err != nil {
|
||||
time.Sleep(gPROC_FAILURE_RETRY_INTERVAL*time.Millisecond)
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
glog.Printfln("%d=>%d, %d, %v", gproc.Pid(), pid, act, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// 生成一条满足Web Server进程通信协议的消息
|
||||
|
||||
@ -10,10 +10,14 @@ import (
|
||||
"io"
|
||||
"net"
|
||||
"time"
|
||||
"fmt"
|
||||
"bytes"
|
||||
)
|
||||
|
||||
const (
|
||||
gDEFAULT_RETRY_INTERVAL = 100 // 默认重试时间间隔
|
||||
gDEFAULT_RETRY_INTERVAL = 100 // (毫秒)默认重试时间间隔
|
||||
gDEFAULT_READ_BUFFER_SIZE = 10 // 默认数据读取缓冲区大小
|
||||
|
||||
)
|
||||
|
||||
type Retry struct {
|
||||
@ -21,6 +25,12 @@ type Retry struct {
|
||||
Interval int // 重试间隔(毫秒)
|
||||
}
|
||||
|
||||
// 自定义的包分割符号,用于标识包是否读取结束
|
||||
// 注意:
|
||||
// 1. 必须使用gtcp包来发送和接收tcp数据才有效;
|
||||
// 2. 只有在发送的字节数为buffer size倍数时才有效;
|
||||
var pkgSplitMark = []byte{0, 'E', 'O', 'P', 0}
|
||||
|
||||
// 常见的二进制数据校验方式,生成校验结果
|
||||
func Checksum(buffer []byte) uint32 {
|
||||
var checksum uint32
|
||||
@ -30,44 +40,69 @@ func Checksum(buffer []byte) uint32 {
|
||||
return checksum
|
||||
}
|
||||
|
||||
// 创建TCP链接
|
||||
func Conn(ip string, port int, timeout...int) (net.Conn, error) {
|
||||
addr := fmt.Sprintf("%s:%d", ip, port)
|
||||
if len(timeout) > 0 {
|
||||
return net.DialTimeout("tcp", addr, time.Duration(timeout[0]) * time.Millisecond)
|
||||
} else {
|
||||
return net.Dial("tcp", addr)
|
||||
}
|
||||
}
|
||||
|
||||
// 获取数据
|
||||
func Receive(conn net.Conn, retry...Retry) []byte {
|
||||
size := 1024
|
||||
func Receive(conn net.Conn, retry...Retry) ([]byte, error) {
|
||||
var err error = nil
|
||||
size := gDEFAULT_READ_BUFFER_SIZE
|
||||
data := make([]byte, 0)
|
||||
for {
|
||||
buffer := make([]byte, size)
|
||||
length, err := conn.Read(buffer)
|
||||
if length < 1 && err != nil {
|
||||
if err == io.EOF || len(retry) == 0 || retry[0].Count == 0 {
|
||||
buffer := make([]byte, size)
|
||||
length, e := conn.Read(buffer)
|
||||
if length < 1 || e != nil {
|
||||
if e == io.EOF {
|
||||
break
|
||||
}
|
||||
if len(retry) > 0 {
|
||||
if retry[0].Count == 0 {
|
||||
err = e
|
||||
break
|
||||
}
|
||||
retry[0].Count--
|
||||
if retry[0].Interval == 0 {
|
||||
retry[0].Interval = gDEFAULT_RETRY_INTERVAL
|
||||
}
|
||||
time.Sleep(time.Duration(retry[0].Interval) * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
break
|
||||
} else {
|
||||
data = append(data, buffer[0:length]...)
|
||||
if err == io.EOF {
|
||||
// 自定义结束标识符判断
|
||||
if length == len(pkgSplitMark) && bytes.Compare(pkgSplitMark, buffer[0 : length]) == 0 {
|
||||
break
|
||||
}
|
||||
data = append(data, buffer[0 : length]...)
|
||||
if length < gDEFAULT_READ_BUFFER_SIZE || e == io.EOF {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
return data
|
||||
return data, err
|
||||
}
|
||||
|
||||
// 带超时时间的数据获取
|
||||
func ReceiveWithTimeout(conn net.Conn, timeout time.Duration, retry...Retry) []byte {
|
||||
func ReceiveWithTimeout(conn net.Conn, timeout time.Duration, retry...Retry) ([]byte, error) {
|
||||
conn.SetReadDeadline(time.Now().Add(timeout))
|
||||
return Receive(conn, retry...)
|
||||
}
|
||||
|
||||
// 发送数据
|
||||
func Send(conn net.Conn, data []byte, retry...Retry) error {
|
||||
if len(data) % gDEFAULT_READ_BUFFER_SIZE == 0 {
|
||||
data = append(data, pkgSplitMark...)
|
||||
}
|
||||
length := 0
|
||||
for {
|
||||
_, err := conn.Write(data)
|
||||
n, err := conn.Write(data)
|
||||
if err != nil {
|
||||
if len(retry) == 0 || retry[0].Count == 0 {
|
||||
return err
|
||||
@ -80,7 +115,10 @@ func Send(conn net.Conn, data []byte, retry...Retry) error {
|
||||
time.Sleep(time.Duration(retry[0].Interval) * time.Millisecond)
|
||||
}
|
||||
} else {
|
||||
return nil
|
||||
length += n
|
||||
if length == len(data) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -89,4 +127,22 @@ func Send(conn net.Conn, data []byte, retry...Retry) error {
|
||||
func SendWithTimeout(conn net.Conn, data []byte, timeout time.Duration, retry...Retry) error {
|
||||
conn.SetWriteDeadline(time.Now().Add(timeout))
|
||||
return Send(conn, data, retry...)
|
||||
}
|
||||
}
|
||||
|
||||
// 发送数据并等待接收返回数据
|
||||
func SendReceive(conn net.Conn, data []byte, retry...Retry) ([]byte, error) {
|
||||
if err := Send(conn, data, retry...); err == nil {
|
||||
return Receive(conn)
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// 发送数据并等待接收返回数据(带返回超时等待时间)
|
||||
func SendReceiveWithTimeout(conn net.Conn, data []byte, timeout time.Duration, retry...Retry) ([]byte, error) {
|
||||
if err := Send(conn, data, retry...); err == nil {
|
||||
return ReceiveWithTimeout(conn, timeout)
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
@ -17,6 +17,7 @@ import (
|
||||
|
||||
const (
|
||||
gPROC_ENV_KEY_PPID_KEY = "gproc.ppid"
|
||||
gPROC_TEMP_DIR_ENV_KEY = "gproc.tempdir"
|
||||
)
|
||||
|
||||
// 进程开始执行时间
|
||||
|
||||
@ -3,182 +3,41 @@
|
||||
// This Source Code Form is subject to the terms of the MIT License.
|
||||
// If a copy of the MIT was not distributed with this file,
|
||||
// You can obtain one at https://gitee.com/johng/gf.
|
||||
// @todo 后期改为tcp进程通信形式
|
||||
// "不要通过共享内存来通信,而应该通过通信来共享内存"
|
||||
|
||||
|
||||
package gproc
|
||||
|
||||
import (
|
||||
"io"
|
||||
"os"
|
||||
"fmt"
|
||||
"time"
|
||||
"errors"
|
||||
"gitee.com/johng/gf/g/os/glog"
|
||||
"gitee.com/johng/gf/g/os/gfile"
|
||||
"gitee.com/johng/gf/g/os/gtime"
|
||||
"gitee.com/johng/gf/g/os/gflock"
|
||||
"gitee.com/johng/gf/g/util/gconv"
|
||||
"gitee.com/johng/gf/g/os/gfsnotify"
|
||||
"gitee.com/johng/gf/g/container/gmap"
|
||||
"gitee.com/johng/gf/g/container/gqueue"
|
||||
"gitee.com/johng/gf/g/encoding/gbinary"
|
||||
)
|
||||
|
||||
const (
|
||||
// 由于子进程的temp dir有可能会和父进程不一致(特别是windows下),影响进程间通信,这里统一使用环境变量设置
|
||||
gPROC_TEMP_DIR_ENV_KEY = "gproc.tempdir"
|
||||
// 写入通信数据失败时候的重试次数
|
||||
gPROC_COMM_FAILURE_RETRY_COUNT = 3
|
||||
// (毫秒)主动通信内容检查时间间隔
|
||||
gPROC_COMM_ACTIVE_CHECK_INTERVAL = 500*time.Millisecond
|
||||
)
|
||||
|
||||
// 全局通信文件清理文件锁(同一时刻只能存在一个进程进行通信文件清理)
|
||||
var commClearLocker = gflock.New("comm.clear.lock")
|
||||
// 当前进程的文件锁
|
||||
var commLocker = gflock.New(fmt.Sprintf("%d.lock", os.Getpid()))
|
||||
// 进程通信消息队列
|
||||
var commQueue = gqueue.New()
|
||||
// 本地进程通信发送消息队列
|
||||
var commSendQueue = gqueue.New()
|
||||
// 本地进程通信接收消息队列
|
||||
var commReceiveQueue = gqueue.New()
|
||||
// (用于发送)已建立的PID对应的Conn通信对象
|
||||
var commPidConnMap = gmap.NewIntInterfaceMap()
|
||||
|
||||
// TCP通信数据结构定义
|
||||
type Msg struct {
|
||||
Pid int // PID,哪个进程发送的消息
|
||||
Data []byte // 参数,消息附带的参数
|
||||
Pid int // PID,来源哪个进程
|
||||
Data []byte // 数据
|
||||
}
|
||||
|
||||
// TCP通信数据结构定义
|
||||
type sendQueueItem struct {
|
||||
Pid int // PID,发向哪个进程
|
||||
Data []byte // 数据
|
||||
}
|
||||
|
||||
// 进程管理/通信初始化操作
|
||||
func init() {
|
||||
path := getCommFilePath(os.Getpid())
|
||||
checkAndInitCommFile(path)
|
||||
commLocker.Lock()
|
||||
fileMtime := gfile.MTime(path)
|
||||
commLocker.UnLock()
|
||||
if gtime.Second() - fileMtime < 10 {
|
||||
// 初始化时读取已有数据(文件修改时间在10秒以内)
|
||||
checkCommBuffer(path)
|
||||
} else {
|
||||
// 否则清空旧的数据内容
|
||||
commLocker.Lock()
|
||||
os.Truncate(path, 0)
|
||||
commLocker.UnLock()
|
||||
}
|
||||
// 文件事件监听,如果通信数据文件有任何变化,读取文件并添加到消息队列
|
||||
err := gfsnotify.Add(path, func(event *gfsnotify.Event) {
|
||||
checkCommBuffer(path)
|
||||
})
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
}
|
||||
|
||||
go autoActiveCheckComm()
|
||||
}
|
||||
|
||||
// 检测并初始化通信文件
|
||||
func checkAndInitCommFile(path string) {
|
||||
commLocker.Lock()
|
||||
defer commLocker.UnLock()
|
||||
if !gfile.Exists(path) {
|
||||
err := gfile.Create(path)
|
||||
if err != nil {
|
||||
glog.Error(err)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
// 检测写入权限
|
||||
if !gfile.IsWritable(path) {
|
||||
glog.Errorfln("%s is not writable for gproc", path)
|
||||
os.Exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
// 主动通信内容检测
|
||||
func autoActiveCheckComm() {
|
||||
path := getCommFilePath(Pid())
|
||||
for {
|
||||
time.Sleep(gPROC_COMM_ACTIVE_CHECK_INTERVAL)
|
||||
checkCommBuffer(path)
|
||||
}
|
||||
}
|
||||
|
||||
// 手动检查进程通信消息,如果存在消息曾推送到进程消息队列
|
||||
func checkCommBuffer(path string) {
|
||||
commLocker.Lock()
|
||||
// 读取进程间通信数据
|
||||
buffer := gfile.GetBinContents(path)
|
||||
if len(buffer) > 0 {
|
||||
os.Truncate(path, 0)
|
||||
}
|
||||
commLocker.UnLock()
|
||||
if len(buffer) > 0 {
|
||||
for _, v := range bufferToMsgs(buffer) {
|
||||
commQueue.PushBack(v)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 获取其他进程传递到当前进程的消息包,阻塞执行
|
||||
func Receive() *Msg {
|
||||
if v := commQueue.PopFront(); v != nil {
|
||||
return v.(*Msg)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 向指定gproc进程发送数据
|
||||
// 数据格式:总长度(32bit) | PID(32bit) | 校验(32bit) | 参数(变长)
|
||||
func Send(pid int, data []byte) error {
|
||||
var err error = nil
|
||||
b := make([]byte, 0)
|
||||
b = append(b, gbinary.EncodeInt32(int32(len(data) + 12))...)
|
||||
b = append(b, gbinary.EncodeInt32(int32(os.Getpid()))...)
|
||||
b = append(b, gbinary.EncodeUint32(checksum(data))...)
|
||||
b = append(b, data...)
|
||||
l := gflock.New(fmt.Sprintf("%d.lock", pid))
|
||||
l.Lock()
|
||||
for i := gPROC_COMM_FAILURE_RETRY_COUNT; i > 0; i-- {
|
||||
err = doSend(pid, b)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
l.UnLock()
|
||||
//glog.Printfln("%d to %d, %v, %d, %v", Pid(), pid, data, gfile.Size(getCommFilePath(pid)), err)
|
||||
return err
|
||||
}
|
||||
|
||||
// 执行进程间通信数据写入
|
||||
func doSend(pid int, buffer []byte) error {
|
||||
file, err := gfile.OpenWithFlag(getCommFilePath(pid), os.O_RDWR|os.O_CREATE|os.O_APPEND)
|
||||
if err != nil{
|
||||
return err
|
||||
}
|
||||
// 必须要Close才会更新文件的修改时间
|
||||
defer file.Close()
|
||||
// 获取原有文件内容大小
|
||||
stat, err := file.Stat()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
oldSize := stat.Size()
|
||||
// 执行数据写入
|
||||
writeSize, err := file.Write(buffer)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if writeSize < len(buffer) {
|
||||
return io.ErrShortWrite
|
||||
}
|
||||
// 写入成功之后获取最新文件内容大小,执行对比
|
||||
if stat, err := file.Stat(); err != nil {
|
||||
return err
|
||||
} else {
|
||||
// 由于文件锁机制的保证,同一时刻只会有一个进程(&协程)在执行写入,不会出现数据粘包情况
|
||||
// 这里从严谨性考虑增加大小判断,更进一步避免粘包,或者丢包情况
|
||||
if stat.Size() - int64(writeSize) != oldSize {
|
||||
return errors.New("error writing data")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
go startTcpListening()
|
||||
}
|
||||
|
||||
// 获取指定进程的通信文件地址
|
||||
@ -193,38 +52,4 @@ func getCommDirPath() string {
|
||||
tempDir = gfile.TempDir()
|
||||
}
|
||||
return tempDir + gfile.Separator + "gproc"
|
||||
}
|
||||
|
||||
// 数据解包,防止黏包
|
||||
func bufferToMsgs(buffer []byte) []*Msg {
|
||||
s := 0
|
||||
msgs := make([]*Msg, 0)
|
||||
for s < len(buffer) {
|
||||
length := gbinary.DecodeToInt(buffer[s : s + 4])
|
||||
if length < 0 || length > len(buffer) {
|
||||
s++
|
||||
continue
|
||||
}
|
||||
checksum1 := gbinary.DecodeToUint32(buffer[s + 8 : s + 12])
|
||||
checksum2 := checksum(buffer[s + 12 : s + length])
|
||||
if checksum1 != checksum2 {
|
||||
s++
|
||||
continue
|
||||
}
|
||||
msgs = append(msgs, &Msg {
|
||||
Pid : gbinary.DecodeToInt(buffer[s + 4 : s + 8]),
|
||||
Data : buffer[s + 12 : s + length],
|
||||
})
|
||||
s += length
|
||||
}
|
||||
return msgs
|
||||
}
|
||||
|
||||
// 常见的二进制数据校验方式,生成校验结果
|
||||
func checksum(buffer []byte) uint32 {
|
||||
var checksum uint32
|
||||
for _, b := range buffer {
|
||||
checksum += uint32(b)
|
||||
}
|
||||
return checksum
|
||||
}
|
||||
15
g/os/gproc/gproc_comm_receive.go
Normal file
15
g/os/gproc/gproc_comm_receive.go
Normal file
@ -0,0 +1,15 @@
|
||||
// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the MIT License.
|
||||
// If a copy of the MIT was not distributed with this file,
|
||||
// You can obtain one at https://gitee.com/johng/gf.
|
||||
|
||||
package gproc
|
||||
|
||||
// 获取其他进程传递到当前进程的消息包,阻塞执行
|
||||
func Receive() *Msg {
|
||||
if v := commReceiveQueue.PopFront(); v != nil {
|
||||
return v.(*Msg)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
79
g/os/gproc/gproc_comm_send.go
Normal file
79
g/os/gproc/gproc_comm_send.go
Normal file
@ -0,0 +1,79 @@
|
||||
// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the MIT License.
|
||||
// If a copy of the MIT was not distributed with this file,
|
||||
// You can obtain one at https://gitee.com/johng/gf.
|
||||
|
||||
package gproc
|
||||
|
||||
import (
|
||||
"net"
|
||||
"gitee.com/johng/gf/g/net/gtcp"
|
||||
"gitee.com/johng/gf/g/os/gfile"
|
||||
"gitee.com/johng/gf/g/util/gconv"
|
||||
"gitee.com/johng/gf/g/encoding/gbinary"
|
||||
"os"
|
||||
"fmt"
|
||||
"errors"
|
||||
)
|
||||
|
||||
const (
|
||||
gPROC_COMM_FAILURE_RETRY_COUNT = 3
|
||||
)
|
||||
|
||||
// 向指定gproc进程发送数据
|
||||
// 数据格式:总长度(32bit) | PID(32bit) | 校验(32bit) | 参数(变长)
|
||||
func Send(pid int, data []byte) error {
|
||||
b := make([]byte, 0)
|
||||
b = append(b, gbinary.EncodeInt32(int32(len(data) + 12))...)
|
||||
b = append(b, gbinary.EncodeInt32(int32(os.Getpid()))...)
|
||||
b = append(b, gbinary.EncodeUint32(gtcp.Checksum(data))...)
|
||||
b = append(b, data...)
|
||||
if conn, err := getConnByPid(pid); err == nil {
|
||||
for i := gPROC_COMM_FAILURE_RETRY_COUNT; i > 0; i-- {
|
||||
if err = gtcp.Send(conn, b); err != nil {
|
||||
conn.Close()
|
||||
if conn, err = newConnByPid(pid); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
return err
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// 获取指定进程的TCP通信对象
|
||||
func getConnByPid(pid int) (net.Conn, error) {
|
||||
if v := commPidConnMap.Get(pid); v != nil {
|
||||
return v.(net.Conn), nil
|
||||
} else {
|
||||
return newConnByPid(pid)
|
||||
}
|
||||
}
|
||||
|
||||
// 创建与指定进程的TCP通信对象
|
||||
func newConnByPid(pid int) (net.Conn, error) {
|
||||
if port := getPortByPid(pid); port > 0 {
|
||||
if conn, err := gtcp.Conn("127.0.0.1", port); err == nil {
|
||||
commPidConnMap.Set(pid, conn)
|
||||
return conn, nil
|
||||
} else {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
return nil, errors.New(fmt.Sprintf("%d not found", pid))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// 获取指定进程监听的端口号
|
||||
func getPortByPid(pid int) int {
|
||||
path := getCommFilePath(pid)
|
||||
content := gfile.GetContents(path)
|
||||
return gconv.Int(content)
|
||||
}
|
||||
91
g/os/gproc/gproc_comm_tcp.go
Normal file
91
g/os/gproc/gproc_comm_tcp.go
Normal file
@ -0,0 +1,91 @@
|
||||
// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the MIT License.
|
||||
// If a copy of the MIT was not distributed with this file,
|
||||
// You can obtain one at https://gitee.com/johng/gf.
|
||||
// "不要通过共享内存来通信,而应该通过通信来共享内存"
|
||||
|
||||
|
||||
package gproc
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"gitee.com/johng/gf/g/os/glog"
|
||||
"gitee.com/johng/gf/g/net/gtcp"
|
||||
"gitee.com/johng/gf/g/os/gfile"
|
||||
"gitee.com/johng/gf/g/util/gconv"
|
||||
"gitee.com/johng/gf/g/encoding/gbinary"
|
||||
)
|
||||
|
||||
const (
|
||||
gPROC_DEFAULT_TCP_PORT = 10000
|
||||
)
|
||||
|
||||
// 创建本地进程TCP通信服务
|
||||
func startTcpListening() {
|
||||
var listen *net.TCPListener
|
||||
for i := gPROC_DEFAULT_TCP_PORT; ; i++ {
|
||||
addr, err := net.ResolveTCPAddr("tcp4", fmt.Sprintf("127.0.0.1:%d", i))
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
listen, err = net.ListenTCP("tcp", addr)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
// 将监听的端口保存到通信文件中(字符串类型存放)
|
||||
gfile.PutContents(getCommFilePath(Pid()), gconv.String(i))
|
||||
break
|
||||
}
|
||||
for {
|
||||
if conn, err := listen.Accept(); err != nil {
|
||||
glog.Error(err)
|
||||
} else if conn != nil {
|
||||
go tcpServiceHandler(conn)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TCP数据通信处理回调函数
|
||||
// 数据格式:总长度(32bit) | PID(32bit) | 校验(32bit) | 参数(变长)
|
||||
func tcpServiceHandler(conn net.Conn) {
|
||||
for {
|
||||
if buffer, err := gtcp.Receive(conn, gtcp.Retry{3, 10}); err == nil {
|
||||
if len(buffer) > 0 {
|
||||
for _, v := range bufferToMsgs(buffer) {
|
||||
commReceiveQueue.PushBack(v)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
fmt.Println(err)
|
||||
conn.Close()
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 数据解包,防止黏包
|
||||
func bufferToMsgs(buffer []byte) []*Msg {
|
||||
s := 0
|
||||
msgs := make([]*Msg, 0)
|
||||
for s < len(buffer) {
|
||||
length := gbinary.DecodeToInt(buffer[s : s + 4])
|
||||
if length < 0 || length > len(buffer) {
|
||||
s++
|
||||
continue
|
||||
}
|
||||
checksum1 := gbinary.DecodeToUint32(buffer[s + 8 : s + 12])
|
||||
checksum2 := gtcp.Checksum(buffer[s + 12 : s + length])
|
||||
if checksum1 != checksum2 {
|
||||
s++
|
||||
continue
|
||||
}
|
||||
msgs = append(msgs, &Msg {
|
||||
Pid : gbinary.DecodeToInt(buffer[s + 4 : s + 8]),
|
||||
Data : buffer[s + 12 : s + length],
|
||||
})
|
||||
s += length
|
||||
}
|
||||
return msgs
|
||||
}
|
||||
@ -93,10 +93,12 @@ func (m *Manager) GetProcess(pid int) *Process {
|
||||
|
||||
// 添加一个已存在进程到进程管理器中
|
||||
func (m *Manager) AddProcess(pid int) {
|
||||
if process, err := os.FindProcess(pid); err == nil {
|
||||
p := m.NewProcess("", nil, nil)
|
||||
p.Process = process
|
||||
m.processes.Set(pid, p)
|
||||
if m.processes.Get(pid) == nil {
|
||||
if process, err := os.FindProcess(pid); err == nil {
|
||||
p := m.NewProcess("", nil, nil)
|
||||
p.Process = process
|
||||
m.processes.Set(pid, p)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -9,8 +9,8 @@ package gproc
|
||||
import (
|
||||
"os"
|
||||
"fmt"
|
||||
"errors"
|
||||
"os/exec"
|
||||
"errors"
|
||||
)
|
||||
|
||||
// 子进程
|
||||
@ -60,7 +60,7 @@ func (p *Process) Send(data []byte) error {
|
||||
if p.Process != nil {
|
||||
return Send(p.Process.Pid, data)
|
||||
}
|
||||
return errors.New("process not running")
|
||||
return errors.New("invalid process")
|
||||
}
|
||||
|
||||
|
||||
|
||||
23
geg/os/grpool/goroutine.go
Normal file
23
geg/os/grpool/goroutine.go
Normal file
@ -0,0 +1,23 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
"gitee.com/johng/gf/g/os/gtime"
|
||||
)
|
||||
|
||||
|
||||
func main() {
|
||||
start := gtime.Millisecond()
|
||||
wg := sync.WaitGroup{}
|
||||
for i := 0; i < 10000000; i++ {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
time.Sleep(time.Millisecond)
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
fmt.Println("time spent:", gtime.Millisecond() - start)
|
||||
}
|
||||
24
geg/os/grpool/grpool.go
Normal file
24
geg/os/grpool/grpool.go
Normal file
@ -0,0 +1,24 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
"gitee.com/johng/gf/g/os/gtime"
|
||||
"gitee.com/johng/gf/g/os/grpool"
|
||||
)
|
||||
|
||||
func main() {
|
||||
start := gtime.Millisecond()
|
||||
wg := sync.WaitGroup{}
|
||||
for i := 0; i < 10000000; i++ {
|
||||
wg.Add(1)
|
||||
grpool.Add(func() {
|
||||
time.Sleep(time.Millisecond)
|
||||
wg.Done()
|
||||
})
|
||||
}
|
||||
wg.Wait()
|
||||
fmt.Println(grpool.Size())
|
||||
fmt.Println("time spent:", gtime.Millisecond() - start)
|
||||
}
|
||||
@ -1,20 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"gitee.com/johng/gf/g/os/grpool"
|
||||
)
|
||||
|
||||
func main() {
|
||||
wg := sync.WaitGroup{}
|
||||
for i := 0; i < 10; i++ {
|
||||
wg.Add(1)
|
||||
v := i
|
||||
grpool.Add(func() {
|
||||
fmt.Println(v)
|
||||
wg.Done()
|
||||
})
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
@ -2,22 +2,15 @@ package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"gitee.com/johng/gf/g/os/gtime"
|
||||
"time"
|
||||
"gitee.com/johng/gf/g/os/gfile"
|
||||
)
|
||||
|
||||
func main() {
|
||||
file, err := gfile.Open("/home/john/Documents/temp.txt")
|
||||
fmt.Println(err)
|
||||
gtime.SetInterval(time.Second, func() bool {
|
||||
if s, e := file.Stat(); e == nil {
|
||||
fmt.Println(s.ModTime().Unix())
|
||||
fmt.Println(gfile.MTime("/home/john/Documents/temp.txt"))
|
||||
}
|
||||
return true
|
||||
})
|
||||
|
||||
time.Sleep(time.Hour)
|
||||
|
||||
func test(data []byte) {
|
||||
data = append(data, byte('A'))
|
||||
fmt.Println(data)
|
||||
}
|
||||
|
||||
func main() {
|
||||
a := []byte("1")
|
||||
test(a)
|
||||
fmt.Println(a)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user