From 5f4afc711149a6c82c61a2a926fc021925506500 Mon Sep 17 00:00:00 2001 From: John Date: Mon, 15 Jan 2018 17:23:22 +0800 Subject: [PATCH 1/3] =?UTF-8?q?=E6=94=B9=E8=BF=9B=E5=92=8C=E5=AE=8C?= =?UTF-8?q?=E5=96=84=E5=B9=B6=E5=8F=91=E5=AE=89=E5=85=A8=E5=AE=B9=E5=99=A8?= =?UTF-8?q?=E9=83=A8=E5=88=86=E6=96=B9=E6=B3=95=EF=BC=8C=E5=88=9D=E6=AD=A5?= =?UTF-8?q?=E5=AE=8C=E6=88=90goroutine=E6=B1=A0=E5=8C=85=EF=BC=8C=E5=BE=85?= =?UTF-8?q?=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- g/container/glist/safelist.go | 65 +++++++++++++++++++++++++------ g/container/gset/int_set.go | 9 +++++ g/container/gset/interface_set.go | 16 ++++++-- g/container/gset/string_set.go | 9 +++++ g/os/groutine/groutine.go | 51 ++++++++++++++++++++++++ g/os/groutine/groutine_api.go | 32 +++++++++++++++ g/os/groutine/groutine_job.go | 34 ++++++++++++++++ geg/os/groutine.go | 27 +++++++++++++ geg/other/test.go | 4 +- 9 files changed, 230 insertions(+), 17 deletions(-) create mode 100644 g/os/groutine/groutine.go create mode 100644 g/os/groutine/groutine_api.go create mode 100644 g/os/groutine/groutine_job.go create mode 100644 geg/os/groutine.go diff --git a/g/container/glist/safelist.go b/g/container/glist/safelist.go index 20ba7a1d9..c1a241f8c 100644 --- a/g/container/glist/safelist.go +++ b/g/container/glist/safelist.go @@ -78,6 +78,18 @@ func (this *SafeList) PopBack() interface{} { return nil } +// 从链表头端出栈数据项(删除) +func (this *SafeList) PopFront() interface{} { + this.Lock() + if elem := this.L.Front(); elem != nil { + item := this.L.Remove(elem) + this.Unlock() + return item + } + this.Unlock() + return nil +} + // 批量从链表尾端出栈数据项(删除) func (this *SafeList) BatchPopBack(max int) []interface{} { this.Lock() @@ -90,35 +102,66 @@ func (this *SafeList) BatchPopBack(max int) []interface{} { if count > max { count = max } - items := make([]interface{}, 0, count) + items := make([]interface{}, count) for i := 0; i < count; i++ { - item := this.L.Remove(this.L.Back()) - items = append(items, item) + items[i] = this.L.Remove(this.L.Back()) } this.Unlock() return items } -// 批量从链表尾端依次获取所有数据 -func (this *SafeList) PopBackAll() []interface{} { +// 批量从链表头端出栈数据项(删除) +func (this *SafeList) BatchPopFront(max int) []interface{} { this.Lock() - count := this.L.Len() if count == 0 { this.Unlock() return []interface{}{} } - items := make([]interface{}, 0, count) - for i := 0; i < count; i++ { - item := this.L.Remove(this.L.Back()) - items = append(items, item) + if count > max { + count = max + } + items := make([]interface{}, count) + for i := 0; i < count; i++ { + items[i] = this.L.Remove(this.L.Front()) } - this.Unlock() return items } +// 批量从链表尾端依次获取所有数据(删除) +func (this *SafeList) PopBackAll() []interface{} { + this.Lock() + count := this.L.Len() + if count == 0 { + this.Unlock() + return []interface{}{} + } + items := make([]interface{}, count) + for i := 0; i < count; i++ { + items[i] = this.L.Remove(this.L.Back()) + } + this.Unlock() + return items +} + +// 批量从链表头端依次获取所有数据(删除) +func (this *SafeList) PopFrontAll() []interface{} { + this.Lock() + count := this.L.Len() + if count == 0 { + this.Unlock() + return []interface{}{} + } + items := make([]interface{}, count) + for i := 0; i < count; i++ { + items[i] = this.L.Remove(this.L.Front()) + } + this.Unlock() + return items +} + // 删除数据项 func (this *SafeList) Remove(e *list.Element) interface{} { this.Lock() diff --git a/g/container/gset/int_set.go b/g/container/gset/int_set.go index 509e4234e..e0a31e147 100644 --- a/g/container/gset/int_set.go +++ b/g/container/gset/int_set.go @@ -22,6 +22,15 @@ func NewIntSet() *IntSet { return &IntSet{M: make(map[int]struct{})} } +// 给定回调函数对原始内容进行遍历 +func (this *IntSet) Iterator(f func (v int)) { + this.RLock() + for k, _ := range this.M { + f(k) + } + this.RUnlock() +} + // 设置键 func (this *IntSet) Add(item int) *IntSet { if this.Contains(item) { diff --git a/g/container/gset/interface_set.go b/g/container/gset/interface_set.go index 065f770a2..53416f956 100644 --- a/g/container/gset/interface_set.go +++ b/g/container/gset/interface_set.go @@ -21,7 +21,16 @@ func NewInterfaceSet() *InterfaceSet { return &InterfaceSet{M: make(map[interface{}]struct{})} } -// 设置键 +// 给定回调函数对原始内容进行遍历 +func (this *InterfaceSet) Iterator(f func (v interface{})) { + this.RLock() + for k, _ := range this.M { + f(k) + } + this.RUnlock() +} + +// 添加 func (this *InterfaceSet) Add(item interface{}) *InterfaceSet { if this.Contains(item) { return this @@ -32,7 +41,7 @@ func (this *InterfaceSet) Add(item interface{}) *InterfaceSet { return this } -// 批量添加设置键 +// 批量添加 func (this *InterfaceSet) BatchAdd(items []interface{}) *InterfaceSet { count := len(items) if count == 0 { @@ -97,13 +106,12 @@ func (this *InterfaceSet) Clear() { // 转换为数组 func (this *InterfaceSet) Slice() []interface{} { this.RLock() + i := 0 ret := make([]interface{}, len(this.M)) - i := 0 for item := range this.M { ret[i] = item i++ } - this.RUnlock() return ret } diff --git a/g/container/gset/string_set.go b/g/container/gset/string_set.go index 1f9495c74..d16ed21e7 100644 --- a/g/container/gset/string_set.go +++ b/g/container/gset/string_set.go @@ -21,6 +21,15 @@ func NewStringSet() *StringSet { return &StringSet{M: make(map[string]struct{})} } +// 给定回调函数对原始内容进行遍历 +func (this *StringSet) Iterator(f func (v string)) { + this.RLock() + for k, _ := range this.M { + f(k) + } + this.RUnlock() +} + // 设置键 func (this *StringSet) Add(item string) *StringSet { if this.Contains(item) { diff --git a/g/os/groutine/groutine.go b/g/os/groutine/groutine.go new file mode 100644 index 000000000..5a1843e60 --- /dev/null +++ b/g/os/groutine/groutine.go @@ -0,0 +1,51 @@ +// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +// Goroutine池. +package groutine + +import ( + "gitee.com/johng/gf/g/container/glist" + "gitee.com/johng/gf/g/container/gset" + "sync" +) + +// goroutine池对象 +type Pool struct { + queue *glist.SafeList // 空闲任务队列*PoolJob) + pjobs *gset.InterfaceSet // 当前任务对象(*PoolJob) +} + +// goroutine任务 +type PoolJob struct { + mu sync.RWMutex + job chan func() // 当前任务(当为nil时表示关闭) + pool *Pool // 所属池 +} + +// 创建一个空的任务对象 +func (p *Pool) newJob() *PoolJob { + j := &PoolJob { + job : make(chan func(), 1), + pool : p, + } + j.start() + p.pjobs.Add(j) + return j +} + +// 添加任务对象到队列 +func (p *Pool) addJob(j *PoolJob) { + p.queue.PushBack(j) +} + +// 获取/创建任务 +func (p *Pool) getJob() *PoolJob { + if r := p.queue.PopFront(); r != nil { + return r.(*PoolJob) + } + return p.newJob() +} diff --git a/g/os/groutine/groutine_api.go b/g/os/groutine/groutine_api.go new file mode 100644 index 000000000..ea24960a9 --- /dev/null +++ b/g/os/groutine/groutine_api.go @@ -0,0 +1,32 @@ +// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +package groutine + +import ( + "gitee.com/johng/gf/g/container/gset" + "gitee.com/johng/gf/g/container/glist" +) + +// 创建goroutine池管理对象 +func New() *Pool { + return &Pool { + queue : glist.NewSafeList(), + pjobs : gset.NewInterfaceSet(), + } +} + +// 添加异步任务 +func (p *Pool) Add(f func()) { + p.getJob().setJob(f) +} + +// 关闭池,所有的任务将会停止,此后继续添加的任务将不会被执行 +func (p *Pool) Close() { + p.pjobs.Iterator(func(v interface{}){ + v.(*PoolJob).stop() + }) +} \ No newline at end of file diff --git a/g/os/groutine/groutine_job.go b/g/os/groutine/groutine_job.go new file mode 100644 index 000000000..8629c4b7f --- /dev/null +++ b/g/os/groutine/groutine_job.go @@ -0,0 +1,34 @@ +// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +package groutine + +// 开始任务 +func (j *PoolJob) start() { + go func() { + for { + if f := <- j.job; f != nil { + // 执行任务 + f() + // 执行完毕后添加到空闲队列 + j.pool.addJob(j) + } else { + break + } + } + }() +} + +// 关闭当前任务 +func (j *PoolJob) stop() { + j.job <- nil +} + +// 设置当前任务的执行函数 +func (j *PoolJob) setJob(f func()) { + j.job <- f +} + diff --git a/geg/os/groutine.go b/geg/os/groutine.go new file mode 100644 index 000000000..6113b20bd --- /dev/null +++ b/geg/os/groutine.go @@ -0,0 +1,27 @@ +package main + +import ( + "time" + "gitee.com/johng/gf/g/os/groutine" + "fmt" +) + +func job() { + time.Sleep(3*time.Second) + fmt.Println("job done") +} + +func main() { + p := groutine.New() + p.Add(job) + p.Add(job) + p.Add(job) + p.Add(job) + + + time.Sleep(1*time.Second) + + p.Close() + + time.Sleep(5*time.Second) +} diff --git a/geg/other/test.go b/geg/other/test.go index 2ca2821be..35397ac8e 100644 --- a/geg/other/test.go +++ b/geg/other/test.go @@ -1,9 +1,9 @@ package main import ( - "gitee.com/johng/gf/g/os/glog" + "fmt" ) func main() { - glog.Error("发生错误!") + fmt.Println(len(make(chan int, 10))) } \ No newline at end of file From 046ccd070986f7529f1e13caecee59c17244c2ae Mon Sep 17 00:00:00 2001 From: John Date: Mon, 15 Jan 2018 22:42:58 +0800 Subject: [PATCH 2/3] =?UTF-8?q?groutine=E6=80=A7=E8=83=BD=E6=B5=8B?= =?UTF-8?q?=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- g/os/groutine/groutine.go | 20 +++++++++++++++++--- g/os/groutine/groutine_api.go | 12 ++++++++---- g/os/groutine/groutine_job.go | 2 ++ g/os/groutine/groutine_test.go | 31 +++++++++++++++++++++++++++++++ geg/other/test.go | 13 ++++++++++++- 5 files changed, 70 insertions(+), 8 deletions(-) create mode 100644 g/os/groutine/groutine_test.go diff --git a/g/os/groutine/groutine.go b/g/os/groutine/groutine.go index 5a1843e60..f3c39d087 100644 --- a/g/os/groutine/groutine.go +++ b/g/os/groutine/groutine.go @@ -15,8 +15,9 @@ import ( // goroutine池对象 type Pool struct { - queue *glist.SafeList // 空闲任务队列*PoolJob) - pjobs *gset.InterfaceSet // 当前任务对象(*PoolJob) + jobs *gset.InterfaceSet // 当前任务对象(*PoolJob) + queue *glist.SafeList // 空闲任务队列(*PoolJob) + funcs []chan func() // 待处理任务操作 } // goroutine任务 @@ -26,6 +27,19 @@ type PoolJob struct { pool *Pool // 所属池 } +// 任务分配循环 +func (p *Pool) loop() { + go func() { + for { + if f := <- p.funcs; f != nil { + p.getJob().setJob(f) + } else { + return + } + } + }() +} + // 创建一个空的任务对象 func (p *Pool) newJob() *PoolJob { j := &PoolJob { @@ -33,7 +47,7 @@ func (p *Pool) newJob() *PoolJob { pool : p, } j.start() - p.pjobs.Add(j) + p.jobs.Add(j) return j } diff --git a/g/os/groutine/groutine_api.go b/g/os/groutine/groutine_api.go index ea24960a9..e116c2e5c 100644 --- a/g/os/groutine/groutine_api.go +++ b/g/os/groutine/groutine_api.go @@ -13,20 +13,24 @@ import ( // 创建goroutine池管理对象 func New() *Pool { - return &Pool { + p := &Pool { + jobs : gset.NewInterfaceSet(), queue : glist.NewSafeList(), - pjobs : gset.NewInterfaceSet(), + funcs : make(chan func(), 1000000), } + p.loop() + return p } // 添加异步任务 func (p *Pool) Add(f func()) { - p.getJob().setJob(f) + p.funcs <- f } // 关闭池,所有的任务将会停止,此后继续添加的任务将不会被执行 func (p *Pool) Close() { - p.pjobs.Iterator(func(v interface{}){ + p.funcs <- nil + p.jobs.Iterator(func(v interface{}){ v.(*PoolJob).stop() }) } \ No newline at end of file diff --git a/g/os/groutine/groutine_job.go b/g/os/groutine/groutine_job.go index 8629c4b7f..71ea8a117 100644 --- a/g/os/groutine/groutine_job.go +++ b/g/os/groutine/groutine_job.go @@ -13,6 +13,8 @@ func (j *PoolJob) start() { if f := <- j.job; f != nil { // 执行任务 f() + // 清空任务(GC可回收f对应资源) + j.job = nil // 执行完毕后添加到空闲队列 j.pool.addJob(j) } else { diff --git a/g/os/groutine/groutine_test.go b/g/os/groutine/groutine_test.go new file mode 100644 index 000000000..2258d7a9b --- /dev/null +++ b/g/os/groutine/groutine_test.go @@ -0,0 +1,31 @@ +package groutine_test + +import ( + "testing" + "gitee.com/johng/gf/g/os/groutine" +) + +func test() { + num := 0 + for i := 0; i < 1000000; i++ { + num += i + } +} + +var pool = groutine.New() + +func BenchmarkGroutine(b *testing.B) { + for i := 0; i < b.N; i++ { + pool.Add(test) + } + //pool.Close() +} + +//func BenchmarkGoRoutine(b *testing.B) { +// t := gtime.Microsecond() +// b.N = 100000 +// for i := 0; i < b.N; i++ { +// go test() +// } +// fmt.Println("BenchmarkGoRoutine costs:", gtime.Microsecond() - t) +//} \ No newline at end of file diff --git a/geg/other/test.go b/geg/other/test.go index 35397ac8e..b22f57484 100644 --- a/geg/other/test.go +++ b/geg/other/test.go @@ -1,9 +1,20 @@ package main import ( + "gitee.com/johng/gf/g/os/gtime" "fmt" + "gitee.com/johng/gf/g/container/glist" ) func main() { - fmt.Println(len(make(chan int, 10))) + + t1 := gtime.Microsecond() + c := make(chan func(), 10) + c <- func(){} + fmt.Println(gtime.Microsecond() - t1) + + t2 := gtime.Microsecond() + l := glist.NewSafeList() + l.PushBack(func() {}) + fmt.Println(gtime.Microsecond() - t2) } \ No newline at end of file From bbb50bf4eba979ca91e0236934e95f6a1d8dce08 Mon Sep 17 00:00:00 2001 From: John Date: Mon, 15 Jan 2018 23:26:56 +0800 Subject: [PATCH 3/3] =?UTF-8?q?=E6=94=B9=E8=BF=9Bgroutine=E8=AE=BE?= =?UTF-8?q?=E8=AE=A1=EF=BC=8C=E5=88=9D=E6=AD=A5=E5=AE=8C=E6=88=90=E6=80=A7?= =?UTF-8?q?=E8=83=BD=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- g/os/groutine/groutine_api.go | 31 ++++++++++++++--- g/os/groutine/groutine_job.go | 2 +- .../{groutine.go => groutine_pool.go} | 33 +++++-------------- g/os/groutine/groutine_test.go | 21 +++++++----- geg/other/test.go | 5 +-- 5 files changed, 51 insertions(+), 41 deletions(-) rename g/os/groutine/{groutine.go => groutine_pool.go} (56%) diff --git a/g/os/groutine/groutine_api.go b/g/os/groutine/groutine_api.go index e116c2e5c..d00e1a79b 100644 --- a/g/os/groutine/groutine_api.go +++ b/g/os/groutine/groutine_api.go @@ -4,19 +4,39 @@ // If a copy of the MIT was not distributed with this file, // You can obtain one at https://gitee.com/johng/gf. +// Goroutine池. +// 用于goroutine复用,提升异步操作执行效率. package groutine import ( + "math" + "sync" "gitee.com/johng/gf/g/container/gset" "gitee.com/johng/gf/g/container/glist" ) +// goroutine池对象 +type Pool struct { + jobs *gset.InterfaceSet // 当前任务对象(*PoolJob) + queue *glist.SafeList // 空闲任务队列(*PoolJob) + funcs *glist.SafeList // 待处理任务操作队列 + events chan struct{} // 任务操作处理事件(用于任务事件通知) +} + +// goroutine任务 +type PoolJob struct { + mu sync.RWMutex + job chan func() // 当前任务(当为nil时表示关闭) + pool *Pool // 所属池 +} + // 创建goroutine池管理对象 func New() *Pool { p := &Pool { - jobs : gset.NewInterfaceSet(), - queue : glist.NewSafeList(), - funcs : make(chan func(), 1000000), + jobs : gset.NewInterfaceSet(), + queue : glist.NewSafeList(), + funcs : glist.NewSafeList(), + events : make(chan struct{}, math.MaxUint32), } p.loop() return p @@ -24,12 +44,13 @@ func New() *Pool { // 添加异步任务 func (p *Pool) Add(f func()) { - p.funcs <- f + p.funcs.PushBack(f) + p.events <- struct{}{} } // 关闭池,所有的任务将会停止,此后继续添加的任务将不会被执行 func (p *Pool) Close() { - p.funcs <- nil + p.Add(nil) p.jobs.Iterator(func(v interface{}){ v.(*PoolJob).stop() }) diff --git a/g/os/groutine/groutine_job.go b/g/os/groutine/groutine_job.go index 71ea8a117..933dc5baa 100644 --- a/g/os/groutine/groutine_job.go +++ b/g/os/groutine/groutine_job.go @@ -26,7 +26,7 @@ func (j *PoolJob) start() { // 关闭当前任务 func (j *PoolJob) stop() { - j.job <- nil + j.setJob(nil) } // 设置当前任务的执行函数 diff --git a/g/os/groutine/groutine.go b/g/os/groutine/groutine_pool.go similarity index 56% rename from g/os/groutine/groutine.go rename to g/os/groutine/groutine_pool.go index f3c39d087..4aff9d7c6 100644 --- a/g/os/groutine/groutine.go +++ b/g/os/groutine/groutine_pool.go @@ -4,37 +4,20 @@ // If a copy of the MIT was not distributed with this file, // You can obtain one at https://gitee.com/johng/gf. -// Goroutine池. package groutine -import ( - "gitee.com/johng/gf/g/container/glist" - "gitee.com/johng/gf/g/container/gset" - "sync" -) - -// goroutine池对象 -type Pool struct { - jobs *gset.InterfaceSet // 当前任务对象(*PoolJob) - queue *glist.SafeList // 空闲任务队列(*PoolJob) - funcs []chan func() // 待处理任务操作 -} - -// goroutine任务 -type PoolJob struct { - mu sync.RWMutex - job chan func() // 当前任务(当为nil时表示关闭) - pool *Pool // 所属池 -} - // 任务分配循环 func (p *Pool) loop() { go func() { for { - if f := <- p.funcs; f != nil { - p.getJob().setJob(f) - } else { - return + // 阻塞监听任务事件 + if _, ok := <- p.events; ok { + // 如果任务为nil,表示池关闭 + if r := p.funcs.PopFront(); r != nil { + p.getJob().setJob(r.(func())) + } else { + return + } } } }() diff --git a/g/os/groutine/groutine_test.go b/g/os/groutine/groutine_test.go index 2258d7a9b..2d0bcee75 100644 --- a/g/os/groutine/groutine_test.go +++ b/g/os/groutine/groutine_test.go @@ -1,3 +1,11 @@ +// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +// go test *.go -bench=".*" + package groutine_test import ( @@ -21,11 +29,8 @@ func BenchmarkGroutine(b *testing.B) { //pool.Close() } -//func BenchmarkGoRoutine(b *testing.B) { -// t := gtime.Microsecond() -// b.N = 100000 -// for i := 0; i < b.N; i++ { -// go test() -// } -// fmt.Println("BenchmarkGoRoutine costs:", gtime.Microsecond() - t) -//} \ No newline at end of file +func BenchmarkGoRoutine(b *testing.B) { + for i := 0; i < b.N; i++ { + go test() + } +} \ No newline at end of file diff --git a/geg/other/test.go b/geg/other/test.go index b22f57484..3f77e3182 100644 --- a/geg/other/test.go +++ b/geg/other/test.go @@ -4,13 +4,14 @@ import ( "gitee.com/johng/gf/g/os/gtime" "fmt" "gitee.com/johng/gf/g/container/glist" + "math" ) func main() { t1 := gtime.Microsecond() - c := make(chan func(), 10) - c <- func(){} + c := make(chan struct{}, math.MaxInt64) + c <- struct{}{} fmt.Println(gtime.Microsecond() - t1) t2 := gtime.Microsecond()