This commit is contained in:
John
2019-01-06 11:09:50 +08:00
parent eae857bcf7
commit bb5d84c29c
9 changed files with 83 additions and 40 deletions

View File

@ -284,11 +284,11 @@ func (l *List) remove(e *Element) *Element {
e.next.setPrev(e.prev)
e.mu.RUnlock()
e.mu.Lock()
e.next = nil
e.prev = nil
e.list = nil
e.mu.Unlock()
//e.mu.Lock()
//e.next = nil
//e.prev = nil
//e.list = nil
//e.mu.Unlock()
l.length.Add(-1)
return e
}

View File

@ -83,7 +83,6 @@ func (lru *memCacheLru) SyncAndClear() {
gwheel.Exit()
return
}
fmt.Println("loop", time.Now())
// 数据同步
for {
if v := lru.rawList.PopFront(); v != nil {
@ -100,7 +99,6 @@ func (lru *memCacheLru) SyncAndClear() {
// 数据清理
for i := lru.Size() - lru.cache.cap; i > 0; i-- {
if s := lru.Pop(); s != nil {
fmt.Println("clear:", s)
lru.cache.clearByKey(s, true)
}
}

View File

@ -11,6 +11,7 @@ import (
"fmt"
"gitee.com/johng/gf/g/container/garray"
"gitee.com/johng/gf/g/os/gcron"
"gitee.com/johng/gf/g/os/glog"
"gitee.com/johng/gf/g/util/gtest"
"testing"
"time"
@ -21,15 +22,18 @@ func TestCron_Add_Close(t *testing.T) {
cron := gcron.New()
array := garray.New(0, 0)
_, err1 := cron.Add("* * * * * *", func() {
glog.Println("cron1")
array.Append(1)
})
_, err2 := cron.Add("* * * * * *", func() {
glog.Println("cron2")
array.Append(1)
}, "test")
_, err3 := cron.Add("* * * * * *", func() {
array.Append(1)
}, "test")
_, err4 := cron.Add("@every 2s", func() {
glog.Println("cron3")
array.Append(1)
})
gtest.Assert(err1, nil)
@ -37,14 +41,14 @@ func TestCron_Add_Close(t *testing.T) {
gtest.AssertNE(err3, nil)
gtest.Assert(err4, nil)
gtest.Assert(cron.Size(), 3)
time.Sleep(1200*time.Millisecond)
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 2)
time.Sleep(1200*time.Millisecond)
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 5)
cron.Close()
time.Sleep(1200*time.Millisecond)
time.Sleep(1100*time.Millisecond)
fixedLength := array.Len()
time.Sleep(1200*time.Millisecond)
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), fixedLength)
})
}

View File

@ -5,8 +5,8 @@
// You can obtain one at https://gitee.com/johng/gf.
// Package gwheel provides Timing Wheel for interval jobs running and management/时间轮.
// 高效的时间轮任务执行管理,用于管理异步的间隔运行任务,或者异步只运行一次的任务(默认最小时间粒度为秒)
// 与其他定时任务管理模块的区别是,时间轮模块只管理间隔执行任务,并且更注重执行效率(纳秒级别)。
// 高效的时间轮任务管理模块,用于管理间隔/延迟运行任务
// 与gcron模块的区别是,时间轮模块只管理间隔执行任务,并且更注重执行效率(纳秒级别)。
package gwheel
import "time"
@ -16,8 +16,8 @@ const (
STATUS_RUNNING = 1
STATUS_CLOSED = -1
gPANIC_EXIT = "exit"
gDEFAULT_SLOT_NUMBER = 100
gDEFAULT_WHEEL_INTERVAL = 10*time.Millisecond
gDEFAULT_SLOT_NUMBER = 10
gDEFAULT_WHEEL_INTERVAL = 50*time.Millisecond
)
var (

View File

@ -35,7 +35,7 @@ func (w *Wheel) newEntry(interval time.Duration, job JobFunc, singleton bool, ti
// 计算出所需的插槽数量
num := int(n/w.interval)
if num == 0 {
return nil, errors.New(fmt.Sprintf(`interval "%v" should not be less than timing wheel interval`, interval))
return nil, errors.New(fmt.Sprintf(`interval "%v" should not be less than timing wheel interval "%v"`, interval, time.Duration(w.interval)))
}
now := time.Now()
nano := now.UnixNano()

View File

@ -39,7 +39,7 @@ func (w *Wheel) checkEntries(n int64, l *glist.List) {
entry := e.Value().(*Entry)
// 是否已停止运行, 那么移除
if entry.Status() == STATUS_CLOSED {
l.Remove(e)
//l.Remove(e)
continue
}
// 是否满足运行条件
@ -49,24 +49,18 @@ func (w *Wheel) checkEntries(n int64, l *glist.List) {
// 异步执行运行
go func(e *glist.Element, l *glist.List) {
defer func() {
if err := recover(); err != nil {
if err != gPANIC_EXIT {
panic(err)
} else {
entry.Close()
}
}
switch entry.Status() {
case STATUS_CLOSED:
l.Remove(e)
case STATUS_RUNNING:
entry.SetStatus(STATUS_READY)
}
if err := recover(); err != nil {
if err != gPANIC_EXIT {
panic(err)
} else {
entry.Close()
}
}
if entry.Status() == STATUS_RUNNING {
entry.SetStatus(STATUS_READY)
}
}()
entry.Job()
}(e, l)
}
}

View File

@ -7,11 +7,20 @@
package gwheel_test
import (
"gitee.com/johng/gf/g/container/gtype"
"gitee.com/johng/gf/g/os/gwheel"
"testing"
"time"
)
var (
nowNanoseconds = time.Now().UnixNano()
entryUpdate = gtype.NewInt64()
entryStatus = gtype.NewInt(gwheel.STATUS_RUNNING)
entryTimes = gtype.NewInt(-1)
entryInterval = int64(0)
entryIsSingleton = gtype.NewBool()
)
func Benchmark_Add(b *testing.B) {
for i := 0; i < b.N; i++ {
// 基准测试的时候不能设置为1秒否则大量的任务会崩掉系统
@ -20,3 +29,29 @@ func Benchmark_Add(b *testing.B) {
})
}
}
// 测试最坏情况的任务检测开销
func Benchmark_RunnableCheck(b *testing.B) {
for i := 0; i < b.N; i++ {
if nowNanoseconds - entryUpdate.Val() >= entryInterval {
// 是否关闭
if entryStatus.Val() == gwheel.STATUS_CLOSED {
continue
}
// 是否单例
if entryIsSingleton.Val() {
if entryStatus.Set(gwheel.STATUS_RUNNING) == gwheel.STATUS_RUNNING {
continue
}
}
// 次数限制
if entryTimes.Add(-1) == 0 {
if entryStatus.Set(gwheel.STATUS_CLOSED) == gwheel.STATUS_CLOSED {
continue
}
}
entryUpdate.Set(nowNanoseconds)
}
}
}

View File

@ -1,14 +1,27 @@
package main
import (
"fmt"
"gitee.com/johng/gf/g/container/gtype"
"gitee.com/johng/gf/g/os/glog"
"gitee.com/johng/gf/g/os/gwheel"
"time"
)
func main() {
gwheel.AddSingleton(time.Second, func() {
fmt.Println(time.Now().String())
})
select { }
v := gtype.NewInt()
w := gwheel.New(100, 10*time.Millisecond)
glog.Println("start")
for i := 0; i < 10000000; i++ {
w.AddOnce(time.Second, func() {
//glog.Println("add")
v.Add(1)
})
}
glog.Println("end")
time.Sleep(1100*time.Millisecond)
glog.Println(v.Val())
//gwheel.AddSingleton(time.Second, func() {
// fmt.Println(time.Now().String())
//})
//select { }
}

View File

@ -6,6 +6,5 @@ import (
)
func main(){
t := time.Hour
fmt.Println(t.Nanoseconds())
fmt.Println(time.Hour)
}