改进grpool代码质量,并改进对池化goroutine数量的限制设计

This commit is contained in:
John
2018-05-25 09:19:06 +08:00
parent 85c73d93f1
commit 1b80ca3c5d
4 changed files with 9 additions and 13 deletions

View File

@ -8,6 +8,7 @@
// 优点:
// 1、队列初始化速度快
// 2、可以向队头/队尾进行Push/Pop操作
// 3、取数据时如果队列为空那么会阻塞等待
package gqueue
import (

View File

@ -13,7 +13,6 @@ import (
"gitee.com/johng/gf/g/os/glog"
"github.com/fsnotify/fsnotify"
"gitee.com/johng/gf/g/os/gfile"
"gitee.com/johng/gf/g/os/grpool"
"gitee.com/johng/gf/g/container/gmap"
"gitee.com/johng/gf/g/container/glist"
"gitee.com/johng/gf/g/container/gqueue"
@ -152,11 +151,11 @@ func (w *Watcher) startEventLoop() {
continue
}
if l := w.callbacks.Get(event.Path); l != nil {
grpool.Add(func() {
for _, v := range l.(*glist.List).FrontAll() {
go func(list interface{}) {
for _, v := range list.(*glist.List).FrontAll() {
v.(func(event *Event))(event)
}
})
}(l)
}
} else {
break

View File

@ -1,4 +1,4 @@
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
// Copyright 2017-2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
@ -11,7 +11,6 @@ package grpool
import (
"math"
"errors"
"gitee.com/johng/gf/g/container/glist"
"gitee.com/johng/gf/g/container/gtype"
)
@ -28,7 +27,7 @@ type Pool struct {
workerNum *gtype.Int // 当前正在运行的goroutine数量(非任务数)
blockedNum *gtype.Int // 当前被阻塞运行的goroutine数量
queue *glist.List // 空闲任务队列(*PoolJob)
funcs *glist.List // 待处理任务操作队列
jobs *glist.List // 待处理任务操作队列
jobEvents chan struct{} // 任务添加事件(兄弟们该干活了!)
freeEvents chan struct{} // 空闲协程通知事件
stopEvents chan struct{} // 池关闭事件(用于池相关异步协程通知)
@ -58,7 +57,7 @@ func New(expire int, size...int) *Pool {
workerNum : gtype.NewInt(),
blockedNum : gtype.NewInt(),
queue : glist.New(),
funcs : glist.New(),
jobs : glist.New(),
jobEvents : make(chan struct{}, math.MaxInt32),
freeEvents : make(chan struct{}, math.MaxInt32),
stopEvents : make(chan struct{}, 0),
@ -95,10 +94,7 @@ func SetExpire(expire int) {
// 添加异步任务
func (p *Pool) Add(f func()) error {
if len(p.stopEvents) > 0 {
return errors.New("pool closed")
}
p.funcs.PushBack(f)
p.jobs.PushBack(f)
p.jobEvents <- struct{}{}
return nil
}

View File

@ -20,7 +20,7 @@ func (p *Pool) startSchedLoop() {
for {
select {
case <-p.jobEvents:
p.getWorker().setJob(p.funcs.PopFront().(func()))
p.getWorker().setJob(p.jobs.PopFront().(func()))
case <-p.stopEvents:
return
}