diff --git a/g/container/gchan/gchan.go b/g/container/gchan/gchan.go index 8c8b1a5a1..67b3ba906 100644 --- a/g/container/gchan/gchan.go +++ b/g/container/gchan/gchan.go @@ -40,8 +40,7 @@ func (q *Chan) Pop() interface{} { // 关闭队列(通知所有通过Pop阻塞的协程退出) func (q *Chan) Close() { - if !q.closed.Val() { - q.closed.Set(true) + if !q.closed.Set(true) { close(q.list) } } diff --git a/g/container/gqueue/gqueue.go b/g/container/gqueue/gqueue.go index f728344bb..7676685b7 100644 --- a/g/container/gqueue/gqueue.go +++ b/g/container/gqueue/gqueue.go @@ -29,26 +29,22 @@ type Queue struct { } const ( - // 默认临时队列大小,注意是临时的 - gDEFAULT_QUEUE_SIZE = 10000 + // 动态队列缓冲区大小 + gQUEUE_SIZE = 10000 ) // 队列大小为非必须参数,默认不限制 func New(limit...int) *Queue { - size := gDEFAULT_QUEUE_SIZE - if len(limit) > 0 { - size = limit[0] - } q := &Queue { - list : glist.New(), - queue : make(chan interface{}, size), - events : make(chan struct{}, math.MaxInt32), closeChan : make(chan struct{}, 0), } if len(limit) > 0 { - q.limit = size + q.limit = limit[0] + q.queue = make(chan interface{}, limit[0]) } else { - // 如果是动态队列大小,那么额外会运行一个goroutine + q.list = glist.New() + q.queue = make(chan interface{}, gQUEUE_SIZE) + q.events = make(chan struct{}, math.MaxInt32) go q.startAsyncLoop() } return q diff --git a/g/container/gqueue/gqueue_test.go b/g/container/gqueue/gqueue_test.go index 97ba32183..e55dacec8 100644 --- a/g/container/gqueue/gqueue_test.go +++ b/g/container/gqueue/gqueue_test.go @@ -13,36 +13,38 @@ import ( "gitee.com/johng/gf/g/container/gqueue" ) -var length = 10000000 +var bn = 20000000 +var length = 1000000 var qstatic = gqueue.New(length) var qdynamic = gqueue.New() var cany = make(chan interface{}, length) -var cint = make(chan int, length) -func Benchmark_GqueueStaticPushAndPop(b *testing.B) { +func Benchmark_Gqueue_StaticPushAndPop(b *testing.B) { + b.N = bn for i := 0; i < b.N; i++ { qstatic.Push(i) qstatic.Pop() } } -func Benchmark_GqueueDynamicPush(b *testing.B) { +func Benchmark_Gqueue_DynamicPush(b *testing.B) { + b.N = bn for i := 0; i < b.N; i++ { qdynamic.Push(i) } } -func Benchmark_ChannelInterfacePushAndPop(b *testing.B) { +func Benchmark_Gqueue_DynamicPop(b *testing.B) { + b.N = bn + for i := 0; i < b.N; i++ { + qdynamic.Pop() + } +} + +func Benchmark_Channel_PushAndPop(b *testing.B) { + b.N = bn for i := 0; i < b.N; i++ { cany <- i <- cany } } - -func Benchmark_ChannelIntPushAndPop(b *testing.B) { - for i := 0; i < b.N; i++ { - cint <- i - <- cint - } -} - diff --git a/geg/other/test.go b/geg/other/test.go index e68980d3d..f8db660d5 100644 --- a/geg/other/test.go +++ b/geg/other/test.go @@ -2,11 +2,32 @@ package main import ( "fmt" - "gitee.com/johng/gf/g/os/gfile" + "time" + "gitee.com/johng/gf/g/os/gtime" + "gitee.com/johng/gf/g/container/gqueue" ) func main() { - fmt.Println(gfile.TempDir()) - fmt.Println(gfile.SelfDir()) - fmt.Println(gfile.Pwd()) -} + q := gqueue.New() + // 数据生产者,每隔1秒往队列写数据 + gtime.SetInterval(time.Second, func() bool { + v := gtime.Now().String() + q.Push(v) + fmt.Println("Push:", v) + return true + }) + + // 3秒后关闭队列 + gtime.SetTimeout(3*time.Second, func() { + q.Close() + }) + + // 消费者,不停读取队列数据并输出到终端 + for { + if v := q.Pop(); v != nil { + fmt.Println("Pop:", v) + } else { + break + } + } +} \ No newline at end of file