improve process comminucation folder creating for package gproc

This commit is contained in:
John Guo
2021-09-01 15:02:58 +08:00
parent 73d2b7ed06
commit 425ca8aa3e
6 changed files with 41 additions and 19 deletions

View File

@ -33,7 +33,7 @@ func GetContentsWithCache(path string, duration ...time.Duration) string {
return string(GetBytesWithCache(path, duration...))
}
// GetBinContents returns []byte content of given file by <path> from cache.
// GetBytesWithCache returns []byte content of given file by <path> from cache.
// If there's no content in the cache, it will read it from disk file specified by <path>.
// The parameter <expire> specifies the caching time for this file content in seconds.
func GetBytesWithCache(path string, duration ...time.Duration) []byte {

View File

@ -12,7 +12,7 @@ import (
"github.com/gogf/gf/internal/intlog"
)
// watchLoop starts the loop for event listening fro underlying inotify monitor.
// watchLoop starts the loop for event listening from underlying inotify monitor.
func (w *Watcher) watchLoop() {
go func() {
for {

View File

@ -7,10 +7,12 @@
package gproc
import (
"context"
"fmt"
"github.com/gogf/gf/container/gmap"
"github.com/gogf/gf/errors/gcode"
"github.com/gogf/gf/errors/gerror"
"github.com/gogf/gf/internal/intlog"
"github.com/gogf/gf/net/gtcp"
"github.com/gogf/gf/os/gfile"
"github.com/gogf/gf/util/gconv"
@ -32,9 +34,10 @@ type MsgResponse struct {
}
const (
defaultGroupNameFoProcComm = "" // Default group name.
defaultTcpPortForProcComm = 10000 // Starting port number for receiver listening.
maxLengthForProcMsgQueue = 10000 // Max size for each message queue of the group.
defaultFolderNameForProcComm = "gf_pid_port_mapping" // Default folder name for storing pid to port mapping files.
defaultGroupNameForProcComm = "" // Default group name.
defaultTcpPortForProcComm = 10000 // Starting port number for receiver listening.
maxLengthForProcMsgQueue = 10000 // Max size for each message queue of the group.
)
var (
@ -43,17 +46,36 @@ var (
commReceiveQueues = gmap.NewStrAnyMap(true)
// commPidFolderPath specifies the folder path storing pid to port mapping files.
commPidFolderPath = gfile.TempDir("gproc")
commPidFolderPath string
)
func init() {
// Automatically create the storage folder.
if !gfile.Exists(commPidFolderPath) {
err := gfile.Mkdir(commPidFolderPath)
if err != nil {
panic(fmt.Errorf(`create gproc folder failed: %v`, err))
availablePaths := []string{
"/var/tmp",
"/var/run",
}
if homePath, _ := gfile.Home(); homePath != "" {
availablePaths = append(availablePaths, gfile.Join(homePath, ".config"))
}
availablePaths = append(availablePaths, gfile.TempDir())
for _, availablePath := range availablePaths {
checkPath := gfile.Join(availablePath, defaultFolderNameForProcComm)
if !gfile.Exists(checkPath) && gfile.Mkdir(checkPath) != nil {
continue
}
if gfile.IsWritable(checkPath) {
commPidFolderPath = checkPath
break
}
}
if commPidFolderPath == "" {
intlog.Errorf(
context.TODO(),
`cannot find available folder for storing pid to port mapping files in paths: %+v, process communication feature will fail`,
availablePaths,
)
}
}
// getConnByPid creates and returns a TCP connection for specified pid.
@ -72,9 +94,7 @@ func getConnByPid(pid int) (*gtcp.PoolConn, error) {
// getPortByPid returns the listening port for specified pid.
// It returns 0 if no port found for the specified pid.
func getPortByPid(pid int) int {
path := getCommFilePath(pid)
content := gfile.GetContentsWithCache(path)
return gconv.Int(content)
return gconv.Int(gfile.GetContentsWithCache(getCommFilePath(pid)))
}
// getCommFilePath returns the pid to port mapping file path for given pid.

View File

@ -35,7 +35,7 @@ func Receive(group ...string) *MsgRequest {
if len(group) > 0 {
groupName = group[0]
} else {
groupName = defaultGroupNameFoProcComm
groupName = defaultGroupNameForProcComm
}
queue := commReceiveQueues.GetOrSetFuncLock(groupName, func() interface{} {
return gqueue.New(maxLengthForProcMsgQueue)
@ -79,8 +79,10 @@ func receiveTcpListening() {
// receiveTcpHandler is the connection handler for receiving data.
func receiveTcpHandler(conn *gtcp.Conn) {
var result []byte
var response MsgResponse
var (
result []byte
response MsgResponse
)
for {
response.Code = 0
response.Message = ""

View File

@ -18,7 +18,7 @@ func Send(pid int, data []byte, group ...string) error {
msg := MsgRequest{
SendPid: Pid(),
RecvPid: pid,
Group: defaultGroupNameFoProcComm,
Group: defaultGroupNameForProcComm,
Data: data,
}
if len(group) > 0 {

View File

@ -1,4 +1,4 @@
package gf
const VERSION = "v1.17.0"
const VERSION = "v1.16.6"
const AUTHORS = "john<john@goframe.org>"