修正grpool的goroutine泄露问题

This commit is contained in:
John
2018-03-08 15:33:12 +08:00
parent 603dde5f16
commit 5888943c5a
3 changed files with 41 additions and 31 deletions

View File

@ -28,12 +28,12 @@ type Pool struct {
queue *glist.List // 空闲任务队列(*PoolJob)
funcs *glist.List // 待处理任务操作队列
freeEvents chan struct{} // 空闲协程通知事件
funcEvents chan struct{} // 任务操作处理事件(用于任务事件通知)
funcEvents chan struct{} // 任务添加事件(兄弟们该干活了!)
stopEvents chan struct{} // 池关闭事件(用于池相关异步协程通知)
}
// goroutine任务
type PoolJob struct {
// goroutine worker
type PoolWorker struct {
job chan func() // 当前任务(当为nil时表示关闭)
pool *Pool // 所属协程池
update int64 // 更新时间
@ -95,7 +95,7 @@ func (p *Pool) Add(f func()) {
p.funcEvents <- struct{}{}
}
// 查询当前goroutine总数
// 查询当前goroutine worker总数
func (p *Pool) Size() int {
return int(atomic.LoadInt32(&p.number))
}
@ -119,7 +119,7 @@ func (p *Pool) SetExpire(expire int) {
func (p *Pool) Close() {
// 必须首先标识让任务过期自动关闭
p.SetExpire(-1)
// 使用stopEvents事件通知所有的异步协程自动退出
// 使用stopEvents事件通知所有的异步协程及清理协程自动退出
for i := 0; i < runtime.GOMAXPROCS(-1) + 1; i++ {
p.stopEvents <- struct{}{}
}

View File

@ -33,21 +33,18 @@ func (p *Pool) startWorkLoop() {
func (p *Pool) startClearLoop() {
go func() {
for {
// 如果接收到关闭通知(池已经关闭),那么不再执行清理操作,直接退出
if len(p.stopEvents) > 0 {
break
}
time.Sleep(gDEFAULT_CLEAR_INTERVAL*time.Second)
// 保证没有工作任务的情况下执行worker清理操作
if len(p.funcEvents) == 0 {
var j *PoolJob
var w *PoolWorker
for {
if r := p.queue.PopFront(); r != nil {
j = r.(*PoolJob)
if gtime.Second() - int64(p.expire) > j.update {
j.stop()
w = r.(*PoolWorker)
if gtime.Second() - int64(p.expire) > w.update {
w.stop()
atomic.AddInt32(&p.number, -1)
} else {
p.queue.PushFront(r)
p.queue.PushFront(w)
break
}
} else {
@ -55,6 +52,19 @@ func (p *Pool) startClearLoop() {
}
}
}
// 如果接收到关闭通知(池已经关闭)那么不再执行清理操作关闭所有worker后退出
if len(p.stopEvents) > 0 {
for {
if r := p.queue.PopFront(); r != nil {
// 主动关闭所有work防止goroutine泄露
r.(*PoolWorker).stop()
atomic.AddInt32(&p.number, -1)
} else {
break
}
}
break
}
}
}()
}
@ -70,7 +80,7 @@ func (p *Pool) getExpire() int32 {
}
// 创建一个空的任务对象
func (p *Pool) newJob() *PoolJob {
func (p *Pool) newJob() *PoolWorker {
// 如果达到goroutine数限制那么阻塞等待有空闲goroutine后继续
if p.reachSizeLimit() {
// 阻塞等待空闲的协程资源,
@ -79,21 +89,21 @@ func (p *Pool) newJob() *PoolJob {
<- p.freeEvents
return p.getJob()
}
j := &PoolJob {
w := &PoolWorker {
job : make(chan func(), 1),
pool : p,
}
j.start()
w.start()
atomic.AddInt32(&p.number, 1)
return j
return w
}
// 添加任务对象到队列
func (p *Pool) addJob(j *PoolJob) bool {
if j.pool.getExpire() == -1 {
func (p *Pool) addJob(w *PoolWorker) bool {
if w.pool.getExpire() == -1 {
return false
}
p.queue.PushBack(j)
p.queue.PushBack(w)
// 如果当前的goroutine数量达到上线那么需要使用空闲goroutine通知事件
if p.reachSizeLimit() {
p.freeEvents <- struct{}{}
@ -102,9 +112,9 @@ func (p *Pool) addJob(j *PoolJob) bool {
}
// 获取/创建任务
func (p *Pool) getJob() *PoolJob {
func (p *Pool) getJob() *PoolWorker {
if r := p.queue.PopFront(); r != nil {
return r.(*PoolJob)
return r.(*PoolWorker)
}
return p.newJob()
}

View File

@ -9,16 +9,16 @@ package grpool
import "gitee.com/johng/gf/g/os/gtime"
// 开始任务
func (j *PoolJob) start() {
func (w *PoolWorker) start() {
go func() {
for {
if f := <- j.job; f != nil {
if f := <- w.job; f != nil {
// 执行任务
f()
// 更新活动时间
j.update = gtime.Second()
w.update = gtime.Second()
// 执行完毕后添加到空闲队列
if !j.pool.addJob(j) {
if !w.pool.addJob(w) {
break
}
} else {
@ -29,12 +29,12 @@ func (j *PoolJob) start() {
}
// 关闭当前任务
func (j *PoolJob) stop() {
j.setJob(nil)
func (w *PoolWorker) stop() {
w.setJob(nil)
}
// 设置当前任务的执行函数
func (j *PoolJob) setJob(f func()) {
j.job <- f
func (w *PoolWorker) setJob(f func()) {
w.job <- f
}