diff --git a/g/os/groutine/groutine_api.go b/g/os/groutine/groutine_api.go index d00e1a79b..3dd84767a 100644 --- a/g/os/groutine/groutine_api.go +++ b/g/os/groutine/groutine_api.go @@ -10,48 +10,87 @@ package groutine import ( "math" - "sync" - "gitee.com/johng/gf/g/container/gset" + "sync/atomic" "gitee.com/johng/gf/g/container/glist" ) +const ( + gDEFAULT_EXPIRE_TIME = 60 // 默认goroutine过期时间 + gDEFAULT_CLEAR_INTERVAL = 3 // 定期检查任务过期时间间隔 +) + // goroutine池对象 type Pool struct { - jobs *gset.InterfaceSet // 当前任务对象(*PoolJob) - queue *glist.SafeList // 空闲任务队列(*PoolJob) - funcs *glist.SafeList // 待处理任务操作队列 - events chan struct{} // 任务操作处理事件(用于任务事件通知) + expire int32 // goroutine过期时间(秒) + number int32 // 当前goroutine数量(非任务数) + queue *glist.SafeList // 空闲任务队列(*PoolJob) + funcs *glist.SafeList // 待处理任务操作队列 + funcEvents chan struct{} // 任务操作处理事件(用于任务事件通知) + stopEvents chan struct{} // 池关闭事件(用于池相关异步线程通知) } // goroutine任务 type PoolJob struct { - mu sync.RWMutex job chan func() // 当前任务(当为nil时表示关闭) pool *Pool // 所属池 + update int64 // 更新时间 } -// 创建goroutine池管理对象 -func New() *Pool { +// 默认的goroutine池管理对象 +// 该对象与进程同生命周期,无需Close +var defaultPool = New(gDEFAULT_EXPIRE_TIME) + +// 创建goroutine池管理对象,给定过期时间(秒) +func New(expire int) *Pool { p := &Pool { - jobs : gset.NewInterfaceSet(), - queue : glist.NewSafeList(), - funcs : glist.NewSafeList(), - events : make(chan struct{}, math.MaxUint32), + expire : int32(expire), + queue : glist.NewSafeList(), + funcs : glist.NewSafeList(), + funcEvents : make(chan struct{}, math.MaxUint32), + stopEvents : make(chan struct{}, 1), } - p.loop() + p.workloop() + p.clearloop() return p } +// 添加异步任务(使用默认的池对象) +func Add(f func()) { + defaultPool.funcs.PushBack(f) + defaultPool.funcEvents <- struct{}{} +} + +// 查询当前goroutine总数 +func Size() int { + return int(atomic.LoadInt32(&defaultPool.number)) +} + +// 设置默认池中goroutine的过期时间 +func SetExpire(expire int) { + atomic.StoreInt32(&defaultPool.expire, int32(expire)) +} + // 添加异步任务 func (p *Pool) Add(f func()) { p.funcs.PushBack(f) - p.events <- struct{}{} + p.funcEvents <- struct{}{} +} + +// 查询当前goroutine总数 +func (p *Pool) Size() int { + return int(atomic.LoadInt32(&p.number)) +} + +// 设置当前池中goroutine的过期时间 +func (p *Pool) SetExpire(expire int) { + atomic.StoreInt32(&p.expire, int32(expire)) } // 关闭池,所有的任务将会停止,此后继续添加的任务将不会被执行 func (p *Pool) Close() { - p.Add(nil) - p.jobs.Iterator(func(v interface{}){ - v.(*PoolJob).stop() - }) + // 必须首先标识让任务过期自动关闭 + p.SetExpire(-1) + p.stopEvents <- struct{}{} // 通知workloop + p.stopEvents <- struct{}{} // 通知clearloop + } \ No newline at end of file diff --git a/g/os/groutine/groutine_job.go b/g/os/groutine/groutine_job.go index 933dc5baa..64031296e 100644 --- a/g/os/groutine/groutine_job.go +++ b/g/os/groutine/groutine_job.go @@ -6,6 +6,8 @@ package groutine +import "gitee.com/johng/gf/g/os/gtime" + // 开始任务 func (j *PoolJob) start() { go func() { @@ -13,10 +15,12 @@ func (j *PoolJob) start() { if f := <- j.job; f != nil { // 执行任务 f() - // 清空任务(GC可回收f对应资源) - j.job = nil + // 更新活动时间 + j.update = gtime.Second() // 执行完毕后添加到空闲队列 - j.pool.addJob(j) + if !j.pool.addJob(j) { + break + } } else { break } diff --git a/g/os/groutine/groutine_pool.go b/g/os/groutine/groutine_pool.go index 4aff9d7c6..7e8b6241a 100644 --- a/g/os/groutine/groutine_pool.go +++ b/g/os/groutine/groutine_pool.go @@ -6,23 +6,61 @@ package groutine +import ( + "time" + "sync/atomic" + "gitee.com/johng/gf/g/os/gtime" +) + // 任务分配循环 -func (p *Pool) loop() { +func (p *Pool) workloop() { go func() { for { - // 阻塞监听任务事件 - if _, ok := <- p.events; ok { - // 如果任务为nil,表示池关闭 - if r := p.funcs.PopFront(); r != nil { - p.getJob().setJob(r.(func())) - } else { + select { + case <-p.funcEvents: + p.getJob().setJob(p.funcs.PopFront().(func())) + case <-p.stopEvents: return - } } } }() } +// 定时清理过期任务 +func (p *Pool) clearloop() { + go func() { + for { + time.Sleep(gDEFAULT_CLEAR_INTERVAL*time.Second) + if len(p.stopEvents) > 0 || len(p.funcEvents) == 0 { + var j *PoolJob + for { + if r := p.queue.PopFront(); r != nil { + j = r.(*PoolJob) + if gtime.Second() - int64(p.expire) > j.update { + j.stop() + atomic.AddInt32(&p.number, -1) + } else { + p.queue.PushFront(r) + break + } + } else { + break + } + } + } + // 判断是池已经关闭,是则退出 + if len(p.stopEvents) > 0 { + break + } + } + }() +} + +// 获取过期时间 +func (p *Pool) getExpire() int32 { + return atomic.LoadInt32(&p.expire) +} + // 创建一个空的任务对象 func (p *Pool) newJob() *PoolJob { j := &PoolJob { @@ -30,13 +68,16 @@ func (p *Pool) newJob() *PoolJob { pool : p, } j.start() - p.jobs.Add(j) + atomic.AddInt32(&p.number, 1) return j } // 添加任务对象到队列 -func (p *Pool) addJob(j *PoolJob) { - p.queue.PushBack(j) +func (p *Pool) addJob(j *PoolJob) bool { + if j.pool.getExpire() == -1 { + return false + } + return p.queue.PushBack(j) != nil } // 获取/创建任务 diff --git a/g/os/groutine/groutine_test.go b/g/os/groutine/groutine_test.go index 2d0bcee75..f05d2d826 100644 --- a/g/os/groutine/groutine_test.go +++ b/g/os/groutine/groutine_test.go @@ -20,13 +20,10 @@ func test() { } } -var pool = groutine.New() - func BenchmarkGroutine(b *testing.B) { for i := 0; i < b.N; i++ { - pool.Add(test) + groutine.Add(test) } - //pool.Close() } func BenchmarkGoRoutine(b *testing.B) { diff --git a/geg/os/groutine.go b/geg/os/groutine.go index 6113b20bd..2a778aaa1 100644 --- a/geg/os/groutine.go +++ b/geg/os/groutine.go @@ -4,24 +4,23 @@ import ( "time" "gitee.com/johng/gf/g/os/groutine" "fmt" + "gitee.com/johng/gf/g/os/gtime" ) -func job() { - time.Sleep(3*time.Second) - fmt.Println("job done") +func job(i int) { + time.Sleep(2*time.Second) + //fmt.Println("job done:", i) } func main() { - p := groutine.New() - p.Add(job) - p.Add(job) - p.Add(job) - p.Add(job) - - - time.Sleep(1*time.Second) - - p.Close() - - time.Sleep(5*time.Second) + for i := 0; i < 10; i++ { + groutine.Add(func() { + job(i) + }) + } + gtime.SetInterval(2*time.Second, func() bool { + fmt.Println(groutine.Size()) + return true + }) + time.Sleep(5000*time.Second) }