diff --git a/g/os/grpool/grpool_api.go b/g/os/grpool/grpool_api.go index 95beb37c6..2af65b644 100644 --- a/g/os/grpool/grpool_api.go +++ b/g/os/grpool/grpool_api.go @@ -21,10 +21,12 @@ const ( // goroutine池对象 type Pool struct { + size int32 // 限制最大的goroutine数量/线程数/worker数量 expire int32 // goroutine过期时间(秒) number int32 // 当前goroutine数量(非任务数) queue *glist.SafeList // 空闲任务队列(*PoolJob) funcs *glist.SafeList // 待处理任务操作队列 + freeEvents chan struct{} // 空闲线程通知事件 funcEvents chan struct{} // 任务操作处理事件(用于任务事件通知) stopEvents chan struct{} // 池关闭事件(用于池相关异步线程通知) } @@ -41,11 +43,18 @@ type PoolJob struct { var defaultPool = New(gDEFAULT_EXPIRE_TIME) // 创建goroutine池管理对象,给定过期时间(秒) -func New(expire int) *Pool { +// 第二个参数用于限制限制最大的goroutine数量/线程数/worker数量,非必需参数,默认不做限制 +func New(expire int, sizes...int) *Pool { + size := math.MaxInt32 + if len(sizes) > 0 { + size = sizes[0] + } p := &Pool { + size : int32(size), expire : int32(expire), queue : glist.NewSafeList(), funcs : glist.NewSafeList(), + freeEvents : make(chan struct{}, math.MaxUint32), funcEvents : make(chan struct{}, math.MaxUint32), stopEvents : make(chan struct{}, 1), } @@ -65,7 +74,17 @@ func Size() int { return int(atomic.LoadInt32(&defaultPool.number)) } -// 设置默认池中goroutine的过期时间 +// 查询当前等待处理的任务总数 +func Jobs() int { + return len(defaultPool.funcEvents) +} + +// 动态改变默认池中goroutine的上线数量 +func SetSize(size int) { + atomic.StoreInt32(&defaultPool.size, int32(size)) +} + +// 动态改变默认池中goroutine的过期时间 func SetExpire(expire int) { atomic.StoreInt32(&defaultPool.expire, int32(expire)) } @@ -81,7 +100,17 @@ func (p *Pool) Size() int { return int(atomic.LoadInt32(&p.number)) } -// 设置当前池中goroutine的过期时间 +// 查询当前等待处理的任务总数 +func (p *Pool) Jobs() int { + return len(p.funcEvents) +} + +// 动态改变当前池中goroutine的上线数量 +func (p *Pool) SetSize(size int) { + atomic.StoreInt32(&p.size, int32(size)) +} + +// 动态改变当前池中goroutine的过期时间 func (p *Pool) SetExpire(expire int) { atomic.StoreInt32(&p.expire, int32(expire)) } diff --git a/g/os/grpool/grpool_pool.go b/g/os/grpool/grpool_pool.go index d7ebc6313..0bb1a5d25 100644 --- a/g/os/grpool/grpool_pool.go +++ b/g/os/grpool/grpool_pool.go @@ -12,7 +12,7 @@ import ( "gitee.com/johng/gf/g/os/gtime" ) -// 任务分配循环 +// 任务分配循环,这是一个独立的goroutine,单线程处理 func (p *Pool) startWorkLoop() { go func() { for { @@ -26,7 +26,7 @@ func (p *Pool) startWorkLoop() { }() } -// 定时清理过期任务 +// 定时清理过期任务,单线程处理 func (p *Pool) startClearLoop() { go func() { for { @@ -48,14 +48,20 @@ func (p *Pool) startClearLoop() { } } } - // 判断是池已经关闭,是则退出 - if len(p.stopEvents) > 0 { + // 判断是池已经关闭,并且所有goroutine已退出,那么该goroutine终止执行 + if len(p.stopEvents) > 0 && atomic.LoadInt32(&p.number) == 0 { break } } }() } +// 判断是否达到goroutine上上限 +func (p *Pool) reachSizeLimit() bool { + return false + return atomic.LoadInt32(&p.number) >= atomic.LoadInt32(&p.size) +} + // 获取过期时间 func (p *Pool) getExpire() int32 { return atomic.LoadInt32(&p.expire) @@ -63,6 +69,14 @@ func (p *Pool) getExpire() int32 { // 创建一个空的任务对象 func (p *Pool) newJob() *PoolJob { + // 如果达到goroutine数限制,那么阻塞等待有空闲goroutine后继续 + //if p.reachSizeLimit() { + // // 阻塞等待空闲goroutine + // select { + // case <- p.freeEvents: + // return p.getJob() + // } + //} j := &PoolJob { job : make(chan func(), 1), pool : p, @@ -77,7 +91,12 @@ func (p *Pool) addJob(j *PoolJob) bool { if j.pool.getExpire() == -1 { return false } - return p.queue.PushBack(j) != nil + p.queue.PushBack(j) + // 如果当前的goroutine数量达到上线,那么需要使用空闲goroutine通知事件 + //if p.reachSizeLimit() { + // p.freeEvents <- struct{}{} + //} + return true } // 获取/创建任务 diff --git a/g/os/grpool/grpool_testb1_test.go b/g/os/grpool/grpool_testb1_test.go index 16826af47..db82a0500 100644 --- a/g/os/grpool/grpool_testb1_test.go +++ b/g/os/grpool/grpool_testb1_test.go @@ -23,8 +23,8 @@ func BenchmarkGrpool_1(b *testing.B) { } } -func BenchmarkGoroutine_1(b *testing.B) { - for i := 0; i < b.N; i++ { - go increment1() - } -} \ No newline at end of file +//func BenchmarkGoroutine_1(b *testing.B) { +// for i := 0; i < b.N; i++ { +// go increment1() +// } +//} \ No newline at end of file diff --git a/geg/os/groutine.go b/geg/os/groutine.go index 0df3fcec7..ebb2120a8 100644 --- a/geg/os/groutine.go +++ b/geg/os/groutine.go @@ -7,19 +7,20 @@ import ( "gitee.com/johng/gf/g/os/grpool" ) -func job(i int) { - time.Sleep(2*time.Second) - //fmt.Println("job done:", i) +func job() { + time.Sleep(1*time.Second) } func main() { - for i := 0; i < 10; i++ { + grpool.SetSize(10) + for i := 0; i < 1000; i++ { grpool.Add(func() { - job(i) + job() }) } gtime.SetInterval(2*time.Second, func() bool { - fmt.Println(grpool.Size()) + fmt.Println("size:", grpool.Size()) + fmt.Println("jobs:", grpool.Jobs()) return true }) time.Sleep(5000*time.Second) diff --git a/geg/other/test.go b/geg/other/test.go index 3f77e3182..bcb8590d7 100644 --- a/geg/other/test.go +++ b/geg/other/test.go @@ -1,21 +1,10 @@ package main import ( - "gitee.com/johng/gf/g/os/gtime" "fmt" - "gitee.com/johng/gf/g/container/glist" - "math" + "runtime" ) func main() { - - t1 := gtime.Microsecond() - c := make(chan struct{}, math.MaxInt64) - c <- struct{}{} - fmt.Println(gtime.Microsecond() - t1) - - t2 := gtime.Microsecond() - l := glist.NewSafeList() - l.PushBack(func() {}) - fmt.Println(gtime.Microsecond() - t2) + fmt.Println(runtime.GOMAXPROCS(0)) } \ No newline at end of file