diff --git a/g/container/gqueue/gqueue.go b/g/container/gqueue/gqueue.go index caaab61a4..2e22551c4 100644 --- a/g/container/gqueue/gqueue.go +++ b/g/container/gqueue/gqueue.go @@ -15,15 +15,16 @@ import ( "sync" "errors" "container/list" + "sync/atomic" ) type Queue struct { - mu sync.RWMutex + mu sync.RWMutex // 用于队列并发安全处理 list *list.List // 数据队列 limit int // 队列限制大小 - limits chan struct{} // 用于队列大小限制 - events chan struct{} // 用于内部数据写入事件通知 - closed bool // 队列是否关闭 + limits chan struct{} // 用于队列写入限制 + events chan struct{} // 用于队列出列限制 + closed int32 // 队列是否关闭(0:未关闭,1:关闭) } // 队列大小为非必须参数,默认不限制 @@ -34,6 +35,7 @@ func New(limit...int) *Queue { } return &Queue { list : list.New(), + limit : 0, limits : make(chan struct{}, size), events : make(chan struct{}, math.MaxInt64), } @@ -41,74 +43,88 @@ func New(limit...int) *Queue { // 将数据压入队列, 队尾 func (q *Queue) PushBack(v interface{}) error { - q.mu.RLock() - if q.closed { - q.mu.RUnlock() + if q.isClosed() { return errors.New("closed") } if q.limit > 0 { q.limits <- struct{}{} } + q.mu.Lock() q.list.PushBack(v) - q.events <- struct{}{} - q.mu.RUnlock() + q.mu.Unlock() + if q.limit == 0 { + q.events <- struct{}{} + } return nil } // 将数据压入队列, 队头 func (q *Queue) PushFront(v interface{}) error { - q.mu.RLock() - if q.closed { - q.mu.RUnlock() + if q.isClosed() { return errors.New("closed") } + // 限制队列大小,使用channel进行阻塞限制 if q.limit > 0 { q.limits <- struct{}{} } + q.mu.Lock() q.list.PushFront(v) - q.events <- struct{}{} - q.mu.RUnlock() + q.mu.Unlock() + if q.limit == 0 { + q.events <- struct{}{} + } return nil } // 从队头先进先出地从队列取出一项数据,当没有数据可获取时,阻塞等待 func (q *Queue) PopFront() interface{} { - <- q.events if q.limit > 0 { <- q.limits + } else { + <- q.events } + q.mu.Lock() if elem := q.list.Front(); elem != nil { item := q.list.Remove(elem) + q.mu.Unlock() return item } + q.mu.Unlock() return nil } // 从队尾先进先出地从队列取出一项数据,当没有数据可获取时,阻塞等待 func (q *Queue) PopBack() interface{} { - <- q.events if q.limit > 0 { <- q.limits + } else { + <- q.events } + q.mu.Lock() if elem := q.list.Front(); elem != nil { item := q.list.Remove(elem) + q.mu.Unlock() return item } + q.mu.Unlock() return nil } // 关闭队列(通知所有通过Pop*阻塞的协程退出) func (q *Queue) Close() { - q.mu.Lock() - if !q.closed { - q.closed = true + if !q.isClosed() { + atomic.StoreInt32(&q.closed, 1) close(q.limits) close(q.events) } - q.mu.Unlock() } // 获取当前队列大小 func (q *Queue) Size() int { return len(q.events) -} \ No newline at end of file +} + +// 队列是否关闭 +func (q *Queue) isClosed() bool { + return atomic.LoadInt32(&q.closed) > 0 +}