Files
gf/g/container/gqueue/gqueue.go

136 lines
3.1 KiB
Go
Raw Normal View History

// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
2018-02-28 12:01:14 +08:00
// 并发安全的动态队列.
2018-08-23 21:55:27 +08:00
// 特点:
2018-02-07 09:42:18 +08:00
// 1、队列初始化速度快
// 2、可以向队头/队尾进行Push/Pop操作
// 3、取数据时如果队列为空那么会阻塞等待
package gqueue
import (
"math"
"sync"
"errors"
2018-02-07 09:42:18 +08:00
"container/list"
2018-03-29 13:46:05 +08:00
"gitee.com/johng/gf/g/container/gtype"
)
type Queue struct {
2018-03-13 15:54:56 +08:00
mu sync.RWMutex // 用于队列并发安全处理
list *list.List // 数据队列
limit int // 队列限制大小
2018-03-13 15:54:56 +08:00
limits chan struct{} // 用于队列写入限制
events chan struct{} // 用于队列出列限制
2018-03-29 13:46:05 +08:00
closed *gtype.Bool // 队列是否关闭
}
// 队列大小为非必须参数,默认不限制
func New(limit...int) *Queue {
size := 0
if len(limit) > 0 {
size = limit[0]
}
return &Queue {
list : list.New(),
limit : size,
limits : make(chan struct{}, size),
2018-03-29 13:46:05 +08:00
events : make(chan struct{}, math.MaxInt32),
closed : gtype.NewBool(),
}
}
// 将数据压入队列, 队尾
func (q *Queue) PushBack(v interface{}) error {
2018-03-29 13:46:05 +08:00
if q.closed.Val() {
2018-02-07 09:42:18 +08:00
return errors.New("closed")
}
if q.limit > 0 {
q.limits <- struct{}{}
}
2018-03-13 15:54:56 +08:00
q.mu.Lock()
q.list.PushBack(v)
2018-03-13 15:54:56 +08:00
q.mu.Unlock()
if q.limit == 0 {
q.events <- struct{}{}
}
return nil
}
// 将数据压入队列, 队头
func (q *Queue) PushFront(v interface{}) error {
2018-03-29 13:46:05 +08:00
if q.closed.Val() {
2018-02-07 09:42:18 +08:00
return errors.New("closed")
}
2018-03-13 15:54:56 +08:00
// 限制队列大小使用channel进行阻塞限制
if q.limit > 0 {
q.limits <- struct{}{}
}
2018-03-13 15:54:56 +08:00
q.mu.Lock()
q.list.PushFront(v)
2018-03-13 15:54:56 +08:00
q.mu.Unlock()
if q.limit == 0 {
q.events <- struct{}{}
}
return nil
}
// 从队头先进先出地从队列取出一项数据,当没有数据可获取时,阻塞等待
func (q *Queue) PopFront() interface{} {
2018-03-29 13:46:05 +08:00
if q.closed.Val() {
return nil
}
if q.limit > 0 {
<- q.limits
2018-03-13 15:54:56 +08:00
} else {
<- q.events
}
2018-03-13 15:54:56 +08:00
q.mu.Lock()
if elem := q.list.Front(); elem != nil {
item := q.list.Remove(elem)
2018-03-13 15:54:56 +08:00
q.mu.Unlock()
return item
}
2018-03-13 15:54:56 +08:00
q.mu.Unlock()
return nil
}
// 从队尾先进先出地从队列取出一项数据,当没有数据可获取时,阻塞等待
func (q *Queue) PopBack() interface{} {
2018-03-29 13:46:05 +08:00
if q.closed.Val() {
return nil
}
if q.limit > 0 {
<- q.limits
2018-03-13 15:54:56 +08:00
} else {
<- q.events
}
2018-03-13 15:54:56 +08:00
q.mu.Lock()
if elem := q.list.Front(); elem != nil {
item := q.list.Remove(elem)
2018-03-13 15:54:56 +08:00
q.mu.Unlock()
return item
}
2018-03-13 15:54:56 +08:00
q.mu.Unlock()
return nil
}
2018-02-07 09:42:18 +08:00
// 关闭队列(通知所有通过Pop*阻塞的协程退出)
func (q *Queue) Close() {
2018-03-29 13:46:05 +08:00
if !q.closed.Val() {
q.closed.Set(true)
close(q.limits)
close(q.events)
}
}
// 获取当前队列大小
func (q *Queue) Size() int {
return len(q.events)
2018-03-13 15:54:56 +08:00
}
2018-03-29 13:46:05 +08:00