diff --git a/g/container/gqueue/gqueue.go b/g/container/gqueue/gqueue.go index 8d61b108e..2aafec72c 100644 --- a/g/container/gqueue/gqueue.go +++ b/g/container/gqueue/gqueue.go @@ -20,15 +20,15 @@ package gqueue import ( "github.com/gogf/gf/g/container/glist" + "github.com/gogf/gf/g/container/gtype" "math" - "strings" ) type Queue struct { limit int // Limit for queue size. list *glist.List // Underlying list structure for data maintaining. + closed *gtype.Bool // Whether queue is closed. events chan struct{} // Events for data writing. - closed chan struct{} // Events for queue closing. C chan interface{} // Underlying channel for data reading. } @@ -42,7 +42,7 @@ const ( // When is given, the queue will be static and high performance which is comparable with stdlib channel. func New(limit...int) *Queue { q := &Queue { - closed : make(chan struct{}, 0), + closed : gtype.NewBool(), } if len(limit) > 0 { q.limit = limit[0] @@ -60,40 +60,26 @@ func New(limit...int) *Queue { // which handles the data synchronization from list to channel . func (q *Queue) startAsyncLoop() { defer func() { - if err := recover(); err != nil { - if e, ok := err.(error); ok { - // If any error occurs by writing to closed q.C, ignore it. - if strings.Contains(e.Error(), "closed channel") { - return - } - } - panic(err) + if q.closed.Val() { + _ = recover() } }() - for { - select { - case <- q.closed: - return - case <- q.events: - for { - if length := q.list.Len(); length > 0 { - array := make([]interface{}, length) - for i := 0; i < length; i++ { - if e := q.list.Front(); e != nil { - array[i] = q.list.Remove(e) - } else { - break - } - } - for _, v := range array { - // When q.C closes, it will panic here, especially q.C is being blocked for writing. - // It will be caught by recover and be ignored, if any error occurs here. - q.C <- v - } - } else { - break - } + for !q.closed.Val() { + <- q.events + if length := q.list.Len(); length > 0 { + array := make([]interface{}, length) + for i := 0; i < length; i++ { + if e := q.list.Front(); e != nil { + array[i] = q.list.Remove(e) + } else { + break } + } + for _, v := range array { + // When q.C closes, it will panic here, especially q.C is being blocked for writing. + // It will be caught by recover and be ignored, if any error occurs here. + q.C <- v + } } } } @@ -119,9 +105,7 @@ func (q *Queue) Pop() interface{} { // Notice: It would notify all goroutines return immediately, // which are being blocked reading using Pop method. func (q *Queue) Close() { - if q.closed != nil { - close(q.closed) - } + q.closed.Set(true) if q.events != nil { close(q.events) }