goroutine池改进中

This commit is contained in:
John
2018-05-25 08:27:17 +08:00
parent 6efcd0e8f2
commit 85c73d93f1
5 changed files with 102 additions and 93 deletions

View File

@ -11,10 +11,9 @@ package grpool
import (
"math"
"runtime"
"sync/atomic"
"gitee.com/johng/gf/g/container/glist"
"errors"
"gitee.com/johng/gf/g/container/glist"
"gitee.com/johng/gf/g/container/gtype"
)
const (
@ -24,17 +23,18 @@ const (
// goroutine池对象
type Pool struct {
size int32 // 限制最大的goroutine数量/协程数/worker数量
expire int32 // goroutine过期时间(秒)
number int32 // 当前goroutine数量(非任务数)
queue *glist.List // 空闲任务队列(*PoolJob)
funcs *glist.List // 待处理任务操作队列
freeEvents chan struct{} // 空闲协程通知事件
funcEvents chan struct{} // 任务添加事件(兄弟们该干活了!)
stopEvents chan struct{} // 池关闭事件(用于池相关异步协程通知)
size *gtype.Int // 限制最大的goroutine数量/协程数/worker数量
expire *gtype.Int // goroutine过期时间(秒)
workerNum *gtype.Int // 当前正在运行的goroutine数量(非任务数)
blockedNum *gtype.Int // 当前被阻塞运行的goroutine数量
queue *glist.List // 空闲任务队列(*PoolJob)
funcs *glist.List // 待处理任务操作队列
jobEvents chan struct{} // 任务添加事件(兄弟们该干活了!)
freeEvents chan struct{} // 空闲协程通知事件
stopEvents chan struct{} // 池关闭事件(用于池相关异步协程通知)
}
// goroutine worker
// 一个worker对应一个goroutine
type PoolWorker struct {
job chan func() // 当前任务(当为nil时表示关闭)
pool *Pool // 所属协程池
@ -53,15 +53,17 @@ func New(expire int, size...int) *Pool {
s = size[0]
}
p := &Pool {
size : int32(s),
expire : int32(expire),
queue : glist.New(),
funcs : glist.New(),
freeEvents : make(chan struct{}, math.MaxInt32),
funcEvents : make(chan struct{}, math.MaxInt32),
stopEvents : make(chan struct{}, runtime.GOMAXPROCS(-1) + 1),
size : gtype.NewInt(s),
expire : gtype.NewInt(expire),
workerNum : gtype.NewInt(),
blockedNum : gtype.NewInt(),
queue : glist.New(),
funcs : glist.New(),
jobEvents : make(chan struct{}, math.MaxInt32),
freeEvents : make(chan struct{}, math.MaxInt32),
stopEvents : make(chan struct{}, 0),
}
p.startWorkLoop()
p.startSchedLoop()
p.startClearLoop()
return p
}
@ -73,22 +75,22 @@ func Add(f func()) error {
// 查询当前goroutine总数
func Size() int {
return int(atomic.LoadInt32(&defaultPool.number))
return defaultPool.workerNum.Val()
}
// 查询当前等待处理的任务总数
func Jobs() int {
return len(defaultPool.funcEvents)
return len(defaultPool.jobEvents)
}
// 动态改变默认池中goroutine的上线数量
func SetSize(size int) {
atomic.StoreInt32(&defaultPool.size, int32(size))
defaultPool.SetSize(size)
}
// 动态改变默认池中goroutine的过期时间
func SetExpire(expire int) {
atomic.StoreInt32(&defaultPool.expire, int32(expire))
defaultPool.SetExpire(expire)
}
// 添加异步任务
@ -97,28 +99,28 @@ func (p *Pool) Add(f func()) error {
return errors.New("pool closed")
}
p.funcs.PushBack(f)
p.funcEvents <- struct{}{}
p.jobEvents <- struct{}{}
return nil
}
// 查询当前goroutine worker总数
func (p *Pool) Size() int {
return int(atomic.LoadInt32(&p.number))
return p.workerNum.Val()
}
// 查询当前等待处理的任务总数
func (p *Pool) Jobs() int {
return len(p.funcEvents)
return len(p.jobEvents)
}
// 动态改变当前池中goroutine的上线数量
func (p *Pool) SetSize(size int) {
atomic.StoreInt32(&p.size, int32(size))
p.size.Set(size)
}
// 动态改变当前池中goroutine的过期时间
func (p *Pool) SetExpire(expire int) {
atomic.StoreInt32(&p.expire, int32(expire))
p.expire.Set(expire)
}
// 关闭池,所有的任务将会停止,此后继续添加的任务将不会被执行
@ -126,7 +128,5 @@ func (p *Pool) Close() {
// 必须首先标识让任务过期自动关闭
p.SetExpire(-1)
// 使用stopEvents事件通知所有的异步协程及清理协程自动退出
for i := 0; i < runtime.GOMAXPROCS(-1) + 1; i++ {
p.stopEvents <- struct{}{}
}
close(p.stopEvents)
}

View File

@ -9,18 +9,18 @@ package grpool
import (
"time"
"runtime"
"sync/atomic"
"gitee.com/johng/gf/g/os/gtime"
)
// 任务分配循环协程,使用基于 runtime.GOMAXPROCS 数量的协程来实现抢占调度
func (p *Pool) startWorkLoop() {
// 任务分配循环协程,使用基于 runtime.GOMAXPROCS 数量的协程来实现抢占调度
// 使用抢占调度的目的是使得任务能够并发地快速被分配出去执行
func (p *Pool) startSchedLoop() {
for i := 0; i < runtime.GOMAXPROCS(-1); i++ {
go func() {
for {
select {
case <-p.funcEvents:
p.getJob().setJob(p.funcs.PopFront().(func()))
case <-p.jobEvents:
p.getWorker().setJob(p.funcs.PopFront().(func()))
case <-p.stopEvents:
return
}
@ -29,92 +29,93 @@ func (p *Pool) startWorkLoop() {
}
}
// 定时清理过期任务,单线程处理
// 定时清理过期任务,单程处理
func (p *Pool) startClearLoop() {
go func() {
for {
time.Sleep(gDEFAULT_CLEAR_INTERVAL*time.Second)
// 保证没有工作任务的情况下执行worker清理操作
if len(p.funcEvents) == 0 {
var w *PoolWorker
for {
if r := p.queue.PopFront(); r != nil {
w = r.(*PoolWorker)
if gtime.Second() - int64(p.expire) > w.update {
w.stop()
atomic.AddInt32(&p.number, -1)
select {
case <-p.stopEvents:
// 如果接收到关闭通知(池已经关闭)关闭所有worker后退出
for {
if r := p.queue.PopFront(); r != nil {
// 主动关闭所有worker防止goroutine泄露
r.(*PoolWorker).stop()
} else {
p.queue.PushFront(w)
break
}
} else {
break
}
}
}
// 如果接收到关闭通知(池已经关闭)闭所有worker后退出
if len(p.stopEvents) > 0 {
for {
if r := p.queue.PopFront(); r != nil {
// 主动关闭所有worker防止goroutine泄露
r.(*PoolWorker).stop()
atomic.AddInt32(&p.number, -1)
} else {
break
return
default:
time.Sleep(gDEFAULT_CLEAR_INTERVAL*time.Second)
// 保证没有工作任务的情况下执行worker清理操作
if len(p.jobEvents) == 0 {
var w *PoolWorker
for {
if r := p.queue.PopFront(); r != nil {
w = r.(*PoolWorker)
if gtime.Second() - int64(p.expire.Val()) > w.update {
w.stop()
} else {
p.queue.PushFront(w)
break
}
} else {
break
}
}
}
}
break
}
}
}()
}
// 判断是否达到goroutine上上限
func (p *Pool) reachSizeLimit() bool {
return atomic.LoadInt32(&p.number) >= atomic.LoadInt32(&p.size)
}
// 获取过期时间
func (p *Pool) getExpire() int32 {
return atomic.LoadInt32(&p.expire)
func (p *Pool) getExpire() int {
return p.expire.Val()
}
// 创建一个空的任务对象
func (p *Pool) newJob() *PoolWorker {
func (p *Pool) newWorker() *PoolWorker {
// 如果达到goroutine数限制那么阻塞等待有空闲goroutine后继续
if p.reachSizeLimit() {
// 阻塞等待空闲的协程资源
// 需要注意的是在高并发下workerNum的值可能会高于size
// 从效率上考虑没有将workerNum和size都放到一个互斥锁中进行准确度控制
// 精准是要付出代价的
if p.workerNum.Val() >= p.size.Val() {
// (非精准控制)阻塞等待空闲的协程资源,
// 这是一个递归循环,因为该流程中存在协程抢占机制,
// 如果进入getJob方法没有抢占到协程资源那么该任务执行会继续等待下一个freeEvents
// 如果进入getJob方法没有抢占到协程资源那么该任务执行会继续等待下一个freeEvents事件产生
p.blockedNum.Add(1)
<- p.freeEvents
return p.getJob()
return p.getWorker()
}
w := &PoolWorker {
job : make(chan func(), 1),
pool : p,
}
w.start()
atomic.AddInt32(&p.number, 1)
p.workerNum.Add(1)
return w
}
// 添加任务对象到队列
func (p *Pool) addJob(w *PoolWorker) bool {
if w.pool.getExpire() == -1 {
// 添加worker对象到空闲队列
func (p *Pool) addWorker(w *PoolWorker) bool {
if p.workerNum.Val() > p.size.Val() || w.pool.getExpire() == -1 {
return false
}
p.queue.PushBack(w)
// 如果当前的goroutine数量达到上线那么需要使用空闲goroutine通知事件
if p.reachSizeLimit() {
if p.blockedNum.Val() > 0 {
p.blockedNum.Add(-1)
p.freeEvents <- struct{}{}
}
return true
}
// 获取/创建任务
func (p *Pool) getJob() *PoolWorker {
func (p *Pool) getWorker() *PoolWorker {
if r := p.queue.PopFront(); r != nil {
return r.(*PoolWorker)
}
return p.newJob()
return p.newWorker()
}

View File

@ -15,16 +15,17 @@ func (w *PoolWorker) start() {
if f := <- w.job; f != nil {
// 执行任务
f()
// 更新活动时间
// 更新活动时间(不存在并发安全问题)
w.update = gtime.Second()
// 执行完毕后添加到空闲队列
if !w.pool.addJob(w) {
if !w.pool.addWorker(w) {
break
}
} else {
break
}
}
w.pool.workerNum.Add(-1)
}()
}

View File

@ -26,11 +26,11 @@ func Test_GrpoolMemUsage(t *testing.T) {
fmt.Println("mem usage:", mem.TotalAlloc/1024)
}
func Test_GroroutineMemUsage(t *testing.T) {
for i := 0; i < n; i++ {
go increment()
}
mem := runtime.MemStats{}
runtime.ReadMemStats(&mem)
fmt.Println("mem usage:", mem.TotalAlloc/1024)
}
//func Test_GroroutineMemUsage(t *testing.T) {
// for i := 0; i < n; i++ {
// go increment()
// }
// mem := runtime.MemStats{}
// runtime.ReadMemStats(&mem)
// fmt.Println("mem usage:", mem.TotalAlloc/1024)
//}

View File

@ -16,10 +16,17 @@ func main() {
for i := 0; i < 1000; i++ {
grpool.Add(job)
}
fmt.Println("size:", grpool.Size())
fmt.Println("jobs:", grpool.Jobs())
gtime.SetInterval(2*time.Second, func() bool {
fmt.Println("size:", grpool.Size())
fmt.Println("jobs:", grpool.Jobs())
return true
})
gtime.SetInterval(5*time.Second, func() bool {
grpool.SetSize(2)
return true
})
select {}
}