mirror of
https://gitee.com/johng/gf
synced 2026-06-07 10:22:11 +08:00
fix data race issue of gqueue
This commit is contained in:
@ -15,9 +15,8 @@
|
||||
package gqueue
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"github.com/gogf/gf/g/container/glist"
|
||||
"math"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// 1、这是一个先进先出的队列(chan <-- list);
|
||||
@ -28,9 +27,8 @@ import (
|
||||
//
|
||||
// 4、由于功能主体是chan,那么操作仍然像chan那样具有阻塞效果;
|
||||
type Queue struct {
|
||||
mu sync.Mutex // 底层链表写锁
|
||||
limit int // 队列限制大小
|
||||
list *list.List // 底层数据链表
|
||||
list *glist.List // 底层数据链表
|
||||
events chan struct{} // 写入事件通知
|
||||
closed chan struct{} // 队列关闭通知
|
||||
C chan interface{} // 队列数据读取
|
||||
@ -50,7 +48,7 @@ func New(limit...int) *Queue {
|
||||
q.limit = limit[0]
|
||||
q.C = make(chan interface{}, limit[0])
|
||||
} else {
|
||||
q.list = list.New()
|
||||
q.list = glist.New()
|
||||
q.events = make(chan struct{}, math.MaxInt32)
|
||||
q.C = make(chan interface{}, gDEFAULT_QUEUE_SIZE)
|
||||
go q.startAsyncLoop()
|
||||
@ -68,7 +66,6 @@ func (q *Queue) startAsyncLoop() {
|
||||
for {
|
||||
if length := q.list.Len(); length > 0 {
|
||||
array := make([]interface{}, length)
|
||||
q.mu.Lock()
|
||||
for i := 0; i < length; i++ {
|
||||
if e := q.list.Front(); e != nil {
|
||||
array[i] = q.list.Remove(e)
|
||||
@ -76,7 +73,6 @@ func (q *Queue) startAsyncLoop() {
|
||||
break
|
||||
}
|
||||
}
|
||||
q.mu.Unlock()
|
||||
for _, v := range array {
|
||||
q.C <- v
|
||||
}
|
||||
@ -93,9 +89,7 @@ func (q *Queue) Push(v interface{}) {
|
||||
if q.limit > 0 {
|
||||
q.C <- v
|
||||
} else {
|
||||
q.mu.Lock()
|
||||
q.list.PushBack(v)
|
||||
q.mu.Unlock()
|
||||
q.events <- struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
@ -18,7 +18,7 @@ func (w *Watcher) startWatchLoop() {
|
||||
// 关闭事件
|
||||
case <- w.closeChan: return
|
||||
|
||||
// 监听事件
|
||||
// 监听事件
|
||||
case ev := <- w.watcher.Events:
|
||||
//fmt.Println("ev:", ev.String())
|
||||
w.cache.SetIfNotExist(ev.String(), func() interface{} {
|
||||
|
||||
@ -232,7 +232,7 @@ func AssertNI(value, expect interface{}) {
|
||||
|
||||
// 提示错误不退出进程执行
|
||||
func Error(message...interface{}) {
|
||||
fmt.Fprintf(os.Stderr, "[ERROR] %s\n%s", fmt.Sprint(message...), getBacktrace())
|
||||
panic(fmt.Sprintf(`[ERROR] %s\n%s`, fmt.Sprint(message...), getBacktrace()))
|
||||
}
|
||||
|
||||
// 提示错误并退出进程执行
|
||||
|
||||
Reference in New Issue
Block a user