Merge branch 'master' of https://github.com/gogf/gf into gogf-master2

This commit is contained in:
jroam
2019-06-17 21:51:38 +08:00
21 changed files with 485 additions and 256 deletions

View File

@ -4,7 +4,6 @@
[![Go Doc](https://godoc.org/github.com/gogf/gf?status.svg)](https://godoc.org/github.com/gogf/gf/g#pkg-subdirectories)
[![Build Status](https://travis-ci.org/gogf/gf.svg?branch=master)](https://travis-ci.org/gogf/gf)
[![Go Report](https://goreportcard.com/badge/github.com/gogf/gf)](https://goreportcard.com/report/github.com/gogf/gf)
[![Code Coverage](https://codecov.io/gh/gogf/gf/branch/master/graph/badge.svg)](https://codecov.io/gh/gogf/gf/branch/master)
[![Production Ready](https://img.shields.io/badge/production-ready-blue.svg)](https://github.com/gogf/gf)
[![License](https://img.shields.io/github/license/gogf/gf.svg?style=flat)](https://github.com/gogf/gf)

View File

@ -4,7 +4,6 @@
[![Go Doc](https://godoc.org/github.com/gogf/gf?status.svg)](https://godoc.org/github.com/gogf/gf/g#pkg-subdirectories)
[![Build Status](https://travis-ci.org/gogf/gf.svg?branch=master)](https://travis-ci.org/gogf/gf)
[![Go Report](https://goreportcard.com/badge/github.com/gogf/gf)](https://goreportcard.com/report/github.com/gogf/gf)
[![Code Coverage](https://codecov.io/gh/gogf/gf/branch/master/graph/badge.svg)](https://codecov.io/gh/gogf/gf/branch/master)
[![Production Ready](https://img.shields.io/badge/production-ready-blue.svg)](https://github.com/gogf/gf)
[![License](https://img.shields.io/github/license/gogf/gf.svg?style=flat)](https://github.com/gogf/gf)

View File

@ -50,7 +50,6 @@
1. grpool增加支持阻塞添加任务接口
# DONE
1. gconv完善针对不同类型的判断例如尽量减少sprintf("%v", xxx)来执行string类型的转换
2. ghttp.Server请求执行中增加服务退出的方法不再执行后续操作

View File

@ -93,11 +93,9 @@ func (l *List) PopBacks(max int) (values []interface{}) {
if max > 0 && max < length {
length = max
}
tempe := (*Element)(nil)
values = make([]interface{}, length)
for i := 0; i < length; i++ {
tempe = l.list.Back()
values[i] = l.list.Remove(tempe)
values[i] = l.list.Remove(l.list.Back())
}
}
l.mu.Unlock()
@ -107,20 +105,18 @@ func (l *List) PopBacks(max int) (values []interface{}) {
// PopFronts removes <max> elements from front of <l>
// and returns values of the removed elements as slice.
func (l *List) PopFronts(max int) (values []interface{}) {
l.mu.RLock()
l.mu.Lock()
length := l.list.Len()
if length > 0 {
if max > 0 && max < length {
length = max
}
tempe := (*Element)(nil)
values = make([]interface{}, length)
for i := 0; i < length; i++ {
tempe = l.list.Front()
values[i] = l.list.Remove(tempe)
values[i] = l.list.Remove(l.list.Front())
}
}
l.mu.RUnlock()
l.mu.Unlock()
return
}

View File

@ -19,9 +19,9 @@
package gqueue
import (
"github.com/gogf/gf/g/container/glist"
"github.com/gogf/gf/g/container/gtype"
"math"
"github.com/gogf/gf/g/container/glist"
"github.com/gogf/gf/g/container/gtype"
"math"
)
type Queue struct {
@ -35,6 +35,8 @@ type Queue struct {
const (
// Size for queue buffer.
gDEFAULT_QUEUE_SIZE = 10000
// Max batch size per-fetching from list.
gDEFAULT_MAX_BATCH_SIZE = 10
)
// New returns an empty queue object.
@ -66,22 +68,28 @@ func (q *Queue) startAsyncLoop() {
}()
for !q.closed.Val() {
<- q.events
if length := q.list.Len(); length > 0 {
array := make([]interface{}, length)
for i := 0; i < length; i++ {
if e := q.list.Front(); e != nil {
array[i] = q.list.Remove(e)
} else {
break
}
}
for _, v := range array {
// When q.C closes, it will panic here, especially q.C is being blocked for writing.
// It will be caught by recover and be ignored, if any error occurs here.
q.C <- v
}
for !q.closed.Val() {
if length := q.list.Len(); length > 0 {
if length > gDEFAULT_MAX_BATCH_SIZE {
length = gDEFAULT_MAX_BATCH_SIZE
}
for _, v := range q.list.PopFronts(length) {
// When q.C is closed, it will panic here, especially q.C is being blocked for writing.
// If any error occurs here, it will be caught by recover and be ignored.
q.C <- v
}
} else {
break
}
}
// Clear q.events to remain just one event to do the next synchronization check.
for i := 0; i < len(q.events) - 1; i++ {
<- q.events
}
}
// It should be here to close q.C.
// It's the sender's responsibility to close channel when it should be closed.
close(q.C)
}
// Push pushes the data <v> into the queue.
@ -91,7 +99,9 @@ func (q *Queue) Push(v interface{}) {
q.C <- v
} else {
q.list.PushBack(v)
q.events <- struct{}{}
if len(q.events) < gDEFAULT_QUEUE_SIZE {
q.events <- struct{}{}
}
}
}
@ -106,11 +116,11 @@ func (q *Queue) Pop() interface{} {
// which are being blocked reading using Pop method.
func (q *Queue) Close() {
q.closed.Set(true)
if q.events != nil {
close(q.events)
}
if q.C != nil {
close(q.C)
if q.events != nil {
close(q.events)
}
for i := 0; i < gDEFAULT_MAX_BATCH_SIZE; i++ {
q.Pop()
}
}

View File

@ -1,3 +1,11 @@
// Copyright 2017 gf Author(https://github.com/gogf/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.
// go test *.go -bench=".*" -benchmem
package gqueue_test
import (
@ -7,16 +15,27 @@ import (
)
func TestQueue_Len(t *testing.T) {
maxs := 100
for n := 10; n < maxs; n++ {
q1 := gqueue.New(maxs)
for i := 0; i < maxs; i++ {
max := 100
for n := 10; n < max; n++ {
q1 := gqueue.New(max)
for i := 0; i < max; i++ {
q1.Push(i)
}
gtest.Assert(q1.Len(), maxs)
gtest.Assert(q1.Len(), max)
gtest.Assert(q1.Size(), max)
}
}
func TestQueue_Basic(t *testing.T) {
q := gqueue.New()
for i := 0; i < 100; i++ {
q.Push(i)
}
gtest.Assert(q.Pop(), 0)
gtest.Assert(q.Pop(), 1)
}
func TestQueue_Pop(t *testing.T) {
q1 := gqueue.New()
q1.Push(1)

View File

@ -8,104 +8,179 @@
package gvar
import (
"github.com/gogf/gf/g/container/gtype"
"github.com/gogf/gf/g/os/gtime"
"github.com/gogf/gf/g/util/gconv"
"time"
"github.com/gogf/gf/g/container/gtype"
"github.com/gogf/gf/g/os/gtime"
"github.com/gogf/gf/g/util/gconv"
"time"
)
// Var is an universal variable type.
type Var struct {
value interface{} // Underlying value.
safe bool // Concurrent safe or not.
value interface{} // Underlying value.
safe bool // Concurrent safe or not.
}
// New returns a new Var with given <value>.
// The parameter <unsafe> used to specify whether using Var in un-concurrent-safety,
// which is false in default, means concurrent-safe.
func New(value interface{}, unsafe...bool) *Var {
v := &Var{}
if len(unsafe) == 0 || !unsafe[0] {
v.safe = true
v.value = gtype.NewInterface(value)
} else {
v.value = value
}
return v
func New(value interface{}, unsafe ...bool) *Var {
v := &Var{}
if len(unsafe) == 0 || !unsafe[0] {
v.safe = true
v.value = gtype.NewInterface(value)
} else {
v.value = value
}
return v
}
// Set sets <value> to <v>, and returns the old value.
func (v *Var) Set(value interface{}) (old interface{}) {
if v.safe {
old = v.value.(*gtype.Interface).Set(value)
} else {
old = v.value
v.value = value
}
return
if v.safe {
old = v.value.(*gtype.Interface).Set(value)
} else {
old = v.value
v.value = value
}
return
}
// Val returns the current value of <v>.
func (v *Var) Val() interface{} {
if v.safe {
return v.value.(*gtype.Interface).Val()
} else {
return v.value
}
if v.safe {
return v.value.(*gtype.Interface).Val()
}
return v.value
}
// See Val().
// Interface is alias of Val.
func (v *Var) Interface() interface{} {
return v.Val()
return v.Val()
}
// Time converts and returns <v> as time.Time.
// The parameter <format> specifies the format of the time string using gtime,
// eg: Y-m-d H:i:s.
func (v *Var) Time(format...string) time.Time {
return gconv.Time(v.Val(), format...)
func (v *Var) Time(format ...string) time.Time {
return gconv.Time(v.Val(), format...)
}
// TimeDuration converts and returns <v> as time.Duration.
// Duration converts and returns <v> as time.Duration.
// If value of <v> is string, then it uses time.ParseDuration for conversion.
func (v *Var) Duration() time.Duration {
return gconv.Duration(v.Val())
return gconv.Duration(v.Val())
}
// GTime converts and returns <v> as *gtime.Time.
// The parameter <format> specifies the format of the time string using gtime,
// eg: Y-m-d H:i:s.
func (v *Var) GTime(format...string) *gtime.Time {
return gconv.GTime(v.Val(), format...)
func (v *Var) GTime(format ...string) *gtime.Time {
return gconv.GTime(v.Val(), format...)
}
// Struct maps value of <v> to <objPointer>.
// The parameter <objPointer> should be a pointer to a struct instance.
// The parameter <mapping> is used to specify the key-to-attribute mapping rules.
func (v *Var) Struct(pointer interface{}, mapping...map[string]string) error {
return gconv.Struct(v.Val(), pointer, mapping...)
func (v *Var) Struct(pointer interface{}, mapping ...map[string]string) error {
return gconv.Struct(v.Val(), pointer, mapping...)
}
func (v *Var) IsNil() bool { return v.Val() == nil }
func (v *Var) Bytes() []byte { return gconv.Bytes(v.Val()) }
func (v *Var) String() string { return gconv.String(v.Val()) }
func (v *Var) Bool() bool { return gconv.Bool(v.Val()) }
// IsNil checks whether <v> is nil.
func (v *Var) IsNil() bool {
return v.Val() == nil
}
func (v *Var) Int() int { return gconv.Int(v.Val()) }
func (v *Var) Int8() int8 { return gconv.Int8(v.Val()) }
func (v *Var) Int16() int16 { return gconv.Int16(v.Val()) }
func (v *Var) Int32() int32 { return gconv.Int32(v.Val()) }
func (v *Var) Int64() int64 { return gconv.Int64(v.Val()) }
// Bytes converts and returns <v> as []byte.
func (v *Var) Bytes() []byte {
return gconv.Bytes(v.Val())
}
func (v *Var) Uint() uint { return gconv.Uint(v.Val()) }
func (v *Var) Uint8() uint8 { return gconv.Uint8(v.Val()) }
func (v *Var) Uint16() uint16 { return gconv.Uint16(v.Val()) }
func (v *Var) Uint32() uint32 { return gconv.Uint32(v.Val()) }
func (v *Var) Uint64() uint64 { return gconv.Uint64(v.Val()) }
// String converts and returns <v> as string.
func (v *Var) String() string {
return gconv.String(v.Val())
}
func (v *Var) Float32() float32 { return gconv.Float32(v.Val()) }
func (v *Var) Float64() float64 { return gconv.Float64(v.Val()) }
// Bool converts and returns <v> as bool.
func (v *Var) Bool() bool {
return gconv.Bool(v.Val())
}
func (v *Var) Ints() []int { return gconv.Ints(v.Val()) }
func (v *Var) Floats() []float64 { return gconv.Floats(v.Val()) }
func (v *Var) Strings() []string { return gconv.Strings(v.Val()) }
func (v *Var) Interfaces() []interface{} { return gconv.Interfaces(v.Val()) }
// Int converts and returns <v> as int.
func (v *Var) Int() int {
return gconv.Int(v.Val())
}
// Int8 converts and returns <v> as int8.
func (v *Var) Int8() int8 {
return gconv.Int8(v.Val())
}
// Int16 converts and returns <v> as int16.
func (v *Var) Int16() int16 {
return gconv.Int16(v.Val())
}
// Int32 converts and returns <v> as int32.
func (v *Var) Int32() int32 {
return gconv.Int32(v.Val())
}
// Int64 converts and returns <v> as int64.
func (v *Var) Int64() int64 {
return gconv.Int64(v.Val())
}
// Uint converts and returns <v> as uint.
func (v *Var) Uint() uint {
return gconv.Uint(v.Val())
}
// Uint8 converts and returns <v> as uint8.
func (v *Var) Uint8() uint8 {
return gconv.Uint8(v.Val())
}
// Uint16 converts and returns <v> as uint16.
func (v *Var) Uint16() uint16 {
return gconv.Uint16(v.Val())
}
// Uint32 converts and returns <v> as uint32.
func (v *Var) Uint32() uint32 {
return gconv.Uint32(v.Val())
}
// Uint64 converts and returns <v> as uint64.
func (v *Var) Uint64() uint64 {
return gconv.Uint64(v.Val())
}
// Float32 converts and returns <v> as float32.
func (v *Var) Float32() float32 {
return gconv.Float32(v.Val())
}
// Float64 converts and returns <v> as float64.
func (v *Var) Float64() float64 {
return gconv.Float64(v.Val())
}
// Ints converts and returns <v> as []int.
func (v *Var) Ints() []int {
return gconv.Ints(v.Val())
}
// Floats converts and returns <v> as []float64.
func (v *Var) Floats() []float64 {
return gconv.Floats(v.Val())
}
// Strings converts and returns <v> as []string.
func (v *Var) Strings() []string {
return gconv.Strings(v.Val())
}
// Interfaces converts and returns <v> as []interfaces{}.
func (v *Var) Interfaces() []interface{} {
return gconv.Interfaces(v.Val())
}

View File

@ -0,0 +1,149 @@
package gfpool_test
import (
"github.com/gogf/gf/g/os/gfile"
"github.com/gogf/gf/g/os/gfpool"
"github.com/gogf/gf/g/os/glog"
"github.com/gogf/gf/g/test/gtest"
"os"
"testing"
"time"
)
// TestOpen test open file cache
func TestOpen(t *testing.T) {
testFile := start("TestOpen.txt")
gtest.Case(t, func() {
f, err := gfpool.Open(testFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_APPEND, 0666)
gtest.AssertEQ(err, nil)
f.Close()
f2, err1 := gfpool.Open(testFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_APPEND, 0666)
gtest.AssertEQ(err1, nil)
gtest.AssertEQ(f, f2)
f2.Close()
// Deprecated test
f3, err2 := gfpool.OpenFile(testFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_APPEND, 0666)
gtest.AssertEQ(err2, nil)
gtest.AssertEQ(f, f3)
f3.Close()
})
stop(testFile)
}
// TestOpenErr test open file error
func TestOpenErr(t *testing.T) {
gtest.Case(t, func() {
testErrFile := "errorPath"
_, err := gfpool.Open(testErrFile, os.O_RDWR, 0666)
gtest.AssertNE(err, nil)
// delete file error
testFile := start("TestOpenDeleteErr.txt")
pool := gfpool.New(testFile, os.O_RDWR, 0666)
_, err1 := pool.File()
gtest.AssertEQ(err1, nil)
stop(testFile)
_, err1 = pool.File()
gtest.AssertNE(err1, nil)
// append mode delete file error and create again
testFile = start("TestOpenCreateErr.txt")
pool = gfpool.New(testFile, os.O_CREATE, 0666)
_, err1 = pool.File()
gtest.AssertEQ(err1, nil)
stop(testFile)
_, err1 = pool.File()
gtest.AssertEQ(err1, nil)
// append mode delete file error
testFile = start("TestOpenAppendErr.txt")
pool = gfpool.New(testFile, os.O_APPEND, 0666)
_, err1 = pool.File()
gtest.AssertEQ(err1, nil)
stop(testFile)
_, err1 = pool.File()
gtest.AssertNE(err1, nil)
// trunc mode delete file error
testFile = start("TestOpenTruncErr.txt")
pool = gfpool.New(testFile, os.O_TRUNC, 0666)
_, err1 = pool.File()
gtest.AssertEQ(err1, nil)
stop(testFile)
_, err1 = pool.File()
gtest.AssertNE(err1, nil)
})
}
// TestOpenExpire test open file cache expire
func TestOpenExpire(t *testing.T) {
testFile := start("TestOpenExpire.txt")
gtest.Case(t, func() {
f, err := gfpool.Open(testFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_APPEND, 0666, 100)
gtest.AssertEQ(err, nil)
f.Close()
time.Sleep(150 * time.Millisecond)
f2, err1 := gfpool.Open(testFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_APPEND, 0666, 100)
gtest.AssertEQ(err1, nil)
gtest.AssertNE(f, f2)
f2.Close()
// Deprecated test
f3, err2 := gfpool.Open(testFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_APPEND, 0666, 100)
gtest.AssertEQ(err2, nil)
gtest.AssertEQ(f2, f3)
f3.Close()
})
stop(testFile)
}
// TestNewPool test gfpool new function
func TestNewPool(t *testing.T) {
testFile := start("TestNewPool.txt")
gtest.Case(t, func() {
f, err := gfpool.Open(testFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_APPEND, 0666)
gtest.AssertEQ(err, nil)
f.Close()
pool := gfpool.New(testFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_APPEND, 0666)
f2, err1 := pool.File()
// pool not equal
gtest.AssertEQ(err1, nil)
gtest.AssertNE(f, f2)
f2.Close()
pool.Close()
})
stop(testFile)
}
// test before
func start(name string) string {
testFile := os.TempDir() + string(os.PathSeparator) + name
if gfile.Exists(testFile) {
gfile.Remove(testFile)
}
content := "123"
gfile.PutContents(testFile, content)
return testFile
}
// test after
func stop(testFile string) {
if gfile.Exists(testFile) {
err := gfile.Remove(testFile)
if err != nil {
glog.Error(err)
}
}
}

View File

@ -4,11 +4,7 @@
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.
// Package gproc implements communication and management of processes.
//
// 进程管理/通信,
// 本进程管理从syscall, os.StartProcess, exec.Cmd都使用过,
// 最后采用了exec.Cmd来实现多进程管理这是一个顶层的跨平台封装兼容性更好另外两个是偏底层的接口。
// Package gproc implements management and communication for processes.
package gproc
import (
@ -62,11 +58,11 @@ func IsChild() bool {
}
// 设置gproc父进程ID当ppid为0时表示该进程为gproc主进程否则为gproc子进程
func SetPPid(ppid int) {
func SetPPid(ppid int) error {
if ppid > 0 {
os.Setenv(gPROC_ENV_KEY_PPID_KEY, gconv.String(ppid))
return os.Setenv(gPROC_ENV_KEY_PPID_KEY, gconv.String(ppid))
} else {
os.Unsetenv(gPROC_ENV_KEY_PPID_KEY)
return os.Unsetenv(gPROC_ENV_KEY_PPID_KEY)
}
}

View File

@ -15,18 +15,21 @@ import (
"github.com/gogf/gf/g/container/gmap"
)
// 进程通信数据结构
type Msg struct {
SendPid int `json:"spid"` // 发送进程ID
RecvPid int `json:"rpid"` // 接收进程ID
Group string `json:"group"` // 分组名称
Data []byte `json:"data"` // 原始数据
}
// 本地进程通信接收消息队列(按照分组进行构建的map键值为*gqueue.Queue对象)
var commReceiveQueues = gmap.NewStrAnyMap()
// (用于发送)已建立的PID对应的Conn通信对象键值为一个Pool防止并行使用同一个通信对象造成数据重叠
var commPidConnMap = gmap.NewIntAnyMap()
// TCP通信数据结构定义
type Msg struct {
Pid int // PID来源哪个进程
Data []byte // 数据
Group string // 分组名称
}
// 获取指定进程的通信文件地址
func getCommFilePath(pid int) string {

View File

@ -10,15 +10,15 @@
package gproc
import (
"fmt"
"net"
"github.com/gogf/gf/g/os/glog"
"github.com/gogf/gf/g/net/gtcp"
"github.com/gogf/gf/g/os/gfile"
"github.com/gogf/gf/g/util/gconv"
"github.com/gogf/gf/g/encoding/gbinary"
"github.com/gogf/gf/g/container/gqueue"
"github.com/gogf/gf/g/container/gtype"
"encoding/json"
"fmt"
"github.com/gogf/gf/g/container/gqueue"
"github.com/gogf/gf/g/container/gtype"
"github.com/gogf/gf/g/net/gtcp"
"github.com/gogf/gf/g/os/gfile"
"github.com/gogf/gf/g/os/glog"
"github.com/gogf/gf/g/util/gconv"
"net"
)
const (
@ -35,7 +35,7 @@ var (
// 进程只有在执行该方法后才会打开请求端口,默认情况下不允许进程间通信。
func Receive(group...string) *Msg {
// 一个进程只能开启一个监听goroutine
if tcpListened.Set(true) == false {
if !tcpListened.Val() && tcpListened.Set(true) == false {
go startTcpListening()
}
queue := (*gqueue.Queue)(nil)
@ -75,7 +75,9 @@ func startTcpListening() {
continue
}
// 将监听的端口保存到通信文件中(字符串类型存放)
gfile.PutContents(getCommFilePath(Pid()), gconv.String(i))
if err := gfile.PutContents(getCommFilePath(Pid()), gconv.String(i)); err != nil {
glog.Error(err)
}
break
}
for {
@ -89,74 +91,41 @@ func startTcpListening() {
// TCP数据通信处理回调函数
func tcpServiceHandler(conn *gtcp.Conn) {
retry := gtcp.Retry {
Count : 3,
Interval: 10,
}
option := gtcp.PkgOption{
Retry : gtcp.Retry {
Count : 3,
Interval: 10,
},
}
for {
var result []byte
buffer, err := conn.Recv(-1, retry)
buffer, err := conn.RecvPkg(option)
if len(buffer) > 0 {
var msgs []*Msg
for _, msg := range bufferToMsgs(buffer) {
if v := commReceiveQueues.Get(msg.Group); v != nil {
msgs = append(msgs, msg)
} else {
result = []byte(fmt.Sprintf("group [%s] does not exist", msg.Group))
break
}
msg := new(Msg)
if err := json.Unmarshal(buffer, msg); err != nil {
glog.Error(err)
continue
}
// 成功时会返回ok给peer
if len(result) == 0 {
if v := commReceiveQueues.Get(msg.Group); v == nil {
result = []byte(fmt.Sprintf("group [%s] does not exist", msg.Group))
break
} else {
result = []byte("ok")
for _, msg := range msgs {
if v := commReceiveQueues.Get(msg.Group); v != nil {
v.(*gqueue.Queue).Push(msg)
}
if v := commReceiveQueues.Get(msg.Group); v != nil {
v.(*gqueue.Queue).Push(msg)
}
}
}
// 产生错误(或者对方已经关闭链接)时,退出接收循环
if err == nil {
conn.Send(result, retry)
if err := conn.SendPkg(result, option); err != nil {
glog.Error(err)
}
} else {
conn.Close()
if err := conn.Close(); err != nil {
glog.Error(err)
}
return
}
}
}
// 数据解包,防止黏包
// 数据格式:总长度(24bit)|发送进程PID(24bit)|接收进程PID(24bit)|分组长度(8bit)|分组名称(变长)|校验(32bit)|参数(变长)
func bufferToMsgs(buffer []byte) []*Msg {
s := 0
msgs := make([]*Msg, 0)
for s < len(buffer) {
// 长度解析及校验
length := gbinary.DecodeToInt(buffer[s : s + 3])
if length < 14 || length > len(buffer) {
s++
continue
}
// 分组信息解析
groupLen := gbinary.DecodeToInt(buffer[s + 9 : s + 10])
// checksum校验(仅对参数做校验,提高校验效率)
checksum1 := gbinary.DecodeToUint32(buffer[s + 10 + groupLen : s + 10 + groupLen + 4])
checksum2 := gtcp.Checksum(buffer[s + 10 + groupLen + 4 : s + length])
if checksum1 != checksum2 {
s++
continue
}
// 接收进程PID校验
if Pid() == gbinary.DecodeToInt(buffer[s + 6 : s + 9]) {
msgs = append(msgs, &Msg {
Pid : gbinary.DecodeToInt(buffer[s + 3 : s + 6]),
Data : buffer[s + 10 + groupLen + 4 : s + length],
Group : string(buffer[s + 10 : s + 10 + groupLen]),
})
}
s += length
}
return msgs
}

View File

@ -7,16 +7,15 @@
package gproc
import (
"bytes"
"errors"
"fmt"
"github.com/gogf/gf/g/encoding/gbinary"
"github.com/gogf/gf/g/net/gtcp"
"github.com/gogf/gf/g/os/gfcache"
"github.com/gogf/gf/g/os/glog"
"github.com/gogf/gf/g/util/gconv"
"io"
"time"
"bytes"
"encoding/json"
"errors"
"fmt"
"github.com/gogf/gf/g/net/gtcp"
"github.com/gogf/gf/g/os/gfcache"
"github.com/gogf/gf/g/util/gconv"
"io"
"time"
)
const (
@ -27,53 +26,53 @@ const (
)
// 向指定gproc进程发送数据.
// 数据格式:总长度(24bit)|发送进程PID(24bit)|接收进程PID(24bit)|分组长度(8bit)|分组名称(变长)|校验(32bit)|参数(变长)
func Send(pid int, data []byte, group...string) error {
groupName := gPROC_COMM_DEAFULT_GRUOP_NAME
if len(group) > 0 {
groupName = group[0]
}
buffer := make([]byte, 0)
buffer = append(buffer, gbinary.EncodeByLength(3, len(groupName) + len(data) + 14)...)
buffer = append(buffer, gbinary.EncodeByLength(3, Pid())...)
buffer = append(buffer, gbinary.EncodeByLength(3, pid)...)
buffer = append(buffer, gbinary.EncodeByLength(1, len(groupName))...)
buffer = append(buffer, []byte(groupName)...)
buffer = append(buffer, gbinary.EncodeUint32(gtcp.Checksum(data))...)
buffer = append(buffer, data...)
// 执行发送流程
var err error
msg := Msg{
SendPid : Pid(),
RecvPid : pid,
Group : gPROC_COMM_DEAFULT_GRUOP_NAME,
Data : data,
}
if len(group) > 0 {
msg.Group = group[0]
}
msgBytes, err := json.Marshal(msg)
if err != nil {
return err
}
var buf []byte
var conn *gtcp.Conn
for i := gPROC_COMM_FAILURE_RETRY_COUNT; i > 0; i-- {
if conn, err = getConnByPid(pid); err == nil {
defer conn.Close()
buf, err = conn.SendRecvWithTimeout(buffer, -1, gPROC_COMM_SEND_TIMEOUT*time.Millisecond)
if len(buf) > 0 {
// 如果有返回值,如果不是"ok",那么表示是错误信息
if !bytes.EqualFold(buf, []byte("ok")) {
err = errors.New(string(buf))
break
}
}
// EOF不算异常错误
if err == nil || err == io.EOF {
break
} else {
glog.Error(err)
}
var conn *gtcp.PoolConn
// 循环获取连接TCP对象
for i := gPROC_COMM_FAILURE_RETRY_COUNT; i > 0; i-- {
if conn, err = getConnByPid(pid); err == nil {
break
}
time.Sleep(gPROC_COMM_FAILURE_RETRY_TIMEOUT*time.Millisecond)
}
if conn == nil {
return err
}
defer conn.Close()
// 执行数据发送
buf, err = conn.SendRecvPkgWithTimeout(msgBytes, gPROC_COMM_SEND_TIMEOUT*time.Millisecond)
if len(buf) > 0 {
if !bytes.EqualFold(buf, []byte("ok")) {
err = errors.New(string(buf))
}
time.Sleep(gPROC_COMM_FAILURE_RETRY_TIMEOUT*time.Millisecond)
}
// EOF不算异常错误
if err == io.EOF {
err = nil
}
return err
}
// 获取指定进程的TCP通信对象
func getConnByPid(pid int) (*gtcp.Conn, error) {
func getConnByPid(pid int) (*gtcp.PoolConn, error) {
port := getPortByPid(pid)
if port > 0 {
if conn, err := gtcp.NewConn(fmt.Sprintf("127.0.0.1:%d", port)); err == nil {
if conn, err := gtcp.NewPoolConn(fmt.Sprintf("127.0.0.1:%d", port)); err == nil {
return conn, nil
} else {
return nil, err

View File

@ -20,6 +20,7 @@ import (
"sync"
)
// View object for template engine.
type View struct {
mu sync.RWMutex
paths *garray.StringArray // Searching path array.
@ -28,10 +29,10 @@ type View struct {
delimiters []string // Customized template delimiters.
}
// Template params type.
// Params is type for template params.
type Params = map[string]interface{}
// Customized template function map type.
// FuncMap is type for custom template functions.
type FuncMap = map[string]interface{}
// Default view object.
@ -218,7 +219,6 @@ func (view *View) AddPath(path string) error {
return nil
}
view.paths.Append(realPath)
//glog.Debug("[gview] AddPath:", realPath)
return nil
}

View File

@ -0,0 +1,18 @@
// Copyright 2018 gf Author(https://github.com/gogf/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://github.com/gogf/gf.
package gtest_test
import (
"github.com/gogf/gf/g/test/gtest"
"testing"
)
func TestCase(t *testing.T) {
gtest.Case(t, func() {
gtest.Assert(1, 1)
})
}

View File

@ -30,7 +30,7 @@ func main() {
case "heartbeat":
onClientHeartBeat(conn, msg)
default:
glog.Errorfln("invalid message: %v", msg)
glog.Errorf("invalid message: %v", msg)
break
}
}

View File

@ -10,8 +10,8 @@ import (
func main() {
l := gflock.New("demo.lock")
l.Lock()
glog.Printfln("locked by pid: %d", gproc.Pid())
glog.Printf("locked by pid: %d", gproc.Pid())
time.Sleep(3 * time.Second)
l.UnLock()
glog.Printfln("unlocked by pid: %d", gproc.Pid())
glog.Printf("unlocked by pid: %d", gproc.Pid())
}

View File

@ -10,18 +10,18 @@ import (
// 父子进程基本演示
func main() {
if gproc.IsChild() {
glog.Printfln("%d: Hi, I am child, waiting 3 seconds to die", gproc.Pid())
glog.Printf("%d: Hi, I am child, waiting 3 seconds to die", gproc.Pid())
time.Sleep(time.Second)
glog.Printfln("%d: 1", gproc.Pid())
glog.Printf("%d: 1", gproc.Pid())
time.Sleep(time.Second)
glog.Printfln("%d: 2", gproc.Pid())
glog.Printf("%d: 2", gproc.Pid())
time.Sleep(time.Second)
glog.Printfln("%d: 3", gproc.Pid())
glog.Printf("%d: 3", gproc.Pid())
} else {
m := gproc.NewManager()
p := m.NewProcess(os.Args[0], os.Args, os.Environ())
p.Start()
p.Wait()
glog.Printfln("%d: child died", gproc.Pid())
glog.Printf("%d: child died", gproc.Pid())
}
}

View File

@ -11,25 +11,25 @@ import (
// 请使用go build编译后运行不要使用IDE运行因为IDE大多采用的是子进程方式执行。
func main() {
if gproc.IsChild() {
glog.Printfln("%d: I am child, waiting 10 seconds to die", gproc.Pid())
glog.Printf("%d: I am child, waiting 10 seconds to die", gproc.Pid())
//p, err := os.FindProcess(os.Getppid())
//fmt.Println(err)
//p.Kill()
time.Sleep(2 * time.Second)
glog.Printfln("%d: 2", gproc.Pid())
glog.Printf("%d: 2", gproc.Pid())
time.Sleep(2 * time.Second)
glog.Printfln("%d: 4", gproc.Pid())
glog.Printf("%d: 4", gproc.Pid())
time.Sleep(2 * time.Second)
glog.Printfln("%d: 6", gproc.Pid())
glog.Printf("%d: 6", gproc.Pid())
time.Sleep(2 * time.Second)
glog.Printfln("%d: 8", gproc.Pid())
glog.Printf("%d: 8", gproc.Pid())
time.Sleep(2 * time.Second)
glog.Printfln("%d: died", gproc.Pid())
glog.Printf("%d: died", gproc.Pid())
} else {
p := gproc.NewProcess(os.Args[0], os.Args, os.Environ())
p.Start()
glog.Printfln("%d: I am main, waiting 3 seconds to die", gproc.Pid())
glog.Printf("%d: I am main, waiting 3 seconds to die", gproc.Pid())
time.Sleep(3 * time.Second)
glog.Printfln("%d: died", gproc.Pid())
glog.Printf("%d: died", gproc.Pid())
}
}

View File

@ -11,7 +11,7 @@ import (
// 查看父子进程的环境变量
func main() {
time.Sleep(5 * time.Second)
glog.Printfln("%d: %v", gproc.Pid(), genv.All())
glog.Printf("%d: %v", gproc.Pid(), genv.All())
p := gproc.NewProcess(os.Args[0], os.Args, os.Environ())
p.Start()
}

View File

@ -8,6 +8,7 @@ import (
"github.com/gogf/gf/g/os/glog"
"github.com/gogf/gf/g/os/gproc"
"github.com/gogf/gf/g/os/gtime"
"github.com/gogf/gf/g/os/gtimer"
"os"
"time"
)
@ -15,11 +16,10 @@ import (
func main() {
fmt.Printf("%d: I am child? %v\n", gproc.Pid(), gproc.IsChild())
if gproc.IsChild() {
gtime.SetInterval(time.Second, func() bool {
gtimer.SetInterval(time.Second, func() {
if err := gproc.Send(gproc.PPid(), []byte(gtime.Datetime())); err != nil {
glog.Error(err)
}
return true
})
select {}
} else {
@ -28,7 +28,7 @@ func main() {
p.Start()
for {
msg := gproc.Receive()
fmt.Printf("%d: receive from %d, data: %s\n", gproc.Pid(), msg.Pid, string(msg.Data))
fmt.Printf("%d: receive from %d, data: %s\n", gproc.Pid(), msg.SendPid, string(msg.Data))
}
}
}

View File

@ -1,17 +1,15 @@
package main
import (
"github.com/gogf/gf/g/container/gqueue"
"github.com/gogf/gf/g/test/gtest"
"fmt"
"sync"
)
func main() {
max := 100
q := gqueue.New(max)
for i := 1; i < max; i++ {
q.Push(i)
}
q.Close()
gtest.Assert(q.Len(), 1)
wg := sync.WaitGroup{}
wg.Add(1)
wg.Add(-100)
wg.Add()
wg.Wait()
fmt.Println(1)
}