From 2ba0913beae77370dd161473279c919bcac8ab10 Mon Sep 17 00:00:00 2001 From: John Date: Tue, 21 Jan 2020 14:46:23 +0800 Subject: [PATCH] comment update for gtimer --- .example/other/test.go | 29 +----------- os/gtimer/gtimer.go | 78 ++++++++++++++++++------------ os/gtimer/gtimer_entry.go | 80 +++++++++++++++---------------- os/gtimer/gtimer_loop.go | 13 +++-- os/gtimer/gtimer_timer.go | 99 ++++++++++++++++++++++++--------------- 5 files changed, 160 insertions(+), 139 deletions(-) diff --git a/.example/other/test.go b/.example/other/test.go index 67760c9c5..790580777 100644 --- a/.example/other/test.go +++ b/.example/other/test.go @@ -1,32 +1,5 @@ package main -import ( - "fmt" - "github.com/gogf/gf/frame/g" - "github.com/gogf/gf/util/gconv" -) - -type TokenRequest struct { - Scope string - Watermark bool - Policy *g.Var -} - func main() { - // s := ` - //{ - // "policy": {"name":"john"}, - // "scope": "pub-med-panel", - // "watermark": true - //} - //` - var t *TokenRequest - m := g.Map{ - "policy": g.Map{"name": "john"}, - "scope": "pub-med-panel", - "watermark": true, - } - err := gconv.Struct(m, &t) - fmt.Println(err) - fmt.Println(t.Policy) + } diff --git a/os/gtimer/gtimer.go b/os/gtimer/gtimer.go index 1e7a87f5d..d7b9c302c 100644 --- a/os/gtimer/gtimer.go +++ b/os/gtimer/gtimer.go @@ -6,10 +6,14 @@ // Package gtimer implements Hierarchical Timing Wheel for interval/delayed jobs running and management. // -// 任务定时器, -// 高性能的分层时间轮任务管理模块,用于管理间隔/延迟运行任务。 -// 与gcron模块的区别是,时间轮模块只管理间隔执行任务,并且更注重执行效率(纳秒级别)。 -// 需要注意执行时间间隔的准确性问题: https://github.com/golang/go/issues/14410 +// This package is designed for management for millions of timing jobs. +// The differences between gtime and gcron are as follows: +// 1. gcron is implemented based on gtimer. +// 2. gtimer is designed for high performance and for millions of timing jobs. +// 3. gcron supports pattern grammar like linux crontab. +// 4. gtimer's benchmark OP is measured in nanoseconds, and gcron's benchmark OP is measured in microseconds. +// +// Note the common delay of the timer: https://github.com/golang/go/issues/14410 package gtimer import ( @@ -20,87 +24,103 @@ import ( ) const ( - STATUS_READY = 0 - STATUS_RUNNING = 1 - STATUS_STOPPED = 2 - STATUS_CLOSED = -1 - gPANIC_EXIT = "exit" - gDEFAULT_TIMES = math.MaxInt32 - gDEFAULT_SLOT_NUMBER = 10 - gDEFAULT_WHEEL_INTERVAL = 50 - gDEFAULT_WHEEL_LEVEL = 6 + STATUS_READY = 0 // Job is ready for running. + STATUS_RUNNING = 1 // Job is already running. + STATUS_STOPPED = 2 // Job is stopped. + STATUS_CLOSED = -1 // Job is closed and waiting to be deleted. + gPANIC_EXIT = "exit" // Internal usage for custom job exit function with panic. + gDEFAULT_TIMES = math.MaxInt32 // Default limit running times, a big number. + gDEFAULT_SLOT_NUMBER = 10 // Default slot number. + gDEFAULT_WHEEL_INTERVAL = 50 // Default wheel interval. + gDEFAULT_WHEEL_LEVEL = 6 // Default wheel level. ) var ( - // 默认定时器属性参数值 defaultSlots = cmdenv.Get("gf.gtimer.slots", gDEFAULT_SLOT_NUMBER).Int() defaultLevel = cmdenv.Get("gf.gtimer.level", gDEFAULT_WHEEL_LEVEL).Int() defaultInterval = cmdenv.Get("gf.gtimer.interval", gDEFAULT_WHEEL_INTERVAL).Duration() * time.Millisecond - // 默认的wheel管理对象 - defaultTimer = New(defaultSlots, defaultInterval, defaultLevel) + defaultTimer = New(defaultSlots, defaultInterval, defaultLevel) ) -// 类似与js中的SetTimeout,一段时间后执行回调函数。 +// SetTimeout runs the job once after duration of . +// It is like the one in javascript. func SetTimeout(delay time.Duration, job JobFunc) { AddOnce(delay, job) } -// 类似与js中的SetInterval,每隔一段时间执行指定回调函数。 +// SetInterval runs the job every duration of . +// It is like the one in javascript. func SetInterval(interval time.Duration, job JobFunc) { Add(interval, job) } -// 添加执行方法。 +// Add adds a timing job to the default timer, which runs in interval of . func Add(interval time.Duration, job JobFunc) *Entry { return defaultTimer.Add(interval, job) } -// 添加执行方法,更多参数控制。 +// AddEntry adds a timing job to the default timer with detailed parameters. +// +// The parameter specifies the running interval of the job. +// +// The parameter specifies whether the job running in singleton mode. +// There's only one of the same job is allowed running when its a singleton mode job. +// +// The parameter specifies limit for the job running times, which means the job +// exits if its run times exceeds the . +// +// The parameter specifies the job status when it's firstly added to the timer. func AddEntry(interval time.Duration, job JobFunc, singleton bool, times int, status int) *Entry { return defaultTimer.AddEntry(interval, job, singleton, times, status) } -// 添加单例运行循环任务。 +// AddSingleton is a convenience function for add singleton mode job. func AddSingleton(interval time.Duration, job JobFunc) *Entry { return defaultTimer.AddSingleton(interval, job) } -// 添加只运行一次的循环任务。 +// AddOnce is a convenience function for adding a job which only runs once and then exits. func AddOnce(interval time.Duration, job JobFunc) *Entry { return defaultTimer.AddOnce(interval, job) } -// 添加运行指定次数的循环任务。 +// AddTimes is a convenience function for adding a job which is limited running times. func AddTimes(interval time.Duration, times int, job JobFunc) *Entry { return defaultTimer.AddTimes(interval, times, job) } -// 延迟添加循环任务。 +// DelayAdd adds a timing job after delay of duration. +// Also see Add. func DelayAdd(delay time.Duration, interval time.Duration, job JobFunc) { defaultTimer.DelayAdd(delay, interval, job) } -// 延迟添加循环任务, 支持完整的参数。 +// DelayAddEntry adds a timing job after delay of duration. +// Also see AddEntry. func DelayAddEntry(delay time.Duration, interval time.Duration, job JobFunc, singleton bool, times int, status int) { defaultTimer.DelayAddEntry(delay, interval, job, singleton, times, status) } -// 延迟添加单例循环任务,delay参数单位为秒 +// DelayAddSingleton adds a timing job after delay of duration. +// Also see AddSingleton. func DelayAddSingleton(delay time.Duration, interval time.Duration, job JobFunc) { defaultTimer.DelayAddSingleton(delay, interval, job) } -// 延迟添加只运行一次的循环任务,delay参数单位为秒 +// DelayAddOnce adds a timing job after delay of duration. +// Also see AddOnce. func DelayAddOnce(delay time.Duration, interval time.Duration, job JobFunc) { defaultTimer.DelayAddOnce(delay, interval, job) } -// 延迟添加运行指定次数的循环任务,delay参数单位为秒 +// DelayAddTimes adds a timing job after delay of duration. +// Also see AddTimes. func DelayAddTimes(delay time.Duration, interval time.Duration, times int, job JobFunc) { defaultTimer.DelayAddTimes(delay, interval, times, job) } -// 在Job方法中调用,停止并删除当前运行的任务。 +// Exit is used in timing job, which exits and marks it closed from timer. +// The timing job will be removed from timer later. func Exit() { panic(gPANIC_EXIT) } diff --git a/os/gtimer/gtimer_entry.go b/os/gtimer/gtimer_entry.go index e1859e7d5..ff128da45 100644 --- a/os/gtimer/gtimer_entry.go +++ b/os/gtimer/gtimer_entry.go @@ -12,25 +12,24 @@ import ( "github.com/gogf/gf/container/gtype" ) -// 循环任务项 +// Entry is the timing job entry to wheel. type Entry struct { - wheel *wheel // 所属时间轮 - job JobFunc // 注册循环任务方法 - singleton *gtype.Bool // 任务是否单例运行 - status *gtype.Int // 任务状态(0: ready; 1: running; 2: stopped; -1: closed), 层级entry共享状态 - times *gtype.Int // 还需运行次数 - create int64 // 注册时的时间轮ticks - interval int64 // 设置的运行间隔(时间轮刻度数量) - createMs int64 // 创建时间(毫秒) - intervalMs int64 // 间隔时间(毫秒) - rawIntervalMs int64 // 原始间隔 + wheel *wheel // Belonged wheel. + job JobFunc // The job function. + singleton *gtype.Bool // Singleton mode. + status *gtype.Int // Job status. + times *gtype.Int // Limit running times. + create int64 // Timer ticks when the job installed. + interval int64 // The interval ticks of the job. + createMs int64 // The timestamp in milliseconds when job installed. + intervalMs int64 // The interval milliseconds of the job. + rawIntervalMs int64 // Raw input interval in milliseconds. } -// 任务执行方法 +// JobFunc is the job function. type JobFunc = func() -// 创建定时任务。 -// 如果times参数<=0,表示不限制运行次数。 +// addEntry adds a timing job to the wheel. func (w *wheel) addEntry(interval time.Duration, job JobFunc, singleton bool, times int, status int) *Entry { if times <= 0 { times = gDEFAULT_TIMES @@ -38,8 +37,8 @@ func (w *wheel) addEntry(interval time.Duration, job JobFunc, singleton bool, ti ms := interval.Nanoseconds() / 1e6 num := ms / w.intervalMs if num == 0 { - // 如果安装的任务间隔小于时间轮刻度, - // 那么将会在下一刻度被执行 + // If the given interval is lesser than the one of the wheel, + // then sets it to one tick, which means it will be run in one interval. num = 1 } nowMs := time.Now().UnixNano() / 1e6 @@ -56,12 +55,12 @@ func (w *wheel) addEntry(interval time.Duration, job JobFunc, singleton bool, ti intervalMs: ms, rawIntervalMs: ms, } - // 安装任务 + // Install the job to the list of the slot. w.slots[(ticks+num)%w.number].PushBack(entry) return entry } -// 创建定时任务,给定父级Entry, 间隔参数参数为毫秒数. +// addEntryByParent adds a timing job with parent entry. func (w *wheel) addEntryByParent(interval int64, parent *Entry) *Entry { num := interval / w.intervalMs if num == 0 { @@ -85,52 +84,52 @@ func (w *wheel) addEntryByParent(interval int64, parent *Entry) *Entry { return entry } -// 获取任务状态 +// Status returns the status of the job. func (entry *Entry) Status() int { return entry.status.Val() } -// 设置任务状态 +// SetStatus custom sets the status for the job. func (entry *Entry) SetStatus(status int) int { return entry.status.Set(status) } -// 启动当前任务 +// Start starts the job. func (entry *Entry) Start() { entry.status.Set(STATUS_READY) } -// 停止当前任务 +// Stop stops the job. func (entry *Entry) Stop() { entry.status.Set(STATUS_STOPPED) } -// 关闭当前任务 +// Close closes the job, and then it will be removed from the timer. func (entry *Entry) Close() { entry.status.Set(STATUS_CLOSED) } -// 是否单例运行 +// IsSingleton checks and returns whether the job in singleton mode. func (entry *Entry) IsSingleton() bool { return entry.singleton.Val() } -// 设置单例运行 +// SetSingleton sets the job singleton mode. func (entry *Entry) SetSingleton(enabled bool) { entry.singleton.Set(enabled) } -// 设置任务的运行次数 +// SetTimes sets the limit running times for the job. func (entry *Entry) SetTimes(times int) { entry.times.Set(times) } -// 执行任务 +// Run runs the job. func (entry *Entry) Run() { entry.job() } -// 检测当前任务是否可运行。 +// check checks if the job should be run in given ticks and timestamp milliseconds. func (entry *Entry) check(nowTicks int64, nowMs int64) (runnable, addable bool) { switch entry.status.Val() { case STATUS_STOPPED: @@ -138,44 +137,45 @@ func (entry *Entry) check(nowTicks int64, nowMs int64) (runnable, addable bool) case STATUS_CLOSED: return false, false } - // 时间轮刻度判断,是否满足运行刻度条件,刻度判断的误差会比较大 + // Firstly checks using the ticks, this may be low precision as one tick is a little bit long. if diff := nowTicks - entry.create; diff > 0 && diff%entry.interval == 0 { - // 分层转换处理 + // If not the lowest level wheel. if entry.wheel.level > 0 { diffMs := nowMs - entry.createMs switch { - // 表示新增(当添加任务后在下一时间轮刻度马上触发) + // Add it to the next slot, which means it will run on next interval. case diffMs < entry.wheel.timer.intervalMs: entry.wheel.slots[(nowTicks+entry.interval)%entry.wheel.number].PushBack(entry) return false, false - // 正常任务 + // Normal rolls on the job. case diffMs >= entry.wheel.timer.intervalMs: - // 任务是否有必要进行分层转换 + // Calculate the leftover milliseconds, + // if it is greater than the minimum interval, then re-install it. if leftMs := entry.intervalMs - diffMs; leftMs > entry.wheel.timer.intervalMs { - - // 往底层添加,通过毫秒计算并重新添加任务到对应的时间轮上,减小运行误差 + // Re-calculate and re-installs the job proper slot. entry.wheel.timer.doAddEntryByParent(leftMs, entry) return false, false } } } - // 是否单例 + // Singleton mode check. if entry.IsSingleton() { - // 注意原子操作结果判断 + // Note that it is atomic operation to ensure concurrent safety. if entry.status.Set(STATUS_RUNNING) == STATUS_RUNNING { return false, true } } - // 次数限制 + // Limit running times. times := entry.times.Add(-1) if times <= 0 { - // 注意原子操作结果判断 + // Note that it is atomic operation to ensure concurrent safety. if entry.status.Set(STATUS_CLOSED) == STATUS_CLOSED || times < 0 { return false, false } } - // 是否不限制运行次数 + // This means it does not limit the running times. + // I know it's ugly, but it is surely high performance for running times limit. if times < 2000000000 && times > 1000000000 { entry.times.Set(gDEFAULT_TIMES) } diff --git a/os/gtimer/gtimer_loop.go b/os/gtimer/gtimer_loop.go index e2a79701b..5dc652fc2 100644 --- a/os/gtimer/gtimer_loop.go +++ b/os/gtimer/gtimer_loop.go @@ -12,7 +12,7 @@ import ( "github.com/gogf/gf/container/glist" ) -// 开始循环 +// start starts the ticker using a standalone goroutine. func (w *wheel) start() { go func() { ticker := time.NewTicker(time.Duration(w.intervalMs) * time.Millisecond) @@ -34,7 +34,10 @@ func (w *wheel) start() { }() } -// 执行时间轮刻度逻辑 +// proceed checks and rolls on the job. +// If a timing job is time for running, it runs in an asynchronous goroutine, +// or else it removes from current slot and re-installs the job to another wheel and slot +// according to its leftover interval in milliseconds. func (w *wheel) proceed() { n := w.ticks.Add(1) l := w.slots[int(n%w.number)] @@ -49,10 +52,10 @@ func (w *wheel) proceed() { } else { entry = v.(*Entry) } - // 是否满足运行条件 + // Checks whether the time for running. runnable, addable := entry.check(nowTicks, nowMs) if runnable { - // 异步执行运行 + // Just run it in another goroutine. go func(entry *Entry) { defer func() { if err := recover(); err != nil { @@ -69,7 +72,7 @@ func (w *wheel) proceed() { entry.job() }(entry) } - // 是否继续添运行, 滚动任务 + // If rolls on the job. if addable { entry.wheel.timer.doAddEntryByParent(entry.rawIntervalMs, entry) } diff --git a/os/gtimer/gtimer_timer.go b/os/gtimer/gtimer_timer.go index 62ac2fd04..9eb2edfc4 100644 --- a/os/gtimer/gtimer_timer.go +++ b/os/gtimer/gtimer_timer.go @@ -13,28 +13,31 @@ import ( "github.com/gogf/gf/container/gtype" ) -// 定时器/分层时间轮 +// Timer is a Hierarchical Timing Wheel manager for timing jobs. type Timer struct { - status *gtype.Int // 定时器状态 - wheels []*wheel // 分层时间轮对象 - length int // 分层层数 - number int // 每一层Slot Number - intervalMs int64 // 最小时间刻度(毫秒) + status *gtype.Int // Timer status. + wheels []*wheel // The underlying wheels. + length int // Max level of the wheels. + number int // Slot Number of each wheel. + intervalMs int64 // Interval of the slot in milliseconds. } -// 单层时间轮 +// Wheel is a slot wrapper for timing job install and uninstall. type wheel struct { - timer *Timer // 所属定时器 - level int // 所属分层索引号 - slots []*glist.List // 所有的循环任务项, 按照Slot Number进行分组 - number int64 // Slot Number=len(slots) - ticks *gtype.Int64 // 当前时间轮已转动的刻度数量 - totalMs int64 // 整个时间轮的时间长度(毫秒)=number*interval - createMs int64 // 创建时间(毫秒) - intervalMs int64 // 时间间隔(slot时间长度, 毫秒) + timer *Timer // Belonged timer. + level int // The level in the timer. + slots []*glist.List // Slot array. + number int64 // Slot Number=len(slots). + ticks *gtype.Int64 // Ticked count of the wheel, one tick is one of its interval passed. + totalMs int64 // Total duration in milliseconds=number*interval. + createMs int64 // Created timestamp in milliseconds. + intervalMs int64 // Interval in milliseconds, which is the duration of one slot. } -// 创建分层时间轮 +// New creates and returns a Hierarchical Timing Wheel designed timer. +// The parameter specifies the interval of the timer. +// The optional parameter specifies the wheels count of the timer, +// which is gDEFAULT_WHEEL_LEVEL in default. func New(slot int, interval time.Duration, level ...int) *Timer { length := gDEFAULT_WHEEL_LEVEL if len(level) > 0 { @@ -61,7 +64,7 @@ func New(slot int, interval time.Duration, level ...int) *Timer { return t } -// 创建自定义的循环任务管理对象 +// newWheel creates and returns a single wheel. func (t *Timer) newWheel(level int, slot int, interval time.Duration) *wheel { w := &wheel{ timer: t, @@ -79,99 +82,118 @@ func (t *Timer) newWheel(level int, slot int, interval time.Duration) *wheel { return w } -// 添加循环任务 +// Add adds a timing job to the timer, which runs in interval of . func (t *Timer) Add(interval time.Duration, job JobFunc) *Entry { return t.doAddEntry(interval, job, false, gDEFAULT_TIMES, STATUS_READY) } -// 添加定时任务 +// AddEntry adds a timing job to the timer with detailed parameters. +// +// The parameter specifies the running interval of the job. +// +// The parameter specifies whether the job running in singleton mode. +// There's only one of the same job is allowed running when its a singleton mode job. +// +// The parameter specifies limit for the job running times, which means the job +// exits if its run times exceeds the . +// +// The parameter specifies the job status when it's firstly added to the timer. func (t *Timer) AddEntry(interval time.Duration, job JobFunc, singleton bool, times int, status int) *Entry { return t.doAddEntry(interval, job, singleton, times, status) } -// 添加单例运行循环任务 +// AddSingleton is a convenience function for add singleton mode job. func (t *Timer) AddSingleton(interval time.Duration, job JobFunc) *Entry { return t.doAddEntry(interval, job, true, gDEFAULT_TIMES, STATUS_READY) } -// 添加只运行一次的循环任务 +// AddOnce is a convenience function for adding a job which only runs once and then exits. func (t *Timer) AddOnce(interval time.Duration, job JobFunc) *Entry { return t.doAddEntry(interval, job, true, 1, STATUS_READY) } -// 添加运行指定次数的循环任务。 +// AddTimes is a convenience function for adding a job which is limited running times. func (t *Timer) AddTimes(interval time.Duration, times int, job JobFunc) *Entry { return t.doAddEntry(interval, job, true, times, STATUS_READY) } -// 延迟添加循环任务。 +// DelayAdd adds a timing job after delay of duration. +// Also see Add. func (t *Timer) DelayAdd(delay time.Duration, interval time.Duration, job JobFunc) { t.AddOnce(delay, func() { t.Add(interval, job) }) } -// 延迟添加循环任务, 支持完整的参数。 +// DelayAddEntry adds a timing job after delay of duration. +// Also see AddEntry. func (t *Timer) DelayAddEntry(delay time.Duration, interval time.Duration, job JobFunc, singleton bool, times int, status int) { t.AddOnce(delay, func() { t.AddEntry(interval, job, singleton, times, status) }) } -// 延迟添加单例循环任务 +// DelayAddSingleton adds a timing job after delay of duration. +// Also see AddSingleton. func (t *Timer) DelayAddSingleton(delay time.Duration, interval time.Duration, job JobFunc) { t.AddOnce(delay, func() { t.AddSingleton(interval, job) }) } -// 延迟添加只运行一次的循环任务 +// DelayAddOnce adds a timing job after delay of duration. +// Also see AddOnce. func (t *Timer) DelayAddOnce(delay time.Duration, interval time.Duration, job JobFunc) { t.AddOnce(delay, func() { t.AddOnce(interval, job) }) } -// 延迟添加只运行一次的循环任务 +// DelayAddTimes adds a timing job after delay of duration. +// Also see AddTimes. func (t *Timer) DelayAddTimes(delay time.Duration, interval time.Duration, times int, job JobFunc) { t.AddOnce(delay, func() { t.AddTimes(interval, times, job) }) } -// 启动定时器 +// Start starts the timer. func (t *Timer) Start() { t.status.Set(STATUS_RUNNING) } -// 定制定时器 +// Stop stops the timer. func (t *Timer) Stop() { t.status.Set(STATUS_STOPPED) } -// 关闭定时器 +// Close closes the timer. func (t *Timer) Close() { t.status.Set(STATUS_CLOSED) } -// 添加定时任务 +// doAddEntry adds a timing job to timer for internal usage. func (t *Timer) doAddEntry(interval time.Duration, job JobFunc, singleton bool, times int, status int) *Entry { return t.wheels[t.getLevelByIntervalMs(interval.Nanoseconds()/1e6)].addEntry(interval, job, singleton, times, status) } -// 添加定时任务,给定父级Entry, 间隔参数参数为毫秒数. +// doAddEntryByParent adds a timing job to timer with parent entry for internal usage. func (t *Timer) doAddEntryByParent(interval int64, parent *Entry) *Entry { return t.wheels[t.getLevelByIntervalMs(interval)].addEntryByParent(interval, parent) } -// 根据intervalMs计算添加的分层索引 +// getLevelByIntervalMs calculates and returns the level of timer wheel with given milliseconds. func (t *Timer) getLevelByIntervalMs(intervalMs int64) int { pos, cmp := t.binSearchIndex(intervalMs) switch cmp { - // intervalMs与最后匹配值相等, 不添加到匹配得层,而是向下一层添加 + // If equals to the last comparison value, do not add it directly to this wheel, + // but loop and continue comparison from the index to the first level, + // and add it to the proper level wheel. case 0: fallthrough - // intervalMs比最后匹配值小 + // If lesser than the last comparison value, + // loop and continue comparison from the index to the first level, + // and add it to the proper level wheel. case -1: i := pos for ; i > 0; i-- { @@ -181,7 +203,9 @@ func (t *Timer) getLevelByIntervalMs(intervalMs int64) int { } return i - // intervalMs比最后匹配值大 + // If greater than the last comparison value, + // loop and continue comparison from the index to the last level, + // and add it to the proper level wheel. case 1: i := pos for ; i < t.length-1; i++ { @@ -194,7 +218,8 @@ func (t *Timer) getLevelByIntervalMs(intervalMs int64) int { return 0 } -// 二分查找当前任务可以添加的时间轮对象索引. +// binSearchIndex uses binary search algorithm for finding the possible level of the wheel +// for the interval value. func (t *Timer) binSearchIndex(n int64) (index int, result int) { min := 0 max := t.length - 1