diff --git a/g/container/gqueue/gqueue.go b/g/container/gqueue/gqueue.go index 1b3814a59..2aafec72c 100644 --- a/g/container/gqueue/gqueue.go +++ b/g/container/gqueue/gqueue.go @@ -20,14 +20,15 @@ package gqueue import ( "github.com/gogf/gf/g/container/glist" + "github.com/gogf/gf/g/container/gtype" "math" ) type Queue struct { limit int // Limit for queue size. list *glist.List // Underlying list structure for data maintaining. + closed *gtype.Bool // Whether queue is closed. events chan struct{} // Events for data writing. - closed chan struct{} // Events for queue closing. C chan interface{} // Underlying channel for data reading. } @@ -41,7 +42,7 @@ const ( // When is given, the queue will be static and high performance which is comparable with stdlib channel. func New(limit...int) *Queue { q := &Queue { - closed : make(chan struct{}, 0), + closed : gtype.NewBool(), } if len(limit) > 0 { q.limit = limit[0] @@ -58,28 +59,27 @@ func New(limit...int) *Queue { // startAsyncLoop starts an asynchronous goroutine, // which handles the data synchronization from list to channel . func (q *Queue) startAsyncLoop() { - for { - select { - case <- q.closed: - return - case <- q.events: - for { - if length := q.list.Len(); length > 0 { - array := make([]interface{}, length) - for i := 0; i < length; i++ { - if e := q.list.Front(); e != nil { - array[i] = q.list.Remove(e) - } else { - break - } - } - for _, v := range array { - q.C <- v - } - } else { - break - } + defer func() { + if q.closed.Val() { + _ = recover() + } + }() + for !q.closed.Val() { + <- q.events + if length := q.list.Len(); length > 0 { + array := make([]interface{}, length) + for i := 0; i < length; i++ { + if e := q.list.Front(); e != nil { + array[i] = q.list.Remove(e) + } else { + break } + } + for _, v := range array { + // When q.C closes, it will panic here, especially q.C is being blocked for writing. + // It will be caught by recover and be ignored, if any error occurs here. + q.C <- v + } } } } @@ -105,14 +105,12 @@ func (q *Queue) Pop() interface{} { // Notice: It would notify all goroutines return immediately, // which are being blocked reading using Pop method. func (q *Queue) Close() { - if q.C != nil { - close(q.C) - } + q.closed.Set(true) if q.events != nil { close(q.events) } - if q.closed != nil { - close(q.closed) + if q.C != nil { + close(q.C) } }