From dafffd46775fa86cccbe3b435c375d26a193458e Mon Sep 17 00:00:00 2001 From: John Date: Fri, 2 Feb 2018 16:56:53 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9Egchan=E5=8C=85=EF=BC=8C?= =?UTF-8?q?=E5=AF=B9channel=E8=BF=9B=E8=A1=8C=E7=AE=80=E5=8D=95=E5=B0=81?= =?UTF-8?q?=E8=A3=85=EF=BC=8C=E4=BE=BF=E4=BA=8E=E4=BC=98=E9=9B=85=E7=9A=84?= =?UTF-8?q?channel=E6=93=8D=E4=BD=9C=E5=A4=84=E7=90=86=EF=BC=9B=E5=AE=8C?= =?UTF-8?q?=E5=96=84gqueue=E5=8C=85=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- g/container/gchan/gchan.go | 54 +++++++++++++++++++++++ g/container/gchan/gchan_test.go | 32 ++++++++++++++ g/container/gqueue/gqueue.go | 72 +++++++++++++++++++++++-------- g/container/gqueue/gqueue_test.go | 37 ++++++++++++++++ geg/other/test.go | 22 ++++------ 5 files changed, 185 insertions(+), 32 deletions(-) create mode 100644 g/container/gchan/gchan.go create mode 100644 g/container/gchan/gchan_test.go create mode 100644 g/container/gqueue/gqueue_test.go diff --git a/g/container/gchan/gchan.go b/g/container/gchan/gchan.go new file mode 100644 index 000000000..4c346fc7b --- /dev/null +++ b/g/container/gchan/gchan.go @@ -0,0 +1,54 @@ +// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +// 优雅的channel操作封装 +package gchan + +import ( + "sync" + "sync/atomic" + "errors" +) + +type Chan struct { + mu sync.RWMutex + list chan interface{} + closed int32 +} + +func New(limit int) *Chan { + return &Chan { + list : make(chan interface{}, limit), + } +} + +// 将数据压入队列 +func (q *Chan) Push(v interface{}) error { + if atomic.LoadInt32(&q.closed) > 0 { + return errors.New("channel closed") + } + q.list <- v + return nil +} + +// 先进先出地从队列取出一项数据,当没有数据可获取时,阻塞等待 +// 第二个返回值表示队列是否关闭 +func (q *Chan) Pop() interface{} { + return <- q.list +} + +// 关闭队列(通知所有通过Pop阻塞的协程退出) +func (q *Chan) Close() { + if atomic.LoadInt32(&q.closed) == 0 { + atomic.StoreInt32(&q.closed, 1) + close(q.list) + } +} + +// 获取当前队列大小 +func (q *Chan) Size() int { + return len(q.list) +} \ No newline at end of file diff --git a/g/container/gchan/gchan_test.go b/g/container/gchan/gchan_test.go new file mode 100644 index 000000000..59413a168 --- /dev/null +++ b/g/container/gchan/gchan_test.go @@ -0,0 +1,32 @@ +// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +// go test *.go -bench=".*" + +package gchan_test + +import ( + "testing" + "gitee.com/johng/gf/g/container/gchan" +) + +var length = 10000000 +var q1 = gchan.New(length) +var q2 = make(chan int, length) + +func BenchmarkGqueuePushAndPop(b *testing.B) { + for i := 0; i < b.N; i++ { + q1.Push(i) + q1.Pop() + } +} + +func BenchmarkChannelPushAndPop(b *testing.B) { + for i := 0; i < b.N; i++ { + q2 <- i + <- q2 + } +} \ No newline at end of file diff --git a/g/container/gqueue/gqueue.go b/g/container/gqueue/gqueue.go index bfef610a4..bf1a9e44a 100644 --- a/g/container/gqueue/gqueue.go +++ b/g/container/gqueue/gqueue.go @@ -11,72 +11,106 @@ import ( "math" "sync" "container/list" + "sync/atomic" + "errors" ) type Queue struct { mu sync.RWMutex - list *list.List - events chan struct{} + list *list.List // 数据队列 + limit int // 队列限制大小 + limits chan struct{} // 用于队列大小限制 + events chan struct{} // 用于内部数据写入事件通知 + closed int32 // 队列是否关闭 } -func New() *Queue { +// 队列大小为非必须参数,默认不限制 +func New(limit...int) *Queue { + size := 0 + if len(limit) > 0 { + size = limit[0] + } return &Queue { list : list.New(), + limits : make(chan struct{}, size), events : make(chan struct{}, math.MaxInt64), } } // 将数据压入队列, 队尾 -func (q *Queue) PushBack(v interface{}) { +func (q *Queue) PushBack(v interface{}) error { + if atomic.LoadInt32(&q.closed) > 0 { + return errors.New("queue closed") + } + if q.limit > 0 { + q.limits <- struct{}{} + } q.mu.Lock() q.list.PushBack(v) q.mu.Unlock() q.events <- struct{}{} + return nil } // 将数据压入队列, 队头 -func (q *Queue) PushFront(v interface{}) { +func (q *Queue) PushFront(v interface{}) error { + if atomic.LoadInt32(&q.closed) > 0 { + return errors.New("queue closed") + } + if q.limit > 0 { + q.limits <- struct{}{} + } q.mu.Lock() q.list.PushFront(v) q.mu.Unlock() q.events <- struct{}{} + return nil } // 从队头先进先出地从队列取出一项数据,当没有数据可获取时,阻塞等待 -// 第二个返回值表示队列是否关闭 -func (q *Queue) PopFront() (interface{}, bool) { +func (q *Queue) PopFront() interface{} { select { case <- q.events: + if q.limit > 0 { + <- q.limits + } q.mu.Lock() if elem := q.list.Front(); elem != nil { item := q.list.Remove(elem) q.mu.Unlock() - return item, true + return item } q.mu.Unlock() } - return nil, false + return nil } // 从队尾先进先出地从队列取出一项数据,当没有数据可获取时,阻塞等待 // 第二个返回值表示队列是否关闭 -func (q *Queue) PopBack() (interface{}, bool) { +func (q *Queue) PopBack() interface{} { select { - case <- q.events: - q.mu.Lock() - if elem := q.list.Front(); elem != nil { - item := q.list.Remove(elem) + case <- q.events: + if q.limit > 0 { + <- q.limits + } + q.mu.Lock() + if elem := q.list.Front(); elem != nil { + item := q.list.Remove(elem) + q.mu.Unlock() + return item + } q.mu.Unlock() - return item, true - } - q.mu.Unlock() } - return nil, false + return nil } // 关闭队列(通知所有通过Pop阻塞的协程退出) func (q *Queue) Close() { - q.events <- struct{}{} + if atomic.LoadInt32(&q.closed) == 0 { + atomic.StoreInt32(&q.closed, 1) + close(q.limits) + close(q.events) + } } // 获取当前队列大小 diff --git a/g/container/gqueue/gqueue_test.go b/g/container/gqueue/gqueue_test.go new file mode 100644 index 000000000..f177ce62e --- /dev/null +++ b/g/container/gqueue/gqueue_test.go @@ -0,0 +1,37 @@ +// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +// go test *.go -bench=".*" + +package gqueue_test + +import ( + "testing" + "gitee.com/johng/gf/g/container/gqueue" +) + +var length = 10000000 +var q = gqueue.New(length) + +func BenchmarkGqueueNew1000W(b *testing.B) { + for i := 0; i < b.N; i++ { + gqueue.New(length) + } +} + +func BenchmarkGqueuePush(b *testing.B) { + for i := 0; i < b.N; i++ { + q.PushBack(i) + } +} + +func BenchmarkGqueuePushAndPop(b *testing.B) { + for i := 0; i < b.N; i++ { + q.PushBack(i) + q.PopFront() + } +} + diff --git a/geg/other/test.go b/geg/other/test.go index d9fc686f5..5e842e6a1 100644 --- a/geg/other/test.go +++ b/geg/other/test.go @@ -1,25 +1,19 @@ package main import ( - "fmt" "time" ) func main() { events1 := make(chan int, 100) events2 := make(chan int, 100) - go func() { - for{ - select { - case t1 := <-events1: - fmt.Println(t1) - case t2 := <-events2: - fmt.Println(t2) - - } - } - - }() + //go func() { + // for{ + // v := <- events1 + // fmt.Println(v) + // } + // + //}() go func() { time.Sleep(2*time.Second) @@ -28,6 +22,8 @@ func main() { time.Sleep(2*time.Second) close(events1) close(events2) + events1 <- 1 + events2 <- 2 }() select {