mirror of
https://gitee.com/johng/gf
synced 2026-06-07 02:12:11 +08:00
improve grpool
This commit is contained in:
@ -19,10 +19,10 @@ import (
|
||||
|
||||
// goroutine池对象
|
||||
type Pool struct {
|
||||
workerChan chan struct{} // 使用channel限制最大的goroutine数量
|
||||
workerNum *gtype.Int // 当前正在运行的worker/goroutine数量
|
||||
jobQueue *glist.List // 待处理任务操作队列
|
||||
jobEvents chan struct{} // 任务添加事件(jobQueue+jobEvents结合使用)
|
||||
limit int // 最大的goroutine数量限制
|
||||
count *gtype.Int // 当前正在运行的goroutine数量
|
||||
list *glist.List // 待处理任务操作列表
|
||||
events chan struct{} // 任务添加事件
|
||||
closed *gtype.Bool
|
||||
}
|
||||
|
||||
@ -30,18 +30,18 @@ type Pool struct {
|
||||
// 该对象与进程同生命周期,无需Close
|
||||
var defaultPool = New()
|
||||
|
||||
// 创建goroutine池管理对象, 参数用于限制限制最大的goroutine数量/线程数/worker数量,非必需参数,默认不做限制
|
||||
// 创建goroutine池管理对象,参数用于限制限制最大的goroutine数量,非必需参数,默认不做限制
|
||||
func New(size...int) *Pool {
|
||||
s := 0
|
||||
limit := -1
|
||||
if len(size) > 0 {
|
||||
s = size[0]
|
||||
limit = size[0]
|
||||
}
|
||||
p := &Pool {
|
||||
workerNum : gtype.NewInt(),
|
||||
jobQueue : glist.New(),
|
||||
jobEvents : make(chan struct{}, math.MaxInt32),
|
||||
workerChan : make(chan struct{}, s),
|
||||
closed : gtype.NewBool(),
|
||||
limit : limit,
|
||||
count : gtype.NewInt(),
|
||||
list : glist.New(),
|
||||
events : make(chan struct{}, math.MaxInt32),
|
||||
closed : gtype.NewBool(),
|
||||
}
|
||||
return p
|
||||
}
|
||||
@ -53,65 +53,57 @@ func Add(f func()) error {
|
||||
|
||||
// 查询当前goroutine总数
|
||||
func Size() int {
|
||||
return defaultPool.workerNum.Val()
|
||||
return defaultPool.count.Val()
|
||||
}
|
||||
|
||||
// 查询当前等待处理的任务总数
|
||||
func Jobs() int {
|
||||
return len(defaultPool.jobEvents)
|
||||
return len(defaultPool.events)
|
||||
}
|
||||
|
||||
// 添加异步任务
|
||||
func (p *Pool) Add(f func()) error {
|
||||
p.jobQueue.PushBack(f)
|
||||
p.jobEvents <- struct{}{}
|
||||
p.list.PushBack(f)
|
||||
p.events <- struct{}{}
|
||||
// 判断是否创建新的worker
|
||||
if p.Jobs() > 1 || p.workerNum.Val() == 0 {
|
||||
p.ForkWorker()
|
||||
if p.list.Len() > 1 || p.count.Val() == 0 {
|
||||
p.fork()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 查询当前goroutine worker总数
|
||||
// 查询当前goroutine总数
|
||||
func (p *Pool) Size() int {
|
||||
return p.workerNum.Val()
|
||||
return p.count.Val()
|
||||
}
|
||||
|
||||
// 查询当前等待处理的任务总数
|
||||
func (p *Pool) Jobs() int {
|
||||
return p.jobQueue.Len()
|
||||
return p.list.Len()
|
||||
}
|
||||
|
||||
// 创建新的worker执行任务
|
||||
func (p *Pool) ForkWorker() {
|
||||
if cap(p.workerChan) > 0 {
|
||||
// 如果worker数量已经达到限制,那么不创建新worker,直接返回
|
||||
if p.workerNum.Val() == cap(p.workerChan) {
|
||||
return
|
||||
}
|
||||
p.workerNum.Add(1)
|
||||
p.workerChan <- struct{}{}
|
||||
} else {
|
||||
p.workerNum.Add(1)
|
||||
func (p *Pool) fork() {
|
||||
// 如果worker数量已经达到限制,那么不创建新worker,直接返回
|
||||
if p.count.Val() == p.limit {
|
||||
return
|
||||
}
|
||||
p.count.Add(1)
|
||||
go func() {
|
||||
for !p.closed.Val() {
|
||||
select {
|
||||
case <- p.jobEvents:
|
||||
if job := p.jobQueue.PopFront(); job != nil {
|
||||
case <- p.events:
|
||||
if job := p.list.PopFront(); job != nil {
|
||||
job.(func())()
|
||||
} else {
|
||||
goto WorkerDone
|
||||
p.count.Add(-1)
|
||||
return
|
||||
}
|
||||
default:
|
||||
goto WorkerDone
|
||||
p.count.Add(-1)
|
||||
return
|
||||
}
|
||||
}
|
||||
WorkerDone:
|
||||
p.workerNum.Add(-1)
|
||||
if cap(p.workerChan) > 0 {
|
||||
<- p.workerChan
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
|
||||
57
g/os/grpool/grpool_unit_test.go
Normal file
57
g/os/grpool/grpool_unit_test.go
Normal file
@ -0,0 +1,57 @@
|
||||
// Copyright 2017 gf Author(https://github.com/gogf/gf). All Rights Reserved.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the MIT License.
|
||||
// If a copy of the MIT was not distributed with this file,
|
||||
// You can obtain one at https://github.com/gogf/gf.
|
||||
|
||||
// go test *.go -bench=".*" -count=1
|
||||
|
||||
package grpool_test
|
||||
|
||||
import (
|
||||
"github.com/gogf/gf/g/container/garray"
|
||||
"github.com/gogf/gf/g/os/grpool"
|
||||
"github.com/gogf/gf/g/test/gtest"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
|
||||
func Test_Basic(t *testing.T) {
|
||||
gtest.Case(t, func() {
|
||||
wg := sync.WaitGroup{}
|
||||
array := garray.NewArray()
|
||||
size := 100000
|
||||
wg.Add(size)
|
||||
for i := 0; i < size; i++ {
|
||||
grpool.Add(func() {
|
||||
array.Append(1)
|
||||
wg.Done()
|
||||
})
|
||||
}
|
||||
wg.Wait()
|
||||
gtest.Assert(array.Len(), size)
|
||||
})
|
||||
|
||||
gtest.Case(t, func() {
|
||||
array := garray.NewArray()
|
||||
size := 100000
|
||||
pool := grpool.New(10000)
|
||||
for i := 0; i < size; i++ {
|
||||
pool.Add(func() {
|
||||
array.Append(1)
|
||||
time.Sleep(2*time.Second)
|
||||
})
|
||||
}
|
||||
time.Sleep(time.Second)
|
||||
gtest.Assert(pool.Size(), 10000)
|
||||
gtest.Assert(pool.Jobs(), 90000)
|
||||
gtest.Assert(array.Len(), 10000)
|
||||
pool.Close()
|
||||
time.Sleep(2*time.Second)
|
||||
gtest.Assert(pool.Size(), 10000)
|
||||
gtest.Assert(pool.Jobs(), 90000)
|
||||
gtest.Assert(array.Len(), 10000)
|
||||
})
|
||||
}
|
||||
@ -1,16 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/gogf/gf/g/container/gpool"
|
||||
"time"
|
||||
)
|
||||
|
||||
func main() {
|
||||
p := gpool.New(60000, nil, func(i interface{}) {
|
||||
fmt.Println("expired")
|
||||
})
|
||||
p.Put(1)
|
||||
time.Sleep(10000*time.Second)
|
||||
fmt.Println(p.Get())
|
||||
}
|
||||
32
geg/container/gpool/gpool_expirefunc.go
Normal file
32
geg/container/gpool/gpool_expirefunc.go
Normal file
@ -0,0 +1,32 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"github.com/gogf/gf/g/container/gpool"
|
||||
"github.com/gogf/gf/g/net/gtcp"
|
||||
"github.com/gogf/gf/g/os/glog"
|
||||
"time"
|
||||
)
|
||||
|
||||
func main() {
|
||||
// 创建对象复用池,对象过期时间为3000毫秒,并给定创建及销毁方法
|
||||
p := gpool.New(3000, func() (interface{}, error) {
|
||||
return gtcp.NewConn("www.baidu.com:80")
|
||||
}, func(i interface{}) {
|
||||
glog.Println("expired")
|
||||
i.(*gtcp.Conn).Close()
|
||||
})
|
||||
conn, err := p.Get()
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
result, err := conn.(*gtcp.Conn).SendRecv([]byte("HEAD / HTTP/1.1\n\n"), -1)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
fmt.Println(string(result))
|
||||
// 丢回池中以便重复使用
|
||||
p.Put(conn)
|
||||
// 等待一定时间观察过期方法调用
|
||||
time.Sleep(4*time.Second)
|
||||
}
|
||||
@ -1,14 +1,73 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
"fmt"
|
||||
"github.com/gogf/gf/g/container/gmap"
|
||||
)
|
||||
|
||||
func main() {
|
||||
log.Printf("111")
|
||||
log.Printf("111")
|
||||
log.Print("222")
|
||||
log.Print("222")
|
||||
log.Println("333")
|
||||
log.Println("333")
|
||||
// 创建一个默认的gmap对象,
|
||||
// 默认情况下该gmap对象支持并发安全特性,
|
||||
// 初始化时可以给定true参数关闭并发安全特性,当做一个普通的map使用。
|
||||
m := gmap.New()
|
||||
|
||||
// 设置键值对
|
||||
for i := 0; i < 10; i++ {
|
||||
m.Set(i, i)
|
||||
}
|
||||
// 查询大小
|
||||
fmt.Println(m.Size())
|
||||
// 批量设置键值对(不同的数据类型对象参数不同)
|
||||
m.Sets(map[interface{}]interface{}{
|
||||
10 : 10,
|
||||
11 : 11,
|
||||
})
|
||||
fmt.Println(m.Size())
|
||||
|
||||
// 查询是否存在
|
||||
fmt.Println(m.Contains(1))
|
||||
|
||||
// 查询键值
|
||||
fmt.Println(m.Get(1))
|
||||
|
||||
// 删除数据项
|
||||
m.Remove(9)
|
||||
fmt.Println(m.Size())
|
||||
|
||||
// 批量删除
|
||||
m.Removes([]interface{}{10, 11})
|
||||
fmt.Println(m.Size())
|
||||
|
||||
// 当前键名列表(随机排序)
|
||||
fmt.Println(m.Keys())
|
||||
// 当前键值列表(随机排序)
|
||||
fmt.Println(m.Values())
|
||||
|
||||
// 查询键名,当键值不存在时,写入给定的默认值
|
||||
fmt.Println(m.GetOrSet(100, 100))
|
||||
|
||||
// 删除键值对,并返回对应的键值
|
||||
fmt.Println(m.Remove(100))
|
||||
|
||||
// 遍历map
|
||||
m.Iterator(func(k interface{}, v interface{}) bool {
|
||||
fmt.Printf("%v:%v ", k, v)
|
||||
return true
|
||||
})
|
||||
|
||||
// 自定义写锁操作
|
||||
m.LockFunc(func(m map[interface{}]interface{}) {
|
||||
m[99] = 99
|
||||
})
|
||||
|
||||
// 自定义读锁操作
|
||||
m.RLockFunc(func(m map[interface{}]interface{}) {
|
||||
fmt.Println(m[99])
|
||||
})
|
||||
|
||||
// 清空map
|
||||
m.Clear()
|
||||
|
||||
// 判断map是否为空
|
||||
fmt.Println(m.IsEmpty())
|
||||
}
|
||||
Reference in New Issue
Block a user