diff --git a/g/os/grpool/grpool.go b/g/os/grpool/grpool.go index 82a0c6318..cb4c9866a 100644 --- a/g/os/grpool/grpool.go +++ b/g/os/grpool/grpool.go @@ -19,10 +19,10 @@ import ( // goroutine池对象 type Pool struct { - workerChan chan struct{} // 使用channel限制最大的goroutine数量 - workerNum *gtype.Int // 当前正在运行的worker/goroutine数量 - jobQueue *glist.List // 待处理任务操作队列 - jobEvents chan struct{} // 任务添加事件(jobQueue+jobEvents结合使用) + limit int // 最大的goroutine数量限制 + count *gtype.Int // 当前正在运行的goroutine数量 + list *glist.List // 待处理任务操作列表 + events chan struct{} // 任务添加事件 closed *gtype.Bool } @@ -30,18 +30,18 @@ type Pool struct { // 该对象与进程同生命周期,无需Close var defaultPool = New() -// 创建goroutine池管理对象, 参数用于限制限制最大的goroutine数量/线程数/worker数量,非必需参数,默认不做限制 +// 创建goroutine池管理对象,参数用于限制限制最大的goroutine数量,非必需参数,默认不做限制 func New(size...int) *Pool { - s := 0 + limit := -1 if len(size) > 0 { - s = size[0] + limit = size[0] } p := &Pool { - workerNum : gtype.NewInt(), - jobQueue : glist.New(), - jobEvents : make(chan struct{}, math.MaxInt32), - workerChan : make(chan struct{}, s), - closed : gtype.NewBool(), + limit : limit, + count : gtype.NewInt(), + list : glist.New(), + events : make(chan struct{}, math.MaxInt32), + closed : gtype.NewBool(), } return p } @@ -53,65 +53,57 @@ func Add(f func()) error { // 查询当前goroutine总数 func Size() int { - return defaultPool.workerNum.Val() + return defaultPool.count.Val() } // 查询当前等待处理的任务总数 func Jobs() int { - return len(defaultPool.jobEvents) + return len(defaultPool.events) } // 添加异步任务 func (p *Pool) Add(f func()) error { - p.jobQueue.PushBack(f) - p.jobEvents <- struct{}{} + p.list.PushBack(f) + p.events <- struct{}{} // 判断是否创建新的worker - if p.Jobs() > 1 || p.workerNum.Val() == 0 { - p.ForkWorker() + if p.list.Len() > 1 || p.count.Val() == 0 { + p.fork() } return nil } -// 查询当前goroutine worker总数 +// 查询当前goroutine总数 func (p *Pool) Size() int { - return p.workerNum.Val() + return p.count.Val() } // 查询当前等待处理的任务总数 func (p *Pool) Jobs() int { - return p.jobQueue.Len() + return p.list.Len() } // 创建新的worker执行任务 -func (p *Pool) ForkWorker() { - if cap(p.workerChan) > 0 { - // 如果worker数量已经达到限制,那么不创建新worker,直接返回 - if p.workerNum.Val() == cap(p.workerChan) { - return - } - p.workerNum.Add(1) - p.workerChan <- struct{}{} - } else { - p.workerNum.Add(1) +func (p *Pool) fork() { + // 如果worker数量已经达到限制,那么不创建新worker,直接返回 + if p.count.Val() == p.limit { + return } + p.count.Add(1) go func() { for !p.closed.Val() { select { - case <- p.jobEvents: - if job := p.jobQueue.PopFront(); job != nil { + case <- p.events: + if job := p.list.PopFront(); job != nil { job.(func())() } else { - goto WorkerDone + p.count.Add(-1) + return } default: - goto WorkerDone + p.count.Add(-1) + return } } -WorkerDone: - p.workerNum.Add(-1) - if cap(p.workerChan) > 0 { - <- p.workerChan - } }() } diff --git a/g/os/grpool/grpool_unit_test.go b/g/os/grpool/grpool_unit_test.go new file mode 100644 index 000000000..799e027cc --- /dev/null +++ b/g/os/grpool/grpool_unit_test.go @@ -0,0 +1,57 @@ +// Copyright 2017 gf Author(https://github.com/gogf/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://github.com/gogf/gf. + +// go test *.go -bench=".*" -count=1 + +package grpool_test + +import ( + "github.com/gogf/gf/g/container/garray" + "github.com/gogf/gf/g/os/grpool" + "github.com/gogf/gf/g/test/gtest" + "sync" + "testing" + "time" +) + + +func Test_Basic(t *testing.T) { + gtest.Case(t, func() { + wg := sync.WaitGroup{} + array := garray.NewArray() + size := 100000 + wg.Add(size) + for i := 0; i < size; i++ { + grpool.Add(func() { + array.Append(1) + wg.Done() + }) + } + wg.Wait() + gtest.Assert(array.Len(), size) + }) + + gtest.Case(t, func() { + array := garray.NewArray() + size := 100000 + pool := grpool.New(10000) + for i := 0; i < size; i++ { + pool.Add(func() { + array.Append(1) + time.Sleep(2*time.Second) + }) + } + time.Sleep(time.Second) + gtest.Assert(pool.Size(), 10000) + gtest.Assert(pool.Jobs(), 90000) + gtest.Assert(array.Len(), 10000) + pool.Close() + time.Sleep(2*time.Second) + gtest.Assert(pool.Size(), 10000) + gtest.Assert(pool.Jobs(), 90000) + gtest.Assert(array.Len(), 10000) + }) +} \ No newline at end of file diff --git a/geg/container/gpool/gpool_expire.go b/geg/container/gpool/gpool_expire.go deleted file mode 100644 index 1a047a625..000000000 --- a/geg/container/gpool/gpool_expire.go +++ /dev/null @@ -1,16 +0,0 @@ -package main - -import ( - "fmt" - "github.com/gogf/gf/g/container/gpool" - "time" -) - -func main() { - p := gpool.New(60000, nil, func(i interface{}) { - fmt.Println("expired") - }) - p.Put(1) - time.Sleep(10000*time.Second) - fmt.Println(p.Get()) -} diff --git a/geg/container/gpool/gpool_expirefunc.go b/geg/container/gpool/gpool_expirefunc.go new file mode 100644 index 000000000..4341ef271 --- /dev/null +++ b/geg/container/gpool/gpool_expirefunc.go @@ -0,0 +1,32 @@ +package main + +import ( + "fmt" + "github.com/gogf/gf/g/container/gpool" + "github.com/gogf/gf/g/net/gtcp" + "github.com/gogf/gf/g/os/glog" + "time" +) + +func main() { + // 创建对象复用池,对象过期时间为3000毫秒,并给定创建及销毁方法 + p := gpool.New(3000, func() (interface{}, error) { + return gtcp.NewConn("www.baidu.com:80") + }, func(i interface{}) { + glog.Println("expired") + i.(*gtcp.Conn).Close() + }) + conn, err := p.Get() + if err != nil { + panic(err) + } + result, err := conn.(*gtcp.Conn).SendRecv([]byte("HEAD / HTTP/1.1\n\n"), -1) + if err != nil { + panic(err) + } + fmt.Println(string(result)) + // 丢回池中以便重复使用 + p.Put(conn) + // 等待一定时间观察过期方法调用 + time.Sleep(4*time.Second) +} diff --git a/geg/other/test.go b/geg/other/test.go index abd15f7ed..224ca34a3 100644 --- a/geg/other/test.go +++ b/geg/other/test.go @@ -1,14 +1,73 @@ package main import ( - "log" + "fmt" + "github.com/gogf/gf/g/container/gmap" ) func main() { - log.Printf("111") - log.Printf("111") - log.Print("222") - log.Print("222") - log.Println("333") - log.Println("333") + // 创建一个默认的gmap对象, + // 默认情况下该gmap对象支持并发安全特性, + // 初始化时可以给定true参数关闭并发安全特性,当做一个普通的map使用。 + m := gmap.New() + + // 设置键值对 + for i := 0; i < 10; i++ { + m.Set(i, i) + } + // 查询大小 + fmt.Println(m.Size()) + // 批量设置键值对(不同的数据类型对象参数不同) + m.Sets(map[interface{}]interface{}{ + 10 : 10, + 11 : 11, + }) + fmt.Println(m.Size()) + + // 查询是否存在 + fmt.Println(m.Contains(1)) + + // 查询键值 + fmt.Println(m.Get(1)) + + // 删除数据项 + m.Remove(9) + fmt.Println(m.Size()) + + // 批量删除 + m.Removes([]interface{}{10, 11}) + fmt.Println(m.Size()) + + // 当前键名列表(随机排序) + fmt.Println(m.Keys()) + // 当前键值列表(随机排序) + fmt.Println(m.Values()) + + // 查询键名,当键值不存在时,写入给定的默认值 + fmt.Println(m.GetOrSet(100, 100)) + + // 删除键值对,并返回对应的键值 + fmt.Println(m.Remove(100)) + + // 遍历map + m.Iterator(func(k interface{}, v interface{}) bool { + fmt.Printf("%v:%v ", k, v) + return true + }) + + // 自定义写锁操作 + m.LockFunc(func(m map[interface{}]interface{}) { + m[99] = 99 + }) + + // 自定义读锁操作 + m.RLockFunc(func(m map[interface{}]interface{}) { + fmt.Println(m[99]) + }) + + // 清空map + m.Clear() + + // 判断map是否为空 + fmt.Println(m.IsEmpty()) } \ No newline at end of file