From 85c73d93f1ce542c7e152f8bbf39dd1800483cec Mon Sep 17 00:00:00 2001 From: John Date: Fri, 25 May 2018 08:27:17 +0800 Subject: [PATCH] =?UTF-8?q?goroutine=E6=B1=A0=E6=94=B9=E8=BF=9B=E4=B8=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- g/os/grpool/grpool_api.go | 64 +++++++++---------- g/os/grpool/grpool_pool.go | 103 +++++++++++++++--------------- g/os/grpool/grpool_pool_worker.go | 5 +- g/os/grpool/grpool_test.go | 16 ++--- geg/os/grpool/grpool1.go | 7 ++ 5 files changed, 102 insertions(+), 93 deletions(-) diff --git a/g/os/grpool/grpool_api.go b/g/os/grpool/grpool_api.go index 7e7b98773..5b0fc14d5 100644 --- a/g/os/grpool/grpool_api.go +++ b/g/os/grpool/grpool_api.go @@ -11,10 +11,9 @@ package grpool import ( "math" - "runtime" - "sync/atomic" - "gitee.com/johng/gf/g/container/glist" "errors" + "gitee.com/johng/gf/g/container/glist" + "gitee.com/johng/gf/g/container/gtype" ) const ( @@ -24,17 +23,18 @@ const ( // goroutine池对象 type Pool struct { - size int32 // 限制最大的goroutine数量/协程数/worker数量 - expire int32 // goroutine过期时间(秒) - number int32 // 当前goroutine数量(非任务数) - queue *glist.List // 空闲任务队列(*PoolJob) - funcs *glist.List // 待处理任务操作队列 - freeEvents chan struct{} // 空闲协程通知事件 - funcEvents chan struct{} // 任务添加事件(兄弟们该干活了!) - stopEvents chan struct{} // 池关闭事件(用于池相关异步协程通知) + size *gtype.Int // 限制最大的goroutine数量/协程数/worker数量 + expire *gtype.Int // goroutine过期时间(秒) + workerNum *gtype.Int // 当前正在运行的goroutine数量(非任务数) + blockedNum *gtype.Int // 当前被阻塞运行的goroutine数量 + queue *glist.List // 空闲任务队列(*PoolJob) + funcs *glist.List // 待处理任务操作队列 + jobEvents chan struct{} // 任务添加事件(兄弟们该干活了!) + freeEvents chan struct{} // 空闲协程通知事件 + stopEvents chan struct{} // 池关闭事件(用于池相关异步协程通知) } -// goroutine worker +// 一个worker对应一个goroutine type PoolWorker struct { job chan func() // 当前任务(当为nil时表示关闭) pool *Pool // 所属协程池 @@ -53,15 +53,17 @@ func New(expire int, size...int) *Pool { s = size[0] } p := &Pool { - size : int32(s), - expire : int32(expire), - queue : glist.New(), - funcs : glist.New(), - freeEvents : make(chan struct{}, math.MaxInt32), - funcEvents : make(chan struct{}, math.MaxInt32), - stopEvents : make(chan struct{}, runtime.GOMAXPROCS(-1) + 1), + size : gtype.NewInt(s), + expire : gtype.NewInt(expire), + workerNum : gtype.NewInt(), + blockedNum : gtype.NewInt(), + queue : glist.New(), + funcs : glist.New(), + jobEvents : make(chan struct{}, math.MaxInt32), + freeEvents : make(chan struct{}, math.MaxInt32), + stopEvents : make(chan struct{}, 0), } - p.startWorkLoop() + p.startSchedLoop() p.startClearLoop() return p } @@ -73,22 +75,22 @@ func Add(f func()) error { // 查询当前goroutine总数 func Size() int { - return int(atomic.LoadInt32(&defaultPool.number)) + return defaultPool.workerNum.Val() } // 查询当前等待处理的任务总数 func Jobs() int { - return len(defaultPool.funcEvents) + return len(defaultPool.jobEvents) } // 动态改变默认池中goroutine的上线数量 func SetSize(size int) { - atomic.StoreInt32(&defaultPool.size, int32(size)) + defaultPool.SetSize(size) } // 动态改变默认池中goroutine的过期时间 func SetExpire(expire int) { - atomic.StoreInt32(&defaultPool.expire, int32(expire)) + defaultPool.SetExpire(expire) } // 添加异步任务 @@ -97,28 +99,28 @@ func (p *Pool) Add(f func()) error { return errors.New("pool closed") } p.funcs.PushBack(f) - p.funcEvents <- struct{}{} + p.jobEvents <- struct{}{} return nil } // 查询当前goroutine worker总数 func (p *Pool) Size() int { - return int(atomic.LoadInt32(&p.number)) + return p.workerNum.Val() } // 查询当前等待处理的任务总数 func (p *Pool) Jobs() int { - return len(p.funcEvents) + return len(p.jobEvents) } // 动态改变当前池中goroutine的上线数量 func (p *Pool) SetSize(size int) { - atomic.StoreInt32(&p.size, int32(size)) + p.size.Set(size) } // 动态改变当前池中goroutine的过期时间 func (p *Pool) SetExpire(expire int) { - atomic.StoreInt32(&p.expire, int32(expire)) + p.expire.Set(expire) } // 关闭池,所有的任务将会停止,此后继续添加的任务将不会被执行 @@ -126,7 +128,5 @@ func (p *Pool) Close() { // 必须首先标识让任务过期自动关闭 p.SetExpire(-1) // 使用stopEvents事件通知所有的异步协程及清理协程自动退出 - for i := 0; i < runtime.GOMAXPROCS(-1) + 1; i++ { - p.stopEvents <- struct{}{} - } + close(p.stopEvents) } \ No newline at end of file diff --git a/g/os/grpool/grpool_pool.go b/g/os/grpool/grpool_pool.go index da2417773..c29d78923 100644 --- a/g/os/grpool/grpool_pool.go +++ b/g/os/grpool/grpool_pool.go @@ -9,18 +9,18 @@ package grpool import ( "time" "runtime" - "sync/atomic" "gitee.com/johng/gf/g/os/gtime" ) -// 任务分配循环协程,使用基于 runtime.GOMAXPROCS 数量的协程来实现抢占调度 -func (p *Pool) startWorkLoop() { +// 任务分配循环协程,使用基于 runtime.GOMAXPROCS 数量的协程来实现抢占调度, +// 使用抢占调度的目的是使得任务能够并发地快速被分配出去执行 +func (p *Pool) startSchedLoop() { for i := 0; i < runtime.GOMAXPROCS(-1); i++ { go func() { for { select { - case <-p.funcEvents: - p.getJob().setJob(p.funcs.PopFront().(func())) + case <-p.jobEvents: + p.getWorker().setJob(p.funcs.PopFront().(func())) case <-p.stopEvents: return } @@ -29,92 +29,93 @@ func (p *Pool) startWorkLoop() { } } -// 定时清理过期任务,单线程处理 +// 定时清理过期任务,单协程处理 func (p *Pool) startClearLoop() { go func() { for { - time.Sleep(gDEFAULT_CLEAR_INTERVAL*time.Second) - // 保证没有工作任务的情况下,执行worker清理操作 - if len(p.funcEvents) == 0 { - var w *PoolWorker - for { - if r := p.queue.PopFront(); r != nil { - w = r.(*PoolWorker) - if gtime.Second() - int64(p.expire) > w.update { - w.stop() - atomic.AddInt32(&p.number, -1) + select { + case <-p.stopEvents: + // 如果接收到关闭通知(池已经关闭),关闭所有worker后退出 + for { + if r := p.queue.PopFront(); r != nil { + // 主动关闭所有worker,防止goroutine泄露 + r.(*PoolWorker).stop() } else { - p.queue.PushFront(w) break } - } else { - break } - } - } - // 如果接收到关闭通知(池已经关闭),闭所有worker后退出 - if len(p.stopEvents) > 0 { - for { - if r := p.queue.PopFront(); r != nil { - // 主动关闭所有worker,防止goroutine泄露 - r.(*PoolWorker).stop() - atomic.AddInt32(&p.number, -1) - } else { - break + return + + default: + time.Sleep(gDEFAULT_CLEAR_INTERVAL*time.Second) + // 保证没有工作任务的情况下,执行worker清理操作 + if len(p.jobEvents) == 0 { + var w *PoolWorker + for { + if r := p.queue.PopFront(); r != nil { + w = r.(*PoolWorker) + if gtime.Second() - int64(p.expire.Val()) > w.update { + w.stop() + } else { + p.queue.PushFront(w) + break + } + } else { + break + } + } } - } - break } } }() } -// 判断是否达到goroutine上上限 -func (p *Pool) reachSizeLimit() bool { - return atomic.LoadInt32(&p.number) >= atomic.LoadInt32(&p.size) -} - // 获取过期时间 -func (p *Pool) getExpire() int32 { - return atomic.LoadInt32(&p.expire) +func (p *Pool) getExpire() int { + return p.expire.Val() } // 创建一个空的任务对象 -func (p *Pool) newJob() *PoolWorker { +func (p *Pool) newWorker() *PoolWorker { // 如果达到goroutine数限制,那么阻塞等待有空闲goroutine后继续 - if p.reachSizeLimit() { - // 阻塞等待空闲的协程资源, + // 需要注意的是在高并发下workerNum的值可能会高于size, + // 从效率上考虑没有将workerNum和size都放到一个互斥锁中进行准确度控制, + // 精准是要付出代价的 + if p.workerNum.Val() >= p.size.Val() { + // (非精准控制)阻塞等待空闲的协程资源, // 这是一个递归循环,因为该流程中存在协程抢占机制, - // 如果进入getJob方法没有抢占到协程资源,那么该任务执行会继续等待下一个freeEvents + // 如果进入getJob方法没有抢占到协程资源,那么该任务执行会继续等待下一个freeEvents事件产生 + p.blockedNum.Add(1) <- p.freeEvents - return p.getJob() + return p.getWorker() } w := &PoolWorker { job : make(chan func(), 1), pool : p, } w.start() - atomic.AddInt32(&p.number, 1) + p.workerNum.Add(1) return w } -// 添加任务对象到队列 -func (p *Pool) addJob(w *PoolWorker) bool { - if w.pool.getExpire() == -1 { +// 添加worker对象到空闲队列 +func (p *Pool) addWorker(w *PoolWorker) bool { + if p.workerNum.Val() > p.size.Val() || w.pool.getExpire() == -1 { return false } p.queue.PushBack(w) // 如果当前的goroutine数量达到上线,那么需要使用空闲goroutine通知事件 - if p.reachSizeLimit() { + if p.blockedNum.Val() > 0 { + p.blockedNum.Add(-1) p.freeEvents <- struct{}{} } return true } // 获取/创建任务 -func (p *Pool) getJob() *PoolWorker { +func (p *Pool) getWorker() *PoolWorker { if r := p.queue.PopFront(); r != nil { return r.(*PoolWorker) } - return p.newJob() + return p.newWorker() } diff --git a/g/os/grpool/grpool_pool_worker.go b/g/os/grpool/grpool_pool_worker.go index 7126cd107..65e363f04 100644 --- a/g/os/grpool/grpool_pool_worker.go +++ b/g/os/grpool/grpool_pool_worker.go @@ -15,16 +15,17 @@ func (w *PoolWorker) start() { if f := <- w.job; f != nil { // 执行任务 f() - // 更新活动时间 + // 更新活动时间(不存在并发安全问题) w.update = gtime.Second() // 执行完毕后添加到空闲队列 - if !w.pool.addJob(w) { + if !w.pool.addWorker(w) { break } } else { break } } + w.pool.workerNum.Add(-1) }() } diff --git a/g/os/grpool/grpool_test.go b/g/os/grpool/grpool_test.go index 7c1c25550..5ddd7fb8a 100644 --- a/g/os/grpool/grpool_test.go +++ b/g/os/grpool/grpool_test.go @@ -26,11 +26,11 @@ func Test_GrpoolMemUsage(t *testing.T) { fmt.Println("mem usage:", mem.TotalAlloc/1024) } -func Test_GroroutineMemUsage(t *testing.T) { - for i := 0; i < n; i++ { - go increment() - } - mem := runtime.MemStats{} - runtime.ReadMemStats(&mem) - fmt.Println("mem usage:", mem.TotalAlloc/1024) -} \ No newline at end of file +//func Test_GroroutineMemUsage(t *testing.T) { +// for i := 0; i < n; i++ { +// go increment() +// } +// mem := runtime.MemStats{} +// runtime.ReadMemStats(&mem) +// fmt.Println("mem usage:", mem.TotalAlloc/1024) +//} \ No newline at end of file diff --git a/geg/os/grpool/grpool1.go b/geg/os/grpool/grpool1.go index b6a29ea4e..2a1f41f5e 100644 --- a/geg/os/grpool/grpool1.go +++ b/geg/os/grpool/grpool1.go @@ -16,10 +16,17 @@ func main() { for i := 0; i < 1000; i++ { grpool.Add(job) } + fmt.Println("size:", grpool.Size()) + fmt.Println("jobs:", grpool.Jobs()) gtime.SetInterval(2*time.Second, func() bool { fmt.Println("size:", grpool.Size()) fmt.Println("jobs:", grpool.Jobs()) return true }) + + gtime.SetInterval(5*time.Second, func() bool { + grpool.SetSize(2) + return true + }) select {} }