diff --git a/README.MD b/README.MD index f363f0e54..2b8740977 100644 --- a/README.MD +++ b/README.MD @@ -4,7 +4,6 @@ [![Go Doc](https://godoc.org/github.com/gogf/gf?status.svg)](https://godoc.org/github.com/gogf/gf/g#pkg-subdirectories) [![Build Status](https://travis-ci.org/gogf/gf.svg?branch=master)](https://travis-ci.org/gogf/gf) -[![Go Report](https://goreportcard.com/badge/github.com/gogf/gf)](https://goreportcard.com/report/github.com/gogf/gf) [![Code Coverage](https://codecov.io/gh/gogf/gf/branch/master/graph/badge.svg)](https://codecov.io/gh/gogf/gf/branch/master) [![Production Ready](https://img.shields.io/badge/production-ready-blue.svg)](https://github.com/gogf/gf) [![License](https://img.shields.io/github/license/gogf/gf.svg?style=flat)](https://github.com/gogf/gf) diff --git a/README_ZH.MD b/README_ZH.MD index 3a6a8ea45..7e44f2e81 100644 --- a/README_ZH.MD +++ b/README_ZH.MD @@ -4,7 +4,6 @@ [![Go Doc](https://godoc.org/github.com/gogf/gf?status.svg)](https://godoc.org/github.com/gogf/gf/g#pkg-subdirectories) [![Build Status](https://travis-ci.org/gogf/gf.svg?branch=master)](https://travis-ci.org/gogf/gf) -[![Go Report](https://goreportcard.com/badge/github.com/gogf/gf)](https://goreportcard.com/report/github.com/gogf/gf) [![Code Coverage](https://codecov.io/gh/gogf/gf/branch/master/graph/badge.svg)](https://codecov.io/gh/gogf/gf/branch/master) [![Production Ready](https://img.shields.io/badge/production-ready-blue.svg)](https://github.com/gogf/gf) [![License](https://img.shields.io/github/license/gogf/gf.svg?style=flat)](https://github.com/gogf/gf) diff --git a/TODO.MD b/TODO.MD index 072b2c7d4..c56610e9c 100644 --- a/TODO.MD +++ b/TODO.MD @@ -50,7 +50,6 @@ 1. grpool增加支持阻塞添加任务接口; - # DONE 1. gconv完善针对不同类型的判断,例如:尽量减少sprintf("%v", xxx)来执行string类型的转换; 2. ghttp.Server请求执行中增加服务退出的方法,不再执行后续操作; diff --git a/g/container/glist/glist.go b/g/container/glist/glist.go index fa0361702..a3ddd5651 100644 --- a/g/container/glist/glist.go +++ b/g/container/glist/glist.go @@ -93,11 +93,9 @@ func (l *List) PopBacks(max int) (values []interface{}) { if max > 0 && max < length { length = max } - tempe := (*Element)(nil) values = make([]interface{}, length) for i := 0; i < length; i++ { - tempe = l.list.Back() - values[i] = l.list.Remove(tempe) + values[i] = l.list.Remove(l.list.Back()) } } l.mu.Unlock() @@ -107,20 +105,18 @@ func (l *List) PopBacks(max int) (values []interface{}) { // PopFronts removes elements from front of // and returns values of the removed elements as slice. func (l *List) PopFronts(max int) (values []interface{}) { - l.mu.RLock() + l.mu.Lock() length := l.list.Len() if length > 0 { if max > 0 && max < length { length = max } - tempe := (*Element)(nil) values = make([]interface{}, length) for i := 0; i < length; i++ { - tempe = l.list.Front() - values[i] = l.list.Remove(tempe) + values[i] = l.list.Remove(l.list.Front()) } } - l.mu.RUnlock() + l.mu.Unlock() return } diff --git a/g/container/gqueue/gqueue.go b/g/container/gqueue/gqueue.go index 2aafec72c..75c1b33c7 100644 --- a/g/container/gqueue/gqueue.go +++ b/g/container/gqueue/gqueue.go @@ -19,9 +19,9 @@ package gqueue import ( - "github.com/gogf/gf/g/container/glist" - "github.com/gogf/gf/g/container/gtype" - "math" + "github.com/gogf/gf/g/container/glist" + "github.com/gogf/gf/g/container/gtype" + "math" ) type Queue struct { @@ -35,6 +35,8 @@ type Queue struct { const ( // Size for queue buffer. gDEFAULT_QUEUE_SIZE = 10000 + // Max batch size per-fetching from list. + gDEFAULT_MAX_BATCH_SIZE = 10 ) // New returns an empty queue object. @@ -66,22 +68,28 @@ func (q *Queue) startAsyncLoop() { }() for !q.closed.Val() { <- q.events - if length := q.list.Len(); length > 0 { - array := make([]interface{}, length) - for i := 0; i < length; i++ { - if e := q.list.Front(); e != nil { - array[i] = q.list.Remove(e) - } else { - break - } - } - for _, v := range array { - // When q.C closes, it will panic here, especially q.C is being blocked for writing. - // It will be caught by recover and be ignored, if any error occurs here. - q.C <- v - } + for !q.closed.Val() { + if length := q.list.Len(); length > 0 { + if length > gDEFAULT_MAX_BATCH_SIZE { + length = gDEFAULT_MAX_BATCH_SIZE + } + for _, v := range q.list.PopFronts(length) { + // When q.C is closed, it will panic here, especially q.C is being blocked for writing. + // If any error occurs here, it will be caught by recover and be ignored. + q.C <- v + } + } else { + break + } + } + // Clear q.events to remain just one event to do the next synchronization check. + for i := 0; i < len(q.events) - 1; i++ { + <- q.events } } + // It should be here to close q.C. + // It's the sender's responsibility to close channel when it should be closed. + close(q.C) } // Push pushes the data into the queue. @@ -91,7 +99,9 @@ func (q *Queue) Push(v interface{}) { q.C <- v } else { q.list.PushBack(v) - q.events <- struct{}{} + if len(q.events) < gDEFAULT_QUEUE_SIZE { + q.events <- struct{}{} + } } } @@ -106,11 +116,11 @@ func (q *Queue) Pop() interface{} { // which are being blocked reading using Pop method. func (q *Queue) Close() { q.closed.Set(true) - if q.events != nil { - close(q.events) - } - if q.C != nil { - close(q.C) + if q.events != nil { + close(q.events) + } + for i := 0; i < gDEFAULT_MAX_BATCH_SIZE; i++ { + q.Pop() } } diff --git a/g/container/gqueue/gqueue_unit_test.go b/g/container/gqueue/gqueue_unit_test.go index eb8908a14..8b947ca75 100644 --- a/g/container/gqueue/gqueue_unit_test.go +++ b/g/container/gqueue/gqueue_unit_test.go @@ -1,3 +1,11 @@ +// Copyright 2017 gf Author(https://github.com/gogf/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +// go test *.go -bench=".*" -benchmem + package gqueue_test import ( @@ -7,16 +15,27 @@ import ( ) func TestQueue_Len(t *testing.T) { - maxs := 100 - for n := 10; n < maxs; n++ { - q1 := gqueue.New(maxs) - for i := 0; i < maxs; i++ { + + max := 100 + for n := 10; n < max; n++ { + q1 := gqueue.New(max) + for i := 0; i < max; i++ { q1.Push(i) } - gtest.Assert(q1.Len(), maxs) + gtest.Assert(q1.Len(), max) + gtest.Assert(q1.Size(), max) } } +func TestQueue_Basic(t *testing.T) { + q := gqueue.New() + for i := 0; i < 100; i++ { + q.Push(i) + } + gtest.Assert(q.Pop(), 0) + gtest.Assert(q.Pop(), 1) +} + func TestQueue_Pop(t *testing.T) { q1 := gqueue.New() q1.Push(1) diff --git a/g/container/gvar/gvar.go b/g/container/gvar/gvar.go index 728fe2570..05b3098e8 100644 --- a/g/container/gvar/gvar.go +++ b/g/container/gvar/gvar.go @@ -8,104 +8,179 @@ package gvar import ( - "github.com/gogf/gf/g/container/gtype" - "github.com/gogf/gf/g/os/gtime" - "github.com/gogf/gf/g/util/gconv" - "time" + "github.com/gogf/gf/g/container/gtype" + "github.com/gogf/gf/g/os/gtime" + "github.com/gogf/gf/g/util/gconv" + "time" ) +// Var is an universal variable type. type Var struct { - value interface{} // Underlying value. - safe bool // Concurrent safe or not. + value interface{} // Underlying value. + safe bool // Concurrent safe or not. } // New returns a new Var with given . // The parameter used to specify whether using Var in un-concurrent-safety, // which is false in default, means concurrent-safe. -func New(value interface{}, unsafe...bool) *Var { - v := &Var{} - if len(unsafe) == 0 || !unsafe[0] { - v.safe = true - v.value = gtype.NewInterface(value) - } else { - v.value = value - } - return v +func New(value interface{}, unsafe ...bool) *Var { + v := &Var{} + if len(unsafe) == 0 || !unsafe[0] { + v.safe = true + v.value = gtype.NewInterface(value) + } else { + v.value = value + } + return v } // Set sets to , and returns the old value. func (v *Var) Set(value interface{}) (old interface{}) { - if v.safe { - old = v.value.(*gtype.Interface).Set(value) - } else { - old = v.value - v.value = value - } - return + if v.safe { + old = v.value.(*gtype.Interface).Set(value) + } else { + old = v.value + v.value = value + } + return } // Val returns the current value of . func (v *Var) Val() interface{} { - if v.safe { - return v.value.(*gtype.Interface).Val() - } else { - return v.value - } + if v.safe { + return v.value.(*gtype.Interface).Val() + } + return v.value } -// See Val(). +// Interface is alias of Val. func (v *Var) Interface() interface{} { - return v.Val() + return v.Val() } // Time converts and returns as time.Time. // The parameter specifies the format of the time string using gtime, // eg: Y-m-d H:i:s. -func (v *Var) Time(format...string) time.Time { - return gconv.Time(v.Val(), format...) +func (v *Var) Time(format ...string) time.Time { + return gconv.Time(v.Val(), format...) } -// TimeDuration converts and returns as time.Duration. +// Duration converts and returns as time.Duration. // If value of is string, then it uses time.ParseDuration for conversion. func (v *Var) Duration() time.Duration { - return gconv.Duration(v.Val()) + return gconv.Duration(v.Val()) } // GTime converts and returns as *gtime.Time. // The parameter specifies the format of the time string using gtime, // eg: Y-m-d H:i:s. -func (v *Var) GTime(format...string) *gtime.Time { - return gconv.GTime(v.Val(), format...) +func (v *Var) GTime(format ...string) *gtime.Time { + return gconv.GTime(v.Val(), format...) } // Struct maps value of to . // The parameter should be a pointer to a struct instance. // The parameter is used to specify the key-to-attribute mapping rules. -func (v *Var) Struct(pointer interface{}, mapping...map[string]string) error { - return gconv.Struct(v.Val(), pointer, mapping...) +func (v *Var) Struct(pointer interface{}, mapping ...map[string]string) error { + return gconv.Struct(v.Val(), pointer, mapping...) } -func (v *Var) IsNil() bool { return v.Val() == nil } -func (v *Var) Bytes() []byte { return gconv.Bytes(v.Val()) } -func (v *Var) String() string { return gconv.String(v.Val()) } -func (v *Var) Bool() bool { return gconv.Bool(v.Val()) } +// IsNil checks whether is nil. +func (v *Var) IsNil() bool { + return v.Val() == nil +} -func (v *Var) Int() int { return gconv.Int(v.Val()) } -func (v *Var) Int8() int8 { return gconv.Int8(v.Val()) } -func (v *Var) Int16() int16 { return gconv.Int16(v.Val()) } -func (v *Var) Int32() int32 { return gconv.Int32(v.Val()) } -func (v *Var) Int64() int64 { return gconv.Int64(v.Val()) } +// Bytes converts and returns as []byte. +func (v *Var) Bytes() []byte { + return gconv.Bytes(v.Val()) +} -func (v *Var) Uint() uint { return gconv.Uint(v.Val()) } -func (v *Var) Uint8() uint8 { return gconv.Uint8(v.Val()) } -func (v *Var) Uint16() uint16 { return gconv.Uint16(v.Val()) } -func (v *Var) Uint32() uint32 { return gconv.Uint32(v.Val()) } -func (v *Var) Uint64() uint64 { return gconv.Uint64(v.Val()) } +// String converts and returns as string. +func (v *Var) String() string { + return gconv.String(v.Val()) +} -func (v *Var) Float32() float32 { return gconv.Float32(v.Val()) } -func (v *Var) Float64() float64 { return gconv.Float64(v.Val()) } +// Bool converts and returns as bool. +func (v *Var) Bool() bool { + return gconv.Bool(v.Val()) +} -func (v *Var) Ints() []int { return gconv.Ints(v.Val()) } -func (v *Var) Floats() []float64 { return gconv.Floats(v.Val()) } -func (v *Var) Strings() []string { return gconv.Strings(v.Val()) } -func (v *Var) Interfaces() []interface{} { return gconv.Interfaces(v.Val()) } +// Int converts and returns as int. +func (v *Var) Int() int { + return gconv.Int(v.Val()) +} + +// Int8 converts and returns as int8. +func (v *Var) Int8() int8 { + return gconv.Int8(v.Val()) +} + +// Int16 converts and returns as int16. +func (v *Var) Int16() int16 { + return gconv.Int16(v.Val()) +} + +// Int32 converts and returns as int32. +func (v *Var) Int32() int32 { + return gconv.Int32(v.Val()) +} + +// Int64 converts and returns as int64. +func (v *Var) Int64() int64 { + return gconv.Int64(v.Val()) +} + +// Uint converts and returns as uint. +func (v *Var) Uint() uint { + return gconv.Uint(v.Val()) +} + +// Uint8 converts and returns as uint8. +func (v *Var) Uint8() uint8 { + return gconv.Uint8(v.Val()) +} + +// Uint16 converts and returns as uint16. +func (v *Var) Uint16() uint16 { + return gconv.Uint16(v.Val()) +} + +// Uint32 converts and returns as uint32. +func (v *Var) Uint32() uint32 { + return gconv.Uint32(v.Val()) +} + +// Uint64 converts and returns as uint64. +func (v *Var) Uint64() uint64 { + return gconv.Uint64(v.Val()) +} + +// Float32 converts and returns as float32. +func (v *Var) Float32() float32 { + return gconv.Float32(v.Val()) +} + +// Float64 converts and returns as float64. +func (v *Var) Float64() float64 { + return gconv.Float64(v.Val()) +} + +// Ints converts and returns as []int. +func (v *Var) Ints() []int { + return gconv.Ints(v.Val()) +} + +// Floats converts and returns as []float64. +func (v *Var) Floats() []float64 { + return gconv.Floats(v.Val()) +} + +// Strings converts and returns as []string. +func (v *Var) Strings() []string { + return gconv.Strings(v.Val()) +} + +// Interfaces converts and returns as []interfaces{}. +func (v *Var) Interfaces() []interface{} { + return gconv.Interfaces(v.Val()) +} diff --git a/g/os/gfpool/gfpool_z_unit_test.go b/g/os/gfpool/gfpool_z_unit_test.go new file mode 100644 index 000000000..67946950a --- /dev/null +++ b/g/os/gfpool/gfpool_z_unit_test.go @@ -0,0 +1,149 @@ +package gfpool_test + +import ( + "github.com/gogf/gf/g/os/gfile" + "github.com/gogf/gf/g/os/gfpool" + "github.com/gogf/gf/g/os/glog" + "github.com/gogf/gf/g/test/gtest" + "os" + "testing" + "time" +) + +// TestOpen test open file cache +func TestOpen(t *testing.T) { + testFile := start("TestOpen.txt") + + gtest.Case(t, func() { + f, err := gfpool.Open(testFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_APPEND, 0666) + gtest.AssertEQ(err, nil) + f.Close() + + f2, err1 := gfpool.Open(testFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_APPEND, 0666) + gtest.AssertEQ(err1, nil) + gtest.AssertEQ(f, f2) + f2.Close() + + // Deprecated test + f3, err2 := gfpool.OpenFile(testFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_APPEND, 0666) + gtest.AssertEQ(err2, nil) + gtest.AssertEQ(f, f3) + f3.Close() + + }) + + stop(testFile) +} + +// TestOpenErr test open file error +func TestOpenErr(t *testing.T) { + gtest.Case(t, func() { + testErrFile := "errorPath" + _, err := gfpool.Open(testErrFile, os.O_RDWR, 0666) + gtest.AssertNE(err, nil) + + // delete file error + testFile := start("TestOpenDeleteErr.txt") + pool := gfpool.New(testFile, os.O_RDWR, 0666) + _, err1 := pool.File() + gtest.AssertEQ(err1, nil) + stop(testFile) + _, err1 = pool.File() + gtest.AssertNE(err1, nil) + + // append mode delete file error and create again + testFile = start("TestOpenCreateErr.txt") + pool = gfpool.New(testFile, os.O_CREATE, 0666) + _, err1 = pool.File() + gtest.AssertEQ(err1, nil) + stop(testFile) + _, err1 = pool.File() + gtest.AssertEQ(err1, nil) + + // append mode delete file error + testFile = start("TestOpenAppendErr.txt") + pool = gfpool.New(testFile, os.O_APPEND, 0666) + _, err1 = pool.File() + gtest.AssertEQ(err1, nil) + stop(testFile) + _, err1 = pool.File() + gtest.AssertNE(err1, nil) + + // trunc mode delete file error + testFile = start("TestOpenTruncErr.txt") + pool = gfpool.New(testFile, os.O_TRUNC, 0666) + _, err1 = pool.File() + gtest.AssertEQ(err1, nil) + stop(testFile) + _, err1 = pool.File() + gtest.AssertNE(err1, nil) + }) +} + +// TestOpenExpire test open file cache expire +func TestOpenExpire(t *testing.T) { + testFile := start("TestOpenExpire.txt") + + gtest.Case(t, func() { + f, err := gfpool.Open(testFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_APPEND, 0666, 100) + gtest.AssertEQ(err, nil) + f.Close() + + time.Sleep(150 * time.Millisecond) + f2, err1 := gfpool.Open(testFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_APPEND, 0666, 100) + gtest.AssertEQ(err1, nil) + gtest.AssertNE(f, f2) + f2.Close() + + // Deprecated test + f3, err2 := gfpool.Open(testFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_APPEND, 0666, 100) + gtest.AssertEQ(err2, nil) + gtest.AssertEQ(f2, f3) + f3.Close() + }) + + stop(testFile) +} + +// TestNewPool test gfpool new function +func TestNewPool(t *testing.T) { + testFile := start("TestNewPool.txt") + + gtest.Case(t, func() { + f, err := gfpool.Open(testFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_APPEND, 0666) + gtest.AssertEQ(err, nil) + f.Close() + + pool := gfpool.New(testFile, os.O_RDWR|os.O_CREATE|os.O_TRUNC|os.O_APPEND, 0666) + f2, err1 := pool.File() + // pool not equal + gtest.AssertEQ(err1, nil) + gtest.AssertNE(f, f2) + f2.Close() + + pool.Close() + }) + + stop(testFile) +} + +// test before +func start(name string) string { + testFile := os.TempDir() + string(os.PathSeparator) + name + if gfile.Exists(testFile) { + gfile.Remove(testFile) + } + content := "123" + gfile.PutContents(testFile, content) + return testFile +} + +// test after +func stop(testFile string) { + if gfile.Exists(testFile) { + err := gfile.Remove(testFile) + if err != nil { + glog.Error(err) + } + } +} diff --git a/g/os/gproc/gproc.go b/g/os/gproc/gproc.go index 69dad15cf..655dd7c16 100644 --- a/g/os/gproc/gproc.go +++ b/g/os/gproc/gproc.go @@ -4,11 +4,7 @@ // If a copy of the MIT was not distributed with this file, // You can obtain one at https://github.com/gogf/gf. -// Package gproc implements communication and management of processes. -// -// 进程管理/通信, -// 本进程管理从syscall, os.StartProcess, exec.Cmd都使用过, -// 最后采用了exec.Cmd来实现多进程管理,这是一个顶层的跨平台封装,兼容性更好,另外两个是偏底层的接口。 +// Package gproc implements management and communication for processes. package gproc import ( @@ -62,11 +58,11 @@ func IsChild() bool { } // 设置gproc父进程ID,当ppid为0时表示该进程为gproc主进程,否则为gproc子进程 -func SetPPid(ppid int) { +func SetPPid(ppid int) error { if ppid > 0 { - os.Setenv(gPROC_ENV_KEY_PPID_KEY, gconv.String(ppid)) + return os.Setenv(gPROC_ENV_KEY_PPID_KEY, gconv.String(ppid)) } else { - os.Unsetenv(gPROC_ENV_KEY_PPID_KEY) + return os.Unsetenv(gPROC_ENV_KEY_PPID_KEY) } } diff --git a/g/os/gproc/gproc_comm.go b/g/os/gproc/gproc_comm.go index 441bcb0c0..8424aeff7 100644 --- a/g/os/gproc/gproc_comm.go +++ b/g/os/gproc/gproc_comm.go @@ -15,18 +15,21 @@ import ( "github.com/gogf/gf/g/container/gmap" ) +// 进程通信数据结构 +type Msg struct { + SendPid int `json:"spid"` // 发送进程ID + RecvPid int `json:"rpid"` // 接收进程ID + Group string `json:"group"` // 分组名称 + Data []byte `json:"data"` // 原始数据 +} + // 本地进程通信接收消息队列(按照分组进行构建的map,键值为*gqueue.Queue对象) var commReceiveQueues = gmap.NewStrAnyMap() // (用于发送)已建立的PID对应的Conn通信对象,键值为一个Pool,防止并行使用同一个通信对象造成数据重叠 var commPidConnMap = gmap.NewIntAnyMap() -// TCP通信数据结构定义 -type Msg struct { - Pid int // PID,来源哪个进程 - Data []byte // 数据 - Group string // 分组名称 -} + // 获取指定进程的通信文件地址 func getCommFilePath(pid int) string { diff --git a/g/os/gproc/gproc_comm_receive.go b/g/os/gproc/gproc_comm_receive.go index 759faae95..2e086f9c7 100644 --- a/g/os/gproc/gproc_comm_receive.go +++ b/g/os/gproc/gproc_comm_receive.go @@ -10,15 +10,15 @@ package gproc import ( - "fmt" - "net" - "github.com/gogf/gf/g/os/glog" - "github.com/gogf/gf/g/net/gtcp" - "github.com/gogf/gf/g/os/gfile" - "github.com/gogf/gf/g/util/gconv" - "github.com/gogf/gf/g/encoding/gbinary" - "github.com/gogf/gf/g/container/gqueue" - "github.com/gogf/gf/g/container/gtype" + "encoding/json" + "fmt" + "github.com/gogf/gf/g/container/gqueue" + "github.com/gogf/gf/g/container/gtype" + "github.com/gogf/gf/g/net/gtcp" + "github.com/gogf/gf/g/os/gfile" + "github.com/gogf/gf/g/os/glog" + "github.com/gogf/gf/g/util/gconv" + "net" ) const ( @@ -35,7 +35,7 @@ var ( // 进程只有在执行该方法后才会打开请求端口,默认情况下不允许进程间通信。 func Receive(group...string) *Msg { // 一个进程只能开启一个监听goroutine - if tcpListened.Set(true) == false { + if !tcpListened.Val() && tcpListened.Set(true) == false { go startTcpListening() } queue := (*gqueue.Queue)(nil) @@ -75,7 +75,9 @@ func startTcpListening() { continue } // 将监听的端口保存到通信文件中(字符串类型存放) - gfile.PutContents(getCommFilePath(Pid()), gconv.String(i)) + if err := gfile.PutContents(getCommFilePath(Pid()), gconv.String(i)); err != nil { + glog.Error(err) + } break } for { @@ -89,74 +91,41 @@ func startTcpListening() { // TCP数据通信处理回调函数 func tcpServiceHandler(conn *gtcp.Conn) { - retry := gtcp.Retry { - Count : 3, - Interval: 10, - } + option := gtcp.PkgOption{ + Retry : gtcp.Retry { + Count : 3, + Interval: 10, + }, + } for { var result []byte - buffer, err := conn.Recv(-1, retry) + buffer, err := conn.RecvPkg(option) if len(buffer) > 0 { - var msgs []*Msg - for _, msg := range bufferToMsgs(buffer) { - if v := commReceiveQueues.Get(msg.Group); v != nil { - msgs = append(msgs, msg) - } else { - result = []byte(fmt.Sprintf("group [%s] does not exist", msg.Group)) - break - } + msg := new(Msg) + if err := json.Unmarshal(buffer, msg); err != nil { + glog.Error(err) + continue } - // 成功时会返回ok给peer - if len(result) == 0 { + if v := commReceiveQueues.Get(msg.Group); v == nil { + result = []byte(fmt.Sprintf("group [%s] does not exist", msg.Group)) + break + } else { result = []byte("ok") - for _, msg := range msgs { - if v := commReceiveQueues.Get(msg.Group); v != nil { - v.(*gqueue.Queue).Push(msg) - } + if v := commReceiveQueues.Get(msg.Group); v != nil { + v.(*gqueue.Queue).Push(msg) } } } - // 产生错误(或者对方已经关闭链接)时,退出接收循环 if err == nil { - conn.Send(result, retry) + if err := conn.SendPkg(result, option); err != nil { + glog.Error(err) + } } else { - conn.Close() + if err := conn.Close(); err != nil { + glog.Error(err) + } return } } } -// 数据解包,防止黏包 -// 数据格式:总长度(24bit)|发送进程PID(24bit)|接收进程PID(24bit)|分组长度(8bit)|分组名称(变长)|校验(32bit)|参数(变长) -func bufferToMsgs(buffer []byte) []*Msg { - s := 0 - msgs := make([]*Msg, 0) - for s < len(buffer) { - // 长度解析及校验 - length := gbinary.DecodeToInt(buffer[s : s + 3]) - if length < 14 || length > len(buffer) { - s++ - continue - } - // 分组信息解析 - groupLen := gbinary.DecodeToInt(buffer[s + 9 : s + 10]) - // checksum校验(仅对参数做校验,提高校验效率) - checksum1 := gbinary.DecodeToUint32(buffer[s + 10 + groupLen : s + 10 + groupLen + 4]) - checksum2 := gtcp.Checksum(buffer[s + 10 + groupLen + 4 : s + length]) - if checksum1 != checksum2 { - s++ - continue - } - // 接收进程PID校验 - if Pid() == gbinary.DecodeToInt(buffer[s + 6 : s + 9]) { - msgs = append(msgs, &Msg { - Pid : gbinary.DecodeToInt(buffer[s + 3 : s + 6]), - Data : buffer[s + 10 + groupLen + 4 : s + length], - Group : string(buffer[s + 10 : s + 10 + groupLen]), - }) - } - s += length - } - return msgs -} - diff --git a/g/os/gproc/gproc_comm_send.go b/g/os/gproc/gproc_comm_send.go index bb7873200..26246010a 100644 --- a/g/os/gproc/gproc_comm_send.go +++ b/g/os/gproc/gproc_comm_send.go @@ -7,16 +7,15 @@ package gproc import ( - "bytes" - "errors" - "fmt" - "github.com/gogf/gf/g/encoding/gbinary" - "github.com/gogf/gf/g/net/gtcp" - "github.com/gogf/gf/g/os/gfcache" - "github.com/gogf/gf/g/os/glog" - "github.com/gogf/gf/g/util/gconv" - "io" - "time" + "bytes" + "encoding/json" + "errors" + "fmt" + "github.com/gogf/gf/g/net/gtcp" + "github.com/gogf/gf/g/os/gfcache" + "github.com/gogf/gf/g/util/gconv" + "io" + "time" ) const ( @@ -27,53 +26,53 @@ const ( ) // 向指定gproc进程发送数据. -// 数据格式:总长度(24bit)|发送进程PID(24bit)|接收进程PID(24bit)|分组长度(8bit)|分组名称(变长)|校验(32bit)|参数(变长) func Send(pid int, data []byte, group...string) error { - groupName := gPROC_COMM_DEAFULT_GRUOP_NAME - if len(group) > 0 { - groupName = group[0] - } - buffer := make([]byte, 0) - buffer = append(buffer, gbinary.EncodeByLength(3, len(groupName) + len(data) + 14)...) - buffer = append(buffer, gbinary.EncodeByLength(3, Pid())...) - buffer = append(buffer, gbinary.EncodeByLength(3, pid)...) - buffer = append(buffer, gbinary.EncodeByLength(1, len(groupName))...) - buffer = append(buffer, []byte(groupName)...) - buffer = append(buffer, gbinary.EncodeUint32(gtcp.Checksum(data))...) - buffer = append(buffer, data...) - // 执行发送流程 - var err error + msg := Msg{ + SendPid : Pid(), + RecvPid : pid, + Group : gPROC_COMM_DEAFULT_GRUOP_NAME, + Data : data, + } + if len(group) > 0 { + msg.Group = group[0] + } + msgBytes, err := json.Marshal(msg) + if err != nil { + return err + } var buf []byte - var conn *gtcp.Conn - for i := gPROC_COMM_FAILURE_RETRY_COUNT; i > 0; i-- { - if conn, err = getConnByPid(pid); err == nil { - defer conn.Close() - buf, err = conn.SendRecvWithTimeout(buffer, -1, gPROC_COMM_SEND_TIMEOUT*time.Millisecond) - if len(buf) > 0 { - // 如果有返回值,如果不是"ok",那么表示是错误信息 - if !bytes.EqualFold(buf, []byte("ok")) { - err = errors.New(string(buf)) - break - } - } - // EOF不算异常错误 - if err == nil || err == io.EOF { - break - } else { - glog.Error(err) - } + var conn *gtcp.PoolConn + // 循环获取连接TCP对象 + for i := gPROC_COMM_FAILURE_RETRY_COUNT; i > 0; i-- { + if conn, err = getConnByPid(pid); err == nil { + break + } + time.Sleep(gPROC_COMM_FAILURE_RETRY_TIMEOUT*time.Millisecond) + } + if conn == nil { + return err + } + defer conn.Close() + // 执行数据发送 + buf, err = conn.SendRecvPkgWithTimeout(msgBytes, gPROC_COMM_SEND_TIMEOUT*time.Millisecond) + if len(buf) > 0 { + if !bytes.EqualFold(buf, []byte("ok")) { + err = errors.New(string(buf)) } - time.Sleep(gPROC_COMM_FAILURE_RETRY_TIMEOUT*time.Millisecond) + } + // EOF不算异常错误 + if err == io.EOF { + err = nil } return err } // 获取指定进程的TCP通信对象 -func getConnByPid(pid int) (*gtcp.Conn, error) { +func getConnByPid(pid int) (*gtcp.PoolConn, error) { port := getPortByPid(pid) if port > 0 { - if conn, err := gtcp.NewConn(fmt.Sprintf("127.0.0.1:%d", port)); err == nil { + if conn, err := gtcp.NewPoolConn(fmt.Sprintf("127.0.0.1:%d", port)); err == nil { return conn, nil } else { return nil, err diff --git a/g/os/gview/gview.go b/g/os/gview/gview.go index bc690967d..edd96d213 100644 --- a/g/os/gview/gview.go +++ b/g/os/gview/gview.go @@ -20,6 +20,7 @@ import ( "sync" ) +// View object for template engine. type View struct { mu sync.RWMutex paths *garray.StringArray // Searching path array. @@ -28,10 +29,10 @@ type View struct { delimiters []string // Customized template delimiters. } -// Template params type. +// Params is type for template params. type Params = map[string]interface{} -// Customized template function map type. +// FuncMap is type for custom template functions. type FuncMap = map[string]interface{} // Default view object. @@ -218,7 +219,6 @@ func (view *View) AddPath(path string) error { return nil } view.paths.Append(realPath) - //glog.Debug("[gview] AddPath:", realPath) return nil } diff --git a/g/test/gtest/gtest_test.go b/g/test/gtest/gtest_test.go new file mode 100644 index 000000000..3ba96669f --- /dev/null +++ b/g/test/gtest/gtest_test.go @@ -0,0 +1,18 @@ +// Copyright 2018 gf Author(https://github.com/gogf/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +package gtest_test + +import ( + "github.com/gogf/gf/g/test/gtest" + "testing" +) + +func TestCase(t *testing.T) { + gtest.Case(t, func() { + gtest.Assert(1, 1) + }) +} \ No newline at end of file diff --git a/geg/net/gtcp/pkg_operations/common/gtcp_common_server.go b/geg/net/gtcp/pkg_operations/common/gtcp_common_server.go index 029a61430..60ddd204f 100644 --- a/geg/net/gtcp/pkg_operations/common/gtcp_common_server.go +++ b/geg/net/gtcp/pkg_operations/common/gtcp_common_server.go @@ -30,7 +30,7 @@ func main() { case "heartbeat": onClientHeartBeat(conn, msg) default: - glog.Errorfln("invalid message: %v", msg) + glog.Errorf("invalid message: %v", msg) break } } diff --git a/geg/os/gflock/gflock.go b/geg/os/gflock/gflock.go index 649b70653..f6c2b929a 100644 --- a/geg/os/gflock/gflock.go +++ b/geg/os/gflock/gflock.go @@ -10,8 +10,8 @@ import ( func main() { l := gflock.New("demo.lock") l.Lock() - glog.Printfln("locked by pid: %d", gproc.Pid()) + glog.Printf("locked by pid: %d", gproc.Pid()) time.Sleep(3 * time.Second) l.UnLock() - glog.Printfln("unlocked by pid: %d", gproc.Pid()) + glog.Printf("unlocked by pid: %d", gproc.Pid()) } diff --git a/geg/os/gproc/gproc.go b/geg/os/gproc/gproc.go index b84d000b7..f0599d4bd 100644 --- a/geg/os/gproc/gproc.go +++ b/geg/os/gproc/gproc.go @@ -10,18 +10,18 @@ import ( // 父子进程基本演示 func main() { if gproc.IsChild() { - glog.Printfln("%d: Hi, I am child, waiting 3 seconds to die", gproc.Pid()) + glog.Printf("%d: Hi, I am child, waiting 3 seconds to die", gproc.Pid()) time.Sleep(time.Second) - glog.Printfln("%d: 1", gproc.Pid()) + glog.Printf("%d: 1", gproc.Pid()) time.Sleep(time.Second) - glog.Printfln("%d: 2", gproc.Pid()) + glog.Printf("%d: 2", gproc.Pid()) time.Sleep(time.Second) - glog.Printfln("%d: 3", gproc.Pid()) + glog.Printf("%d: 3", gproc.Pid()) } else { m := gproc.NewManager() p := m.NewProcess(os.Args[0], os.Args, os.Environ()) p.Start() p.Wait() - glog.Printfln("%d: child died", gproc.Pid()) + glog.Printf("%d: child died", gproc.Pid()) } } diff --git a/geg/os/gproc/gproc3.go b/geg/os/gproc/gproc3.go index 856cb1c19..a44fefe7e 100644 --- a/geg/os/gproc/gproc3.go +++ b/geg/os/gproc/gproc3.go @@ -11,25 +11,25 @@ import ( // 请使用go build编译后运行,不要使用IDE运行,因为IDE大多采用的是子进程方式执行。 func main() { if gproc.IsChild() { - glog.Printfln("%d: I am child, waiting 10 seconds to die", gproc.Pid()) + glog.Printf("%d: I am child, waiting 10 seconds to die", gproc.Pid()) //p, err := os.FindProcess(os.Getppid()) //fmt.Println(err) //p.Kill() time.Sleep(2 * time.Second) - glog.Printfln("%d: 2", gproc.Pid()) + glog.Printf("%d: 2", gproc.Pid()) time.Sleep(2 * time.Second) - glog.Printfln("%d: 4", gproc.Pid()) + glog.Printf("%d: 4", gproc.Pid()) time.Sleep(2 * time.Second) - glog.Printfln("%d: 6", gproc.Pid()) + glog.Printf("%d: 6", gproc.Pid()) time.Sleep(2 * time.Second) - glog.Printfln("%d: 8", gproc.Pid()) + glog.Printf("%d: 8", gproc.Pid()) time.Sleep(2 * time.Second) - glog.Printfln("%d: died", gproc.Pid()) + glog.Printf("%d: died", gproc.Pid()) } else { p := gproc.NewProcess(os.Args[0], os.Args, os.Environ()) p.Start() - glog.Printfln("%d: I am main, waiting 3 seconds to die", gproc.Pid()) + glog.Printf("%d: I am main, waiting 3 seconds to die", gproc.Pid()) time.Sleep(3 * time.Second) - glog.Printfln("%d: died", gproc.Pid()) + glog.Printf("%d: died", gproc.Pid()) } } diff --git a/geg/os/gproc/gproc4.go b/geg/os/gproc/gproc4.go index 6bcdbbb0b..cdc24ddcf 100644 --- a/geg/os/gproc/gproc4.go +++ b/geg/os/gproc/gproc4.go @@ -11,7 +11,7 @@ import ( // 查看父子进程的环境变量 func main() { time.Sleep(5 * time.Second) - glog.Printfln("%d: %v", gproc.Pid(), genv.All()) + glog.Printf("%d: %v", gproc.Pid(), genv.All()) p := gproc.NewProcess(os.Args[0], os.Args, os.Environ()) p.Start() } diff --git a/geg/os/gproc/gproc_comm.go b/geg/os/gproc/gproc_comm.go index 670201a9e..1a2d912ef 100644 --- a/geg/os/gproc/gproc_comm.go +++ b/geg/os/gproc/gproc_comm.go @@ -8,6 +8,7 @@ import ( "github.com/gogf/gf/g/os/glog" "github.com/gogf/gf/g/os/gproc" "github.com/gogf/gf/g/os/gtime" + "github.com/gogf/gf/g/os/gtimer" "os" "time" ) @@ -15,11 +16,10 @@ import ( func main() { fmt.Printf("%d: I am child? %v\n", gproc.Pid(), gproc.IsChild()) if gproc.IsChild() { - gtime.SetInterval(time.Second, func() bool { + gtimer.SetInterval(time.Second, func() { if err := gproc.Send(gproc.PPid(), []byte(gtime.Datetime())); err != nil { glog.Error(err) } - return true }) select {} } else { @@ -28,7 +28,7 @@ func main() { p.Start() for { msg := gproc.Receive() - fmt.Printf("%d: receive from %d, data: %s\n", gproc.Pid(), msg.Pid, string(msg.Data)) + fmt.Printf("%d: receive from %d, data: %s\n", gproc.Pid(), msg.SendPid, string(msg.Data)) } } } diff --git a/geg/other/test.go b/geg/other/test.go index 40c961e28..7e31feb29 100644 --- a/geg/other/test.go +++ b/geg/other/test.go @@ -1,17 +1,15 @@ package main import ( - "github.com/gogf/gf/g/container/gqueue" - "github.com/gogf/gf/g/test/gtest" + "fmt" + "sync" ) func main() { - max := 100 - q := gqueue.New(max) - for i := 1; i < max; i++ { - q.Push(i) - } - q.Close() - gtest.Assert(q.Len(), 1) - + wg := sync.WaitGroup{} + wg.Add(1) + wg.Add(-100) + wg.Add() + wg.Wait() + fmt.Println(1) }