改进grpool,简化设计

This commit is contained in:
john
2018-07-27 15:48:49 +08:00
parent 2cd676e941
commit a98a94bec6
8 changed files with 149 additions and 323 deletions

120
g/os/grpool/grpool.go Normal file
View File

@ -0,0 +1,120 @@
// Copyright 2017-2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// Goroutine池.
// 用于goroutine复用提升异步操作执行效率.
// 需要注意的是grpool提供给的公共池不提供关闭方法自创建的池可以手动关闭掉。
package grpool
import (
"math"
"gitee.com/johng/gf/g/container/glist"
"gitee.com/johng/gf/g/container/gtype"
)
// goroutine池对象
type Pool struct {
workerChan chan struct{} // 使用channel限制最大的goroutine数量
workerNum *gtype.Int // 当前正在运行的worker/goroutine数量
jobQueue *glist.List // 待处理任务操作队列
jobEvents chan struct{} // 任务添加事件(jobQueue+jobEvents结合使用)
closed *gtype.Bool
}
// 默认的goroutine池管理对象
// 该对象与进程同生命周期无需Close
var defaultPool = New()
// 创建goroutine池管理对象给定过期时间(秒)
// 第二个参数用于限制限制最大的goroutine数量/线程数/worker数量非必需参数默认不做限制
func New(size...int) *Pool {
s := 0
if len(size) > 0 {
s = size[0]
}
p := &Pool {
workerNum : gtype.NewInt(),
jobQueue : glist.New(),
jobEvents : make(chan struct{}, math.MaxInt32),
workerChan : make(chan struct{}, s),
closed : gtype.NewBool(),
}
return p
}
// 添加异步任务(使用默认的池对象)
func Add(f func()) error {
return defaultPool.Add(f)
}
// 查询当前goroutine总数
func Size() int {
return defaultPool.workerNum.Val()
}
// 查询当前等待处理的任务总数
func Jobs() int {
return len(defaultPool.jobEvents)
}
// 添加异步任务
func (p *Pool) Add(f func()) error {
p.jobQueue.PushBack(f)
p.jobEvents <- struct{}{}
// 判断是否创建新的worker
if p.Jobs() > 1 || p.workerNum.Val() == 0 {
p.ForkWorker()
}
return nil
}
// 查询当前goroutine worker总数
func (p *Pool) Size() int {
return p.workerNum.Val()
}
// 查询当前等待处理的任务总数
func (p *Pool) Jobs() int {
return p.jobQueue.Len()
}
// 创建新的worker执行任务
func (p *Pool) ForkWorker() {
if cap(p.workerChan) > 0 {
// 如果worker数量已经达到限制那么不创建新worker直接返回
if p.workerNum.Val() == cap(p.workerChan) {
return
}
p.workerNum.Add(1)
p.workerChan <- struct{}{}
} else {
p.workerNum.Add(1)
}
go func() {
for !p.closed.Val() {
select {
case <- p.jobEvents:
if job := p.jobQueue.PopFront(); job != nil {
job.(func())()
} else {
goto WorkerDone
}
default:
goto WorkerDone
}
}
WorkerDone:
p.workerNum.Add(-1)
if cap(p.workerChan) > 0 {
<- p.workerChan
}
}()
}
// 关闭池,所有的任务将会停止,此后继续添加的任务将不会被执行
func (p *Pool) Close() {
p.closed.Set(true)
}

View File

@ -1,128 +0,0 @@
// Copyright 2017-2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// Goroutine池.
// 用于goroutine复用提升异步操作执行效率.
// 需要注意的是grpool提供给的公共池不提供关闭方法(但可以修改公共属性),自创建的池可以手动关闭掉。
package grpool
import (
"math"
"gitee.com/johng/gf/g/container/glist"
"gitee.com/johng/gf/g/container/gtype"
)
const (
gDEFAULT_EXPIRE_TIME = 60 // 默认goroutine过期时间(秒)
gDEFAULT_CLEAR_INTERVAL = 60 // 定期检查任务过期时间间隔(秒)
)
// goroutine池对象
type Pool struct {
size *gtype.Int // 限制最大的goroutine数量/协程数/worker数量
expire *gtype.Int // goroutine过期时间(秒)
workerNum *gtype.Int // 当前正在运行的goroutine数量(非任务数)
blockedNum *gtype.Int // 当前被阻塞运行的goroutine数量
queue *glist.List // 空闲任务队列(*PoolJob)
jobs *glist.List // 待处理任务操作队列
jobEvents chan struct{} // 任务添加事件(兄弟们该干活了!)
freeEvents chan struct{} // 空闲协程通知事件
stopEvents chan struct{} // 池关闭事件(用于池相关异步协程通知)
}
// 一个worker对应一个goroutine
type PoolWorker struct {
job chan func() // 当前任务(当为nil时表示关闭)
pool *Pool // 所属协程池
update int64 // 更新时间
}
// 默认的goroutine池管理对象
// 该对象与进程同生命周期无需Close
var defaultPool = New(gDEFAULT_EXPIRE_TIME)
// 创建goroutine池管理对象给定过期时间(秒)
// 第二个参数用于限制限制最大的goroutine数量/线程数/worker数量非必需参数默认不做限制
func New(expire int, size...int) *Pool {
s := math.MaxInt32
if len(size) > 0 {
s = size[0]
}
p := &Pool {
size : gtype.NewInt(s),
expire : gtype.NewInt(expire),
workerNum : gtype.NewInt(),
blockedNum : gtype.NewInt(),
queue : glist.New(),
jobs : glist.New(),
jobEvents : make(chan struct{}, math.MaxInt32),
freeEvents : make(chan struct{}, math.MaxInt32),
stopEvents : make(chan struct{}, 0),
}
p.startSchedLoop()
p.startClearLoop()
return p
}
// 添加异步任务(使用默认的池对象)
func Add(f func()) error {
return defaultPool.Add(f)
}
// 查询当前goroutine总数
func Size() int {
return defaultPool.workerNum.Val()
}
// 查询当前等待处理的任务总数
func Jobs() int {
return len(defaultPool.jobEvents)
}
// 动态改变默认池中goroutine的上线数量
func SetSize(size int) {
defaultPool.SetSize(size)
}
// 动态改变默认池中goroutine的过期时间
func SetExpire(expire int) {
defaultPool.SetExpire(expire)
}
// 添加异步任务
func (p *Pool) Add(f func()) error {
p.jobs.PushBack(f)
p.jobEvents <- struct{}{}
return nil
}
// 查询当前goroutine worker总数
func (p *Pool) Size() int {
return p.workerNum.Val()
}
// 查询当前等待处理的任务总数
func (p *Pool) Jobs() int {
return len(p.jobEvents)
}
// 动态改变当前池中goroutine的上线数量
func (p *Pool) SetSize(size int) {
p.size.Set(size)
}
// 动态改变当前池中goroutine的过期时间
func (p *Pool) SetExpire(expire int) {
p.expire.Set(expire)
}
// 关闭池,所有的任务将会停止,此后继续添加的任务将不会被执行
func (p *Pool) Close() {
// 必须首先标识让任务过期自动关闭
p.SetExpire(-1)
// 使用stopEvents事件通知所有的异步协程及清理协程自动退出
close(p.stopEvents)
}

View File

@ -1,121 +0,0 @@
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package grpool
import (
"time"
"runtime"
"gitee.com/johng/gf/g/os/gtime"
)
// 任务分配循环协程,使用基于 runtime.GOMAXPROCS 数量的协程来实现抢占调度,
// 使用抢占调度的目的是使得任务能够并发地快速被分配出去执行
func (p *Pool) startSchedLoop() {
for i := 0; i < runtime.GOMAXPROCS(-1); i++ {
go func() {
for {
select {
case <-p.jobEvents:
p.getWorker().setJob(p.jobs.PopFront().(func()))
case <-p.stopEvents:
return
}
}
}()
}
}
// 定时清理过期任务,单协程处理
func (p *Pool) startClearLoop() {
go func() {
for {
select {
case <-p.stopEvents:
// 如果接收到关闭通知(池已经关闭)关闭所有worker后退出
for {
if r := p.queue.PopFront(); r != nil {
// 主动关闭所有worker防止goroutine泄露
r.(*PoolWorker).stop()
} else {
break
}
}
return
default:
time.Sleep(gDEFAULT_CLEAR_INTERVAL*time.Second)
// 保证没有工作任务的情况下执行worker清理操作
if len(p.jobEvents) == 0 {
var w *PoolWorker
for {
if r := p.queue.PopFront(); r != nil {
w = r.(*PoolWorker)
if gtime.Second() - int64(p.expire.Val()) > w.update {
w.stop()
} else {
p.queue.PushFront(w)
break
}
} else {
break
}
}
}
}
}
}()
}
// 获取过期时间
func (p *Pool) getExpire() int {
return p.expire.Val()
}
// 创建一个空的任务对象
func (p *Pool) newWorker() *PoolWorker {
// 如果达到goroutine数限制那么阻塞等待有空闲goroutine后继续
// 需要注意的是在高并发下workerNum的值可能会高于size
// 从效率上考虑没有将workerNum和size都放到一个互斥锁中进行准确度控制
// 精准是要付出代价的
if p.workerNum.Val() >= p.size.Val() {
// (非精准控制)阻塞等待空闲的协程资源,
// 这是一个递归循环,因为该流程中存在协程抢占机制,
// 如果进入getJob方法没有抢占到协程资源那么该任务执行会继续等待下一个freeEvents事件产生
p.blockedNum.Add(1)
<- p.freeEvents
return p.getWorker()
}
w := &PoolWorker {
job : make(chan func(), 1),
pool : p,
}
w.start()
p.workerNum.Add(1)
return w
}
// 添加worker对象到空闲队列
func (p *Pool) addWorker(w *PoolWorker) bool {
if p.workerNum.Val() > p.size.Val() || w.pool.getExpire() == -1 {
return false
}
p.queue.PushBack(w)
// 如果当前的goroutine数量达到上线那么需要使用空闲goroutine通知事件
if p.blockedNum.Val() > 0 {
p.blockedNum.Add(-1)
p.freeEvents <- struct{}{}
}
return true
}
// 获取/创建任务
func (p *Pool) getWorker() *PoolWorker {
if r := p.queue.PopFront(); r != nil {
return r.(*PoolWorker)
}
return p.newWorker()
}

View File

@ -1,41 +0,0 @@
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package grpool
import "gitee.com/johng/gf/g/os/gtime"
// 开始任务
func (w *PoolWorker) start() {
go func() {
for {
if f := <- w.job; f != nil {
// 执行任务
f()
// 更新活动时间(不存在并发安全问题)
w.update = gtime.Second()
// 执行完毕后添加到空闲队列
if !w.pool.addWorker(w) {
break
}
} else {
break
}
}
w.pool.workerNum.Add(-1)
}()
}
// 关闭当前任务
func (w *PoolWorker) stop() {
w.setJob(nil)
}
// 设置当前任务的执行函数
func (w *PoolWorker) setJob(f func()) {
w.job <- f
}

View File

@ -7,30 +7,29 @@
package grpool_test
import (
"fmt"
"runtime"
"testing"
"gitee.com/johng/gf/g/os/grpool"
"runtime"
"fmt"
)
func increment() {
for i := 0; i < 1000000; i++ {}
for i := 0; i < 100000; i++ {}
}
func Test_GrpoolMemUsage(t *testing.T) {
for i := 0; i < n; i++ {
grpool.Add(increment)
}
mem := runtime.MemStats{}
runtime.ReadMemStats(&mem)
fmt.Println("mem usage:", mem.TotalAlloc/1024)
}
//func Test_GroroutineMemUsage(t *testing.T) {
//func Test_GrpoolMemUsage(t *testing.T) {
// for i := 0; i < n; i++ {
// go increment()
// grpool.Add(increment)
// }
// mem := runtime.MemStats{}
// runtime.ReadMemStats(&mem)
// fmt.Println("mem usage:", mem.TotalAlloc/1024)
//}
//}
func Test_GroroutineMemUsage(t *testing.T) {
for i := 0; i < n; i++ {
go increment()
}
mem := runtime.MemStats{}
runtime.ReadMemStats(&mem)
fmt.Println("mem usage:", mem.TotalAlloc/1024)
}

View File

@ -11,10 +11,10 @@ import (
func main() {
start := gtime.Millisecond()
wg := sync.WaitGroup{}
for i := 0; i < 10000000; i++ {
for i := 0; i < 100000; i++ {
wg.Add(1)
go func() {
time.Sleep(time.Millisecond)
time.Sleep(time.Second)
wg.Done()
}()
}

View File

@ -11,10 +11,10 @@ import (
func main() {
start := gtime.Millisecond()
wg := sync.WaitGroup{}
for i := 0; i < 10000000; i++ {
for i := 0; i < 100000; i++ {
wg.Add(1)
grpool.Add(func() {
time.Sleep(time.Millisecond)
time.Sleep(time.Second)
wg.Done()
})
}

View File

@ -3,8 +3,8 @@ package main
import (
"time"
"fmt"
"gitee.com/johng/gf/g/os/gtime"
"gitee.com/johng/gf/g/os/grpool"
"gitee.com/johng/gf/g/os/gtime"
)
func job() {
@ -12,21 +12,18 @@ func job() {
}
func main() {
grpool.SetSize(10)
pool := grpool.New(100)
for i := 0; i < 1000; i++ {
grpool.Add(job)
pool.Add(job)
}
fmt.Println("size:", grpool.Size())
fmt.Println("jobs:", grpool.Jobs())
gtime.SetInterval(2*time.Second, func() bool {
fmt.Println("size:", grpool.Size())
fmt.Println("jobs:", grpool.Jobs())
return true
fmt.Println("worker:", pool.Size())
fmt.Println(" jobs:", pool.Jobs())
gtime.SetInterval(time.Second, func() bool {
fmt.Println("worker:", pool.Size())
fmt.Println(" jobs:", pool.Jobs())
fmt.Println()
return true
})
gtime.SetInterval(5*time.Second, func() bool {
grpool.SetSize(2)
return true
})
select {}
}