mirror of
https://gitee.com/johng/gf
synced 2026-06-07 10:22:11 +08:00
fix issue in gqueue.Close
This commit is contained in:
@ -21,6 +21,7 @@ package gqueue
|
||||
import (
|
||||
"github.com/gogf/gf/g/container/glist"
|
||||
"math"
|
||||
"strings"
|
||||
)
|
||||
|
||||
type Queue struct {
|
||||
@ -58,6 +59,17 @@ func New(limit...int) *Queue {
|
||||
// startAsyncLoop starts an asynchronous goroutine,
|
||||
// which handles the data synchronization from list <q.list> to channel <q.C>.
|
||||
func (q *Queue) startAsyncLoop() {
|
||||
defer func() {
|
||||
if err := recover(); err != nil {
|
||||
if e, ok := err.(error); ok {
|
||||
// If any error occurs by writing to closed q.C, ignore it.
|
||||
if strings.Contains(e.Error(), "closed channel") {
|
||||
return
|
||||
}
|
||||
}
|
||||
panic(err)
|
||||
}
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case <- q.closed:
|
||||
@ -74,6 +86,8 @@ func (q *Queue) startAsyncLoop() {
|
||||
}
|
||||
}
|
||||
for _, v := range array {
|
||||
// When q.C closes, it will panic here, especially q.C is being blocked for writing.
|
||||
// It will be caught by recover and be ignored, if any error occurs here.
|
||||
q.C <- v
|
||||
}
|
||||
} else {
|
||||
@ -105,14 +119,14 @@ func (q *Queue) Pop() interface{} {
|
||||
// Notice: It would notify all goroutines return immediately,
|
||||
// which are being blocked reading using Pop method.
|
||||
func (q *Queue) Close() {
|
||||
if q.C != nil {
|
||||
close(q.C)
|
||||
if q.closed != nil {
|
||||
close(q.closed)
|
||||
}
|
||||
if q.events != nil {
|
||||
close(q.events)
|
||||
}
|
||||
if q.closed != nil {
|
||||
close(q.closed)
|
||||
if q.C != nil {
|
||||
close(q.C)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -1,50 +1,49 @@
|
||||
// Copyright 2017 gf Author(https://github.com/gogf/gf). All Rights Reserved.
|
||||
//
|
||||
// This Source Code Form is subject to the terms of the MIT License.
|
||||
// If a copy of the MIT was not distributed with this file,
|
||||
// You can obtain one at https://github.com/gogf/gf.
|
||||
|
||||
// go test *.go -bench=".*" -benchmem
|
||||
|
||||
package gqueue_test
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"github.com/gogf/gf/g/container/gqueue"
|
||||
"github.com/gogf/gf/g/test/gtest"
|
||||
"testing"
|
||||
)
|
||||
|
||||
var bn = 20000000
|
||||
var length = 1000000
|
||||
var qstatic = gqueue.New(length)
|
||||
var qdynamic = gqueue.New()
|
||||
var cany = make(chan interface{}, length)
|
||||
|
||||
func Benchmark_Gqueue_StaticPushAndPop(b *testing.B) {
|
||||
b.N = bn
|
||||
for i := 0; i < b.N; i++ {
|
||||
qstatic.Push(i)
|
||||
qstatic.Pop()
|
||||
func TestQueue_Len(t *testing.T) {
|
||||
q1 := gqueue.New(300)
|
||||
for i := 0; i < 200; i++ {
|
||||
q1.Push(i)
|
||||
}
|
||||
gtest.Assert(q1.Len(), 200)
|
||||
}
|
||||
|
||||
func Benchmark_Gqueue_DynamicPush(b *testing.B) {
|
||||
b.N = bn
|
||||
for i := 0; i < b.N; i++ {
|
||||
qdynamic.Push(i)
|
||||
func TestQueue_Pop(t *testing.T) {
|
||||
q1 := gqueue.New()
|
||||
|
||||
q1.Push(1)
|
||||
q1.Push(2)
|
||||
i1 := q1.Pop()
|
||||
gtest.Assert(i1, 1)
|
||||
q1.Close()
|
||||
i1 = q1.Pop()
|
||||
gtest.Assert(i1, 2)
|
||||
|
||||
maxs := 12
|
||||
q2 := gqueue.New(maxs)
|
||||
for i := 0; i < maxs; i++ {
|
||||
q2.Push(i)
|
||||
}
|
||||
|
||||
i3 := q2.Pop()
|
||||
gtest.Assert(i3, 0)
|
||||
|
||||
}
|
||||
|
||||
func Benchmark_Gqueue_DynamicPop(b *testing.B) {
|
||||
b.N = bn
|
||||
for i := 0; i < b.N; i++ {
|
||||
qdynamic.Pop()
|
||||
}
|
||||
}
|
||||
|
||||
func Benchmark_Channel_PushAndPop(b *testing.B) {
|
||||
b.N = bn
|
||||
for i := 0; i < b.N; i++ {
|
||||
cany <- i
|
||||
<- cany
|
||||
}
|
||||
func TestQueue_Close(t *testing.T) {
|
||||
q1 := gqueue.New()
|
||||
q1.Push(1)
|
||||
q1.Push(2)
|
||||
gtest.Assert(q1.Len(), 2)
|
||||
|
||||
q1.Close()
|
||||
gtest.Assert(q1.Len(), 2)
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user