mirror of
https://gitee.com/johng/gf
synced 2026-06-07 02:12:11 +08:00
gcron, gtimer updates
This commit is contained in:
@ -23,6 +23,7 @@ func TestCron_Entry_Operations(t *testing.T) {
|
||||
cron := gcron.New()
|
||||
array := garray.New(0, 0)
|
||||
cron.DelayAddTimes(500*time.Millisecond, "* * * * * *", 2, func() {
|
||||
glog.Println("add times")
|
||||
array.Append(1)
|
||||
})
|
||||
gtest.Assert(cron.Size(), 0)
|
||||
@ -52,7 +53,7 @@ func TestCron_Entry_Operations(t *testing.T) {
|
||||
gtest.Assert(cron.Size(), 1)
|
||||
entry.Start()
|
||||
glog.Println("start")
|
||||
time.Sleep(1000*time.Millisecond)
|
||||
time.Sleep(1200*time.Millisecond)
|
||||
gtest.Assert(array.Len(), 2)
|
||||
gtest.Assert(cron.Size(), 1)
|
||||
entry.Close()
|
||||
|
||||
@ -126,11 +126,12 @@ func (entry *Entry) Run() {
|
||||
}
|
||||
|
||||
// 检测当前任务是否可运行。
|
||||
func (entry *Entry) check(nowTicks int64, nowMs int64) bool {
|
||||
func (entry *Entry) check(nowTicks int64, nowMs int64) (runnable, addable bool) {
|
||||
switch entry.status.Val() {
|
||||
case STATUS_STOPPED: fallthrough
|
||||
case STATUS_STOPPED:
|
||||
return false, true
|
||||
case STATUS_CLOSED:
|
||||
return false
|
||||
return false, false
|
||||
}
|
||||
// 时间轮客户端判断,是否满足运行刻度条件,误差会比较大
|
||||
if diff := nowTicks - entry.create; diff > 0 && diff%entry.interval == 0 {
|
||||
@ -141,7 +142,7 @@ func (entry *Entry) check(nowTicks int64, nowMs int64) bool {
|
||||
// 表示新增(当添加任务后在下一时间轮刻度马上触发)
|
||||
case diffMs < entry.wheel.timer.intervalMs:
|
||||
entry.wheel.slots[(nowTicks+entry.interval)%entry.wheel.number].PushBack(entry)
|
||||
return false
|
||||
return false, false
|
||||
|
||||
// 正常任务
|
||||
case diffMs >= entry.wheel.timer.intervalMs:
|
||||
@ -149,7 +150,7 @@ func (entry *Entry) check(nowTicks int64, nowMs int64) bool {
|
||||
if leftMs := entry.intervalMs - diffMs; leftMs > entry.wheel.timer.intervalMs {
|
||||
// 往底层添加,通过毫秒计算并重新添加任务到对应的时间轮上,减小运行误差
|
||||
entry.wheel.timer.doAddEntryByParent(leftMs, entry)
|
||||
return false
|
||||
return false, false
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -157,7 +158,7 @@ func (entry *Entry) check(nowTicks int64, nowMs int64) bool {
|
||||
if entry.IsSingleton() {
|
||||
// 注意原子操作结果判断
|
||||
if entry.status.Set(STATUS_RUNNING) == STATUS_RUNNING {
|
||||
return false
|
||||
return false, true
|
||||
}
|
||||
}
|
||||
// 次数限制
|
||||
@ -165,7 +166,7 @@ func (entry *Entry) check(nowTicks int64, nowMs int64) bool {
|
||||
if times <= 0 {
|
||||
// 注意原子操作结果判断
|
||||
if entry.status.Set(STATUS_CLOSED) == STATUS_CLOSED || times < 0 {
|
||||
return false
|
||||
return false, false
|
||||
}
|
||||
}
|
||||
// 是否不限制运行次数
|
||||
@ -173,7 +174,7 @@ func (entry *Entry) check(nowTicks int64, nowMs int64) bool {
|
||||
times = gDEFAULT_TIMES
|
||||
entry.times.Set(gDEFAULT_TIMES)
|
||||
}
|
||||
return true
|
||||
return true, true
|
||||
}
|
||||
return false
|
||||
return false, true
|
||||
}
|
||||
|
||||
@ -49,7 +49,8 @@ func (w *wheel) proceed() {
|
||||
entry = v.(*Entry)
|
||||
}
|
||||
// 是否满足运行条件
|
||||
if entry.check(nowTicks, nowMs) {
|
||||
runnable, addable := entry.check(nowTicks, nowMs)
|
||||
if runnable {
|
||||
// 异步执行运行
|
||||
go func(entry *Entry) {
|
||||
defer func() {
|
||||
@ -68,7 +69,7 @@ func (w *wheel) proceed() {
|
||||
}(entry)
|
||||
}
|
||||
// 是否继续添运行, 滚动任务
|
||||
if entry.status.Val() != STATUS_CLOSED {
|
||||
if addable {
|
||||
entry.wheel.timer.doAddEntryByParent(entry.rawIntervalMs, entry)
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user