comment update for gtimer

This commit is contained in:
John
2020-01-21 14:46:23 +08:00
parent 8f2dcf21ff
commit 2ba0913bea
5 changed files with 160 additions and 139 deletions

View File

@ -1,32 +1,5 @@
package main
import (
"fmt"
"github.com/gogf/gf/frame/g"
"github.com/gogf/gf/util/gconv"
)
type TokenRequest struct {
Scope string
Watermark bool
Policy *g.Var
}
func main() {
// s := `
//{
// "policy": {"name":"john"},
// "scope": "pub-med-panel",
// "watermark": true
//}
//`
var t *TokenRequest
m := g.Map{
"policy": g.Map{"name": "john"},
"scope": "pub-med-panel",
"watermark": true,
}
err := gconv.Struct(m, &t)
fmt.Println(err)
fmt.Println(t.Policy)
}

View File

@ -6,10 +6,14 @@
// Package gtimer implements Hierarchical Timing Wheel for interval/delayed jobs running and management.
//
// 任务定时器,
// 高性能的分层时间轮任务管理模块,用于管理间隔/延迟运行任务。
// gcron模块的区别是,时间轮模块只管理间隔执行任务,并且更注重执行效率(纳秒级别)。
// 需要注意执行时间间隔的准确性问题: https://github.com/golang/go/issues/14410
// This package is designed for management for millions of timing jobs.
// The differences between gtime and gcron are as follows:
// 1. gcron is implemented based on gtimer.
// 2. gtimer is designed for high performance and for millions of timing jobs.
// 3. gcron supports pattern grammar like linux crontab.
// 4. gtimer's benchmark OP is measured in nanoseconds, and gcron's benchmark OP is measured in microseconds.
//
// Note the common delay of the timer: https://github.com/golang/go/issues/14410
package gtimer
import (
@ -20,87 +24,103 @@ import (
)
const (
STATUS_READY = 0
STATUS_RUNNING = 1
STATUS_STOPPED = 2
STATUS_CLOSED = -1
gPANIC_EXIT = "exit"
gDEFAULT_TIMES = math.MaxInt32
gDEFAULT_SLOT_NUMBER = 10
gDEFAULT_WHEEL_INTERVAL = 50
gDEFAULT_WHEEL_LEVEL = 6
STATUS_READY = 0 // Job is ready for running.
STATUS_RUNNING = 1 // Job is already running.
STATUS_STOPPED = 2 // Job is stopped.
STATUS_CLOSED = -1 // Job is closed and waiting to be deleted.
gPANIC_EXIT = "exit" // Internal usage for custom job exit function with panic.
gDEFAULT_TIMES = math.MaxInt32 // Default limit running times, a big number.
gDEFAULT_SLOT_NUMBER = 10 // Default slot number.
gDEFAULT_WHEEL_INTERVAL = 50 // Default wheel interval.
gDEFAULT_WHEEL_LEVEL = 6 // Default wheel level.
)
var (
// 默认定时器属性参数值
defaultSlots = cmdenv.Get("gf.gtimer.slots", gDEFAULT_SLOT_NUMBER).Int()
defaultLevel = cmdenv.Get("gf.gtimer.level", gDEFAULT_WHEEL_LEVEL).Int()
defaultInterval = cmdenv.Get("gf.gtimer.interval", gDEFAULT_WHEEL_INTERVAL).Duration() * time.Millisecond
// 默认的wheel管理对象
defaultTimer = New(defaultSlots, defaultInterval, defaultLevel)
defaultTimer = New(defaultSlots, defaultInterval, defaultLevel)
)
// 类似与js中的SetTimeout,一段时间后执行回调函数。
// SetTimeout runs the job once after duration of <delay>.
// It is like the one in javascript.
func SetTimeout(delay time.Duration, job JobFunc) {
AddOnce(delay, job)
}
// 类似与js中的SetInterval,每隔一段时间执行指定回调函数。
// SetInterval runs the job every duration of <delay>.
// It is like the one in javascript.
func SetInterval(interval time.Duration, job JobFunc) {
Add(interval, job)
}
// 添加执行方法。
// Add adds a timing job to the default timer, which runs in interval of <interval>.
func Add(interval time.Duration, job JobFunc) *Entry {
return defaultTimer.Add(interval, job)
}
// 添加执行方法,更多参数控制。
// AddEntry adds a timing job to the default timer with detailed parameters.
//
// The parameter <interval> specifies the running interval of the job.
//
// The parameter <singleton> specifies whether the job running in singleton mode.
// There's only one of the same job is allowed running when its a singleton mode job.
//
// The parameter <times> specifies limit for the job running times, which means the job
// exits if its run times exceeds the <times>.
//
// The parameter <status> specifies the job status when it's firstly added to the timer.
func AddEntry(interval time.Duration, job JobFunc, singleton bool, times int, status int) *Entry {
return defaultTimer.AddEntry(interval, job, singleton, times, status)
}
// 添加单例运行循环任务。
// AddSingleton is a convenience function for add singleton mode job.
func AddSingleton(interval time.Duration, job JobFunc) *Entry {
return defaultTimer.AddSingleton(interval, job)
}
// 添加只运行一次的循环任务。
// AddOnce is a convenience function for adding a job which only runs once and then exits.
func AddOnce(interval time.Duration, job JobFunc) *Entry {
return defaultTimer.AddOnce(interval, job)
}
// 添加运行指定次数的循环任务。
// AddTimes is a convenience function for adding a job which is limited running times.
func AddTimes(interval time.Duration, times int, job JobFunc) *Entry {
return defaultTimer.AddTimes(interval, times, job)
}
// 延迟添加循环任务。
// DelayAdd adds a timing job after delay of <interval> duration.
// Also see Add.
func DelayAdd(delay time.Duration, interval time.Duration, job JobFunc) {
defaultTimer.DelayAdd(delay, interval, job)
}
// 延迟添加循环任务, 支持完整的参数。
// DelayAddEntry adds a timing job after delay of <interval> duration.
// Also see AddEntry.
func DelayAddEntry(delay time.Duration, interval time.Duration, job JobFunc, singleton bool, times int, status int) {
defaultTimer.DelayAddEntry(delay, interval, job, singleton, times, status)
}
// 延迟添加单例循环任务delay参数单位为秒
// DelayAddSingleton adds a timing job after delay of <interval> duration.
// Also see AddSingleton.
func DelayAddSingleton(delay time.Duration, interval time.Duration, job JobFunc) {
defaultTimer.DelayAddSingleton(delay, interval, job)
}
// 延迟添加只运行一次的循环任务delay参数单位为秒
// DelayAddOnce adds a timing job after delay of <interval> duration.
// Also see AddOnce.
func DelayAddOnce(delay time.Duration, interval time.Duration, job JobFunc) {
defaultTimer.DelayAddOnce(delay, interval, job)
}
// 延迟添加运行指定次数的循环任务delay参数单位为秒
// DelayAddTimes adds a timing job after delay of <interval> duration.
// Also see AddTimes.
func DelayAddTimes(delay time.Duration, interval time.Duration, times int, job JobFunc) {
defaultTimer.DelayAddTimes(delay, interval, times, job)
}
// 在Job方法中调用停止并删除当前运行的任务。
// Exit is used in timing job, which exits and marks it closed from timer.
// The timing job will be removed from timer later.
func Exit() {
panic(gPANIC_EXIT)
}

View File

@ -12,25 +12,24 @@ import (
"github.com/gogf/gf/container/gtype"
)
// 循环任务项
// Entry is the timing job entry to wheel.
type Entry struct {
wheel *wheel // 所属时间轮
job JobFunc // 注册循环任务方法
singleton *gtype.Bool // 任务是否单例运行
status *gtype.Int // 任务状态(0: ready; 1: running; 2: stopped; -1: closed), 层级entry共享状态
times *gtype.Int // 还需运行次数
create int64 // 注册时的时间轮ticks
interval int64 // 设置的运行间隔(时间轮刻度数量)
createMs int64 // 创建时间(毫秒)
intervalMs int64 // 间隔时间(毫秒)
rawIntervalMs int64 // 原始间隔
wheel *wheel // Belonged wheel.
job JobFunc // The job function.
singleton *gtype.Bool // Singleton mode.
status *gtype.Int // Job status.
times *gtype.Int // Limit running times.
create int64 // Timer ticks when the job installed.
interval int64 // The interval ticks of the job.
createMs int64 // The timestamp in milliseconds when job installed.
intervalMs int64 // The interval milliseconds of the job.
rawIntervalMs int64 // Raw input interval in milliseconds.
}
// 任务执行方法
// JobFunc is the job function.
type JobFunc = func()
// 创建定时任务。
// 如果times参数<=0表示不限制运行次数。
// addEntry adds a timing job to the wheel.
func (w *wheel) addEntry(interval time.Duration, job JobFunc, singleton bool, times int, status int) *Entry {
if times <= 0 {
times = gDEFAULT_TIMES
@ -38,8 +37,8 @@ func (w *wheel) addEntry(interval time.Duration, job JobFunc, singleton bool, ti
ms := interval.Nanoseconds() / 1e6
num := ms / w.intervalMs
if num == 0 {
// 如果安装的任务间隔小于时间轮刻度,
// 那么将会在下一刻度被执行
// If the given interval is lesser than the one of the wheel,
// then sets it to one tick, which means it will be run in one interval.
num = 1
}
nowMs := time.Now().UnixNano() / 1e6
@ -56,12 +55,12 @@ func (w *wheel) addEntry(interval time.Duration, job JobFunc, singleton bool, ti
intervalMs: ms,
rawIntervalMs: ms,
}
// 安装任务
// Install the job to the list of the slot.
w.slots[(ticks+num)%w.number].PushBack(entry)
return entry
}
// 创建定时任务给定父级Entry, 间隔参数参数为毫秒数.
// addEntryByParent adds a timing job with parent entry.
func (w *wheel) addEntryByParent(interval int64, parent *Entry) *Entry {
num := interval / w.intervalMs
if num == 0 {
@ -85,52 +84,52 @@ func (w *wheel) addEntryByParent(interval int64, parent *Entry) *Entry {
return entry
}
// 获取任务状态
// Status returns the status of the job.
func (entry *Entry) Status() int {
return entry.status.Val()
}
// 设置任务状态
// SetStatus custom sets the status for the job.
func (entry *Entry) SetStatus(status int) int {
return entry.status.Set(status)
}
// 启动当前任务
// Start starts the job.
func (entry *Entry) Start() {
entry.status.Set(STATUS_READY)
}
// 停止当前任务
// Stop stops the job.
func (entry *Entry) Stop() {
entry.status.Set(STATUS_STOPPED)
}
// 关闭当前任务
// Close closes the job, and then it will be removed from the timer.
func (entry *Entry) Close() {
entry.status.Set(STATUS_CLOSED)
}
// 是否单例运行
// IsSingleton checks and returns whether the job in singleton mode.
func (entry *Entry) IsSingleton() bool {
return entry.singleton.Val()
}
// 设置单例运行
// SetSingleton sets the job singleton mode.
func (entry *Entry) SetSingleton(enabled bool) {
entry.singleton.Set(enabled)
}
// 设置任务的运行次数
// SetTimes sets the limit running times for the job.
func (entry *Entry) SetTimes(times int) {
entry.times.Set(times)
}
// 执行任务
// Run runs the job.
func (entry *Entry) Run() {
entry.job()
}
// 检测当前任务是否可运行。
// check checks if the job should be run in given ticks and timestamp milliseconds.
func (entry *Entry) check(nowTicks int64, nowMs int64) (runnable, addable bool) {
switch entry.status.Val() {
case STATUS_STOPPED:
@ -138,44 +137,45 @@ func (entry *Entry) check(nowTicks int64, nowMs int64) (runnable, addable bool)
case STATUS_CLOSED:
return false, false
}
// 时间轮刻度判断,是否满足运行刻度条件,刻度判断的误差会比较大
// Firstly checks using the ticks, this may be low precision as one tick is a little bit long.
if diff := nowTicks - entry.create; diff > 0 && diff%entry.interval == 0 {
// 分层转换处理
// If not the lowest level wheel.
if entry.wheel.level > 0 {
diffMs := nowMs - entry.createMs
switch {
// 表示新增(当添加任务后在下一时间轮刻度马上触发)
// Add it to the next slot, which means it will run on next interval.
case diffMs < entry.wheel.timer.intervalMs:
entry.wheel.slots[(nowTicks+entry.interval)%entry.wheel.number].PushBack(entry)
return false, false
// 正常任务
// Normal rolls on the job.
case diffMs >= entry.wheel.timer.intervalMs:
// 任务是否有必要进行分层转换
// Calculate the leftover milliseconds,
// if it is greater than the minimum interval, then re-install it.
if leftMs := entry.intervalMs - diffMs; leftMs > entry.wheel.timer.intervalMs {
// 往底层添加,通过毫秒计算并重新添加任务到对应的时间轮上,减小运行误差
// Re-calculate and re-installs the job proper slot.
entry.wheel.timer.doAddEntryByParent(leftMs, entry)
return false, false
}
}
}
// 是否单例
// Singleton mode check.
if entry.IsSingleton() {
// 注意原子操作结果判断
// Note that it is atomic operation to ensure concurrent safety.
if entry.status.Set(STATUS_RUNNING) == STATUS_RUNNING {
return false, true
}
}
// 次数限制
// Limit running times.
times := entry.times.Add(-1)
if times <= 0 {
// 注意原子操作结果判断
// Note that it is atomic operation to ensure concurrent safety.
if entry.status.Set(STATUS_CLOSED) == STATUS_CLOSED || times < 0 {
return false, false
}
}
// 是否不限制运行次数
// This means it does not limit the running times.
// I know it's ugly, but it is surely high performance for running times limit.
if times < 2000000000 && times > 1000000000 {
entry.times.Set(gDEFAULT_TIMES)
}

View File

@ -12,7 +12,7 @@ import (
"github.com/gogf/gf/container/glist"
)
// 开始循环
// start starts the ticker using a standalone goroutine.
func (w *wheel) start() {
go func() {
ticker := time.NewTicker(time.Duration(w.intervalMs) * time.Millisecond)
@ -34,7 +34,10 @@ func (w *wheel) start() {
}()
}
// 执行时间轮刻度逻辑
// proceed checks and rolls on the job.
// If a timing job is time for running, it runs in an asynchronous goroutine,
// or else it removes from current slot and re-installs the job to another wheel and slot
// according to its leftover interval in milliseconds.
func (w *wheel) proceed() {
n := w.ticks.Add(1)
l := w.slots[int(n%w.number)]
@ -49,10 +52,10 @@ func (w *wheel) proceed() {
} else {
entry = v.(*Entry)
}
// 是否满足运行条件
// Checks whether the time for running.
runnable, addable := entry.check(nowTicks, nowMs)
if runnable {
// 异步执行运行
// Just run it in another goroutine.
go func(entry *Entry) {
defer func() {
if err := recover(); err != nil {
@ -69,7 +72,7 @@ func (w *wheel) proceed() {
entry.job()
}(entry)
}
// 是否继续添运行, 滚动任务
// If rolls on the job.
if addable {
entry.wheel.timer.doAddEntryByParent(entry.rawIntervalMs, entry)
}

View File

@ -13,28 +13,31 @@ import (
"github.com/gogf/gf/container/gtype"
)
// 定时器/分层时间轮
// Timer is a Hierarchical Timing Wheel manager for timing jobs.
type Timer struct {
status *gtype.Int // 定时器状态
wheels []*wheel // 分层时间轮对象
length int // 分层层数
number int // 每一层Slot Number
intervalMs int64 // 最小时间刻度(毫秒)
status *gtype.Int // Timer status.
wheels []*wheel // The underlying wheels.
length int // Max level of the wheels.
number int // Slot Number of each wheel.
intervalMs int64 // Interval of the slot in milliseconds.
}
// 单层时间轮
// Wheel is a slot wrapper for timing job install and uninstall.
type wheel struct {
timer *Timer // 所属定时器
level int // 所属分层索引号
slots []*glist.List // 所有的循环任务项, 按照Slot Number进行分组
number int64 // Slot Number=len(slots)
ticks *gtype.Int64 // 当前时间轮已转动的刻度数量
totalMs int64 // 整个时间轮的时间长度(毫秒)=number*interval
createMs int64 // 创建时间(毫秒)
intervalMs int64 // 时间间隔(slot时间长度, 毫秒)
timer *Timer // Belonged timer.
level int // The level in the timer.
slots []*glist.List // Slot array.
number int64 // Slot Number=len(slots).
ticks *gtype.Int64 // Ticked count of the wheel, one tick is one of its interval passed.
totalMs int64 // Total duration in milliseconds=number*interval.
createMs int64 // Created timestamp in milliseconds.
intervalMs int64 // Interval in milliseconds, which is the duration of one slot.
}
// 创建分层时间轮
// New creates and returns a Hierarchical Timing Wheel designed timer.
// The parameter <interval> specifies the interval of the timer.
// The optional parameter <level> specifies the wheels count of the timer,
// which is gDEFAULT_WHEEL_LEVEL in default.
func New(slot int, interval time.Duration, level ...int) *Timer {
length := gDEFAULT_WHEEL_LEVEL
if len(level) > 0 {
@ -61,7 +64,7 @@ func New(slot int, interval time.Duration, level ...int) *Timer {
return t
}
// 创建自定义的循环任务管理对象
// newWheel creates and returns a single wheel.
func (t *Timer) newWheel(level int, slot int, interval time.Duration) *wheel {
w := &wheel{
timer: t,
@ -79,99 +82,118 @@ func (t *Timer) newWheel(level int, slot int, interval time.Duration) *wheel {
return w
}
// 添加循环任务
// Add adds a timing job to the timer, which runs in interval of <interval>.
func (t *Timer) Add(interval time.Duration, job JobFunc) *Entry {
return t.doAddEntry(interval, job, false, gDEFAULT_TIMES, STATUS_READY)
}
// 添加定时任务
// AddEntry adds a timing job to the timer with detailed parameters.
//
// The parameter <interval> specifies the running interval of the job.
//
// The parameter <singleton> specifies whether the job running in singleton mode.
// There's only one of the same job is allowed running when its a singleton mode job.
//
// The parameter <times> specifies limit for the job running times, which means the job
// exits if its run times exceeds the <times>.
//
// The parameter <status> specifies the job status when it's firstly added to the timer.
func (t *Timer) AddEntry(interval time.Duration, job JobFunc, singleton bool, times int, status int) *Entry {
return t.doAddEntry(interval, job, singleton, times, status)
}
// 添加单例运行循环任务
// AddSingleton is a convenience function for add singleton mode job.
func (t *Timer) AddSingleton(interval time.Duration, job JobFunc) *Entry {
return t.doAddEntry(interval, job, true, gDEFAULT_TIMES, STATUS_READY)
}
// 添加只运行一次的循环任务
// AddOnce is a convenience function for adding a job which only runs once and then exits.
func (t *Timer) AddOnce(interval time.Duration, job JobFunc) *Entry {
return t.doAddEntry(interval, job, true, 1, STATUS_READY)
}
// 添加运行指定次数的循环任务。
// AddTimes is a convenience function for adding a job which is limited running times.
func (t *Timer) AddTimes(interval time.Duration, times int, job JobFunc) *Entry {
return t.doAddEntry(interval, job, true, times, STATUS_READY)
}
// 延迟添加循环任务。
// DelayAdd adds a timing job after delay of <interval> duration.
// Also see Add.
func (t *Timer) DelayAdd(delay time.Duration, interval time.Duration, job JobFunc) {
t.AddOnce(delay, func() {
t.Add(interval, job)
})
}
// 延迟添加循环任务, 支持完整的参数。
// DelayAddEntry adds a timing job after delay of <interval> duration.
// Also see AddEntry.
func (t *Timer) DelayAddEntry(delay time.Duration, interval time.Duration, job JobFunc, singleton bool, times int, status int) {
t.AddOnce(delay, func() {
t.AddEntry(interval, job, singleton, times, status)
})
}
// 延迟添加单例循环任务
// DelayAddSingleton adds a timing job after delay of <interval> duration.
// Also see AddSingleton.
func (t *Timer) DelayAddSingleton(delay time.Duration, interval time.Duration, job JobFunc) {
t.AddOnce(delay, func() {
t.AddSingleton(interval, job)
})
}
// 延迟添加只运行一次的循环任务
// DelayAddOnce adds a timing job after delay of <interval> duration.
// Also see AddOnce.
func (t *Timer) DelayAddOnce(delay time.Duration, interval time.Duration, job JobFunc) {
t.AddOnce(delay, func() {
t.AddOnce(interval, job)
})
}
// 延迟添加只运行一次的循环任务
// DelayAddTimes adds a timing job after delay of <interval> duration.
// Also see AddTimes.
func (t *Timer) DelayAddTimes(delay time.Duration, interval time.Duration, times int, job JobFunc) {
t.AddOnce(delay, func() {
t.AddTimes(interval, times, job)
})
}
// 启动定时器
// Start starts the timer.
func (t *Timer) Start() {
t.status.Set(STATUS_RUNNING)
}
// 定制定时器
// Stop stops the timer.
func (t *Timer) Stop() {
t.status.Set(STATUS_STOPPED)
}
// 关闭定时器
// Close closes the timer.
func (t *Timer) Close() {
t.status.Set(STATUS_CLOSED)
}
// 添加定时任务
// doAddEntry adds a timing job to timer for internal usage.
func (t *Timer) doAddEntry(interval time.Duration, job JobFunc, singleton bool, times int, status int) *Entry {
return t.wheels[t.getLevelByIntervalMs(interval.Nanoseconds()/1e6)].addEntry(interval, job, singleton, times, status)
}
// 添加定时任务给定父级Entry, 间隔参数参数为毫秒数.
// doAddEntryByParent adds a timing job to timer with parent entry for internal usage.
func (t *Timer) doAddEntryByParent(interval int64, parent *Entry) *Entry {
return t.wheels[t.getLevelByIntervalMs(interval)].addEntryByParent(interval, parent)
}
// 根据intervalMs计算添加的分层索引
// getLevelByIntervalMs calculates and returns the level of timer wheel with given milliseconds.
func (t *Timer) getLevelByIntervalMs(intervalMs int64) int {
pos, cmp := t.binSearchIndex(intervalMs)
switch cmp {
// intervalMs与最后匹配值相等, 不添加到匹配得层,而是向下一层添加
// If equals to the last comparison value, do not add it directly to this wheel,
// but loop and continue comparison from the index to the first level,
// and add it to the proper level wheel.
case 0:
fallthrough
// intervalMs比最后匹配值小
// If lesser than the last comparison value,
// loop and continue comparison from the index to the first level,
// and add it to the proper level wheel.
case -1:
i := pos
for ; i > 0; i-- {
@ -181,7 +203,9 @@ func (t *Timer) getLevelByIntervalMs(intervalMs int64) int {
}
return i
// intervalMs比最后匹配值大
// If greater than the last comparison value,
// loop and continue comparison from the index to the last level,
// and add it to the proper level wheel.
case 1:
i := pos
for ; i < t.length-1; i++ {
@ -194,7 +218,8 @@ func (t *Timer) getLevelByIntervalMs(intervalMs int64) int {
return 0
}
// 二分查找当前任务可以添加的时间轮对象索引.
// binSearchIndex uses binary search algorithm for finding the possible level of the wheel
// for the interval value.
func (t *Timer) binSearchIndex(n int64) (index int, result int) {
min := 0
max := t.length - 1