diff --git a/container/gqueue/gqueue.go b/container/gqueue/gqueue.go index e20e0ec94..65443a5da 100644 --- a/container/gqueue/gqueue.go +++ b/container/gqueue/gqueue.go @@ -89,7 +89,7 @@ func (q *Queue) Close() { if q.limit > 0 { close(q.C) } else { - for i := 0; i < defaultBatchSize; i++ { + for range defaultBatchSize { q.Pop() } } @@ -103,6 +103,12 @@ func (q *Queue) Len() (length int64) { if q.limit > 0 { return bufferedSize } + // If the queue is unlimited and the buffered size is exactly the default size, + // it means there might be some data in the list not synchronized to channel yet. + // So we need to add 1 to the buffered size to make the result more accurate. + if bufferedSize == defaultQueueSize { + bufferedSize++ + } return int64(q.list.Size()) + bufferedSize } @@ -126,7 +132,7 @@ func (q *Queue) asyncLoopFromListToChannel() { if bufferLength := q.list.Len(); bufferLength > 0 { // When q.C is closed, it will panic here, especially q.C is being blocked for writing. // If any error occurs here, it will be caught by recover and be ignored. - for i := 0; i < bufferLength; i++ { + for range bufferLength { q.C <- q.list.PopFront() } } else { diff --git a/container/gqueue/gqueue_z_unit_test.go b/container/gqueue/gqueue_z_unit_test.go index cf87f7a48..88ed6f962 100644 --- a/container/gqueue/gqueue_z_unit_test.go +++ b/container/gqueue/gqueue_z_unit_test.go @@ -24,7 +24,7 @@ func TestQueue_Len(t *testing.T) { ) for n := 10; n < maxTries; n++ { q1 := gqueue.New(maxNum) - for i := 0; i < maxNum; i++ { + for i := range maxNum { q1.Push(i) } t.Assert(q1.Len(), maxNum) @@ -38,7 +38,7 @@ func TestQueue_Len(t *testing.T) { ) for n := 10; n < maxTries; n++ { q1 := gqueue.New() - for i := 0; i < maxNum; i++ { + for i := range maxNum { q1.Push(i) } t.AssertLE(q1.Len(), maxNum) @@ -50,7 +50,8 @@ func TestQueue_Len(t *testing.T) { func TestQueue_Basic(t *testing.T) { gtest.C(t, func(t *gtest.T) { q := gqueue.New() - for i := 0; i < 100; i++ { + defer q.Close() + for i := range 100 { q.Push(i) } t.Assert(q.Pop(), 0) @@ -61,6 +62,7 @@ func TestQueue_Basic(t *testing.T) { func TestQueue_Pop(t *testing.T) { gtest.C(t, func(t *gtest.T) { q1 := gqueue.New() + defer q1.Close() q1.Push(1) q1.Push(2) q1.Push(3) @@ -73,27 +75,28 @@ func TestQueue_Pop(t *testing.T) { func TestQueue_Close(t *testing.T) { gtest.C(t, func(t *gtest.T) { q1 := gqueue.New() + defer q1.Close() q1.Push(1) q1.Push(2) // wait sync to channel time.Sleep(10 * time.Millisecond) t.Assert(q1.Len(), 2) - q1.Close() }) gtest.C(t, func(t *gtest.T) { q1 := gqueue.New(2) + defer q1.Close() q1.Push(1) q1.Push(2) // wait sync to channel time.Sleep(10 * time.Millisecond) t.Assert(q1.Len(), 2) - q1.Close() }) } func Test_Issue2509(t *testing.T) { gtest.C(t, func(t *gtest.T) { q := gqueue.New() + defer q.Close() q.Push(1) q.Push(2) q.Push(3) @@ -106,3 +109,22 @@ func Test_Issue2509(t *testing.T) { t.Assert(q.Len(), 0) }) } + +// Issue #4376 +func TestIssue4376(t *testing.T) { + gtest.C(t, func(t *gtest.T) { + gq := gqueue.New() + defer gq.Close() + cq := make(chan int, 100000) + defer close(cq) + + for i := range 11603 { + gq.Push(i) + cq <- i + } + // May be not equal because of the async channel reading goroutine. + t.Log(gq.Len(), len(cq)) + time.Sleep(50 * time.Millisecond) + t.Log(gq.Len(), len(cq)) + }) +}