mirror of
https://gitee.com/johng/gf
synced 2026-06-07 02:12:11 +08:00
完善groutine功能
This commit is contained in:
@ -10,48 +10,87 @@ package groutine
|
||||
|
||||
import (
|
||||
"math"
|
||||
"sync"
|
||||
"gitee.com/johng/gf/g/container/gset"
|
||||
"sync/atomic"
|
||||
"gitee.com/johng/gf/g/container/glist"
|
||||
)
|
||||
|
||||
const (
|
||||
gDEFAULT_EXPIRE_TIME = 60 // 默认goroutine过期时间
|
||||
gDEFAULT_CLEAR_INTERVAL = 3 // 定期检查任务过期时间间隔
|
||||
)
|
||||
|
||||
// goroutine池对象
|
||||
type Pool struct {
|
||||
jobs *gset.InterfaceSet // 当前任务对象(*PoolJob)
|
||||
queue *glist.SafeList // 空闲任务队列(*PoolJob)
|
||||
funcs *glist.SafeList // 待处理任务操作队列
|
||||
events chan struct{} // 任务操作处理事件(用于任务事件通知)
|
||||
expire int32 // goroutine过期时间(秒)
|
||||
number int32 // 当前goroutine数量(非任务数)
|
||||
queue *glist.SafeList // 空闲任务队列(*PoolJob)
|
||||
funcs *glist.SafeList // 待处理任务操作队列
|
||||
funcEvents chan struct{} // 任务操作处理事件(用于任务事件通知)
|
||||
stopEvents chan struct{} // 池关闭事件(用于池相关异步线程通知)
|
||||
}
|
||||
|
||||
// goroutine任务
|
||||
type PoolJob struct {
|
||||
mu sync.RWMutex
|
||||
job chan func() // 当前任务(当为nil时表示关闭)
|
||||
pool *Pool // 所属池
|
||||
update int64 // 更新时间
|
||||
}
|
||||
|
||||
// 创建goroutine池管理对象
|
||||
func New() *Pool {
|
||||
// 默认的goroutine池管理对象
|
||||
// 该对象与进程同生命周期,无需Close
|
||||
var defaultPool = New(gDEFAULT_EXPIRE_TIME)
|
||||
|
||||
// 创建goroutine池管理对象,给定过期时间(秒)
|
||||
func New(expire int) *Pool {
|
||||
p := &Pool {
|
||||
jobs : gset.NewInterfaceSet(),
|
||||
queue : glist.NewSafeList(),
|
||||
funcs : glist.NewSafeList(),
|
||||
events : make(chan struct{}, math.MaxUint32),
|
||||
expire : int32(expire),
|
||||
queue : glist.NewSafeList(),
|
||||
funcs : glist.NewSafeList(),
|
||||
funcEvents : make(chan struct{}, math.MaxUint32),
|
||||
stopEvents : make(chan struct{}, 1),
|
||||
}
|
||||
p.loop()
|
||||
p.workloop()
|
||||
p.clearloop()
|
||||
return p
|
||||
}
|
||||
|
||||
// 添加异步任务(使用默认的池对象)
|
||||
func Add(f func()) {
|
||||
defaultPool.funcs.PushBack(f)
|
||||
defaultPool.funcEvents <- struct{}{}
|
||||
}
|
||||
|
||||
// 查询当前goroutine总数
|
||||
func Size() int {
|
||||
return int(atomic.LoadInt32(&defaultPool.number))
|
||||
}
|
||||
|
||||
// 设置默认池中goroutine的过期时间
|
||||
func SetExpire(expire int) {
|
||||
atomic.StoreInt32(&defaultPool.expire, int32(expire))
|
||||
}
|
||||
|
||||
// 添加异步任务
|
||||
func (p *Pool) Add(f func()) {
|
||||
p.funcs.PushBack(f)
|
||||
p.events <- struct{}{}
|
||||
p.funcEvents <- struct{}{}
|
||||
}
|
||||
|
||||
// 查询当前goroutine总数
|
||||
func (p *Pool) Size() int {
|
||||
return int(atomic.LoadInt32(&p.number))
|
||||
}
|
||||
|
||||
// 设置当前池中goroutine的过期时间
|
||||
func (p *Pool) SetExpire(expire int) {
|
||||
atomic.StoreInt32(&p.expire, int32(expire))
|
||||
}
|
||||
|
||||
// 关闭池,所有的任务将会停止,此后继续添加的任务将不会被执行
|
||||
func (p *Pool) Close() {
|
||||
p.Add(nil)
|
||||
p.jobs.Iterator(func(v interface{}){
|
||||
v.(*PoolJob).stop()
|
||||
})
|
||||
// 必须首先标识让任务过期自动关闭
|
||||
p.SetExpire(-1)
|
||||
p.stopEvents <- struct{}{} // 通知workloop
|
||||
p.stopEvents <- struct{}{} // 通知clearloop
|
||||
|
||||
}
|
||||
@ -6,6 +6,8 @@
|
||||
|
||||
package groutine
|
||||
|
||||
import "gitee.com/johng/gf/g/os/gtime"
|
||||
|
||||
// 开始任务
|
||||
func (j *PoolJob) start() {
|
||||
go func() {
|
||||
@ -13,10 +15,12 @@ func (j *PoolJob) start() {
|
||||
if f := <- j.job; f != nil {
|
||||
// 执行任务
|
||||
f()
|
||||
// 清空任务(GC可回收f对应资源)
|
||||
j.job = nil
|
||||
// 更新活动时间
|
||||
j.update = gtime.Second()
|
||||
// 执行完毕后添加到空闲队列
|
||||
j.pool.addJob(j)
|
||||
if !j.pool.addJob(j) {
|
||||
break
|
||||
}
|
||||
} else {
|
||||
break
|
||||
}
|
||||
|
||||
@ -6,23 +6,61 @@
|
||||
|
||||
package groutine
|
||||
|
||||
import (
|
||||
"time"
|
||||
"sync/atomic"
|
||||
"gitee.com/johng/gf/g/os/gtime"
|
||||
)
|
||||
|
||||
// 任务分配循环
|
||||
func (p *Pool) loop() {
|
||||
func (p *Pool) workloop() {
|
||||
go func() {
|
||||
for {
|
||||
// 阻塞监听任务事件
|
||||
if _, ok := <- p.events; ok {
|
||||
// 如果任务为nil,表示池关闭
|
||||
if r := p.funcs.PopFront(); r != nil {
|
||||
p.getJob().setJob(r.(func()))
|
||||
} else {
|
||||
select {
|
||||
case <-p.funcEvents:
|
||||
p.getJob().setJob(p.funcs.PopFront().(func()))
|
||||
case <-p.stopEvents:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// 定时清理过期任务
|
||||
func (p *Pool) clearloop() {
|
||||
go func() {
|
||||
for {
|
||||
time.Sleep(gDEFAULT_CLEAR_INTERVAL*time.Second)
|
||||
if len(p.stopEvents) > 0 || len(p.funcEvents) == 0 {
|
||||
var j *PoolJob
|
||||
for {
|
||||
if r := p.queue.PopFront(); r != nil {
|
||||
j = r.(*PoolJob)
|
||||
if gtime.Second() - int64(p.expire) > j.update {
|
||||
j.stop()
|
||||
atomic.AddInt32(&p.number, -1)
|
||||
} else {
|
||||
p.queue.PushFront(r)
|
||||
break
|
||||
}
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
// 判断是池已经关闭,是则退出
|
||||
if len(p.stopEvents) > 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// 获取过期时间
|
||||
func (p *Pool) getExpire() int32 {
|
||||
return atomic.LoadInt32(&p.expire)
|
||||
}
|
||||
|
||||
// 创建一个空的任务对象
|
||||
func (p *Pool) newJob() *PoolJob {
|
||||
j := &PoolJob {
|
||||
@ -30,13 +68,16 @@ func (p *Pool) newJob() *PoolJob {
|
||||
pool : p,
|
||||
}
|
||||
j.start()
|
||||
p.jobs.Add(j)
|
||||
atomic.AddInt32(&p.number, 1)
|
||||
return j
|
||||
}
|
||||
|
||||
// 添加任务对象到队列
|
||||
func (p *Pool) addJob(j *PoolJob) {
|
||||
p.queue.PushBack(j)
|
||||
func (p *Pool) addJob(j *PoolJob) bool {
|
||||
if j.pool.getExpire() == -1 {
|
||||
return false
|
||||
}
|
||||
return p.queue.PushBack(j) != nil
|
||||
}
|
||||
|
||||
// 获取/创建任务
|
||||
|
||||
@ -20,13 +20,10 @@ func test() {
|
||||
}
|
||||
}
|
||||
|
||||
var pool = groutine.New()
|
||||
|
||||
func BenchmarkGroutine(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
pool.Add(test)
|
||||
groutine.Add(test)
|
||||
}
|
||||
//pool.Close()
|
||||
}
|
||||
|
||||
func BenchmarkGoRoutine(b *testing.B) {
|
||||
|
||||
@ -4,24 +4,23 @@ import (
|
||||
"time"
|
||||
"gitee.com/johng/gf/g/os/groutine"
|
||||
"fmt"
|
||||
"gitee.com/johng/gf/g/os/gtime"
|
||||
)
|
||||
|
||||
func job() {
|
||||
time.Sleep(3*time.Second)
|
||||
fmt.Println("job done")
|
||||
func job(i int) {
|
||||
time.Sleep(2*time.Second)
|
||||
//fmt.Println("job done:", i)
|
||||
}
|
||||
|
||||
func main() {
|
||||
p := groutine.New()
|
||||
p.Add(job)
|
||||
p.Add(job)
|
||||
p.Add(job)
|
||||
p.Add(job)
|
||||
|
||||
|
||||
time.Sleep(1*time.Second)
|
||||
|
||||
p.Close()
|
||||
|
||||
time.Sleep(5*time.Second)
|
||||
for i := 0; i < 10; i++ {
|
||||
groutine.Add(func() {
|
||||
job(i)
|
||||
})
|
||||
}
|
||||
gtime.SetInterval(2*time.Second, func() bool {
|
||||
fmt.Println(groutine.Size())
|
||||
return true
|
||||
})
|
||||
time.Sleep(5000*time.Second)
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user