mirror of
https://gitee.com/johng/gf
synced 2026-07-04 21:03:13 +08:00
fix(container/gqueue): Optimize queue length calculation and loop structure in test cases (#4455)
fixed #4376
This commit is contained in:
@ -89,7 +89,7 @@ func (q *Queue) Close() {
|
||||
if q.limit > 0 {
|
||||
close(q.C)
|
||||
} else {
|
||||
for i := 0; i < defaultBatchSize; i++ {
|
||||
for range defaultBatchSize {
|
||||
q.Pop()
|
||||
}
|
||||
}
|
||||
@ -103,6 +103,12 @@ func (q *Queue) Len() (length int64) {
|
||||
if q.limit > 0 {
|
||||
return bufferedSize
|
||||
}
|
||||
// If the queue is unlimited and the buffered size is exactly the default size,
|
||||
// it means there might be some data in the list not synchronized to channel yet.
|
||||
// So we need to add 1 to the buffered size to make the result more accurate.
|
||||
if bufferedSize == defaultQueueSize {
|
||||
bufferedSize++
|
||||
}
|
||||
return int64(q.list.Size()) + bufferedSize
|
||||
}
|
||||
|
||||
@ -126,7 +132,7 @@ func (q *Queue) asyncLoopFromListToChannel() {
|
||||
if bufferLength := q.list.Len(); bufferLength > 0 {
|
||||
// When q.C is closed, it will panic here, especially q.C is being blocked for writing.
|
||||
// If any error occurs here, it will be caught by recover and be ignored.
|
||||
for i := 0; i < bufferLength; i++ {
|
||||
for range bufferLength {
|
||||
q.C <- q.list.PopFront()
|
||||
}
|
||||
} else {
|
||||
|
||||
@ -24,7 +24,7 @@ func TestQueue_Len(t *testing.T) {
|
||||
)
|
||||
for n := 10; n < maxTries; n++ {
|
||||
q1 := gqueue.New(maxNum)
|
||||
for i := 0; i < maxNum; i++ {
|
||||
for i := range maxNum {
|
||||
q1.Push(i)
|
||||
}
|
||||
t.Assert(q1.Len(), maxNum)
|
||||
@ -38,7 +38,7 @@ func TestQueue_Len(t *testing.T) {
|
||||
)
|
||||
for n := 10; n < maxTries; n++ {
|
||||
q1 := gqueue.New()
|
||||
for i := 0; i < maxNum; i++ {
|
||||
for i := range maxNum {
|
||||
q1.Push(i)
|
||||
}
|
||||
t.AssertLE(q1.Len(), maxNum)
|
||||
@ -50,7 +50,8 @@ func TestQueue_Len(t *testing.T) {
|
||||
func TestQueue_Basic(t *testing.T) {
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
q := gqueue.New()
|
||||
for i := 0; i < 100; i++ {
|
||||
defer q.Close()
|
||||
for i := range 100 {
|
||||
q.Push(i)
|
||||
}
|
||||
t.Assert(q.Pop(), 0)
|
||||
@ -61,6 +62,7 @@ func TestQueue_Basic(t *testing.T) {
|
||||
func TestQueue_Pop(t *testing.T) {
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
q1 := gqueue.New()
|
||||
defer q1.Close()
|
||||
q1.Push(1)
|
||||
q1.Push(2)
|
||||
q1.Push(3)
|
||||
@ -73,27 +75,28 @@ func TestQueue_Pop(t *testing.T) {
|
||||
func TestQueue_Close(t *testing.T) {
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
q1 := gqueue.New()
|
||||
defer q1.Close()
|
||||
q1.Push(1)
|
||||
q1.Push(2)
|
||||
// wait sync to channel
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
t.Assert(q1.Len(), 2)
|
||||
q1.Close()
|
||||
})
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
q1 := gqueue.New(2)
|
||||
defer q1.Close()
|
||||
q1.Push(1)
|
||||
q1.Push(2)
|
||||
// wait sync to channel
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
t.Assert(q1.Len(), 2)
|
||||
q1.Close()
|
||||
})
|
||||
}
|
||||
|
||||
func Test_Issue2509(t *testing.T) {
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
q := gqueue.New()
|
||||
defer q.Close()
|
||||
q.Push(1)
|
||||
q.Push(2)
|
||||
q.Push(3)
|
||||
@ -106,3 +109,22 @@ func Test_Issue2509(t *testing.T) {
|
||||
t.Assert(q.Len(), 0)
|
||||
})
|
||||
}
|
||||
|
||||
// Issue #4376
|
||||
func TestIssue4376(t *testing.T) {
|
||||
gtest.C(t, func(t *gtest.T) {
|
||||
gq := gqueue.New()
|
||||
defer gq.Close()
|
||||
cq := make(chan int, 100000)
|
||||
defer close(cq)
|
||||
|
||||
for i := range 11603 {
|
||||
gq.Push(i)
|
||||
cq <- i
|
||||
}
|
||||
// May be not equal because of the async channel reading goroutine.
|
||||
t.Log(gq.Len(), len(cq))
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
t.Log(gq.Len(), len(cq))
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user