From 5888943c5a14aba885187f120f4e52804fbbc783 Mon Sep 17 00:00:00 2001 From: John Date: Thu, 8 Mar 2018 15:33:12 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=AD=A3grpool=E7=9A=84goroutine?= =?UTF-8?q?=E6=B3=84=E9=9C=B2=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- g/os/grpool/grpool_api.go | 10 ++-- g/os/grpool/grpool_pool.go | 46 +++++++++++-------- .../{grpool_job.go => grpool_pool_worker.go} | 16 +++---- 3 files changed, 41 insertions(+), 31 deletions(-) rename g/os/grpool/{grpool_job.go => grpool_pool_worker.go} (73%) diff --git a/g/os/grpool/grpool_api.go b/g/os/grpool/grpool_api.go index 5beb6a5eb..12bdfec8e 100644 --- a/g/os/grpool/grpool_api.go +++ b/g/os/grpool/grpool_api.go @@ -28,12 +28,12 @@ type Pool struct { queue *glist.List // 空闲任务队列(*PoolJob) funcs *glist.List // 待处理任务操作队列 freeEvents chan struct{} // 空闲协程通知事件 - funcEvents chan struct{} // 任务操作处理事件(用于任务事件通知) + funcEvents chan struct{} // 任务添加事件(兄弟们该干活了!) stopEvents chan struct{} // 池关闭事件(用于池相关异步协程通知) } -// goroutine任务 -type PoolJob struct { +// goroutine worker +type PoolWorker struct { job chan func() // 当前任务(当为nil时表示关闭) pool *Pool // 所属协程池 update int64 // 更新时间 @@ -95,7 +95,7 @@ func (p *Pool) Add(f func()) { p.funcEvents <- struct{}{} } -// 查询当前goroutine总数 +// 查询当前goroutine worker总数 func (p *Pool) Size() int { return int(atomic.LoadInt32(&p.number)) } @@ -119,7 +119,7 @@ func (p *Pool) SetExpire(expire int) { func (p *Pool) Close() { // 必须首先标识让任务过期自动关闭 p.SetExpire(-1) - // 使用stopEvents事件通知所有的异步协程自动退出 + // 使用stopEvents事件通知所有的异步协程及清理协程自动退出 for i := 0; i < runtime.GOMAXPROCS(-1) + 1; i++ { p.stopEvents <- struct{}{} } diff --git a/g/os/grpool/grpool_pool.go b/g/os/grpool/grpool_pool.go index 3d21da382..d1ab3d593 100644 --- a/g/os/grpool/grpool_pool.go +++ b/g/os/grpool/grpool_pool.go @@ -33,21 +33,18 @@ func (p *Pool) startWorkLoop() { func (p *Pool) startClearLoop() { go func() { for { - // 如果接收到关闭通知(池已经关闭),那么不再执行清理操作,直接退出 - if len(p.stopEvents) > 0 { - break - } time.Sleep(gDEFAULT_CLEAR_INTERVAL*time.Second) + // 保证没有工作任务的情况下,执行worker清理操作 if len(p.funcEvents) == 0 { - var j *PoolJob + var w *PoolWorker for { if r := p.queue.PopFront(); r != nil { - j = r.(*PoolJob) - if gtime.Second() - int64(p.expire) > j.update { - j.stop() + w = r.(*PoolWorker) + if gtime.Second() - int64(p.expire) > w.update { + w.stop() atomic.AddInt32(&p.number, -1) } else { - p.queue.PushFront(r) + p.queue.PushFront(w) break } } else { @@ -55,6 +52,19 @@ func (p *Pool) startClearLoop() { } } } + // 如果接收到关闭通知(池已经关闭),那么不再执行清理操作,关闭所有worker后退出 + if len(p.stopEvents) > 0 { + for { + if r := p.queue.PopFront(); r != nil { + // 主动关闭所有work,防止goroutine泄露 + r.(*PoolWorker).stop() + atomic.AddInt32(&p.number, -1) + } else { + break + } + } + break + } } }() } @@ -70,7 +80,7 @@ func (p *Pool) getExpire() int32 { } // 创建一个空的任务对象 -func (p *Pool) newJob() *PoolJob { +func (p *Pool) newJob() *PoolWorker { // 如果达到goroutine数限制,那么阻塞等待有空闲goroutine后继续 if p.reachSizeLimit() { // 阻塞等待空闲的协程资源, @@ -79,21 +89,21 @@ func (p *Pool) newJob() *PoolJob { <- p.freeEvents return p.getJob() } - j := &PoolJob { + w := &PoolWorker { job : make(chan func(), 1), pool : p, } - j.start() + w.start() atomic.AddInt32(&p.number, 1) - return j + return w } // 添加任务对象到队列 -func (p *Pool) addJob(j *PoolJob) bool { - if j.pool.getExpire() == -1 { +func (p *Pool) addJob(w *PoolWorker) bool { + if w.pool.getExpire() == -1 { return false } - p.queue.PushBack(j) + p.queue.PushBack(w) // 如果当前的goroutine数量达到上线,那么需要使用空闲goroutine通知事件 if p.reachSizeLimit() { p.freeEvents <- struct{}{} @@ -102,9 +112,9 @@ func (p *Pool) addJob(j *PoolJob) bool { } // 获取/创建任务 -func (p *Pool) getJob() *PoolJob { +func (p *Pool) getJob() *PoolWorker { if r := p.queue.PopFront(); r != nil { - return r.(*PoolJob) + return r.(*PoolWorker) } return p.newJob() } diff --git a/g/os/grpool/grpool_job.go b/g/os/grpool/grpool_pool_worker.go similarity index 73% rename from g/os/grpool/grpool_job.go rename to g/os/grpool/grpool_pool_worker.go index 939ba39b9..7126cd107 100644 --- a/g/os/grpool/grpool_job.go +++ b/g/os/grpool/grpool_pool_worker.go @@ -9,16 +9,16 @@ package grpool import "gitee.com/johng/gf/g/os/gtime" // 开始任务 -func (j *PoolJob) start() { +func (w *PoolWorker) start() { go func() { for { - if f := <- j.job; f != nil { + if f := <- w.job; f != nil { // 执行任务 f() // 更新活动时间 - j.update = gtime.Second() + w.update = gtime.Second() // 执行完毕后添加到空闲队列 - if !j.pool.addJob(j) { + if !w.pool.addJob(w) { break } } else { @@ -29,12 +29,12 @@ func (j *PoolJob) start() { } // 关闭当前任务 -func (j *PoolJob) stop() { - j.setJob(nil) +func (w *PoolWorker) stop() { + w.setJob(nil) } // 设置当前任务的执行函数 -func (j *PoolJob) setJob(f func()) { - j.job <- f +func (w *PoolWorker) setJob(f func()) { + w.job <- f }