diff --git a/g/os/grpool/grpool.go b/g/os/grpool/grpool.go new file mode 100644 index 000000000..f9ef097de --- /dev/null +++ b/g/os/grpool/grpool.go @@ -0,0 +1,120 @@ +// Copyright 2017-2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +// Goroutine池. +// 用于goroutine复用,提升异步操作执行效率. +// 需要注意的是,grpool提供给的公共池不提供关闭方法,自创建的池可以手动关闭掉。 +package grpool + +import ( + "math" + "gitee.com/johng/gf/g/container/glist" + "gitee.com/johng/gf/g/container/gtype" +) + +// goroutine池对象 +type Pool struct { + workerChan chan struct{} // 使用channel限制最大的goroutine数量 + workerNum *gtype.Int // 当前正在运行的worker/goroutine数量 + jobQueue *glist.List // 待处理任务操作队列 + jobEvents chan struct{} // 任务添加事件(jobQueue+jobEvents结合使用) + closed *gtype.Bool +} + +// 默认的goroutine池管理对象 +// 该对象与进程同生命周期,无需Close +var defaultPool = New() + +// 创建goroutine池管理对象,给定过期时间(秒) +// 第二个参数用于限制限制最大的goroutine数量/线程数/worker数量,非必需参数,默认不做限制 +func New(size...int) *Pool { + s := 0 + if len(size) > 0 { + s = size[0] + } + p := &Pool { + workerNum : gtype.NewInt(), + jobQueue : glist.New(), + jobEvents : make(chan struct{}, math.MaxInt32), + workerChan : make(chan struct{}, s), + closed : gtype.NewBool(), + } + return p +} + +// 添加异步任务(使用默认的池对象) +func Add(f func()) error { + return defaultPool.Add(f) +} + +// 查询当前goroutine总数 +func Size() int { + return defaultPool.workerNum.Val() +} + +// 查询当前等待处理的任务总数 +func Jobs() int { + return len(defaultPool.jobEvents) +} + +// 添加异步任务 +func (p *Pool) Add(f func()) error { + p.jobQueue.PushBack(f) + p.jobEvents <- struct{}{} + // 判断是否创建新的worker + if p.Jobs() > 1 || p.workerNum.Val() == 0 { + p.ForkWorker() + } + return nil +} + +// 查询当前goroutine worker总数 +func (p *Pool) Size() int { + return p.workerNum.Val() +} + +// 查询当前等待处理的任务总数 +func (p *Pool) Jobs() int { + return p.jobQueue.Len() +} + +// 创建新的worker执行任务 +func (p *Pool) ForkWorker() { + if cap(p.workerChan) > 0 { + // 如果worker数量已经达到限制,那么不创建新worker,直接返回 + if p.workerNum.Val() == cap(p.workerChan) { + return + } + p.workerNum.Add(1) + p.workerChan <- struct{}{} + } else { + p.workerNum.Add(1) + } + go func() { + for !p.closed.Val() { + select { + case <- p.jobEvents: + if job := p.jobQueue.PopFront(); job != nil { + job.(func())() + } else { + goto WorkerDone + } + default: + goto WorkerDone + } + } +WorkerDone: + p.workerNum.Add(-1) + if cap(p.workerChan) > 0 { + <- p.workerChan + } + }() +} + +// 关闭池,所有的任务将会停止,此后继续添加的任务将不会被执行 +func (p *Pool) Close() { + p.closed.Set(true) +} \ No newline at end of file diff --git a/g/os/grpool/grpool_api.go b/g/os/grpool/grpool_api.go deleted file mode 100644 index 8a4153ced..000000000 --- a/g/os/grpool/grpool_api.go +++ /dev/null @@ -1,128 +0,0 @@ -// Copyright 2017-2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved. -// -// This Source Code Form is subject to the terms of the MIT License. -// If a copy of the MIT was not distributed with this file, -// You can obtain one at https://gitee.com/johng/gf. - -// Goroutine池. -// 用于goroutine复用,提升异步操作执行效率. -// 需要注意的是,grpool提供给的公共池不提供关闭方法(但可以修改公共属性),自创建的池可以手动关闭掉。 -package grpool - -import ( - "math" - "gitee.com/johng/gf/g/container/glist" - "gitee.com/johng/gf/g/container/gtype" -) - -const ( - gDEFAULT_EXPIRE_TIME = 60 // 默认goroutine过期时间(秒) - gDEFAULT_CLEAR_INTERVAL = 60 // 定期检查任务过期时间间隔(秒) -) - -// goroutine池对象 -type Pool struct { - size *gtype.Int // 限制最大的goroutine数量/协程数/worker数量 - expire *gtype.Int // goroutine过期时间(秒) - workerNum *gtype.Int // 当前正在运行的goroutine数量(非任务数) - blockedNum *gtype.Int // 当前被阻塞运行的goroutine数量 - queue *glist.List // 空闲任务队列(*PoolJob) - jobs *glist.List // 待处理任务操作队列 - jobEvents chan struct{} // 任务添加事件(兄弟们该干活了!) - freeEvents chan struct{} // 空闲协程通知事件 - stopEvents chan struct{} // 池关闭事件(用于池相关异步协程通知) -} - -// 一个worker对应一个goroutine -type PoolWorker struct { - job chan func() // 当前任务(当为nil时表示关闭) - pool *Pool // 所属协程池 - update int64 // 更新时间 -} - -// 默认的goroutine池管理对象 -// 该对象与进程同生命周期,无需Close -var defaultPool = New(gDEFAULT_EXPIRE_TIME) - -// 创建goroutine池管理对象,给定过期时间(秒) -// 第二个参数用于限制限制最大的goroutine数量/线程数/worker数量,非必需参数,默认不做限制 -func New(expire int, size...int) *Pool { - s := math.MaxInt32 - if len(size) > 0 { - s = size[0] - } - p := &Pool { - size : gtype.NewInt(s), - expire : gtype.NewInt(expire), - workerNum : gtype.NewInt(), - blockedNum : gtype.NewInt(), - queue : glist.New(), - jobs : glist.New(), - jobEvents : make(chan struct{}, math.MaxInt32), - freeEvents : make(chan struct{}, math.MaxInt32), - stopEvents : make(chan struct{}, 0), - } - p.startSchedLoop() - p.startClearLoop() - return p -} - -// 添加异步任务(使用默认的池对象) -func Add(f func()) error { - return defaultPool.Add(f) -} - -// 查询当前goroutine总数 -func Size() int { - return defaultPool.workerNum.Val() -} - -// 查询当前等待处理的任务总数 -func Jobs() int { - return len(defaultPool.jobEvents) -} - -// 动态改变默认池中goroutine的上线数量 -func SetSize(size int) { - defaultPool.SetSize(size) -} - -// 动态改变默认池中goroutine的过期时间 -func SetExpire(expire int) { - defaultPool.SetExpire(expire) -} - -// 添加异步任务 -func (p *Pool) Add(f func()) error { - p.jobs.PushBack(f) - p.jobEvents <- struct{}{} - return nil -} - -// 查询当前goroutine worker总数 -func (p *Pool) Size() int { - return p.workerNum.Val() -} - -// 查询当前等待处理的任务总数 -func (p *Pool) Jobs() int { - return len(p.jobEvents) -} - -// 动态改变当前池中goroutine的上线数量 -func (p *Pool) SetSize(size int) { - p.size.Set(size) -} - -// 动态改变当前池中goroutine的过期时间 -func (p *Pool) SetExpire(expire int) { - p.expire.Set(expire) -} - -// 关闭池,所有的任务将会停止,此后继续添加的任务将不会被执行 -func (p *Pool) Close() { - // 必须首先标识让任务过期自动关闭 - p.SetExpire(-1) - // 使用stopEvents事件通知所有的异步协程及清理协程自动退出 - close(p.stopEvents) -} \ No newline at end of file diff --git a/g/os/grpool/grpool_pool.go b/g/os/grpool/grpool_pool.go deleted file mode 100644 index b5bba9fb6..000000000 --- a/g/os/grpool/grpool_pool.go +++ /dev/null @@ -1,121 +0,0 @@ -// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. -// -// This Source Code Form is subject to the terms of the MIT License. -// If a copy of the MIT was not distributed with this file, -// You can obtain one at https://gitee.com/johng/gf. - -package grpool - -import ( - "time" - "runtime" - "gitee.com/johng/gf/g/os/gtime" -) - -// 任务分配循环协程,使用基于 runtime.GOMAXPROCS 数量的协程来实现抢占调度, -// 使用抢占调度的目的是使得任务能够并发地快速被分配出去执行 -func (p *Pool) startSchedLoop() { - for i := 0; i < runtime.GOMAXPROCS(-1); i++ { - go func() { - for { - select { - case <-p.jobEvents: - p.getWorker().setJob(p.jobs.PopFront().(func())) - case <-p.stopEvents: - return - } - } - }() - } -} - -// 定时清理过期任务,单协程处理 -func (p *Pool) startClearLoop() { - go func() { - for { - select { - case <-p.stopEvents: - // 如果接收到关闭通知(池已经关闭),关闭所有worker后退出 - for { - if r := p.queue.PopFront(); r != nil { - // 主动关闭所有worker,防止goroutine泄露 - r.(*PoolWorker).stop() - } else { - break - } - } - return - - default: - time.Sleep(gDEFAULT_CLEAR_INTERVAL*time.Second) - // 保证没有工作任务的情况下,执行worker清理操作 - if len(p.jobEvents) == 0 { - var w *PoolWorker - for { - if r := p.queue.PopFront(); r != nil { - w = r.(*PoolWorker) - if gtime.Second() - int64(p.expire.Val()) > w.update { - w.stop() - } else { - p.queue.PushFront(w) - break - } - } else { - break - } - } - } - } - } - }() -} - -// 获取过期时间 -func (p *Pool) getExpire() int { - return p.expire.Val() -} - -// 创建一个空的任务对象 -func (p *Pool) newWorker() *PoolWorker { - // 如果达到goroutine数限制,那么阻塞等待有空闲goroutine后继续 - // 需要注意的是在高并发下workerNum的值可能会高于size, - // 从效率上考虑没有将workerNum和size都放到一个互斥锁中进行准确度控制, - // 精准是要付出代价的 - if p.workerNum.Val() >= p.size.Val() { - // (非精准控制)阻塞等待空闲的协程资源, - // 这是一个递归循环,因为该流程中存在协程抢占机制, - // 如果进入getJob方法没有抢占到协程资源,那么该任务执行会继续等待下一个freeEvents事件产生 - p.blockedNum.Add(1) - <- p.freeEvents - return p.getWorker() - } - w := &PoolWorker { - job : make(chan func(), 1), - pool : p, - } - w.start() - p.workerNum.Add(1) - return w -} - -// 添加worker对象到空闲队列 -func (p *Pool) addWorker(w *PoolWorker) bool { - if p.workerNum.Val() > p.size.Val() || w.pool.getExpire() == -1 { - return false - } - p.queue.PushBack(w) - // 如果当前的goroutine数量达到上线,那么需要使用空闲goroutine通知事件 - if p.blockedNum.Val() > 0 { - p.blockedNum.Add(-1) - p.freeEvents <- struct{}{} - } - return true -} - -// 获取/创建任务 -func (p *Pool) getWorker() *PoolWorker { - if r := p.queue.PopFront(); r != nil { - return r.(*PoolWorker) - } - return p.newWorker() -} diff --git a/g/os/grpool/grpool_pool_worker.go b/g/os/grpool/grpool_pool_worker.go deleted file mode 100644 index 65e363f04..000000000 --- a/g/os/grpool/grpool_pool_worker.go +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. -// -// This Source Code Form is subject to the terms of the MIT License. -// If a copy of the MIT was not distributed with this file, -// You can obtain one at https://gitee.com/johng/gf. - -package grpool - -import "gitee.com/johng/gf/g/os/gtime" - -// 开始任务 -func (w *PoolWorker) start() { - go func() { - for { - if f := <- w.job; f != nil { - // 执行任务 - f() - // 更新活动时间(不存在并发安全问题) - w.update = gtime.Second() - // 执行完毕后添加到空闲队列 - if !w.pool.addWorker(w) { - break - } - } else { - break - } - } - w.pool.workerNum.Add(-1) - }() -} - -// 关闭当前任务 -func (w *PoolWorker) stop() { - w.setJob(nil) -} - -// 设置当前任务的执行函数 -func (w *PoolWorker) setJob(f func()) { - w.job <- f -} - diff --git a/g/os/grpool/grpool_test.go b/g/os/grpool/grpool_test.go index 5ddd7fb8a..1107499c2 100644 --- a/g/os/grpool/grpool_test.go +++ b/g/os/grpool/grpool_test.go @@ -7,30 +7,29 @@ package grpool_test import ( - "fmt" - "runtime" "testing" - "gitee.com/johng/gf/g/os/grpool" + "runtime" + "fmt" ) func increment() { - for i := 0; i < 1000000; i++ {} + for i := 0; i < 100000; i++ {} } -func Test_GrpoolMemUsage(t *testing.T) { - for i := 0; i < n; i++ { - grpool.Add(increment) - } - mem := runtime.MemStats{} - runtime.ReadMemStats(&mem) - fmt.Println("mem usage:", mem.TotalAlloc/1024) -} - -//func Test_GroroutineMemUsage(t *testing.T) { +//func Test_GrpoolMemUsage(t *testing.T) { // for i := 0; i < n; i++ { -// go increment() +// grpool.Add(increment) // } // mem := runtime.MemStats{} // runtime.ReadMemStats(&mem) // fmt.Println("mem usage:", mem.TotalAlloc/1024) -//} \ No newline at end of file +//} + +func Test_GroroutineMemUsage(t *testing.T) { + for i := 0; i < n; i++ { + go increment() + } + mem := runtime.MemStats{} + runtime.ReadMemStats(&mem) + fmt.Println("mem usage:", mem.TotalAlloc/1024) +} \ No newline at end of file diff --git a/geg/os/grpool/goroutine.go b/geg/os/grpool/goroutine.go index b977958a9..edcf8d529 100644 --- a/geg/os/grpool/goroutine.go +++ b/geg/os/grpool/goroutine.go @@ -11,10 +11,10 @@ import ( func main() { start := gtime.Millisecond() wg := sync.WaitGroup{} - for i := 0; i < 10000000; i++ { + for i := 0; i < 100000; i++ { wg.Add(1) go func() { - time.Sleep(time.Millisecond) + time.Sleep(time.Second) wg.Done() }() } diff --git a/geg/os/grpool/grpool.go b/geg/os/grpool/grpool.go index bad4bb92c..bcb1aef6e 100644 --- a/geg/os/grpool/grpool.go +++ b/geg/os/grpool/grpool.go @@ -11,10 +11,10 @@ import ( func main() { start := gtime.Millisecond() wg := sync.WaitGroup{} - for i := 0; i < 10000000; i++ { + for i := 0; i < 100000; i++ { wg.Add(1) grpool.Add(func() { - time.Sleep(time.Millisecond) + time.Sleep(time.Second) wg.Done() }) } diff --git a/geg/os/grpool/grpool1.go b/geg/os/grpool/grpool1.go index 2a1f41f5e..2b08a94c1 100644 --- a/geg/os/grpool/grpool1.go +++ b/geg/os/grpool/grpool1.go @@ -3,8 +3,8 @@ package main import ( "time" "fmt" - "gitee.com/johng/gf/g/os/gtime" "gitee.com/johng/gf/g/os/grpool" + "gitee.com/johng/gf/g/os/gtime" ) func job() { @@ -12,21 +12,18 @@ func job() { } func main() { - grpool.SetSize(10) + pool := grpool.New(100) for i := 0; i < 1000; i++ { - grpool.Add(job) + pool.Add(job) } - fmt.Println("size:", grpool.Size()) - fmt.Println("jobs:", grpool.Jobs()) - gtime.SetInterval(2*time.Second, func() bool { - fmt.Println("size:", grpool.Size()) - fmt.Println("jobs:", grpool.Jobs()) - return true + fmt.Println("worker:", pool.Size()) + fmt.Println(" jobs:", pool.Jobs()) + gtime.SetInterval(time.Second, func() bool { + fmt.Println("worker:", pool.Size()) + fmt.Println(" jobs:", pool.Jobs()) + fmt.Println() + return true }) - gtime.SetInterval(5*time.Second, func() bool { - grpool.SetSize(2) - return true - }) select {} }