diff --git a/os/gproc/gproc_comm.go b/os/gproc/gproc_comm.go index 40652bcc0..546142a16 100644 --- a/os/gproc/gproc_comm.go +++ b/os/gproc/gproc_comm.go @@ -9,6 +9,7 @@ package gproc import ( "context" "fmt" + "sync" "github.com/gogf/gf/v2/container/gmap" "github.com/gogf/gf/v2/errors/gerror" @@ -20,10 +21,10 @@ import ( // MsgRequest is the request structure for process communication. type MsgRequest struct { - SendPid int // Sender PID. - RecvPid int // Receiver PID. - Group string // Message group name. - Data []byte // Request data. + SenderPid int // Sender PID. + ReceiverPid int // Receiver PID. + Group string // Message group name. + Data []byte // Request data. } // MsgResponse is the response structure for process communication. @@ -47,37 +48,11 @@ var ( // commPidFolderPath specifies the folder path storing pid to port mapping files. commPidFolderPath string + + // commPidFolderPathOnce is used for lazy calculation for `commPidFolderPath` is necessary. + commPidFolderPathOnce sync.Once ) -func init() { - availablePaths := []string{ - "/var/tmp", - "/var/run", - } - if homePath, _ := gfile.Home(); homePath != "" { - availablePaths = append(availablePaths, gfile.Join(homePath, ".config")) - } - availablePaths = append(availablePaths, gfile.Temp()) - for _, availablePath := range availablePaths { - checkPath := gfile.Join(availablePath, defaultFolderNameForProcComm) - if !gfile.Exists(checkPath) && gfile.Mkdir(checkPath) != nil { - continue - } - if gfile.IsWritable(checkPath) { - commPidFolderPath = checkPath - break - } - } - if commPidFolderPath == "" { - intlog.Errorf( - context.TODO(), - `cannot find available folder for storing pid to port mapping files in paths: %+v, process communication feature will fail`, - availablePaths, - ) - } - -} - // getConnByPid creates and returns a TCP connection for specified pid. func getConnByPid(pid int) (*gtcp.PoolConn, error) { port := getPortByPid(pid) @@ -94,10 +69,49 @@ func getConnByPid(pid int) (*gtcp.PoolConn, error) { // getPortByPid returns the listening port for specified pid. // It returns 0 if no port found for the specified pid. func getPortByPid(pid int) int { - return gconv.Int(gfile.GetContentsWithCache(getCommFilePath(pid))) + path := getCommFilePath(pid) + if path == "" { + return 0 + } + return gconv.Int(gfile.GetContentsWithCache(path)) } // getCommFilePath returns the pid to port mapping file path for given pid. func getCommFilePath(pid int) string { - return gfile.Join(commPidFolderPath, gconv.String(pid)) + path, err := getCommPidFolderPath() + if err != nil { + intlog.Errorf(context.TODO(), `%+v`, err) + return "" + } + return gfile.Join(path, gconv.String(pid)) +} + +// getCommPidFolderPath retrieves and returns the available directory for storing pid mapping files. +func getCommPidFolderPath() (folderPath string, err error) { + commPidFolderPathOnce.Do(func() { + availablePaths := []string{ + "/var/tmp", + "/var/run", + } + if path, _ := gfile.Home(".config"); path != "" { + availablePaths = append(availablePaths, path) + } + availablePaths = append(availablePaths, gfile.Temp()) + for _, availablePath := range availablePaths { + checkPath := gfile.Join(availablePath, defaultFolderNameForProcComm) + if !gfile.Exists(checkPath) && gfile.Mkdir(checkPath) != nil { + continue + } + if gfile.IsWritable(checkPath) { + commPidFolderPath = checkPath + break + } + } + err = gerror.Newf( + `cannot find available folder for storing pid to port mapping files in paths: %+v`, + availablePaths, + ) + }) + folderPath = commPidFolderPath + return } diff --git a/os/gproc/gproc_comm_receive.go b/os/gproc/gproc_comm_receive.go index a8386cdc1..942c04e69 100644 --- a/os/gproc/gproc_comm_receive.go +++ b/os/gproc/gproc_comm_receive.go @@ -13,6 +13,7 @@ import ( "github.com/gogf/gf/v2/container/gqueue" "github.com/gogf/gf/v2/container/gtype" + "github.com/gogf/gf/v2/errors/gerror" "github.com/gogf/gf/v2/internal/json" "github.com/gogf/gf/v2/net/gtcp" "github.com/gogf/gf/v2/os/gfile" @@ -51,26 +52,27 @@ func Receive(group ...string) *MsgRequest { // receiveTcpListening scans local for available port and starts listening. func receiveTcpListening() { - var listen *net.TCPListener - // Scan the available port for listening. - for i := defaultTcpPortForProcComm; ; i++ { - addr, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("127.0.0.1:%d", i)) - if err != nil { - continue - } - listen, err = net.ListenTCP("tcp", addr) - if err != nil { - continue - } - // Save the port to the pid file. - if err := gfile.PutContents(getCommFilePath(Pid()), gconv.String(i)); err != nil { - panic(err) - } - break + var ( + listen *net.TCPListener + conn net.Conn + port = gtcp.MustGetFreePort() + address = fmt.Sprintf("127.0.0.1:%d", port) + ) + tcpAddress, err := net.ResolveTCPAddr("tcp", address) + if err != nil { + panic(gerror.Wrap(err, `net.ResolveTCPAddr failed`)) + } + listen, err = net.ListenTCP("tcp", tcpAddress) + if err != nil { + panic(gerror.Wrapf(err, `net.ListenTCP failed for address "%s"`, address)) + } + // Save the port to the pid file. + if err = gfile.PutContents(getCommFilePath(Pid()), gconv.String(port)); err != nil { + panic(err) } // Start listening. for { - if conn, err := listen.Accept(); err != nil { + if conn, err = listen.Accept(); err != nil { glog.Error(context.TODO(), err) } else if conn != nil { go receiveTcpHandler(gtcp.NewConnByNetConn(conn)) @@ -96,9 +98,12 @@ func receiveTcpHandler(conn *gtcp.Conn) { if err = json.UnmarshalUseNumber(buffer, msg); err != nil { continue } - if msg.RecvPid != Pid() { + if msg.ReceiverPid != Pid() { // Not mine package. - response.Message = fmt.Sprintf("receiver pid not match, target: %d, current: %d", msg.RecvPid, Pid()) + response.Message = fmt.Sprintf( + "receiver pid not match, target: %d, current: %d", + msg.ReceiverPid, Pid(), + ) } else if v := commReceiveQueues.Get(msg.Group); v == nil { // Group check. response.Message = fmt.Sprintf("group [%s] does not exist", msg.Group) diff --git a/os/gproc/gproc_comm_send.go b/os/gproc/gproc_comm_send.go index fee372d38..a42ec47e3 100644 --- a/os/gproc/gproc_comm_send.go +++ b/os/gproc/gproc_comm_send.go @@ -17,10 +17,10 @@ import ( // Send sends data to specified process of given pid. func Send(pid int, data []byte, group ...string) error { msg := MsgRequest{ - SendPid: Pid(), - RecvPid: pid, - Group: defaultGroupNameForProcComm, - Data: data, + SenderPid: Pid(), + ReceiverPid: pid, + Group: defaultGroupNameForProcComm, + Data: data, } if len(group) > 0 { msg.Group = group[0]