mirror of
https://gitee.com/johng/gf
synced 2026-06-06 16:21:40 +08:00
新增gchan包,对channel进行简单封装,便于优雅的channel操作处理;完善gqueue包功能
This commit is contained in:
54
g/container/gchan/gchan.go
Normal file
54
g/container/gchan/gchan.go
Normal file
@ -0,0 +1,54 @@
|
||||
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the MIT License.
|
||||
// If a copy of the MIT was not distributed with this file,
|
||||
// You can obtain one at https://gitee.com/johng/gf.
|
||||
|
||||
// 优雅的channel操作封装
|
||||
package gchan
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"errors"
|
||||
)
|
||||
|
||||
type Chan struct {
|
||||
mu sync.RWMutex
|
||||
list chan interface{}
|
||||
closed int32
|
||||
}
|
||||
|
||||
func New(limit int) *Chan {
|
||||
return &Chan {
|
||||
list : make(chan interface{}, limit),
|
||||
}
|
||||
}
|
||||
|
||||
// 将数据压入队列
|
||||
func (q *Chan) Push(v interface{}) error {
|
||||
if atomic.LoadInt32(&q.closed) > 0 {
|
||||
return errors.New("channel closed")
|
||||
}
|
||||
q.list <- v
|
||||
return nil
|
||||
}
|
||||
|
||||
// 先进先出地从队列取出一项数据,当没有数据可获取时,阻塞等待
|
||||
// 第二个返回值表示队列是否关闭
|
||||
func (q *Chan) Pop() interface{} {
|
||||
return <- q.list
|
||||
}
|
||||
|
||||
// 关闭队列(通知所有通过Pop阻塞的协程退出)
|
||||
func (q *Chan) Close() {
|
||||
if atomic.LoadInt32(&q.closed) == 0 {
|
||||
atomic.StoreInt32(&q.closed, 1)
|
||||
close(q.list)
|
||||
}
|
||||
}
|
||||
|
||||
// 获取当前队列大小
|
||||
func (q *Chan) Size() int {
|
||||
return len(q.list)
|
||||
}
|
||||
32
g/container/gchan/gchan_test.go
Normal file
32
g/container/gchan/gchan_test.go
Normal file
@ -0,0 +1,32 @@
|
||||
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the MIT License.
|
||||
// If a copy of the MIT was not distributed with this file,
|
||||
// You can obtain one at https://gitee.com/johng/gf.
|
||||
|
||||
// go test *.go -bench=".*"
|
||||
|
||||
package gchan_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"gitee.com/johng/gf/g/container/gchan"
|
||||
)
|
||||
|
||||
var length = 10000000
|
||||
var q1 = gchan.New(length)
|
||||
var q2 = make(chan int, length)
|
||||
|
||||
func BenchmarkGqueuePushAndPop(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
q1.Push(i)
|
||||
q1.Pop()
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkChannelPushAndPop(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
q2 <- i
|
||||
<- q2
|
||||
}
|
||||
}
|
||||
@ -11,72 +11,106 @@ import (
|
||||
"math"
|
||||
"sync"
|
||||
"container/list"
|
||||
"sync/atomic"
|
||||
"errors"
|
||||
)
|
||||
|
||||
type Queue struct {
|
||||
mu sync.RWMutex
|
||||
list *list.List
|
||||
events chan struct{}
|
||||
list *list.List // 数据队列
|
||||
limit int // 队列限制大小
|
||||
limits chan struct{} // 用于队列大小限制
|
||||
events chan struct{} // 用于内部数据写入事件通知
|
||||
closed int32 // 队列是否关闭
|
||||
}
|
||||
|
||||
func New() *Queue {
|
||||
// 队列大小为非必须参数,默认不限制
|
||||
func New(limit...int) *Queue {
|
||||
size := 0
|
||||
if len(limit) > 0 {
|
||||
size = limit[0]
|
||||
}
|
||||
return &Queue {
|
||||
list : list.New(),
|
||||
limits : make(chan struct{}, size),
|
||||
events : make(chan struct{}, math.MaxInt64),
|
||||
}
|
||||
}
|
||||
|
||||
// 将数据压入队列, 队尾
|
||||
func (q *Queue) PushBack(v interface{}) {
|
||||
func (q *Queue) PushBack(v interface{}) error {
|
||||
if atomic.LoadInt32(&q.closed) > 0 {
|
||||
return errors.New("queue closed")
|
||||
}
|
||||
if q.limit > 0 {
|
||||
q.limits <- struct{}{}
|
||||
}
|
||||
q.mu.Lock()
|
||||
q.list.PushBack(v)
|
||||
q.mu.Unlock()
|
||||
q.events <- struct{}{}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 将数据压入队列, 队头
|
||||
func (q *Queue) PushFront(v interface{}) {
|
||||
func (q *Queue) PushFront(v interface{}) error {
|
||||
if atomic.LoadInt32(&q.closed) > 0 {
|
||||
return errors.New("queue closed")
|
||||
}
|
||||
if q.limit > 0 {
|
||||
q.limits <- struct{}{}
|
||||
}
|
||||
q.mu.Lock()
|
||||
q.list.PushFront(v)
|
||||
q.mu.Unlock()
|
||||
q.events <- struct{}{}
|
||||
return nil
|
||||
}
|
||||
|
||||
// 从队头先进先出地从队列取出一项数据,当没有数据可获取时,阻塞等待
|
||||
// 第二个返回值表示队列是否关闭
|
||||
func (q *Queue) PopFront() (interface{}, bool) {
|
||||
func (q *Queue) PopFront() interface{} {
|
||||
select {
|
||||
case <- q.events:
|
||||
if q.limit > 0 {
|
||||
<- q.limits
|
||||
}
|
||||
q.mu.Lock()
|
||||
if elem := q.list.Front(); elem != nil {
|
||||
item := q.list.Remove(elem)
|
||||
q.mu.Unlock()
|
||||
return item, true
|
||||
return item
|
||||
}
|
||||
q.mu.Unlock()
|
||||
}
|
||||
return nil, false
|
||||
return nil
|
||||
}
|
||||
|
||||
// 从队尾先进先出地从队列取出一项数据,当没有数据可获取时,阻塞等待
|
||||
// 第二个返回值表示队列是否关闭
|
||||
func (q *Queue) PopBack() (interface{}, bool) {
|
||||
func (q *Queue) PopBack() interface{} {
|
||||
select {
|
||||
case <- q.events:
|
||||
q.mu.Lock()
|
||||
if elem := q.list.Front(); elem != nil {
|
||||
item := q.list.Remove(elem)
|
||||
case <- q.events:
|
||||
if q.limit > 0 {
|
||||
<- q.limits
|
||||
}
|
||||
q.mu.Lock()
|
||||
if elem := q.list.Front(); elem != nil {
|
||||
item := q.list.Remove(elem)
|
||||
q.mu.Unlock()
|
||||
return item
|
||||
}
|
||||
q.mu.Unlock()
|
||||
return item, true
|
||||
}
|
||||
q.mu.Unlock()
|
||||
}
|
||||
return nil, false
|
||||
return nil
|
||||
}
|
||||
|
||||
// 关闭队列(通知所有通过Pop阻塞的协程退出)
|
||||
func (q *Queue) Close() {
|
||||
q.events <- struct{}{}
|
||||
if atomic.LoadInt32(&q.closed) == 0 {
|
||||
atomic.StoreInt32(&q.closed, 1)
|
||||
close(q.limits)
|
||||
close(q.events)
|
||||
}
|
||||
}
|
||||
|
||||
// 获取当前队列大小
|
||||
|
||||
37
g/container/gqueue/gqueue_test.go
Normal file
37
g/container/gqueue/gqueue_test.go
Normal file
@ -0,0 +1,37 @@
|
||||
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the MIT License.
|
||||
// If a copy of the MIT was not distributed with this file,
|
||||
// You can obtain one at https://gitee.com/johng/gf.
|
||||
|
||||
// go test *.go -bench=".*"
|
||||
|
||||
package gqueue_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"gitee.com/johng/gf/g/container/gqueue"
|
||||
)
|
||||
|
||||
var length = 10000000
|
||||
var q = gqueue.New(length)
|
||||
|
||||
func BenchmarkGqueueNew1000W(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
gqueue.New(length)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkGqueuePush(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
q.PushBack(i)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkGqueuePushAndPop(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
q.PushBack(i)
|
||||
q.PopFront()
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,25 +1,19 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
)
|
||||
|
||||
func main() {
|
||||
events1 := make(chan int, 100)
|
||||
events2 := make(chan int, 100)
|
||||
go func() {
|
||||
for{
|
||||
select {
|
||||
case t1 := <-events1:
|
||||
fmt.Println(t1)
|
||||
case t2 := <-events2:
|
||||
fmt.Println(t2)
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
}()
|
||||
//go func() {
|
||||
// for{
|
||||
// v := <- events1
|
||||
// fmt.Println(v)
|
||||
// }
|
||||
//
|
||||
//}()
|
||||
|
||||
go func() {
|
||||
time.Sleep(2*time.Second)
|
||||
@ -28,6 +22,8 @@ func main() {
|
||||
time.Sleep(2*time.Second)
|
||||
close(events1)
|
||||
close(events2)
|
||||
events1 <- 1
|
||||
events2 <- 2
|
||||
}()
|
||||
|
||||
select {
|
||||
|
||||
Reference in New Issue
Block a user