From 2ae3e215558047eeb03b77ffa66207d7f63c3702 Mon Sep 17 00:00:00 2001 From: John Date: Mon, 15 Jan 2018 23:26:56 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=B9=E8=BF=9Bgroutine=E8=AE=BE=E8=AE=A1?= =?UTF-8?q?=EF=BC=8C=E5=88=9D=E6=AD=A5=E5=AE=8C=E6=88=90=E6=80=A7=E8=83=BD?= =?UTF-8?q?=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- g/os/groutine/groutine_api.go | 31 ++++++++++++++--- g/os/groutine/groutine_job.go | 2 +- .../{groutine.go => groutine_pool.go} | 33 +++++-------------- g/os/groutine/groutine_test.go | 21 +++++++----- geg/other/test.go | 5 +-- 5 files changed, 51 insertions(+), 41 deletions(-) rename g/os/groutine/{groutine.go => groutine_pool.go} (56%) diff --git a/g/os/groutine/groutine_api.go b/g/os/groutine/groutine_api.go index e116c2e5c..d00e1a79b 100644 --- a/g/os/groutine/groutine_api.go +++ b/g/os/groutine/groutine_api.go @@ -4,19 +4,39 @@ // If a copy of the MIT was not distributed with this file, // You can obtain one at https://gitee.com/johng/gf. +// Goroutine池. +// 用于goroutine复用,提升异步操作执行效率. package groutine import ( + "math" + "sync" "gitee.com/johng/gf/g/container/gset" "gitee.com/johng/gf/g/container/glist" ) +// goroutine池对象 +type Pool struct { + jobs *gset.InterfaceSet // 当前任务对象(*PoolJob) + queue *glist.SafeList // 空闲任务队列(*PoolJob) + funcs *glist.SafeList // 待处理任务操作队列 + events chan struct{} // 任务操作处理事件(用于任务事件通知) +} + +// goroutine任务 +type PoolJob struct { + mu sync.RWMutex + job chan func() // 当前任务(当为nil时表示关闭) + pool *Pool // 所属池 +} + // 创建goroutine池管理对象 func New() *Pool { p := &Pool { - jobs : gset.NewInterfaceSet(), - queue : glist.NewSafeList(), - funcs : make(chan func(), 1000000), + jobs : gset.NewInterfaceSet(), + queue : glist.NewSafeList(), + funcs : glist.NewSafeList(), + events : make(chan struct{}, math.MaxUint32), } p.loop() return p @@ -24,12 +44,13 @@ func New() *Pool { // 添加异步任务 func (p *Pool) Add(f func()) { - p.funcs <- f + p.funcs.PushBack(f) + p.events <- struct{}{} } // 关闭池,所有的任务将会停止,此后继续添加的任务将不会被执行 func (p *Pool) Close() { - p.funcs <- nil + p.Add(nil) p.jobs.Iterator(func(v interface{}){ v.(*PoolJob).stop() }) diff --git a/g/os/groutine/groutine_job.go b/g/os/groutine/groutine_job.go index 71ea8a117..933dc5baa 100644 --- a/g/os/groutine/groutine_job.go +++ b/g/os/groutine/groutine_job.go @@ -26,7 +26,7 @@ func (j *PoolJob) start() { // 关闭当前任务 func (j *PoolJob) stop() { - j.job <- nil + j.setJob(nil) } // 设置当前任务的执行函数 diff --git a/g/os/groutine/groutine.go b/g/os/groutine/groutine_pool.go similarity index 56% rename from g/os/groutine/groutine.go rename to g/os/groutine/groutine_pool.go index f3c39d087..4aff9d7c6 100644 --- a/g/os/groutine/groutine.go +++ b/g/os/groutine/groutine_pool.go @@ -4,37 +4,20 @@ // If a copy of the MIT was not distributed with this file, // You can obtain one at https://gitee.com/johng/gf. -// Goroutine池. package groutine -import ( - "gitee.com/johng/gf/g/container/glist" - "gitee.com/johng/gf/g/container/gset" - "sync" -) - -// goroutine池对象 -type Pool struct { - jobs *gset.InterfaceSet // 当前任务对象(*PoolJob) - queue *glist.SafeList // 空闲任务队列(*PoolJob) - funcs []chan func() // 待处理任务操作 -} - -// goroutine任务 -type PoolJob struct { - mu sync.RWMutex - job chan func() // 当前任务(当为nil时表示关闭) - pool *Pool // 所属池 -} - // 任务分配循环 func (p *Pool) loop() { go func() { for { - if f := <- p.funcs; f != nil { - p.getJob().setJob(f) - } else { - return + // 阻塞监听任务事件 + if _, ok := <- p.events; ok { + // 如果任务为nil,表示池关闭 + if r := p.funcs.PopFront(); r != nil { + p.getJob().setJob(r.(func())) + } else { + return + } } } }() diff --git a/g/os/groutine/groutine_test.go b/g/os/groutine/groutine_test.go index 2258d7a9b..2d0bcee75 100644 --- a/g/os/groutine/groutine_test.go +++ b/g/os/groutine/groutine_test.go @@ -1,3 +1,11 @@ +// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +// go test *.go -bench=".*" + package groutine_test import ( @@ -21,11 +29,8 @@ func BenchmarkGroutine(b *testing.B) { //pool.Close() } -//func BenchmarkGoRoutine(b *testing.B) { -// t := gtime.Microsecond() -// b.N = 100000 -// for i := 0; i < b.N; i++ { -// go test() -// } -// fmt.Println("BenchmarkGoRoutine costs:", gtime.Microsecond() - t) -//} \ No newline at end of file +func BenchmarkGoRoutine(b *testing.B) { + for i := 0; i < b.N; i++ { + go test() + } +} \ No newline at end of file diff --git a/geg/other/test.go b/geg/other/test.go index b22f57484..3f77e3182 100644 --- a/geg/other/test.go +++ b/geg/other/test.go @@ -4,13 +4,14 @@ import ( "gitee.com/johng/gf/g/os/gtime" "fmt" "gitee.com/johng/gf/g/container/glist" + "math" ) func main() { t1 := gtime.Microsecond() - c := make(chan func(), 10) - c <- func(){} + c := make(chan struct{}, math.MaxInt64) + c <- struct{}{} fmt.Println(gtime.Microsecond() - t1) t2 := gtime.Microsecond()