mirror of
https://gitee.com/johng/gf
synced 2026-07-03 11:51:04 +08:00
feat(container/gqueue): add generic queuefeature (#4497)
add TQueue --------- Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: hailaz <739476267@qq.com>
This commit is contained in:
@ -17,20 +17,9 @@
|
||||
// 4. Blocking when reading data from queue;
|
||||
package gqueue
|
||||
|
||||
import (
|
||||
"math"
|
||||
|
||||
"github.com/gogf/gf/v2/container/glist"
|
||||
"github.com/gogf/gf/v2/container/gtype"
|
||||
)
|
||||
|
||||
// Queue is a concurrent-safe queue built on doubly linked list and channel.
|
||||
type Queue struct {
|
||||
limit int // Limit for queue size.
|
||||
list *glist.List // Underlying list structure for data maintaining.
|
||||
closed *gtype.Bool // Whether queue is closed.
|
||||
events chan struct{} // Events for data writing.
|
||||
C chan any // Underlying channel for data reading.
|
||||
*TQueue[any]
|
||||
}
|
||||
|
||||
const (
|
||||
@ -42,74 +31,35 @@ const (
|
||||
// Optional parameter `limit` is used to limit the size of the queue, which is unlimited in default.
|
||||
// When `limit` is given, the queue will be static and high performance which is comparable with stdlib channel.
|
||||
func New(limit ...int) *Queue {
|
||||
q := &Queue{
|
||||
closed: gtype.NewBool(),
|
||||
return &Queue{
|
||||
TQueue: NewTQueue[any](limit...),
|
||||
}
|
||||
if len(limit) > 0 && limit[0] > 0 {
|
||||
q.limit = limit[0]
|
||||
q.C = make(chan any, limit[0])
|
||||
} else {
|
||||
q.list = glist.New(true)
|
||||
q.events = make(chan struct{}, math.MaxInt32)
|
||||
q.C = make(chan any, defaultQueueSize)
|
||||
go q.asyncLoopFromListToChannel()
|
||||
}
|
||||
return q
|
||||
}
|
||||
|
||||
// Push pushes the data `v` into the queue.
|
||||
// Note that it would panic if Push is called after the queue is closed.
|
||||
func (q *Queue) Push(v any) {
|
||||
if q.limit > 0 {
|
||||
q.C <- v
|
||||
} else {
|
||||
q.list.PushBack(v)
|
||||
if len(q.events) < defaultQueueSize {
|
||||
q.events <- struct{}{}
|
||||
}
|
||||
}
|
||||
q.TQueue.Push(v)
|
||||
}
|
||||
|
||||
// Pop pops an item from the queue in FIFO way.
|
||||
// Note that it would return nil immediately if Pop is called after the queue is closed.
|
||||
func (q *Queue) Pop() any {
|
||||
return <-q.C
|
||||
return q.TQueue.Pop()
|
||||
}
|
||||
|
||||
// Close closes the queue.
|
||||
// Notice: It would notify all goroutines return immediately,
|
||||
// which are being blocked reading using Pop method.
|
||||
func (q *Queue) Close() {
|
||||
if !q.closed.Cas(false, true) {
|
||||
return
|
||||
}
|
||||
if q.events != nil {
|
||||
close(q.events)
|
||||
}
|
||||
if q.limit > 0 {
|
||||
close(q.C)
|
||||
} else {
|
||||
for range defaultBatchSize {
|
||||
q.Pop()
|
||||
}
|
||||
}
|
||||
q.TQueue.Close()
|
||||
}
|
||||
|
||||
// Len returns the length of the queue.
|
||||
// Note that the result might not be accurate if using unlimited queue size as there's an
|
||||
// asynchronous channel reading the list constantly.
|
||||
func (q *Queue) Len() (length int64) {
|
||||
bufferedSize := int64(len(q.C))
|
||||
if q.limit > 0 {
|
||||
return bufferedSize
|
||||
}
|
||||
// If the queue is unlimited and the buffered size is exactly the default size,
|
||||
// it means there might be some data in the list not synchronized to channel yet.
|
||||
// So we need to add 1 to the buffered size to make the result more accurate.
|
||||
if bufferedSize == defaultQueueSize {
|
||||
bufferedSize++
|
||||
}
|
||||
return int64(q.list.Size()) + bufferedSize
|
||||
return q.TQueue.Len()
|
||||
}
|
||||
|
||||
// Size is alias of Len.
|
||||
@ -118,34 +68,3 @@ func (q *Queue) Len() (length int64) {
|
||||
func (q *Queue) Size() int64 {
|
||||
return q.Len()
|
||||
}
|
||||
|
||||
// asyncLoopFromListToChannel starts an asynchronous goroutine,
|
||||
// which handles the data synchronization from list `q.list` to channel `q.C`.
|
||||
func (q *Queue) asyncLoopFromListToChannel() {
|
||||
defer func() {
|
||||
if q.closed.Val() {
|
||||
_ = recover()
|
||||
}
|
||||
}()
|
||||
for !q.closed.Val() {
|
||||
<-q.events
|
||||
for !q.closed.Val() {
|
||||
if bufferLength := q.list.Len(); bufferLength > 0 {
|
||||
// When q.C is closed, it will panic here, especially q.C is being blocked for writing.
|
||||
// If any error occurs here, it will be caught by recover and be ignored.
|
||||
for range bufferLength {
|
||||
q.C <- q.list.PopFront()
|
||||
}
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
// Clear q.events to remain just one event to do the next synchronization check.
|
||||
for i := 0; i < len(q.events)-1; i++ {
|
||||
<-q.events
|
||||
}
|
||||
}
|
||||
// It should be here to close `q.C` if `q` is unlimited size.
|
||||
// It's the sender's responsibility to close channel when it should be closed.
|
||||
close(q.C)
|
||||
}
|
||||
|
||||
134
container/gqueue/gqueue_t.go
Normal file
134
container/gqueue/gqueue_t.go
Normal file
@ -0,0 +1,134 @@
|
||||
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the MIT License.
|
||||
// If a copy of the MIT was not distributed with this file,
|
||||
// You can obtain one at https://github.com/gogf/gf.
|
||||
package gqueue
|
||||
|
||||
import (
|
||||
"math"
|
||||
|
||||
"github.com/gogf/gf/v2/container/glist"
|
||||
"github.com/gogf/gf/v2/container/gtype"
|
||||
)
|
||||
|
||||
// TQueue is a concurrent-safe queue built on doubly linked list and channel.
|
||||
type TQueue[T any] struct {
|
||||
limit int // Limit for queue size.
|
||||
list *glist.TList[T] // Underlying list structure for data maintaining.
|
||||
closed *gtype.Bool // Whether queue is closed.
|
||||
events chan struct{} // Events for data writing.
|
||||
C chan T // Underlying channel for data reading.
|
||||
}
|
||||
|
||||
// NewTQueue returns an empty queue object.
|
||||
// Optional parameter `limit` is used to limit the size of the queue, which is unlimited in default.
|
||||
// When `limit` is given, the queue will be static and high performance which is comparable with stdlib channel.
|
||||
func NewTQueue[T any](limit ...int) *TQueue[T] {
|
||||
q := &TQueue[T]{
|
||||
closed: gtype.NewBool(),
|
||||
}
|
||||
if len(limit) > 0 && limit[0] > 0 {
|
||||
q.limit = limit[0]
|
||||
q.C = make(chan T, limit[0])
|
||||
} else {
|
||||
q.list = glist.NewT[T](true)
|
||||
q.events = make(chan struct{}, math.MaxInt32)
|
||||
q.C = make(chan T, defaultQueueSize)
|
||||
go q.asyncLoopFromListToChannel()
|
||||
}
|
||||
return q
|
||||
}
|
||||
|
||||
// Push pushes the data `v` into the queue.
|
||||
// Note that it would panic if Push is called after the queue is closed.
|
||||
func (q *TQueue[T]) Push(v T) {
|
||||
if q.limit > 0 {
|
||||
q.C <- v
|
||||
} else {
|
||||
q.list.PushBack(v)
|
||||
if len(q.events) < defaultQueueSize {
|
||||
q.events <- struct{}{}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Pop pops an item from the queue in FIFO way.
|
||||
// Note that it would return nil immediately if Pop is called after the queue is closed.
|
||||
func (q *TQueue[T]) Pop() T {
|
||||
return <-q.C
|
||||
}
|
||||
|
||||
// Close closes the queue.
|
||||
// Notice: It would notify all goroutines return immediately,
|
||||
// which are being blocked reading using Pop method.
|
||||
func (q *TQueue[T]) Close() {
|
||||
if !q.closed.Cas(false, true) {
|
||||
return
|
||||
}
|
||||
if q.events != nil {
|
||||
close(q.events)
|
||||
}
|
||||
if q.limit > 0 {
|
||||
close(q.C)
|
||||
} else {
|
||||
for range defaultBatchSize {
|
||||
q.Pop()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Len returns the length of the queue.
|
||||
// Note that the result might not be accurate if using unlimited queue size as there's an
|
||||
// asynchronous channel reading the list constantly.
|
||||
func (q *TQueue[T]) Len() (length int64) {
|
||||
bufferedSize := int64(len(q.C))
|
||||
if q.limit > 0 {
|
||||
return bufferedSize
|
||||
}
|
||||
// If the queue is unlimited and the buffered size is exactly the default size,
|
||||
// it means there might be some data in the list not synchronized to channel yet.
|
||||
// So we need to add 1 to the buffered size to make the result more accurate.
|
||||
if bufferedSize == defaultQueueSize {
|
||||
bufferedSize++
|
||||
}
|
||||
return int64(q.list.Size()) + bufferedSize
|
||||
}
|
||||
|
||||
// Size is alias of Len.
|
||||
//
|
||||
// Deprecated: use Len instead.
|
||||
func (q *TQueue[T]) Size() int64 {
|
||||
return q.Len()
|
||||
}
|
||||
|
||||
// asyncLoopFromListToChannel starts an asynchronous goroutine,
|
||||
// which handles the data synchronization from list `q.list` to channel `q.C`.
|
||||
func (q *TQueue[T]) asyncLoopFromListToChannel() {
|
||||
defer func() {
|
||||
if q.closed.Val() {
|
||||
_ = recover()
|
||||
}
|
||||
}()
|
||||
for !q.closed.Val() {
|
||||
<-q.events
|
||||
for !q.closed.Val() {
|
||||
if bufferLength := q.list.Len(); bufferLength > 0 {
|
||||
// When q.C is closed, it will panic here, especially q.C is being blocked for writing.
|
||||
// If any error occurs here, it will be caught by recover and be ignored.
|
||||
for range bufferLength {
|
||||
q.C <- q.list.PopFront()
|
||||
}
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
// Clear q.events to remain just one event to do the next synchronization check.
|
||||
for i := 0; i < len(q.events)-1; i++ {
|
||||
<-q.events
|
||||
}
|
||||
}
|
||||
// It should be here to close `q.C` if `q` is unlimited size.
|
||||
// It's the sender's responsibility to close channel when it should be closed.
|
||||
close(q.C)
|
||||
}
|
||||
@ -128,3 +128,218 @@ func TestIssue4376(t *testing.T) {
|
||||
t.Log(gq.Len(), len(cq))
|
||||
})
|
||||
}
|
||||
|
||||
// Test static queue (with limit) close operation
|
||||
func TestQueue_StaticClose(t *testing.T) {
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
q := gqueue.New(10)
|
||||
defer func() {
|
||||
if err := recover(); err == nil {
|
||||
t.Log("Close succeeded")
|
||||
}
|
||||
}()
|
||||
q.Push(1)
|
||||
q.Push(2)
|
||||
q.Close()
|
||||
// After closing, Pop should return nil
|
||||
v := q.Pop()
|
||||
t.Assert(v, nil)
|
||||
})
|
||||
}
|
||||
|
||||
// Test Size() method (deprecated alias of Len)
|
||||
func TestQueue_Size(t *testing.T) {
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
q := gqueue.New(20)
|
||||
for i := range 10 {
|
||||
q.Push(i)
|
||||
}
|
||||
t.Assert(q.Size(), 10)
|
||||
t.Assert(q.Len(), 10)
|
||||
q.Close()
|
||||
})
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
q := gqueue.New()
|
||||
for i := range 15 {
|
||||
q.Push(i)
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
t.Assert(q.Size(), q.Len())
|
||||
q.Close()
|
||||
})
|
||||
}
|
||||
|
||||
// Test TQueue directly with generic type
|
||||
func TestTQueue_Generic(t *testing.T) {
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
// Test with custom type
|
||||
q := gqueue.NewTQueue[string]()
|
||||
defer q.Close()
|
||||
q.Push("hello")
|
||||
q.Push("world")
|
||||
t.Assert(q.Pop(), "hello")
|
||||
t.Assert(q.Pop(), "world")
|
||||
})
|
||||
}
|
||||
|
||||
// Test TQueue Size method directly
|
||||
func TestTQueue_Size(t *testing.T) {
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
q := gqueue.NewTQueue[int]()
|
||||
defer q.Close()
|
||||
for i := range 10 {
|
||||
q.Push(i)
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
// Size is an alias of Len for TQueue
|
||||
t.Assert(q.Size(), q.Len())
|
||||
})
|
||||
}
|
||||
|
||||
// Test TQueue with static limit
|
||||
func TestTQueue_StaticLimit(t *testing.T) {
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
q := gqueue.NewTQueue[int](5)
|
||||
defer q.Close()
|
||||
for i := range 5 {
|
||||
q.Push(i)
|
||||
}
|
||||
t.Assert(q.Len(), 5)
|
||||
for i := range 5 {
|
||||
t.Assert(q.Pop(), i)
|
||||
}
|
||||
t.Assert(q.Len(), 0)
|
||||
})
|
||||
}
|
||||
|
||||
// Test queue with large data push/pop
|
||||
func TestQueue_LargeDataScale(t *testing.T) {
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
q := gqueue.New()
|
||||
defer q.Close()
|
||||
n := 5000
|
||||
for i := range n {
|
||||
q.Push(i)
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
// Pop should retrieve all items in order
|
||||
for i := range n {
|
||||
v := q.Pop()
|
||||
t.Assert(v, i)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Test double close (idempotent close)
|
||||
func TestQueue_DoubleClose(t *testing.T) {
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
q := gqueue.New()
|
||||
q.Push(1)
|
||||
q.Close()
|
||||
// Second close should not panic
|
||||
q.Close()
|
||||
t.Assert(q.Pop(), nil)
|
||||
})
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
q := gqueue.New(10)
|
||||
q.Push(1)
|
||||
q.Close()
|
||||
// Second close should not panic for static queue
|
||||
q.Close()
|
||||
// Pop from closed static queue returns the buffered value
|
||||
v := q.Pop()
|
||||
t.Assert(v, 1)
|
||||
})
|
||||
}
|
||||
|
||||
// Test concurrent push and pop
|
||||
func TestQueue_ConcurrentPushPop(t *testing.T) {
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
q := gqueue.New()
|
||||
defer q.Close()
|
||||
// Producer goroutine
|
||||
go func() {
|
||||
for i := range 100 {
|
||||
q.Push(i)
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
q.Close()
|
||||
}()
|
||||
// Consumer
|
||||
count := 0
|
||||
for {
|
||||
v := q.Pop()
|
||||
if v == nil {
|
||||
break
|
||||
}
|
||||
count++
|
||||
}
|
||||
t.AssertGE(count, 1)
|
||||
})
|
||||
}
|
||||
|
||||
// Test Pop on empty queue returns nil when closed
|
||||
func TestQueue_PopEmptyClosed(t *testing.T) {
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
q := gqueue.New()
|
||||
q.Close()
|
||||
v := q.Pop()
|
||||
t.Assert(v, nil)
|
||||
})
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
q := gqueue.New(10)
|
||||
q.Close()
|
||||
v := q.Pop()
|
||||
t.Assert(v, nil)
|
||||
})
|
||||
}
|
||||
|
||||
// Test Len with dynamic queue at capacity boundary
|
||||
func TestQueue_LenAtBoundary(t *testing.T) {
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
q := gqueue.New()
|
||||
defer q.Close()
|
||||
// Push exactly defaultQueueSize items to test boundary condition
|
||||
for i := range 10000 {
|
||||
q.Push(i)
|
||||
}
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
len := q.Len()
|
||||
t.AssertGE(len, 0)
|
||||
})
|
||||
}
|
||||
|
||||
// Test Close on dynamic queue with pending asyncLoopFromListToChannel
|
||||
func TestQueue_CloseWithAsyncLoop(t *testing.T) {
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
q := gqueue.New()
|
||||
// Push some data to activate asyncLoopFromListToChannel
|
||||
for i := range 100 {
|
||||
q.Push(i)
|
||||
}
|
||||
// Immediately close
|
||||
q.Close()
|
||||
// Pop should return values until exhausted, then nil
|
||||
for {
|
||||
v := q.Pop()
|
||||
if v == nil {
|
||||
break
|
||||
}
|
||||
}
|
||||
t.Assert(q.Pop(), nil)
|
||||
})
|
||||
}
|
||||
|
||||
// Test static queue edge case with zero limit (should create unlimited queue)
|
||||
func TestQueue_ZeroLimitCreatesUnlimited(t *testing.T) {
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
q := gqueue.New(0)
|
||||
defer q.Close()
|
||||
for i := range 100 {
|
||||
q.Push(i)
|
||||
}
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
len := q.Len()
|
||||
t.Assert(len, 100)
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user