grpool测试

This commit is contained in:
John
2018-01-17 12:49:36 +08:00
parent 4de91912c9
commit 1100d0ff86
5 changed files with 70 additions and 32 deletions

View File

@ -21,10 +21,12 @@ const (
// goroutine池对象
type Pool struct {
size int32 // 限制最大的goroutine数量/线程数/worker数量
expire int32 // goroutine过期时间(秒)
number int32 // 当前goroutine数量(非任务数)
queue *glist.SafeList // 空闲任务队列(*PoolJob)
funcs *glist.SafeList // 待处理任务操作队列
freeEvents chan struct{} // 空闲线程通知事件
funcEvents chan struct{} // 任务操作处理事件(用于任务事件通知)
stopEvents chan struct{} // 池关闭事件(用于池相关异步线程通知)
}
@ -41,11 +43,18 @@ type PoolJob struct {
var defaultPool = New(gDEFAULT_EXPIRE_TIME)
// 创建goroutine池管理对象给定过期时间(秒)
func New(expire int) *Pool {
// 第二个参数用于限制限制最大的goroutine数量/线程数/worker数量非必需参数默认不做限制
func New(expire int, sizes...int) *Pool {
size := math.MaxInt32
if len(sizes) > 0 {
size = sizes[0]
}
p := &Pool {
size : int32(size),
expire : int32(expire),
queue : glist.NewSafeList(),
funcs : glist.NewSafeList(),
freeEvents : make(chan struct{}, math.MaxUint32),
funcEvents : make(chan struct{}, math.MaxUint32),
stopEvents : make(chan struct{}, 1),
}
@ -65,7 +74,17 @@ func Size() int {
return int(atomic.LoadInt32(&defaultPool.number))
}
// 设置默认池中goroutine的过期时间
// 查询当前等待处理的任务总数
func Jobs() int {
return len(defaultPool.funcEvents)
}
// 动态改变默认池中goroutine的上线数量
func SetSize(size int) {
atomic.StoreInt32(&defaultPool.size, int32(size))
}
// 动态改变默认池中goroutine的过期时间
func SetExpire(expire int) {
atomic.StoreInt32(&defaultPool.expire, int32(expire))
}
@ -81,7 +100,17 @@ func (p *Pool) Size() int {
return int(atomic.LoadInt32(&p.number))
}
// 设置当前池中goroutine的过期时间
// 查询当前等待处理的任务总数
func (p *Pool) Jobs() int {
return len(p.funcEvents)
}
// 动态改变当前池中goroutine的上线数量
func (p *Pool) SetSize(size int) {
atomic.StoreInt32(&p.size, int32(size))
}
// 动态改变当前池中goroutine的过期时间
func (p *Pool) SetExpire(expire int) {
atomic.StoreInt32(&p.expire, int32(expire))
}

View File

@ -12,7 +12,7 @@ import (
"gitee.com/johng/gf/g/os/gtime"
)
// 任务分配循环
// 任务分配循环这是一个独立的goroutine单线程处理
func (p *Pool) startWorkLoop() {
go func() {
for {
@ -26,7 +26,7 @@ func (p *Pool) startWorkLoop() {
}()
}
// 定时清理过期任务
// 定时清理过期任务,单线程处理
func (p *Pool) startClearLoop() {
go func() {
for {
@ -48,14 +48,20 @@ func (p *Pool) startClearLoop() {
}
}
}
// 判断是池已经关闭,是则退出
if len(p.stopEvents) > 0 {
// 判断是池已经关闭,并且所有goroutine已退出那么该goroutine终止执行
if len(p.stopEvents) > 0 && atomic.LoadInt32(&p.number) == 0 {
break
}
}
}()
}
// 判断是否达到goroutine上上限
func (p *Pool) reachSizeLimit() bool {
return false
return atomic.LoadInt32(&p.number) >= atomic.LoadInt32(&p.size)
}
// 获取过期时间
func (p *Pool) getExpire() int32 {
return atomic.LoadInt32(&p.expire)
@ -63,6 +69,14 @@ func (p *Pool) getExpire() int32 {
// 创建一个空的任务对象
func (p *Pool) newJob() *PoolJob {
// 如果达到goroutine数限制那么阻塞等待有空闲goroutine后继续
//if p.reachSizeLimit() {
// // 阻塞等待空闲goroutine
// select {
// case <- p.freeEvents:
// return p.getJob()
// }
//}
j := &PoolJob {
job : make(chan func(), 1),
pool : p,
@ -77,7 +91,12 @@ func (p *Pool) addJob(j *PoolJob) bool {
if j.pool.getExpire() == -1 {
return false
}
return p.queue.PushBack(j) != nil
p.queue.PushBack(j)
// 如果当前的goroutine数量达到上线那么需要使用空闲goroutine通知事件
//if p.reachSizeLimit() {
// p.freeEvents <- struct{}{}
//}
return true
}
// 获取/创建任务

View File

@ -23,8 +23,8 @@ func BenchmarkGrpool_1(b *testing.B) {
}
}
func BenchmarkGoroutine_1(b *testing.B) {
for i := 0; i < b.N; i++ {
go increment1()
}
}
//func BenchmarkGoroutine_1(b *testing.B) {
// for i := 0; i < b.N; i++ {
// go increment1()
// }
//}

View File

@ -7,19 +7,20 @@ import (
"gitee.com/johng/gf/g/os/grpool"
)
func job(i int) {
time.Sleep(2*time.Second)
//fmt.Println("job done:", i)
func job() {
time.Sleep(1*time.Second)
}
func main() {
for i := 0; i < 10; i++ {
grpool.SetSize(10)
for i := 0; i < 1000; i++ {
grpool.Add(func() {
job(i)
job()
})
}
gtime.SetInterval(2*time.Second, func() bool {
fmt.Println(grpool.Size())
fmt.Println("size:", grpool.Size())
fmt.Println("jobs:", grpool.Jobs())
return true
})
time.Sleep(5000*time.Second)

View File

@ -1,21 +1,10 @@
package main
import (
"gitee.com/johng/gf/g/os/gtime"
"fmt"
"gitee.com/johng/gf/g/container/glist"
"math"
"runtime"
)
func main() {
t1 := gtime.Microsecond()
c := make(chan struct{}, math.MaxInt64)
c <- struct{}{}
fmt.Println(gtime.Microsecond() - t1)
t2 := gtime.Microsecond()
l := glist.NewSafeList()
l.PushBack(func() {})
fmt.Println(gtime.Microsecond() - t2)
fmt.Println(runtime.GOMAXPROCS(0))
}