改进和完善并发安全容器部分方法,初步完成goroutine池包,待测试

This commit is contained in:
John
2018-01-15 17:23:22 +08:00
parent 9615ff2ef1
commit 4b5ad193c1
9 changed files with 230 additions and 17 deletions

View File

@ -78,6 +78,18 @@ func (this *SafeList) PopBack() interface{} {
return nil
}
// 从链表头端出栈数据项(删除)
func (this *SafeList) PopFront() interface{} {
this.Lock()
if elem := this.L.Front(); elem != nil {
item := this.L.Remove(elem)
this.Unlock()
return item
}
this.Unlock()
return nil
}
// 批量从链表尾端出栈数据项(删除)
func (this *SafeList) BatchPopBack(max int) []interface{} {
this.Lock()
@ -90,35 +102,66 @@ func (this *SafeList) BatchPopBack(max int) []interface{} {
if count > max {
count = max
}
items := make([]interface{}, 0, count)
items := make([]interface{}, count)
for i := 0; i < count; i++ {
item := this.L.Remove(this.L.Back())
items = append(items, item)
items[i] = this.L.Remove(this.L.Back())
}
this.Unlock()
return items
}
// 批量从链表尾端依次获取所有数据
func (this *SafeList) PopBackAll() []interface{} {
// 批量从链表头端出栈数据项(删除)
func (this *SafeList) BatchPopFront(max int) []interface{} {
this.Lock()
count := this.L.Len()
if count == 0 {
this.Unlock()
return []interface{}{}
}
items := make([]interface{}, 0, count)
for i := 0; i < count; i++ {
item := this.L.Remove(this.L.Back())
items = append(items, item)
if count > max {
count = max
}
items := make([]interface{}, count)
for i := 0; i < count; i++ {
items[i] = this.L.Remove(this.L.Front())
}
this.Unlock()
return items
}
// 批量从链表尾端依次获取所有数据(删除)
func (this *SafeList) PopBackAll() []interface{} {
this.Lock()
count := this.L.Len()
if count == 0 {
this.Unlock()
return []interface{}{}
}
items := make([]interface{}, count)
for i := 0; i < count; i++ {
items[i] = this.L.Remove(this.L.Back())
}
this.Unlock()
return items
}
// 批量从链表头端依次获取所有数据(删除)
func (this *SafeList) PopFrontAll() []interface{} {
this.Lock()
count := this.L.Len()
if count == 0 {
this.Unlock()
return []interface{}{}
}
items := make([]interface{}, count)
for i := 0; i < count; i++ {
items[i] = this.L.Remove(this.L.Front())
}
this.Unlock()
return items
}
// 删除数据项
func (this *SafeList) Remove(e *list.Element) interface{} {
this.Lock()

View File

@ -22,6 +22,15 @@ func NewIntSet() *IntSet {
return &IntSet{M: make(map[int]struct{})}
}
// 给定回调函数对原始内容进行遍历
func (this *IntSet) Iterator(f func (v int)) {
this.RLock()
for k, _ := range this.M {
f(k)
}
this.RUnlock()
}
// 设置键
func (this *IntSet) Add(item int) *IntSet {
if this.Contains(item) {

View File

@ -21,7 +21,16 @@ func NewInterfaceSet() *InterfaceSet {
return &InterfaceSet{M: make(map[interface{}]struct{})}
}
// 设置键
// 给定回调函数对原始内容进行遍历
func (this *InterfaceSet) Iterator(f func (v interface{})) {
this.RLock()
for k, _ := range this.M {
f(k)
}
this.RUnlock()
}
// 添加
func (this *InterfaceSet) Add(item interface{}) *InterfaceSet {
if this.Contains(item) {
return this
@ -32,7 +41,7 @@ func (this *InterfaceSet) Add(item interface{}) *InterfaceSet {
return this
}
// 批量添加设置键
// 批量添加
func (this *InterfaceSet) BatchAdd(items []interface{}) *InterfaceSet {
count := len(items)
if count == 0 {
@ -97,13 +106,12 @@ func (this *InterfaceSet) Clear() {
// 转换为数组
func (this *InterfaceSet) Slice() []interface{} {
this.RLock()
i := 0
ret := make([]interface{}, len(this.M))
i := 0
for item := range this.M {
ret[i] = item
i++
}
this.RUnlock()
return ret
}

View File

@ -21,6 +21,15 @@ func NewStringSet() *StringSet {
return &StringSet{M: make(map[string]struct{})}
}
// 给定回调函数对原始内容进行遍历
func (this *StringSet) Iterator(f func (v string)) {
this.RLock()
for k, _ := range this.M {
f(k)
}
this.RUnlock()
}
// 设置键
func (this *StringSet) Add(item string) *StringSet {
if this.Contains(item) {

51
g/os/groutine/groutine.go Normal file
View File

@ -0,0 +1,51 @@
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// Goroutine池.
package groutine
import (
"gitee.com/johng/gf/g/container/glist"
"gitee.com/johng/gf/g/container/gset"
"sync"
)
// goroutine池对象
type Pool struct {
queue *glist.SafeList // 空闲任务队列*PoolJob)
pjobs *gset.InterfaceSet // 当前任务对象(*PoolJob)
}
// goroutine任务
type PoolJob struct {
mu sync.RWMutex
job chan func() // 当前任务(当为nil时表示关闭)
pool *Pool // 所属池
}
// 创建一个空的任务对象
func (p *Pool) newJob() *PoolJob {
j := &PoolJob {
job : make(chan func(), 1),
pool : p,
}
j.start()
p.pjobs.Add(j)
return j
}
// 添加任务对象到队列
func (p *Pool) addJob(j *PoolJob) {
p.queue.PushBack(j)
}
// 获取/创建任务
func (p *Pool) getJob() *PoolJob {
if r := p.queue.PopFront(); r != nil {
return r.(*PoolJob)
}
return p.newJob()
}

View File

@ -0,0 +1,32 @@
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package groutine
import (
"gitee.com/johng/gf/g/container/gset"
"gitee.com/johng/gf/g/container/glist"
)
// 创建goroutine池管理对象
func New() *Pool {
return &Pool {
queue : glist.NewSafeList(),
pjobs : gset.NewInterfaceSet(),
}
}
// 添加异步任务
func (p *Pool) Add(f func()) {
p.getJob().setJob(f)
}
// 关闭池,所有的任务将会停止,此后继续添加的任务将不会被执行
func (p *Pool) Close() {
p.pjobs.Iterator(func(v interface{}){
v.(*PoolJob).stop()
})
}

View File

@ -0,0 +1,34 @@
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package groutine
// 开始任务
func (j *PoolJob) start() {
go func() {
for {
if f := <- j.job; f != nil {
// 执行任务
f()
// 执行完毕后添加到空闲队列
j.pool.addJob(j)
} else {
break
}
}
}()
}
// 关闭当前任务
func (j *PoolJob) stop() {
j.job <- nil
}
// 设置当前任务的执行函数
func (j *PoolJob) setJob(f func()) {
j.job <- f
}

27
geg/os/groutine.go Normal file
View File

@ -0,0 +1,27 @@
package main
import (
"time"
"gitee.com/johng/gf/g/os/groutine"
"fmt"
)
func job() {
time.Sleep(3*time.Second)
fmt.Println("job done")
}
func main() {
p := groutine.New()
p.Add(job)
p.Add(job)
p.Add(job)
p.Add(job)
time.Sleep(1*time.Second)
p.Close()
time.Sleep(5*time.Second)
}

View File

@ -1,9 +1,9 @@
package main
import (
"gitee.com/johng/gf/g/os/glog"
"fmt"
)
func main() {
glog.Error("发生错误!")
fmt.Println(len(make(chan int, 10)))
}