修正gqueue并发安全问题

This commit is contained in:
John
2018-03-13 15:54:56 +08:00
parent 5ea70888fe
commit 1f78e1602b

View File

@ -15,15 +15,16 @@ import (
"sync"
"errors"
"container/list"
"sync/atomic"
)
type Queue struct {
mu sync.RWMutex
mu sync.RWMutex // 用于队列并发安全处理
list *list.List // 数据队列
limit int // 队列限制大小
limits chan struct{} // 用于队列大小限制
events chan struct{} // 用于内部数据写入事件通知
closed bool // 队列是否关闭
limits chan struct{} // 用于队列写入限制
events chan struct{} // 用于队列出列限制
closed int32 // 队列是否关闭(0:未关闭1:关闭)
}
// 队列大小为非必须参数,默认不限制
@ -34,6 +35,7 @@ func New(limit...int) *Queue {
}
return &Queue {
list : list.New(),
limit : 0,
limits : make(chan struct{}, size),
events : make(chan struct{}, math.MaxInt64),
}
@ -41,74 +43,88 @@ func New(limit...int) *Queue {
// 将数据压入队列, 队尾
func (q *Queue) PushBack(v interface{}) error {
q.mu.RLock()
if q.closed {
q.mu.RUnlock()
if q.isClosed() {
return errors.New("closed")
}
if q.limit > 0 {
q.limits <- struct{}{}
}
q.mu.Lock()
q.list.PushBack(v)
q.events <- struct{}{}
q.mu.RUnlock()
q.mu.Unlock()
if q.limit == 0 {
q.events <- struct{}{}
}
return nil
}
// 将数据压入队列, 队头
func (q *Queue) PushFront(v interface{}) error {
q.mu.RLock()
if q.closed {
q.mu.RUnlock()
if q.isClosed() {
return errors.New("closed")
}
// 限制队列大小使用channel进行阻塞限制
if q.limit > 0 {
q.limits <- struct{}{}
}
q.mu.Lock()
q.list.PushFront(v)
q.events <- struct{}{}
q.mu.RUnlock()
q.mu.Unlock()
if q.limit == 0 {
q.events <- struct{}{}
}
return nil
}
// 从队头先进先出地从队列取出一项数据,当没有数据可获取时,阻塞等待
func (q *Queue) PopFront() interface{} {
<- q.events
if q.limit > 0 {
<- q.limits
} else {
<- q.events
}
q.mu.Lock()
if elem := q.list.Front(); elem != nil {
item := q.list.Remove(elem)
q.mu.Unlock()
return item
}
q.mu.Unlock()
return nil
}
// 从队尾先进先出地从队列取出一项数据,当没有数据可获取时,阻塞等待
func (q *Queue) PopBack() interface{} {
<- q.events
if q.limit > 0 {
<- q.limits
} else {
<- q.events
}
q.mu.Lock()
if elem := q.list.Front(); elem != nil {
item := q.list.Remove(elem)
q.mu.Unlock()
return item
}
q.mu.Unlock()
return nil
}
// 关闭队列(通知所有通过Pop*阻塞的协程退出)
func (q *Queue) Close() {
q.mu.Lock()
if !q.closed {
q.closed = true
if !q.isClosed() {
atomic.StoreInt32(&q.closed, 1)
close(q.limits)
close(q.events)
}
q.mu.Unlock()
}
// 获取当前队列大小
func (q *Queue) Size() int {
return len(q.events)
}
}
// 队列是否关闭
func (q *Queue) isClosed() bool {
return atomic.LoadInt32(&q.closed) > 0
}