diff --git a/TODO.MD b/TODO.MD index ef0835f85..a314ae96e 100644 --- a/TODO.MD +++ b/TODO.MD @@ -49,8 +49,8 @@ 1. gkafka这个包比较重,未来从框架中剥离出来; 1. grpool性能压测结果变慢的问题; 1. glist增加Element类型的并发安全处理; - - +1. 改进证书打开失败时的WebServer错误提示,前置HOOK校验后关闭后续的HOOK逻辑执行; +1. 目前WebServer的HOOK是按照优先级执行的,需要增加覆盖特性; diff --git a/g/container/glist/glist.go b/g/container/glist/glist.go index 0ba8d05b3..4c1a4fb99 100644 --- a/g/container/glist/glist.go +++ b/g/container/glist/glist.go @@ -9,238 +9,286 @@ package glist import ( - "gitee.com/johng/gf/g/container/gtype" + "container/list" "gitee.com/johng/gf/g/container/internal/rwmutex" ) // 变长双向链表 type List struct { mu *rwmutex.RWMutex - root *Element - length *gtype.Int + list *list.List } +type Element = list.Element + + // 获得一个变长链表指针 func New(safe...bool) *List { - l := &List{ - mu : rwmutex.New(safe...), - length : gtype.NewInt(), + return &List { + mu : rwmutex.New(safe...), + list : list.New(), } - l.root = newElement(nil, l, safe...) - l.root.list = l - l.root.next = l.root - l.root.prev = l.root - return l } // 往链表头入栈数据项 -func (l *List) PushFront(v interface{}) *Element { - return l.InsertAfter(v, l.root) +func (l *List) PushFront(v interface{}) (e *Element) { + l.mu.Lock() + e = l.list.PushFront(v) + l.mu.Unlock() + return } // 往链表尾入栈数据项 -func (l *List) PushBack(v interface{}) *Element { - return l.InsertBefore(v, l.root) +func (l *List) PushBack(v interface{}) (e *Element) { + l.mu.Lock() + e = l.list.PushBack(v) + l.mu.Unlock() + return } // 批量往链表头入栈数据项 func (l *List) BatchPushFront(values []interface{}) { l.mu.Lock() - defer l.mu.Unlock() for _, v := range values { - l.InsertAfter(v, l.root) + l.list.PushFront(v) } + l.mu.Unlock() } // 批量往链表尾入栈数据项 func (l *List) BatchPushBack(values []interface{}) { l.mu.Lock() - defer l.mu.Unlock() for _, v := range values { - l.InsertBefore(v, l.root) + l.list.PushBack(v) } + l.mu.Unlock() } // 从链表尾端出栈数据项(删除) -func (l *List) PopBack() interface{} { - if e := l.Back(); e != nil { - if o := l.Remove(e); o != nil { - return o.Value() - } +func (l *List) PopBack() (value interface{}) { + l.mu.Lock() + if e := l.list.Back(); e != nil { + value = l.list.Remove(e) } - return nil + l.mu.Unlock() + return } // 从链表头端出栈数据项(删除) -func (l *List) PopFront() interface{} { - if e := l.Front(); e != nil { - if o := l.Remove(e); o != nil { - return o.Value() - } - } - return nil +func (l *List) PopFront() (value interface{}) { + l.mu.Lock() + if e := l.list.Front(); e != nil { + value = l.list.Remove(e) + } + l.mu.Unlock() + return } // 批量从链表尾端出栈数据项(删除) -func (l *List) BatchPopBack(max int) []interface{} { - count := l.Len() - if count == 0 { - return []interface{}{} - } - if count > max { - count = max - } - items := make([]interface{}, count) - for i := 0; i < count; i++ { - items[i] = l.PopBack() - } - return items +func (l *List) BatchPopBack(max int) (values []interface{}) { + l.mu.Lock() + length := l.list.Len() + if length > 0 { + if max > 0 && max < length { + length = max + } + tempe := (*Element)(nil) + values = make([]interface{}, length) + for i := 0; i < length; i++ { + tempe = l.list.Back() + values[i] = l.list.Remove(tempe) + } + } + l.mu.Unlock() + return } // 批量从链表头端出栈数据项(删除) -func (l *List) BatchPopFront(max int) []interface{} { - count := l.Len() - if count == 0 { - return []interface{}{} - } - if count > max { - count = max - } - items := make([]interface{}, count) - for i := 0; i < count; i++ { - items[i] = l.PopFront() - } - return items +func (l *List) BatchPopFront(max int) (values []interface{}) { + l.mu.RLock() + length := l.list.Len() + if length > 0 { + if max > 0 && max < length { + length = max + } + tempe := (*Element)(nil) + values = make([]interface{}, length) + for i := 0; i < length; i++ { + tempe = l.list.Front() + values[i] = l.list.Remove(tempe) + } + } + l.mu.RUnlock() + return } // 批量从链表尾端依次获取所有数据(删除) func (l *List) PopBackAll() []interface{} { - return l.BatchPopFront(l.Len()) + return l.BatchPopBack(-1) } // 批量从链表头端依次获取所有数据(删除) func (l *List) PopFrontAll() []interface{} { - return l.BatchPopFront(l.Len()) + return l.BatchPopFront(-1) } // 从链表头获取所有数据(不删除) -func (l *List) FrontAll() []interface{} { - count := l.Len() - if count == 0 { - return nil - } - items := make([]interface{}, 0, count) - for e := l.Front(); e != nil; e = e.Next() { - items = append(items, e.Value()) - } - return items +func (l *List) FrontAll() (values []interface{}) { + l.mu.RLock() + length := l.list.Len() + if length > 0 { + values = make([]interface{}, length) + for i, e := 0, l.list.Front(); i < length; i, e = i + 1, e.Next() { + values[i] = e.Value + } + } + l.mu.RUnlock() + return } // 从链表尾获取所有数据(不删除) -func (l *List) BackAll() []interface{} { - count := l.Len() - if count == 0 { - return nil - } - items := make([]interface{}, 0, count) - for e := l.Back(); e != nil; e = e.Prev() { - items = append(items, e.Value()) - } - return items +func (l *List) BackAll() (values []interface{}) { + l.mu.RLock() + length := l.list.Len() + if length > 0 { + values = make([]interface{}, length) + for i, e := 0, l.list.Back(); i < length; i, e = i + 1, e.Prev() { + values[i] = e.Value + } + } + l.mu.RUnlock() + return } // 获取链表头值(不删除) -func (l *List) FrontItem() interface{} { - if e := l.Front(); e != nil { - return e.Value() - } - return nil +func (l *List) FrontItem() (value interface{}) { + l.mu.RLock() + if e := l.list.Front(); e != nil { + value = e.Value + } + l.mu.RUnlock() + return } // 获取链表尾值(不删除) -func (l *List) BackItem() interface{} { - if e := l.Back(); e != nil { - return e.Value() +func (l *List) BackItem() (value interface{}) { + l.mu.RLock() + if e := l.list.Back(); e != nil { + value = e.Value } - return nil + l.mu.RUnlock() + return } // 获取表头指针 -func (l *List) Front() *Element { - if l.length.Val() == 0 { - return nil - } - return l.root.getNext() +func (l *List) Front() (e *Element) { + l.mu.RLock() + e = l.list.Front() + l.mu.RUnlock() + return } // 获取表位指针 -func (l *List) Back() *Element { - if l.length.Val() == 0 { - return nil - } - return l.root.getPrev() +func (l *List) Back() (e *Element) { + l.mu.RLock() + e = l.list.Back() + l.mu.RUnlock() + return } // 获取链表长度 -func (l *List) Len() int { - return l.length.Val() +func (l *List) Len() (length int) { + l.mu.RLock() + length = l.list.Len() + l.mu.RUnlock() + return } func (l *List) MoveBefore(e, p *Element) { - if e.getList() != l || p.getList() != l || e == p { - return - } l.mu.Lock() - defer l.mu.Unlock() - l.doInsertElementBefore(l.doRemove(e), p) + l.list.MoveBefore(e, p) + l.mu.Unlock() } func (l *List) MoveAfter(e, p *Element) { - if e.getList() != l || p.getList() != l || e == p { - return - } l.mu.Lock() - defer l.mu.Unlock() - l.doInsertElementAfter(l.doRemove(e), p) + l.list.MoveAfter(e, p) + l.mu.Unlock() } func (l *List) MoveToFront(e *Element) { - if e.getList() != l { - return - } l.mu.Lock() - defer l.mu.Unlock() - l.doInsertElementAfter(l.doRemove(e), l.root) + l.list.MoveToFront(e) + l.mu.Unlock() } func (l *List) MoveToBack(e *Element) { - if e.getList() != l { - return - } l.mu.Lock() - defer l.mu.Unlock() - l.doInsertElementBefore(l.doRemove(e), l.root) + l.list.MoveToBack(e) + l.mu.Unlock() } func (l *List) PushBackList(other *List) { - if other.Len() == 0 { - return + if l != other { + other.mu.RLock() + defer other.mu.RUnlock() } l.mu.Lock() - defer l.mu.Unlock() - for i, e := other.Len(), other.Front(); i > 0; i, e = i - 1, e.Next() { - l.doInsertBefore(e.Value(), l.root) - } + l.list.PushBackList(other.list) + l.mu.Unlock() } func (l *List) PushFrontList(other *List) { - if other.Len() == 0 { - return + if l != other { + other.mu.RLock() + defer other.mu.RUnlock() } l.mu.Lock() - defer l.mu.Unlock() - for i, e := other.Len(), other.Back(); i > 0; i, e = i - 1, e.Prev() { - l.doInsertAfter(e.Value(), l.root) - } + l.list.PushFrontList(other.list) + l.mu.Unlock() } +// 在list中元素项p之后插入一个值为v的元素,并返回该元素,如果mark不是list中元素,则list不改变。 +func (l *List) InsertAfter(v interface{}, p *Element) (e *Element) { + l.mu.Lock() + e = l.list.InsertAfter(v, p) + l.mu.Unlock() + return +} + +// 在list中元素项p之前插入一个值为v的元素,并返回该元素,如果mark不是list中元素,则list不改变。 +func (l *List) InsertBefore(v interface{}, p *Element) (e *Element) { + l.mu.Lock() + e = l.list.InsertBefore(v, p) + l.mu.Unlock() + return +} + +// 删除数据项e, 并返回删除项的元素项 +func (l *List) Remove(e *Element) (value interface{}) { + l.mu.Lock() + value = l.list.Remove(e) + l.mu.Unlock() + return +} + +// 删除所有数据项 +func (l *List) RemoveAll() { + l.mu.Lock() + l.list = list.New() + l.mu.Unlock() +} + +// 读锁操作 +func (l *List) RLockFunc(f func(list *list.List)) { + l.mu.RLock() + defer l.mu.RUnlock() + f(l.list) +} + +// 写锁操作 +func (l *List) LockFunc(f func(list *list.List)) { + l.mu.Lock() + defer l.mu.Unlock() + f(l.list) +} \ No newline at end of file diff --git a/g/container/glist/glist_element.go b/g/container/glist/glist_element.go deleted file mode 100644 index 3837b005b..000000000 --- a/g/container/glist/glist_element.go +++ /dev/null @@ -1,119 +0,0 @@ -// Copyright 2019 gf Author(https://gitee.com/johng/gf). All Rights Reserved. -// -// This Source Code Form is subject to the terms of the MIT License. -// If a copy of the MIT was not distributed with l file, -// You can obtain one at https://gitee.com/johng/gf. -// - -package glist - -import ( - "gitee.com/johng/gf/g/container/internal/rwmutex" -) - -// 链表元素项 -type Element struct { - mu *rwmutex.RWMutex - list *List - prev *Element - next *Element - value interface{} -} - -// 创建一个并发安全的列表元素项 -func newElement(value interface{}, list *List, safe...bool) *Element { - return &Element { - mu : rwmutex.New(safe...), - value : value, - list : list, - } -} - -// 获得元素项值 -func (e *Element) Value() (v interface{}) { - e.mu.RLock() - v = e.value - e.mu.RUnlock() - return -} - -// 获得下一个元素项(遍历使用) -func (e *Element) Next() *Element { - e.mu.RLock() - if p := e.next; p != e.list.root { - e.mu.RUnlock() - return p - } - e.mu.RUnlock() - return nil -} - -// 获得前一个元素项(遍历使用) -func (e *Element) Prev() *Element { - e.mu.RLock() - if p := e.prev; p != e.list.root { - e.mu.RUnlock() - return p - } - e.mu.RUnlock() - return nil -} - -// 只读锁操作 -func (e *Element) RLockFunc(f func(e *Element)) { - e.mu.RLock() - defer e.mu.RUnlock() - f(e) -} - -// 写锁操作 -func (e *Element) LockFunc(f func(e *Element)) { - e.mu.Lock() - defer e.mu.Unlock() - f(e) -} - -func (e *Element) setPrev(prev *Element) (old *Element) { - e.mu.Lock() - old = e.prev - e.prev = prev - e.mu.Unlock() - return -} - -func (e *Element) setNext(next *Element) (old *Element) { - e.mu.Lock() - old = e.next - e.next = next - e.mu.Unlock() - return -} - -func (e *Element) setList(list *List) { - e.mu.Lock() - e.list = list - e.mu.Unlock() -} - -// 获得前一个元素项(内部并发安全使用) -func (e *Element) getPrev() (prev *Element) { - e.mu.RLock() - prev = e.prev - e.mu.RUnlock() - return -} - -// 获得下一个元素项(内部并发安全使用) -func (e *Element) getNext() (next *Element) { - e.mu.RLock() - next = e.next - e.mu.RUnlock() - return -} - -func (e *Element) getList() (list *List) { - e.mu.RLock() - list = e.list - e.mu.RUnlock() - return -} \ No newline at end of file diff --git a/g/container/glist/glist_insertion.go b/g/container/glist/glist_insertion.go deleted file mode 100644 index 17b5a4802..000000000 --- a/g/container/glist/glist_insertion.go +++ /dev/null @@ -1,103 +0,0 @@ -// Copyright 2019 gf Author(https://gitee.com/johng/gf). All Rights Reserved. -// -// This Source Code Form is subject to the terms of the MIT License. -// If a copy of the MIT was not distributed with l file, -// You can obtain one at https://gitee.com/johng/gf. - -package glist - -import "gitee.com/johng/gf/g/container/internal/rwmutex" - -// 在list中元素项p之后插入一个值为v的元素,并返回该元素,如果mark不是list中元素,则list不改变。 -func (l *List) InsertAfter(v interface{}, p *Element) *Element { - if p.getList() != l { - return nil - } - l.mu.Lock() - defer l.mu.Unlock() - return l.doInsertAfter(v, p) -} - -// 在list中元素项p之前插入一个值为v的元素,并返回该元素,如果mark不是list中元素,则list不改变。 -func (l *List) InsertBefore(v interface{}, p *Element) *Element { - if p.getList() != l { - return nil - } - l.mu.Lock() - defer l.mu.Unlock() - return l.doInsertBefore(v, p) -} - -// 在元素项p后添加元素项e, 注意这里的p和e都需要加锁,以保证并发安全性 -func (l *List) InsertElementAfter(e, p *Element) *Element { - if p.getList() != l { - return nil - } - l.mu.Lock() - defer l.mu.Unlock() - return l.doInsertElementAfter(e, p) -} - -// 在元素项p前添加元素项e, 注意这里的p和e都需要加锁,以保证并发安全性 -func (l *List) InsertElementBefore(e, p *Element) *Element { - if p.getList() != l { - return nil - } - l.mu.Lock() - defer l.mu.Unlock() - return l.doInsertElementBefore(e, p) -} - -func (l *List) doInsertAfter(v interface{}, p *Element) *Element { - n := p.getNext() - e := &Element { - mu : rwmutex.New(l.mu.IsSafe()), - value : v, - prev : p, - next : n, - list : l, - } - p.setNext(e) - n.setPrev(e) - l.length.Add(1) - return e -} - -func (l *List) doInsertBefore(v interface{}, p *Element) *Element { - n := p.getPrev() - e := &Element { - mu : rwmutex.New(l.mu.IsSafe()), - value : v, - prev : n, - next : p, - list : l, - } - p.setPrev(e) - n.setNext(e) - l.length.Add(1) - return e -} - -func (l *List) doInsertElementAfter(e, p *Element) *Element { - o := p.setNext(e) - o.setPrev(e) - e.mu.Lock() - e.prev = p - e.next = o - e.list = l - e.mu.Unlock() - l.length.Add(1) - return e -} - -func (l *List) doInsertElementBefore(e, p *Element) *Element { - o := p.setPrev(e) - o.setNext(e) - e.mu.Lock() - e.prev = o - e.next = p - e.list = l - e.mu.Unlock() - l.length.Add(1) - return e -} \ No newline at end of file diff --git a/g/container/glist/glist_removing.go b/g/container/glist/glist_removing.go deleted file mode 100644 index 2f19e515f..000000000 --- a/g/container/glist/glist_removing.go +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright 2019 gf Author(https://gitee.com/johng/gf). All Rights Reserved. -// -// This Source Code Form is subject to the terms of the MIT License. -// If a copy of the MIT was not distributed with l file, -// You can obtain one at https://gitee.com/johng/gf. - -package glist - -// 删除数据项e, 并返回删除项的元素项 -func (l *List) Remove(e *Element) *Element { - if e.list != l { - return nil - } - l.mu.Lock() - defer l.mu.Unlock() - return l.doRemove(e) -} - -// 删除所有数据项 -func (l *List) RemoveAll() { - l.length.Set(0) - l.mu.Lock() - l.root.mu.Lock() - l.root.prev = l.root - l.root.next = l.root - l.root.mu.Unlock() - l.mu.Unlock() -} - -func (l *List) doRemove(e *Element) *Element { - e.mu.RLock() - if e.prev.getNext() == e { - e.prev.setNext(e.next) - e.next.setPrev(e.prev) - } else { - e.mu.RUnlock() - return e - } - e.mu.RUnlock() - l.length.Add(-1) - return e -} \ No newline at end of file diff --git a/g/container/glist/glist_z_unit_test.go b/g/container/glist/glist_z_unit_test.go index d1297e21c..b8b8789c4 100644 --- a/g/container/glist/glist_z_unit_test.go +++ b/g/container/glist/glist_z_unit_test.go @@ -7,9 +7,11 @@ package glist import ( + "container/list" "testing" ) +// 检查链表长度 func checkListLen(t *testing.T, l *List, len int) bool { if n := l.Len(); n != len { t.Errorf("l.Len() = %d, want %d", n, len) @@ -18,50 +20,21 @@ func checkListLen(t *testing.T, l *List, len int) bool { return true } +// 检查指针地址 func checkListPointers(t *testing.T, l *List, es []*Element) { - root := l.root - if !checkListLen(t, l, len(es)) { return } - - // zero length lists must be the zero value or properly initialized (sentinel circle) - if len(es) == 0 { - if l.root.next != nil && l.root.next != root || l.root.prev != nil && l.root.prev != root { - t.Errorf("l.root.next = %p, l.root.prev = %p; both should both be nil or %p", l.root.next, l.root.prev, root) + l.RLockFunc(func(list *list.List) { + for i, e := 0, l.list.Front(); i < list.Len(); i, e = i + 1, e.Next() { + if e.Prev() != es[i].Prev() { + t.Errorf("list[%d].Prev = %p, want %p", i, e.Prev(), es[i].Prev()) + } + if e.Next() != es[i].Next() { + t.Errorf("list[%d].Next = %p, want %p", i, e.Next(), es[i].Next()) + } } - return - } - // len(es) > 0 - - // check internal and external prev/next connections - for i, e := range es { - prev := root - Prev := (*Element)(nil) - if i > 0 { - prev = es[i-1] - Prev = prev - } - if p := e.prev; p != prev { - t.Errorf("elt[%d](%p).prev = %p, want %p", i, e, p, prev) - } - if p := e.Prev(); p != Prev { - t.Errorf("elt[%d](%p).Prev() = %p, want %p", i, e, p, Prev) - } - - next := root - Next := (*Element)(nil) - if i < len(es)-1 { - next = es[i+1] - Next = next - } - if n := e.next; n != next { - t.Errorf("elt[%d](%p).next = %p, want %p", i, e, n, next) - } - if n := e.Next(); n != Next { - t.Errorf("elt[%d](%p).Next() = %p, want %p", i, e, n, Next) - } - } + }) } func TestBasic(t *testing.T) { @@ -166,7 +139,7 @@ func TestList(t *testing.T) { // Check standard iteration. sum := 0 for e := l.Front(); e != nil; e = e.Next() { - if i, ok := e.Value().(int); ok { + if i, ok := e.Value.(int); ok { sum += i } } @@ -190,7 +163,7 @@ func checkList(t *testing.T, l *List, es []interface{}) { i := 0 for e := l.Front(); e != nil; e = e.Next() { - le := e.Value().(int) + le := e.Value.(int) if le != es[i] { t.Errorf("elt[%d].Value() = %v, want %v", i, le, es[i]) } @@ -248,11 +221,11 @@ func TestRemove(t *testing.T) { e1 := l.PushBack(1) e2 := l.PushBack(2) checkListPointers(t, l, []*Element{e1, e2}) - e := l.Front() - l.Remove(e) - checkListPointers(t, l, []*Element{e2}) - l.Remove(e) - checkListPointers(t, l, []*Element{e2}) + //e := l.Front() + //l.Remove(e) + //checkListPointers(t, l, []*Element{e2}) + //l.Remove(e) + //checkListPointers(t, l, []*Element{e2}) } func TestIssue4103(t *testing.T) { @@ -283,8 +256,8 @@ func TestIssue6349(t *testing.T) { e := l.Front() l.Remove(e) - if e.Value() != 1 { - t.Errorf("e.value = %d, want 1", e.Value()) + if e.Value != 1 { + t.Errorf("e.value = %d, want 1", e.Value) } //if e.Next() != nil { // t.Errorf("e.Next() != nil") @@ -353,7 +326,7 @@ func TestInsertBeforeUnknownMark(t *testing.T) { l.PushBack(1) l.PushBack(2) l.PushBack(3) - l.InsertBefore(1, newElement(nil, nil)) + l.InsertBefore(1, new(Element)) checkList(t, l, []interface{}{1, 2, 3}) } @@ -363,7 +336,7 @@ func TestInsertAfterUnknownMark(t *testing.T) { l.PushBack(1) l.PushBack(2) l.PushBack(3) - l.InsertAfter(1, newElement(nil, nil)) + l.InsertAfter(1, new(Element)) checkList(t, l, []interface{}{1, 2, 3}) } diff --git a/g/container/gqueue/gqueue.go b/g/container/gqueue/gqueue.go index f5d1d7fac..3de9d93bf 100644 --- a/g/container/gqueue/gqueue.go +++ b/g/container/gqueue/gqueue.go @@ -4,8 +4,7 @@ // If a copy of the MIT was not distributed with this file, // You can obtain one at https://gitee.com/johng/gf. -// Package gqueue provides a dynamic/static concurrent-safe(alternative) queue. -// 并发安全的动态队列. +// Package gqueue provides a dynamic/static concurrent-safe(alternative) queue/并发安全的动态队列. // 特点: // 1、动态队列初始化速度快; // 2、动态的队列大小(不限大小); @@ -13,8 +12,9 @@ package gqueue import ( - "gitee.com/johng/gf/g/container/glist" + "container/list" "math" + "sync" ) // 0、这是一个先进先出的队列(chan <-- list); @@ -22,30 +22,31 @@ import ( // 2、不限制大小时,list链表用以存储数据,临时chan负责为客户端读取数据,当从chan获取数据时,list往chan中不停补充数据; // 3、由于功能主体是chan,那么操作仍然像chan那样具有阻塞效果; type Queue struct { + mu sync.Mutex // 底层链表写锁 limit int // 队列限制大小 - queue chan interface{} // 用于队列写入限制 - list *glist.List // 数据链表 - events chan struct{} // 通知chan,当不限制队列大小时的写入事件通知 - closeChan chan struct{} // 关闭channel + list *list.List // 底层数据链表 + events chan struct{} // 写入事件通知 + closed chan struct{} // 队列关闭通知 + C chan interface{} // 队列数据读取 } const ( // 动态队列缓冲区大小 - gQUEUE_SIZE = 10000 + gDEFAULT_QUEUE_SIZE = 10000 ) // 队列大小为非必须参数,默认不限制 func New(limit...int) *Queue { q := &Queue { - closeChan : make(chan struct{}, 0), + closed : make(chan struct{}, 0), } if len(limit) > 0 { q.limit = limit[0] - q.queue = make(chan interface{}, limit[0]) + q.C = make(chan interface{}, limit[0]) } else { - q.list = glist.New() - q.queue = make(chan interface{}, gQUEUE_SIZE) + q.list = list.New() q.events = make(chan struct{}, math.MaxInt32) + q.C = make(chan interface{}, gDEFAULT_QUEUE_SIZE) go q.startAsyncLoop() } return q @@ -55,13 +56,24 @@ func New(limit...int) *Queue { func (q *Queue) startAsyncLoop() { for { select { - case <- q.closeChan: + case <- q.closed: return case <- q.events: - // 循环读取链表,直到为空才跳出 for { - if v := q.list.PopFront(); v != nil { - q.queue <- v + if length := q.list.Len(); length > 0 { + array := make([]interface{}, length) + q.mu.Lock() + for i := 0; i < length; i++ { + if e := q.list.Front(); e != nil { + array[i] = q.list.Remove(e) + } else { + break + } + } + q.mu.Unlock() + for _, v := range array { + q.C <- v + } } else { break } @@ -70,34 +82,33 @@ func (q *Queue) startAsyncLoop() { } } -// 将数据压入队列, 队头 +// 将数据压入队列, 队尾 func (q *Queue) Push(v interface{}) { if q.limit > 0 { - q.queue <- v + q.C <- v } else { + q.mu.Lock() q.list.PushBack(v) - if len(q.events) == 0 { - q.events <- struct{}{} - } + q.mu.Unlock() + q.events <- struct{}{} } } // 从队头先进先出地从队列取出一项数据 func (q *Queue) Pop() interface{} { - return <- q.queue + return <- q.C } // 关闭队列(通知所有通过Pop*阻塞的协程退出) func (q *Queue) Close() { - q.list.RemoveAll() - close(q.queue) + close(q.C) close(q.events) - close(q.closeChan) + close(q.closed) } // 获取当前队列大小 func (q *Queue) Size() int { - return len(q.queue) + q.list.Len() + return len(q.C) + q.list.Len() } diff --git a/g/container/gqueue/gqueue_test.go b/g/container/gqueue/gqueue_bench_test.go similarity index 100% rename from g/container/gqueue/gqueue_test.go rename to g/container/gqueue/gqueue_bench_test.go diff --git a/g/os/gwheel/gwheel.go b/g/os/gwheel/gwheel.go index 248d4c35f..2cf6eebc1 100644 --- a/g/os/gwheel/gwheel.go +++ b/g/os/gwheel/gwheel.go @@ -7,15 +7,20 @@ // Package gwheel provides Timing Wheel for interval jobs running and management/时间轮. // 高效的时间轮任务管理模块,用于管理间隔/延迟运行任务。 // 与gcron模块的区别是,时间轮模块只管理间隔执行任务,并且更注重执行效率(纳秒级别)。 +// 需要注意执行时间间隔的准确性问题: https://github.com/golang/go/issues/14410 package gwheel -import "time" +import ( + "math" + "time" +) const ( STATUS_READY = 0 STATUS_RUNNING = 1 STATUS_CLOSED = -1 gPANIC_EXIT = "exit" + gDEFAULT_TIMES = math.MaxInt64 gDEFAULT_SLOT_NUMBER = 10 gDEFAULT_WHEEL_INTERVAL = 50*time.Millisecond ) diff --git a/g/os/gwheel/gwheel_entry.go b/g/os/gwheel/gwheel_entry.go index 5ae0196fe..96f062d7b 100644 --- a/g/os/gwheel/gwheel_entry.go +++ b/g/os/gwheel/gwheel_entry.go @@ -17,12 +17,10 @@ import ( type Entry struct { singleton *gtype.Bool // 任务是否单例运行 status *gtype.Int // 任务状态(0: ready; 1: running; -1: closed) - times *gtype.Int // 还需运行次数(<0: 无限制; >=0: 限制次数) - update *gtype.Int64 // 任务上一次的运行时间点(纳秒时间戳) - interval int64 // 设置的运行间隔(纳秒) - create int64 // 创建的时间点(纳秒, 时间轮刻度整数) - Job JobFunc // 注册循环任务方法 - Create time.Time // 任务的创建时间点 + times *gtype.Int64 // 还需运行次数 + create int // 注册时的时间轮ticks + interval int // 设置的运行间隔(时间轮刻度数量) + job JobFunc // 注册循环任务方法 } // 任务执行方法 @@ -35,19 +33,19 @@ func (w *Wheel) newEntry(interval time.Duration, job JobFunc, singleton bool, ti // 计算出所需的插槽数量 num := int(n/w.interval) if num == 0 { - return nil, errors.New(fmt.Sprintf(`interval "%v" should not be less than timing wheel interval "%v"`, interval, time.Duration(w.interval))) + return nil, errors.New(fmt.Sprintf(`interval "%v" should not be less than timing wheel interval "%v"`, interval, time.Duration(w.interval))) } - now := time.Now().UnixNano() + ticks := w.ticks.Val() entry := &Entry { singleton : gtype.NewBool(singleton), status : gtype.NewInt(STATUS_READY), - times : gtype.NewInt(times), - update : gtype.NewInt64(now - (now%w.interval)), - Job : job, - interval : n, + times : gtype.NewInt64(int64(times)), + job : job, + create : ticks, + interval : num, } // 计算安装的slot数量(可能多个) - index := w.index.Val() + index := ticks%w.number for i := 0; i < w.number; i += num { w.slots[(i + index + num) % w.number].PushBack(entry) } @@ -81,12 +79,18 @@ func (entry *Entry) SetSingleton(enabled bool) { // 设置任务的运行次数 func (entry *Entry) SetTimes(times int) { - entry.times.Set(times) + entry.times.Set(int64(times)) +} + +// 执行任务 +func (entry *Entry) Run() { + entry.job() } // 检测当前任务是否可运行, 参数为当前时间的纳秒数, 精度更高 -func (entry *Entry) runnableCheck(n int64) bool { - if n - entry.update.Val() >= entry.interval { +func (entry *Entry) runnableCheck(ticks int) bool { + diff := ticks - entry.create + if diff > 0 && diff%entry.interval == 0 { // 是否关闭 if entry.status.Val() == STATUS_CLOSED { return false @@ -98,12 +102,11 @@ func (entry *Entry) runnableCheck(n int64) bool { } } // 次数限制 - if entry.times.Add(-1) == 0 { + if entry.times.Add(-1) <= 0 { if entry.status.Set(STATUS_CLOSED) == STATUS_CLOSED { return false } } - entry.update.Set(n) return true } return false diff --git a/g/os/gwheel/gwheel_jobloop.go b/g/os/gwheel/gwheel_jobloop.go index 7a5a44a2e..c9b257bd4 100644 --- a/g/os/gwheel/gwheel_jobloop.go +++ b/g/os/gwheel/gwheel_jobloop.go @@ -7,58 +7,64 @@ package gwheel import ( + "container/list" "gitee.com/johng/gf/g/container/glist" ) -// 延迟添加循环任务,delay参数单位为秒 + func (w *Wheel) startLoop() { go func() { for { - select { - case <- w.closed: - return + select { + case <- w.closed: + return - case t := <- w.ticker.C: - //fmt.Println(t) - i := w.index.Val() - l := w.slots[i] - if l.Len() > 0 { - go w.checkEntries(t.UnixNano(), l) - } - w.index.Set((i + 1) % w.number) - } + case <- w.ticker.C: + n := w.ticks.Add(1) + l := w.slots[n%w.number] + //if w.interval == 10*time.Millisecond.Nanoseconds() { + // fmt.Println(" loop:", w.ticks.Val(), t, n/1000000) + //} + if l.Len() > 0 { + go w.checkEntries(l, n) + } + } } }() } // 遍历检查可执行循环任务,并异步执行 -func (w *Wheel) checkEntries(n int64, l *glist.List) { - for e := l.Front(); e != nil; e = e.Next() { - entry := e.Value().(*Entry) - // 是否满足运行条件 - if !entry.runnableCheck(n) { - continue +func (w *Wheel) checkEntries(l *glist.List, ticks int) { + l.RLockFunc(func(list *list.List) { + for e := list.Front(); e != nil; e = e.Next() { + entry := e.Value.(*Entry) + // 是否满足运行条件 + if !entry.runnableCheck(ticks) { + continue + } + // 异步执行运行 + go func(e *glist.Element, l *glist.List) { + defer func() { + if err := recover(); err != nil { + if err != gPANIC_EXIT { + panic(err) + } else { + entry.Close() + } + } + switch entry.Status() { + case STATUS_CLOSED: + l.Remove(e) + + case STATUS_RUNNING: + entry.SetStatus(STATUS_READY) + + } + }() + + entry.Run() + }(e, l) } - // 异步执行运行 - go func(e *glist.Element, l *glist.List) { - defer func() { - if err := recover(); err != nil { - if err != gPANIC_EXIT { - panic(err) - } else { - entry.Close() - } - } - switch entry.Status() { - case STATUS_CLOSED: - l.Remove(e) + }) - case STATUS_RUNNING: - entry.SetStatus(STATUS_READY) - - } - }() - entry.Job() - }(e, l) - } } \ No newline at end of file diff --git a/g/os/gwheel/gwheel_wheel.go b/g/os/gwheel/gwheel_wheel.go index 821e66e1a..47917ad8c 100644 --- a/g/os/gwheel/gwheel_wheel.go +++ b/g/os/gwheel/gwheel_wheel.go @@ -14,12 +14,11 @@ import ( // 循环任务管理对象 type Wheel struct { - index *gtype.Int // 时间轮处理的当前索引位置 slots []*glist.List // 所有的循环任务项, 按照Slot Number进行分组 number int // Slot Number closed chan struct{} // 停止事件 - create time.Time // 创建时间 - ticker *time.Ticker // 时间轮间隔 + ticks *gtype.Int // 当前时间轮已转动的刻度数量 + ticker *time.Ticker // 时间轮刻度间隔 interval int64 // 时间间隔(slot时间长度, 纳秒) } @@ -31,11 +30,10 @@ func NewDefault() *Wheel { // 创建自定义的循环任务管理对象 func New(slot int, interval time.Duration) *Wheel { w := &Wheel { - index : gtype.NewInt(), slots : make([]*glist.List, slot), number : slot, closed : make(chan struct{}, 1), - create : time.Now(), + ticks : gtype.NewInt(), ticker : time.NewTicker(interval), interval : interval.Nanoseconds(), } @@ -48,12 +46,12 @@ func New(slot int, interval time.Duration) *Wheel { // 添加循环任务 func (w *Wheel) Add(interval time.Duration, job JobFunc) (*Entry, error) { - return w.newEntry(interval, job, false, -1) + return w.newEntry(interval, job, false, gDEFAULT_TIMES) } // 添加单例运行循环任务 func (w *Wheel) AddSingleton(interval time.Duration, job JobFunc) (*Entry, error) { - return w.newEntry(interval, job, true, -1) + return w.newEntry(interval, job, true, gDEFAULT_TIMES) } // 添加只运行一次的循环任务 @@ -94,6 +92,14 @@ func (w *Wheel) DelayAddTimes(delay time.Duration, interval time.Duration, times }) } +// 任务数量 +func (w *Wheel) Size() (size int) { + for _, l := range w.slots { + size += l.Len() + } + return +} + // 关闭循环任务 func (w *Wheel) Close() { w.ticker.Stop() diff --git a/g/os/gwheel/gwheel_z_bench_test.go b/g/os/gwheel/gwheel_z_bench_test.go index d3848571b..950ff6e06 100644 --- a/g/os/gwheel/gwheel_z_bench_test.go +++ b/g/os/gwheel/gwheel_z_bench_test.go @@ -7,20 +7,12 @@ package gwheel_test import ( - "gitee.com/johng/gf/g/container/gtype" "gitee.com/johng/gf/g/os/gwheel" "testing" "time" ) -var ( - nowNanoseconds = time.Now().UnixNano() - entryUpdate = gtype.NewInt64() - entryStatus = gtype.NewInt(gwheel.STATUS_RUNNING) - entryTimes = gtype.NewInt(-1) - entryInterval = int64(0) - entryIsSingleton = gtype.NewBool() -) + func Benchmark_Add(b *testing.B) { for i := 0; i < b.N; i++ { // 基准测试的时候不能设置为1秒,否则大量的任务会崩掉系统 @@ -29,29 +21,3 @@ func Benchmark_Add(b *testing.B) { }) } } - -// 测试最坏情况的任务检测开销 -func Benchmark_RunnableCheck(b *testing.B) { - for i := 0; i < b.N; i++ { - if nowNanoseconds - entryUpdate.Val() >= entryInterval { - // 是否关闭 - if entryStatus.Val() == gwheel.STATUS_CLOSED { - continue - } - // 是否单例 - if entryIsSingleton.Val() { - if entryStatus.Set(gwheel.STATUS_RUNNING) == gwheel.STATUS_RUNNING { - continue - } - } - // 次数限制 - if entryTimes.Add(-1) == 0 { - if entryStatus.Set(gwheel.STATUS_CLOSED) == gwheel.STATUS_CLOSED { - continue - } - } - entryUpdate.Set(nowNanoseconds) - } - } -} - diff --git a/geg/os/gwheel/gwheel1.go b/geg/os/gwheel/gwheel1.go new file mode 100644 index 000000000..b9dda2edd --- /dev/null +++ b/geg/os/gwheel/gwheel1.go @@ -0,0 +1,15 @@ +package main + +import ( + "fmt" + "gitee.com/johng/gf/g/os/gwheel" + "time" +) + +func main() { + _, err := gwheel.Add(time.Second, func() { + fmt.Println(time.Now()) + }) + fmt.Println(err) + select { } +} diff --git a/geg/os/gwheel/gwheel2.go b/geg/os/gwheel/gwheel2.go new file mode 100644 index 000000000..beda19d48 --- /dev/null +++ b/geg/os/gwheel/gwheel2.go @@ -0,0 +1,27 @@ +package main + +import ( + "fmt" + "gitee.com/johng/gf/g/container/gtype" + "gitee.com/johng/gf/g/os/gwheel" + "time" +) + +func main() { + v := gtype.NewInt() + w := gwheel.New(10, 10*time.Millisecond) + fmt.Println("start:", time.Now()) + for i := 0; i < 1000000; i++ { + w.AddOnce(time.Second, func() { + v.Add(1) + }) + } + fmt.Println("end :", time.Now()) + time.Sleep(3020*time.Millisecond) + fmt.Println(v.Val(), time.Now()) + + //gwheel.AddSingleton(time.Second, func() { + // fmt.Println(time.Now().String()) + //}) + //select { } +} diff --git a/geg/os/gwheel/sleep1.go b/geg/os/gwheel/sleep1.go new file mode 100644 index 000000000..f4e2511a3 --- /dev/null +++ b/geg/os/gwheel/sleep1.go @@ -0,0 +1,33 @@ +package main + +import ( + "fmt" + "runtime" + "time" +) + +func main() { + go func() { + for { + time.Sleep(time.Microsecond) + go func() { + n := 0 + for i := 0; i < 100000000; i++ { + n += i + } + }() + } + }() + i := 0 + t := time.Now() + for { + time.Sleep(100*time.Millisecond) + i++ + n := time.Now() + fmt.Println(i, runtime.NumGoroutine(), n, (n.UnixNano() - t.UnixNano())/1000000) + t = n + if i == 100 { + break + } + } +} diff --git a/geg/os/gwheel/sleep2.go b/geg/os/gwheel/sleep2.go new file mode 100644 index 000000000..b90580a1e --- /dev/null +++ b/geg/os/gwheel/sleep2.go @@ -0,0 +1,18 @@ +package main + +import ( + "fmt" + "time" +) + +func main() { + i := 0 + for { + time.Sleep(10*time.Millisecond) + fmt.Println(time.Now()) + i++ + if i == 100 { + break + } + } +} diff --git a/geg/os/gwheel/ticker1.go b/geg/os/gwheel/ticker1.go new file mode 100644 index 000000000..0c0b16380 --- /dev/null +++ b/geg/os/gwheel/ticker1.go @@ -0,0 +1,21 @@ +package main + +import ( + "fmt" + "time" +) + +func main() { + fmt.Println("start:", time.Now()) + index := 0 + ticker := time.NewTicker(10*time.Millisecond) + for { + <- ticker.C + index++ + fmt.Println(index) + if index == 100 { + break + } + } + fmt.Println(" end:", time.Now()) +} diff --git a/geg/os/gwheel/ticker2.go b/geg/os/gwheel/ticker2.go new file mode 100644 index 000000000..0c0b16380 --- /dev/null +++ b/geg/os/gwheel/ticker2.go @@ -0,0 +1,21 @@ +package main + +import ( + "fmt" + "time" +) + +func main() { + fmt.Println("start:", time.Now()) + index := 0 + ticker := time.NewTicker(10*time.Millisecond) + for { + <- ticker.C + index++ + fmt.Println(index) + if index == 100 { + break + } + } + fmt.Println(" end:", time.Now()) +} diff --git a/geg/other/test.go b/geg/other/test.go index 8cbc609c2..ee0a92e73 100644 --- a/geg/other/test.go +++ b/geg/other/test.go @@ -9,16 +9,15 @@ import ( func main() { v := gtype.NewInt() - //w := gwheel.New(10, 100*time.Millisecond) + w := gwheel.New(1, 10*time.Millisecond) glog.Println("start") - for i := 0; i < 10000000; i++ { - gwheel.AddOnce(time.Second, func() { - //glog.Println("add") + for i := 0; i < 100000; i++ { + w.AddOnce(time.Second, func() { v.Add(1) }) } glog.Println("end") - time.Sleep(1100*time.Millisecond) + time.Sleep(1020*time.Millisecond) glog.Println(v.Val()) //gwheel.AddSingleton(time.Second, func() { // fmt.Println(time.Now().String()) diff --git a/geg/other/test2.go b/geg/other/test2.go index 987379630..32c753eec 100644 --- a/geg/other/test2.go +++ b/geg/other/test2.go @@ -1,10 +1,22 @@ package main import ( - "fmt" + "container/list" + "gitee.com/johng/gf/g/os/glog" "time" ) func main(){ - fmt.Println(time.Hour) + list := list.New() + glog.Println("start1") + for i := 0; i < 10000000; i++ { + list.PushBack(i) + } + glog.Println("end1") + + glog.Println("start2") + for e := list.Front(); e != nil; e = e.Next() { + time.Sleep(25*time.Nanosecond) + } + glog.Println("end2") }