This commit is contained in:
John
2018-12-30 14:53:16 +08:00
parent e558863743
commit ccf837b2bf
25 changed files with 636 additions and 352 deletions

View File

@ -46,6 +46,7 @@
1. strpos/stripos/strrpos/strripos: http://php.net/manual/en/function.stripos.php
1. 改进WebServer获取POST参数处理逻辑当提交非form数据时例如json数据针对某些方法可以直接解析
1. WebServer增加可选择的路由覆盖配置默认情况下不覆盖
1. grpool性能压测结果变慢的问题
# DONE

View File

@ -34,7 +34,7 @@ func (t *Int) Val() int {
return int(atomic.LoadInt64(&t.val))
}
// 数值增加delta并返回的数值
// 数值增加delta并返回**新**的数值
func (t *Int) Add(delta int) int {
return int(atomic.AddInt64(&t.val, int64(delta)))
}

View File

@ -3,7 +3,6 @@ package rwmutex
import "sync"
// RWMutex的封装支持对并发安全开启/关闭的控制。
// 但是只能初始化时确定并发安全性,不能在运行时动态修改并发安全特性设置。
type RWMutex struct {
sync.RWMutex
safe bool

View File

@ -3,7 +3,6 @@ package rwmutex
import "sync"
// RWMutex的封装支持对并发安全开启/关闭的控制。
// 但是只能初始化时确定并发安全性,不能在运行时动态修改并发安全特性设置。
type RWMutex struct {
sync.RWMutex
safe bool

View File

@ -4,6 +4,7 @@
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// Package gcache provides memory cache for process/缓存模块.
// 并发安全的单进程高速缓存.
package gcache

View File

@ -6,16 +6,17 @@
// go test *.go -bench=".*" -benchmem
package gcache
package gcache_test
import (
"gitee.com/johng/gf/g/os/gcache"
"testing"
"sync"
)
var (
c = New()
clru = New(10000)
c = gcache.New()
clru = gcache.New(10000)
mInt = make(map[int]int)
mMap = make(map[interface{}]interface{})

View File

@ -7,6 +7,7 @@
package gcache
import (
"gitee.com/johng/gf/g/os/gtimew"
"sync/atomic"
"unsafe"
)
@ -22,7 +23,7 @@ func New(lruCap...int) *Cache {
c := &Cache {
memCache : newMemCache(lruCap...),
}
go c.autoLoop()
gtimew.AddSingleton(1, c.syncEventAndClearExpired)
return c
}

View File

@ -11,10 +11,10 @@ import (
"gitee.com/johng/gf/g/container/gset"
"gitee.com/johng/gf/g/container/gtype"
"gitee.com/johng/gf/g/os/gtime"
"gitee.com/johng/gf/g/os/gtimew"
"gitee.com/johng/gf/g/util/gconv"
"math"
"sync"
"time"
)
@ -26,8 +26,8 @@ type memCache struct {
cap int // 控制缓存池大小超过大小则按照LRU算法进行缓存过期处理(默认为0表示不进行限制)
data map[interface{}]memCacheItem // 缓存数据(所有的缓存数据存放哈希表)
expireTimes map[interface{}]int64 // 键名对应的分组过期时间(用于相同键名过期时间快速更新)键值为10秒级时间戳
expireSets map[int64]*gset.Set // 分组过期时间对应的键名列表(用于自动过期快速删除)键值为10秒级时间戳
expireTimes map[interface{}]int64 // 键名对应的分组过期时间(用于相同键名过期时间快速更新)键值为1秒级时间戳
expireSets map[int64]*gset.Set // 分组过期时间对应的键名列表(用于自动过期快速删除)键值为1秒级时间戳
lru *memCacheLru // LRU缓存限制(只有限定cap池大小时才启用)
lruGetList *glist.List // Get操作的LRU记录
@ -69,17 +69,17 @@ func newMemCache(lruCap...int) *memCache {
return c
}
// 计算过期缓存的键名(将毫秒换算成秒的整数毫秒按照10秒进行分组)
// 计算过期缓存的键名(将毫秒换算成秒的整数毫秒按照1秒进行分组)
func (c *memCache) makeExpireKey(expire int64) int64 {
return int64(math.Ceil(float64(expire/10000) + 1)*10000)
return int64(math.Ceil(float64(expire/1000) + 1)*1000)
}
// 获取一个过期键名存放Set,如果没有则返回nil
// 获取一个过期键名存放Set, 如果没有则返回nil
func (c *memCache) getExpireSet(expire int64) (expireSet *gset.Set) {
c.expireSetMu.RLock()
expireSet, _ = c.expireSets[expire]
c.expireSetMu.RUnlock()
return nil
return
}
// 获取或者创建一个过期键名存放Set(由于是异步单线程执行因此不会出现创建set时的覆盖问题)
@ -287,74 +287,70 @@ func (c *memCache) Close() {
// 数据异步任务循环:
// 1、将事件列表中的数据异步处理并同步结果到expireTimes和expireSets属性中
// 2、清理过期键值对数据
func (c *memCache) autoLoop() {
func (c *memCache) syncEventAndClearExpired() {
event := (*memCacheEvent)(nil)
oldExpireTime := int64(0)
newExpireTime := int64(0)
if c.closed.Val() {
gtimew.ExitJob()
return
}
// ========================
// 数据同步处理
// ========================
for {
if c.closed.Val() {
return
v := c.eventList.PopFront()
if v == nil {
break
}
// ========================
// 数据同步处理
// ========================
event = v.(*memCacheEvent)
// 获得旧的过期时间分组
c.expireTimeMu.RLock()
oldExpireTime = c.expireTimes[event.k]
c.expireTimeMu.RUnlock()
// 计算新的过期时间分组
newExpireTime = c.makeExpireKey(event.e)
if newExpireTime != oldExpireTime {
c.getOrNewExpireSet(newExpireTime).Add(event.k)
if oldExpireTime != 0 {
c.getOrNewExpireSet(oldExpireTime).Remove(event.k)
}
// 重新设置对应键名的过期时间
c.expireTimeMu.Lock()
c.expireTimes[event.k] = newExpireTime
c.expireTimeMu.Unlock()
}
// 写入操作也会增加到LRU(Least Recently Used)操作记录
if c.cap > 0 {
c.lru.Push(event.k)
}
}
// 异步处理读取操作的LRU列表
if c.cap > 0 && c.lruGetList.Len() > 0 {
for {
v := c.eventList.PopFront()
if v == nil {
if v := c.lruGetList.PopFront(); v != nil {
c.lru.Push(v)
} else {
break
}
event = v.(*memCacheEvent)
// 获得旧的过期时间分组
c.expireTimeMu.RLock()
oldExpireTime = c.expireTimes[event.k]
c.expireTimeMu.RUnlock()
// 计算新的过期时间分组
newExpireTime = c.makeExpireKey(event.e)
if newExpireTime != oldExpireTime {
c.getOrNewExpireSet(newExpireTime).Add(event.k)
if oldExpireTime != 0 {
c.getOrNewExpireSet(oldExpireTime).Remove(event.k)
}
// 重新设置对应键名的过期时间
c.expireTimeMu.Lock()
c.expireTimes[event.k] = newExpireTime
c.expireTimeMu.Unlock()
}
// 写入操作也会增加到LRU(Least Recently Used)操作记录
if c.cap > 0 {
c.lru.Push(event.k)
}
}
// 异步处理读取操作的LRU列表
if c.cap > 0 && c.lruGetList.Len() > 0 {
for {
if v := c.lruGetList.PopFront(); v != nil {
c.lru.Push(v)
} else {
break
}
}
}
// ========================
// 缓存过期处理
// ========================
ek := c.makeExpireKey(gtime.Millisecond())
eks := []int64{ek - 1000, ek - 2000, ek - 3000, ek - 4000, ek - 5000}
for _, expireTime := range eks {
if expireSet := c.getExpireSet(expireTime); expireSet != nil {
// 遍历Set执行数据过期删除
expireSet.Iterator(func(key interface{}) bool {
return c.clearByKey(key)
})
// Set数据处理完之后删除该Set
c.expireSetMu.Lock()
delete(c.expireSets, expireTime)
c.expireSetMu.Unlock()
}
// ========================
// 缓存过期处理
// ========================
ek := c.makeExpireKey(gtime.Millisecond())
eks := []int64{ek - 10000, ek - 20000, ek - 30000, ek - 40000, ek - 50000}
for _, expireTime := range eks {
if expireSet := c.getExpireSet(expireTime); expireSet != nil {
// 遍历Set执行数据过期删除
expireSet.Iterator(func(key interface{}) bool {
return c.clearByKey(key)
})
// Set数据处理完之后删除该Set
c.expireSetMu.Lock()
delete(c.expireSets, expireTime)
c.expireSetMu.Unlock()
}
}
// 每间隔1秒批量处理一次
time.Sleep(time.Second)
}
}
@ -374,7 +370,9 @@ func (c *memCache) clearByKey(key interface{}, force...bool) bool {
c.expireTimeMu.Unlock()
// 删除LRU管理对象中指定键名
c.lru.Remove(key)
if c.cap > 0 {
c.lru.Remove(key)
}
return true
}

View File

@ -12,7 +12,7 @@ import (
"gitee.com/johng/gf/g/container/glist"
"gitee.com/johng/gf/g/container/gmap"
"gitee.com/johng/gf/g/container/gtype"
"time"
"gitee.com/johng/gf/g/os/gtimew"
)
// LRU算法实现对象底层双向链表使用了标准库的list.List
@ -33,7 +33,7 @@ func newMemCacheLru(cache *memCache) *memCacheLru {
rawList : glist.New(),
closed : gtype.NewBool(),
}
go lru.StartAutoLoop()
gtimew.AddSingleton(1, lru.SyncAndClear)
return lru
}
@ -78,30 +78,28 @@ func (lru *memCacheLru) Print() {
}
// 异步执行协程将queue中的数据同步到list中
func (lru *memCacheLru) StartAutoLoop() {
func (lru *memCacheLru) SyncAndClear() {
if lru.closed.Val() {
gtimew.ExitJob()
return
}
// 数据同步
for {
if lru.closed.Val() {
return
}
// 数据同步
for {
if v := lru.rawList.PopFront(); v != nil {
// 删除对应链表项
if v := lru.data.Get(v); v != nil {
lru.list.Remove(v.(*list.Element))
}
// 将数据插入到链表头,并记录对应的链表项到哈希表中,便于检索
lru.data.Set(v, lru.list.PushFront(v))
} else {
break
if v := lru.rawList.PopFront(); v != nil {
// 删除对应链表项
if v := lru.data.Get(v); v != nil {
lru.list.Remove(v.(*list.Element))
}
// 将数据插入到链表头,并记录对应的链表项到哈希表中,便于检索
lru.data.Set(v, lru.list.PushFront(v))
} else {
break
}
// 数据清理
for i := lru.Size() - lru.cache.cap; i > 0; i-- {
if s := lru.Pop(); s != nil {
lru.cache.clearByKey(s, true)
}
}
// 数据清理
for i := lru.Size() - lru.cache.cap; i > 0; i-- {
if s := lru.Pop(); s != nil {
lru.cache.clearByKey(s, true)
}
time.Sleep(time.Second)
}
}

View File

@ -0,0 +1,65 @@
// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// go test *.go -bench=".*" -benchmem
package gcache_test
import (
"gitee.com/johng/gf/g/os/gcache"
"gitee.com/johng/gf/g/util/gtest"
"testing"
"time"
)
func TestCache_Set(t *testing.T) {
gtest.Case(func() {
cache := gcache.New()
cache.Set(1, 11, 0)
gtest.Assert(cache.Get(1), 11)
})
}
func TestCache_Set_Expire(t *testing.T) {
gtest.Case(func() {
cache := gcache.New()
cache.Set(2, 22, 100)
gtest.Assert(cache.Get(2), 22)
time.Sleep(200*time.Millisecond)
gtest.Assert(cache.Get(2), nil)
time.Sleep(3*time.Second)
gtest.Assert(cache.Size(), 0)
})
}
func TestCache_Keys_Values(t *testing.T) {
gtest.Case(func() {
cache := gcache.New()
for i := 0; i < 10; i++ {
cache.Set(i, i*10, 0)
}
gtest.Assert(len(cache.Keys()), 10)
gtest.Assert(len(cache.Values()), 10)
gtest.AssertIN(0, cache.Keys())
gtest.AssertIN(90, cache.Values())
})
}
func TestCache_LRU(t *testing.T) {
gtest.Case(func() {
cache := gcache.New(2)
for i := 0; i < 10; i++ {
cache.Set(i, i, 0)
}
gtest.Assert(cache.Size(), 10)
gtest.Assert(cache.Get(6), 6)
time.Sleep(3*time.Second)
gtest.Assert(cache.Size(), 2)
gtest.Assert(cache.Get(6), 6)
gtest.Assert(cache.Get(1), nil)
})
}

View File

@ -13,7 +13,7 @@ import (
func Benchmark_Add(b *testing.B) {
for i := 0; i < b.N; i++ {
gcron.Add("* * * * * *", func() {
gcron.Add("1 1 1 1 1 1", func() {
})
}

View File

@ -15,60 +15,50 @@ import (
"time"
)
var (
cron1 = gcron.New()
cron2 = gcron.New()
)
func TestCron_Add_Close(t *testing.T) {
cron := gcron.New()
array := garray.New(0, 0)
_, err1 := cron1.Add("* * * * * *", func() {
_, err1 := cron.Add("* * * * * *", func() {
array.Append(1)
})
_, err2 := cron1.Add("* * * * * *", func() {
_, err2 := cron.Add("* * * * * *", func() {
array.Append(1)
}, "test")
_, err3 := cron1.Add("* * * * * *", func() {
_, err3 := cron.Add("* * * * * *", func() {
array.Append(1)
}, "test")
_, err4 := cron1.Add("@every 2s", func() {
_, err4 := cron.Add("@every 2s", func() {
array.Append(1)
})
gtest.Assert(err1, nil)
gtest.Assert(err2, nil)
gtest.AssertNE(err3, nil)
gtest.Assert(err4, nil)
gtest.Assert(len(cron.Entries()), 3)
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 2)
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 5)
cron1.Close()
cron.Close()
time.Sleep(1100*time.Millisecond)
fixedLength := array.Len()
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), fixedLength)
}
func TestCron_Entries(t *testing.T) {
entries := cron1.Entries()
gtest.Assert(len(entries), 3)
}
func TestCron_DelayAdd(t *testing.T) {
cron2.Add("* * * * * *", func() {}, "add")
cron2.DelayAdd(1, "* * * * * *", func() {}, "delay_add")
gtest.Assert(len(cron2.Entries()), 1)
func TestCron_Mathod(t *testing.T) {
cron := gcron.New()
cron.Add("* * * * * *", func() {}, "add")
cron.DelayAdd(1, "* * * * * *", func() {}, "delay_add")
gtest.Assert(len(cron.Entries()), 1)
time.Sleep(1100*time.Millisecond)
gtest.Assert(len(cron2.Entries()), 2)
}
gtest.Assert(len(cron.Entries()), 2)
func TestCron_Remove(t *testing.T) {
cron2.Remove("delay_add")
gtest.Assert(len(cron2.Entries()), 1)
}
cron.Remove("delay_add")
gtest.Assert(len(cron.Entries()), 1)
func TestCron_Search(t *testing.T) {
entry1 := cron2.Search("add")
entry2 := cron2.Search("test-none")
entry1 := cron.Search("add")
entry2 := cron.Search("test-none")
gtest.AssertNE(entry1, nil)
gtest.Assert(entry2, nil)
}

View File

@ -14,18 +14,15 @@ import (
"time"
)
var (
singletonCron = gcron.New()
)
func TestCron_AddSingleton(t *testing.T) {
cron := gcron.New()
array := garray.New(0, 0)
singletonCron.AddSingleton("* * * * * *", func() {
cron.AddSingleton("* * * * * *", func() {
array.Append(1)
time.Sleep(5*time.Second)
})
gtest.Assert(len(singletonCron.Entries()), 1)
gtest.Assert(len(cron.Entries()), 1)
time.Sleep(3500*time.Millisecond)
gtest.Assert(array.Len(), 1)
}

View File

@ -14,20 +14,17 @@ import (
"time"
)
var (
onceCron = gcron.New()
)
func TestCron_AddOnce(t *testing.T) {
cron := gcron.New()
array := garray.New(0, 0)
singletonCron.AddOnce("* * * * * *", func() {
cron.AddOnce("* * * * * *", func() {
array.Append(1)
})
singletonCron.AddOnce("* * * * * *", func() {
cron.AddOnce("* * * * * *", func() {
array.Append(1)
})
gtest.Assert(len(singletonCron.Entries()), 2)
gtest.Assert(len(cron.Entries()), 2)
time.Sleep(2500*time.Millisecond)
gtest.Assert(array.Len(), 2)
gtest.Assert(len(singletonCron.Entries()), 0)
gtest.Assert(len(cron.Entries()), 0)
}

View File

@ -7,11 +7,8 @@
package gmlock
import (
"fmt"
"gitee.com/johng/gf/g/container/gmap"
"gitee.com/johng/gf/g/os/gcron"
"gitee.com/johng/gf/g/os/gtime"
"time"
"gitee.com/johng/gf/g/os/gtimew"
)
// 内存锁管理对象
@ -81,7 +78,7 @@ func (l *Locker) doLock(key string, expire int, try bool) bool {
if ok && expire > 0 {
// 异步goroutine计时处理
wid := mu.wid.Val()
gtime.SetTimeout(time.Duration(expire)*time.Millisecond, func() {
gtimew.AddOnce(expire, func() {
if wid == mu.wid.Val() {
mu.Unlock()
}
@ -101,7 +98,7 @@ func (l *Locker) doRLock(key string, expire int, try bool) bool {
}
if ok && expire > 0 {
rid := mu.rid.Val()
gcron.AddOnce(fmt.Sprintf(`@every %ds`, expire), func() {
gtimew.AddOnce(expire, func() {
if rid == mu.rid.Val() {
mu.RUnlock()
}

View File

@ -31,15 +31,6 @@ func NewMutex() *Mutex {
}
}
// 不阻塞Lock
func (l *Mutex) TryLock() bool {
if l.wcount.Val() == 0 && l.rcount.Val() == 0 {
l.Lock()
return true
}
return false
}
func (l *Mutex) Lock() {
l.wcount.Add(1)
l.mu.Lock()
@ -49,20 +40,12 @@ func (l *Mutex) Lock() {
// 安全的Unlock
func (l *Mutex) Unlock() {
if l.wcount.Val() > 0 {
l.mu.Unlock()
l.wcount.Add(-1)
if l.wcount.Add(-1) >= 0 {
l.mu.Unlock()
}
}
}
// 不阻塞RLock
func (l *Mutex) TryRLock() bool {
if l.wcount.Val() == 0 {
l.RLock()
return true
}
return false
}
func (l *Mutex) RLock() {
l.rcount.Add(1)
l.mu.RLock()
@ -72,7 +55,35 @@ func (l *Mutex) RLock() {
// 安全的RUnlock
func (l *Mutex) RUnlock() {
if l.rcount.Val() > 0 {
l.mu.RUnlock()
l.rcount.Add(-1)
if l.wcount.Add(-1) >= 0 {
l.mu.RUnlock()
}
}
}
// 不阻塞Lock
func (l *Mutex) TryLock() bool {
// 初步读写次数检查, 但无法保证原子性
if l.wcount.Val() == 0 && l.rcount.Val() == 0 {
// 第二次检查, 保证原子操作
if l.wcount.Add(1) == 1 {
l.mu.Lock()
l.wid.Set(gtime.Nanosecond())
return true
}
}
return false
}
// 不阻塞RLock
func (l *Mutex) TryRLock() bool {
// 只要不存在写锁
if l.wcount.Val() == 0 {
l.rcount.Add(1)
l.mu.RLock()
l.rid.Set(gtime.Nanosecond())
return true
}
return false
}

View File

@ -1,103 +0,0 @@
// Copyright 2019 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package gtimec
import (
"gitee.com/johng/gf/g/container/glist"
"gitee.com/johng/gf/g/container/gtype"
"time"
)
// 循环任务管理对象
type Circle struct {
status *gtype.Int // 循环任务状态(0: 未执行; 1: 运行中; -1:删除关闭)
entries *glist.List // 所有的循环任务项
}
// 创建自定义的循环任务管理对象
func New() *Circle {
circle := &Circle {
status : gtype.NewInt(STATUS_RUNNING),
entries : glist.New(),
}
circle.startLoop()
return circle
}
// 添加循环任务
func (c *Circle) Add(interval int, job func()) *Entry {
entry := newEntry(interval, job, MODE_NORMAL)
c.entries.PushBack(entry)
return entry
}
// 添加单例运行循环任务
func (c *Circle) AddSingleton(interval int, job func()) *Entry {
entry := newEntry(interval, job, MODE_SINGLETON)
c.entries.PushBack(entry)
return entry
}
// 添加只运行一次的循环任务
func (c *Circle) AddOnce(interval int, job func()) *Entry {
entry := newEntry(interval, job, MODE_ONCE)
c.entries.PushBack(entry)
return entry
}
// 延迟添加循环任务delay参数单位为秒
func (c *Circle) DelayAdd(delay int, interval int, job func()) {
go func() {
time.Sleep(time.Duration(delay)*time.Second)
c.Add(interval, job)
}()
}
// 延迟添加单例循环任务delay参数单位为秒
func (c *Circle) DelayAddSingleton(delay int, interval int, job func()) {
go func() {
time.Sleep(time.Duration(delay)*time.Second)
c.AddSingleton(interval, job)
}()
}
// 延迟添加只运行一次的循环任务delay参数单位为秒
func (c *Circle) DelayAddOnce(delay int, interval int, job func()) {
go func() {
time.Sleep(time.Duration(delay)*time.Second)
c.AddOnce(interval, job)
}()
}
// 关闭循环任务
func (c *Circle) Close() {
c.status.Set(STATUS_CLOSED)
}
//// 获取所有已注册的循环任务项(按照注册时间从小到大进行排序)
//func (c *Circle) Entries() []*Entry {
// array := garray.NewSortedArray(c.entries.Len(), func(v1, v2 interface{}) int {
// entry1 := v1.(*Entry)
// entry2 := v2.(*Entry)
// if entry1.Create > entry2.Create {
// return 1
// }
// return -1
// }, false)
// c.entries.RLockFunc(func(m map[string]interface{}) {
// for _, v := range m {
// array.Add(v.(*Entry))
// }
// })
// entries := make([]*Entry, array.Len())
// array.RLockFunc(func(array []interface{}) {
// for k, v := range array {
// entries[k] = v.(*Entry)
// }
// })
// return entries
//}

View File

@ -4,10 +4,10 @@
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// Package gtimec provides Time Circle for interval job running management/时间轮.
// Package gtimew provides Time Wheel for interval jobs running management/时间轮.
// 高效的时间轮任务执行管理,用于管理异步的间隔运行任务,或者异步延迟只运行一次的任务(最小时间粒度为秒)。
// 与其他定时任务管理模块的区别是,时间轮模块只管理间隔执行任务,并且更注重执行效率(纳秒级别)。
package gtimec
package gtimew
const (
MODE_NORMAL = 0
@ -17,44 +17,51 @@ const (
STATUS_READY = 0
STATUS_RUNNING = 1
STATUS_CLOSED = -1
gPANIC_EXIT = "exit"
)
var (
// 默认的circle管理对象
defaultCircle = New()
// 默认的wheel管理对象
defaultWheel = New()
)
// 添加执行方法,可以给定名字,以便于后续执行删除
func Add(interval int, job func()) *Entry {
return defaultCircle.Add(interval, job)
func Add(interval int, job JobFunc) *Entry {
return defaultWheel.Add(interval, job)
}
// 添加单例运行循环任务
func AddSingleton(interval int, job func()) *Entry {
return defaultCircle.AddSingleton(interval, job)
func AddSingleton(interval int, job JobFunc) *Entry {
return defaultWheel.AddSingleton(interval, job)
}
// 添加只运行一次的循环任务
func AddOnce(interval int, job func()) *Entry {
return defaultCircle.AddOnce(interval, job)
func AddOnce(interval int, job JobFunc) *Entry {
return defaultWheel.AddOnce(interval, job)
}
// 延迟添加循环任务delay参数单位为秒
func DelayAdd(delay int, interval int, job func()) {
defaultCircle.DelayAdd(delay, interval, job)
func DelayAdd(delay int, interval int, job JobFunc) {
defaultWheel.DelayAdd(delay, interval, job)
}
// 延迟添加单例循环任务delay参数单位为秒
func DelayAddSingleton(delay int, interval int, job func()) {
defaultCircle.DelayAddSingleton(delay, interval, job)
func DelayAddSingleton(delay int, interval int, job JobFunc) {
defaultWheel.DelayAddSingleton(delay, interval, job)
}
// 延迟添加只运行一次的循环任务delay参数单位为秒
func DelayAddOnce(delay int, interval int, job func()) {
defaultCircle.DelayAddOnce(delay, interval, job)
func DelayAddOnce(delay int, interval int, job JobFunc) {
defaultWheel.DelayAddOnce(delay, interval, job)
}
//// 获取所有已注册的循环任务项
//func Entries() []*Entry {
// return defaultCircle.Entries()
//}
// 获取所有已注册的循环任务项
func Entries() []*Entry {
return defaultWheel.Entries()
}
// 在Job方法中调用停止当前运行的Job
func ExitJob() {
panic(gPANIC_EXIT)
}

View File

@ -4,16 +4,17 @@
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package gtimec_test
package gtimew_test
import (
"gitee.com/johng/gf/g/os/gtimec"
"gitee.com/johng/gf/g/os/gtimew"
"testing"
)
func Benchmark_Add(b *testing.B) {
for i := 0; i < b.N; i++ {
gtimec.Add(1, func() {
// 基准测试的时候不能设置为1秒否则大量的任务会崩掉系统
gtimew.Add(100000, func() {
})
}

View File

@ -4,7 +4,7 @@
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package gtimec
package gtimew
import (
"gitee.com/johng/gf/g/container/gtype"
@ -15,13 +15,16 @@ import (
type Entry struct {
mode *gtype.Int // 任务运行模式(0: normal; 1: singleton; 2: once)
status *gtype.Int // 循环任务状态(0: ready; 1: running; -1: stopped)
Job func() // 注册循环任务方法
Job JobFunc // 注册循环任务方法
Create int64 // 创建时间戳(秒)
Interval int64 // 运行间隔(秒)
}
// 任务执行方法
type JobFunc func()
// 创建循环任务
func newEntry(interval int, job func(), mode int) *Entry {
func newEntry(interval int, job JobFunc, mode int) *Entry {
return &Entry {
mode : gtype.NewInt(mode),
status : gtype.NewInt(),
@ -31,6 +34,11 @@ func newEntry(interval int, job func(), mode int) *Entry {
}
}
// 获取任务运行模式
func (entry *Entry) Mode() int {
return entry.mode.Val()
}
// 设置任务运行模式(0: normal; 1: singleton; 2: once)
func (entry *Entry) SetMode(mode int) {
entry.mode.Set(mode)
@ -52,7 +60,7 @@ func (entry *Entry) Stop() {
}
// 给定时间是否满足当前循环任务运行间隔
func (entry *Entry) meet(t time.Time) bool {
func (entry *Entry) Meet(t time.Time) bool {
diff := t.Unix() - entry.Create
if diff > 0 {
return diff%entry.Interval == 0

View File

@ -4,39 +4,36 @@
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package gtimec
package gtimew
import (
"container/list"
"gitee.com/johng/gf/g/container/garray"
"gitee.com/johng/gf/g/os/glog"
"time"
)
// 延迟添加循环任务delay参数单位为秒
func (c *Circle) startLoop() {
func (w *Wheel) startLoop() {
go func() {
for c.status.Val() != STATUS_CLOSED {
for w.status.Val() != STATUS_CLOSED {
time.Sleep(time.Second)
glog.Println("hello")
if c.status.Val() == STATUS_RUNNING {
go c.checkEntries(time.Now())
if w.status.Val() == STATUS_RUNNING {
go w.checkEntries(time.Now())
}
}
}()
}
// 遍历检查可执行循环任务,并异步执行
func (c *Circle) checkEntries(t time.Time) {
removeArray := garray.NewArray(0, 0, false)
c.entries.RLockFunc(func(l *list.List) {
func (w *Wheel) checkEntries(t time.Time) {
w.entries.RLockFunc(func(l *list.List) {
for e := l.Front(); e != nil; e = e.Next() {
entry := e.Value.(*Entry)
if entry.meet(t) {
if entry.Meet(t) {
// 是否已命令停止运行
if entry.status.Val() == STATUS_CLOSED {
continue
}
// 判断任务的运行模式
switch entry.mode.Val() {
// 是否只允许单例运行
case MODE_SINGLETON:
@ -48,25 +45,28 @@ func (c *Circle) checkEntries(t time.Time) {
if entry.status.Set(STATUS_CLOSED) == STATUS_CLOSED {
continue
}
removeArray.Append(e)
}
// 执行异步运行
//go func() {
// //defer func() {
// // if entry.status.Val() != STATUS_CLOSED {
// // entry.status.Set(STATUS_READY)
// // }
// //}()
// //entry.Job()
//}()
go func(element *list.Element) {
defer func() {
if err := recover(); err != nil {
if err == gPANIC_EXIT {
entry.status.Set(STATUS_CLOSED)
} else {
panic(err)
}
}
if entry.status.Val() != STATUS_CLOSED {
entry.status.Set(STATUS_READY)
} else {
// 异步删除,不受锁机制的影响
w.entries.Remove(element)
}
}()
entry.Job()
}(e)
}
}
})
if removeArray.Len() > 0 {
c.entries.LockFunc(func(l *list.List) {
for _, v := range removeArray.Slice() {
l.Remove(v.(*list.Element))
}
})
}
}

View File

@ -0,0 +1,150 @@
// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// 包方法操作
package gtimew_test
import (
"gitee.com/johng/gf/g/container/garray"
"gitee.com/johng/gf/g/os/gtimew"
"gitee.com/johng/gf/g/util/gtest"
"testing"
"time"
)
func TestWheel_Add_Close(t *testing.T) {
gtest.Case(func() {
wheel := gtimew.New()
array := garray.New(0, 0)
entry1 := wheel.Add(1, func() {
array.Append(1)
})
entry2 := wheel.Add(1, func() {
array.Append(1)
})
entry3 := wheel.Add(2, func() {
array.Append(1)
})
gtest.AssertNE(entry1, nil)
gtest.AssertNE(entry2, nil)
gtest.AssertNE(entry3, nil)
gtest.Assert(len(wheel.Entries()), 3)
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 2)
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 5)
wheel.Close()
time.Sleep(1100*time.Millisecond)
fixedLength := array.Len()
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), fixedLength)
})
}
func TestWheel_Singlton(t *testing.T) {
gtest.Case(func() {
wheel := gtimew.New()
array := garray.New(0, 0)
entry := wheel.AddSingleton(1, func() {
array.Append(1)
time.Sleep(10*time.Second)
})
gtest.AssertNE(entry, nil)
gtest.Assert(len(wheel.Entries()), 1)
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 1)
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 1)
})
}
func TestWheel_Once(t *testing.T) {
gtest.Case(func() {
wheel := gtimew.New()
array := garray.New(0, 0)
entry1 := wheel.AddOnce(1, func() {
array.Append(1)
})
entry2 := wheel.AddOnce(1, func() {
array.Append(1)
})
gtest.AssertNE(entry1, nil)
gtest.AssertNE(entry2, nil)
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 2)
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 2)
wheel.Close()
time.Sleep(1100*time.Millisecond)
fixedLength := array.Len()
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), fixedLength)
})
}
func TestWheel_DelayAdd(t *testing.T) {
gtest.Case(func() {
wheel := gtimew.New()
wheel.DelayAdd(1, 1, func() {})
gtest.Assert(len(wheel.Entries()), 0)
time.Sleep(1100*time.Millisecond)
gtest.Assert(len(wheel.Entries()), 1)
})
}
func TestWheel_DelayAdd_Singleton(t *testing.T) {
gtest.Case(func() {
wheel := gtimew.New()
array := garray.New(0, 0)
wheel.DelayAddSingleton(1, 1, func() {
array.Append(1)
time.Sleep(10*time.Second)
})
gtest.Assert(len(wheel.Entries()), 0)
time.Sleep(1100*time.Millisecond)
gtest.Assert(len(wheel.Entries()), 1)
gtest.Assert(array.Len(), 0)
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 1)
})
}
func TestWheel_DelayAdd_Once(t *testing.T) {
gtest.Case(func() {
wheel := gtimew.New()
array := garray.New(0, 0)
wheel.DelayAddOnce(1, 1, func() {
array.Append(1)
})
gtest.Assert(len(wheel.Entries()), 0)
time.Sleep(1100*time.Millisecond)
gtest.Assert(len(wheel.Entries()), 1)
gtest.Assert(array.Len(), 0)
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 1)
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 1)
})
}
func TestWheel_ExitJob(t *testing.T) {
gtest.Case(func() {
wheel := gtimew.New()
array := garray.New(0, 0)
wheel.Add(1, func() {
array.Append(1)
gtimew.ExitJob()
})
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 1)
gtest.Assert(len(wheel.Entries()), 0)
})
}

View File

@ -0,0 +1,68 @@
// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// Entry操作
package gtimew_test
import (
"gitee.com/johng/gf/g/container/garray"
"gitee.com/johng/gf/g/os/gtimew"
"gitee.com/johng/gf/g/util/gtest"
"testing"
"time"
)
func TestWheel_Entry_Operation(t *testing.T) {
wheel := gtimew.New()
array := garray.New(0, 0)
entry := wheel.Add(1, func() {
array.Append(1)
})
gtest.AssertNE(entry, nil)
gtest.Assert(len(wheel.Entries()), 1)
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 1)
entry.Stop()
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 1)
entry.Start()
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 2)
}
func TestWheel_Entry_Singlton(t *testing.T) {
wheel := gtimew.New()
array := garray.New(0, 0)
entry := wheel.Add(1, func() {
array.Append(1)
time.Sleep(10*time.Second)
})
entry.SetMode(gtimew.MODE_SINGLETON)
gtest.AssertNE(entry, nil)
gtest.Assert(len(wheel.Entries()), 1)
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 1)
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 1)
}
func TestWheel_Entry_Once(t *testing.T) {
wheel := gtimew.New()
array := garray.New(0, 0)
entry := wheel.Add(1, func() {
array.Append(1)
})
entry.SetMode(gtimew.MODE_ONCE)
gtest.AssertNE(entry, nil)
gtest.Assert(len(wheel.Entries()), 1)
time.Sleep(1100*time.Millisecond)
gtest.Assert(array.Len(), 1)
gtest.Assert(len(wheel.Entries()), 0)
}

105
g/os/gtimew/gtimew_wheel.go Normal file
View File

@ -0,0 +1,105 @@
// Copyright 2019 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package gtimew
import (
"container/list"
"gitee.com/johng/gf/g/container/garray"
"gitee.com/johng/gf/g/container/glist"
"gitee.com/johng/gf/g/container/gtype"
"time"
)
// 循环任务管理对象
type Wheel struct {
status *gtype.Int // 循环任务状态(0: 未执行; 1: 运行中; -1:删除关闭)
entries *glist.List // 所有的循环任务项
}
// 创建自定义的循环任务管理对象
func New() *Wheel {
wheel := &Wheel {
status : gtype.NewInt(STATUS_RUNNING),
entries : glist.New(),
}
wheel.startLoop()
return wheel
}
// 添加循环任务
func (w *Wheel) Add(interval int, job JobFunc) *Entry {
entry := newEntry(interval, job, MODE_NORMAL)
w.entries.PushBack(entry)
return entry
}
// 添加单例运行循环任务
func (w *Wheel) AddSingleton(interval int, job JobFunc) *Entry {
entry := newEntry(interval, job, MODE_SINGLETON)
w.entries.PushBack(entry)
return entry
}
// 添加只运行一次的循环任务
func (w *Wheel) AddOnce(interval int, job JobFunc) *Entry {
entry := newEntry(interval, job, MODE_ONCE)
w.entries.PushBack(entry)
return entry
}
// 延迟添加循环任务delay参数单位为秒
func (w *Wheel) DelayAdd(delay int, interval int, job JobFunc) {
go func() {
time.Sleep(time.Duration(delay)*time.Second)
w.Add(interval, job)
}()
}
// 延迟添加单例循环任务delay参数单位为秒
func (w *Wheel) DelayAddSingleton(delay int, interval int, job JobFunc) {
go func() {
time.Sleep(time.Duration(delay)*time.Second)
w.AddSingleton(interval, job)
}()
}
// 延迟添加只运行一次的循环任务delay参数单位为秒
func (w *Wheel) DelayAddOnce(delay int, interval int, job JobFunc) {
go func() {
time.Sleep(time.Duration(delay)*time.Second)
w.AddOnce(interval, job)
}()
}
// 关闭循环任务
func (w *Wheel) Close() {
w.status.Set(STATUS_CLOSED)
}
// 获取所有已注册的循环任务项(按照注册时间从小到大进行排序)
func (w *Wheel) Entries() []*Entry {
array := garray.NewSortedArray(w.entries.Len(), func(v1, v2 interface{}) int {
entry1 := v1.(*Entry)
entry2 := v2.(*Entry)
if entry1.Create > entry2.Create {
return 1
}
return -1
}, false)
w.entries.RLockFunc(func(l *list.List) {
for e := l.Front(); e != nil; e = e.Next() {
array.Add(e.Value.(*Entry))
}
})
entries := make([]*Entry, array.Len())
array.RLockFunc(func(array []interface{}) {
for k, v := range array {
entries[k] = v.(*Entry)
}
})
return entries
}

View File

@ -4,7 +4,7 @@
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// Package gtest provides useful test utils/测试模块.
// Package gtest provides simple and useful test utils/测试模块.
package gtest
import (
@ -15,6 +15,17 @@ import (
"reflect"
)
// 封装一个测试用例
func Case(f func()) {
defer func() {
if err := recover(); err != nil {
glog.To(os.Stderr).Println(err)
glog.Header(false).PrintBacktrace(4)
}
}()
f()
}
// 断言判断, 相等
func Assert(value, expect interface{}) {
rv := reflect.ValueOf(value)
@ -24,9 +35,7 @@ func Assert(value, expect interface{}) {
}
}
if value != expect {
glog.To(os.Stderr).Printfln(`[ASSERT] EXPECT %v == %v`, value, expect)
glog.Header(false).PrintBacktrace(1)
os.Exit(1)
panic(fmt.Sprintf(`[ASSERT] EXPECT %v == %v`, value, expect))
}
}
@ -39,9 +48,7 @@ func AssertEQ(value, expect interface{}) {
}
}
if value != expect {
glog.To(os.Stderr).Printfln(`[ASSERT] EXPECT %v == %v`, value, expect)
glog.Header(false).PrintBacktrace(1)
os.Exit(1)
panic(fmt.Sprintf(`[ASSERT] EXPECT %v == %v`, value, expect))
}
}
@ -54,9 +61,7 @@ func AssertNE(value, expect interface{}) {
}
}
if value == expect {
glog.To(os.Stderr).Printfln(`[ASSERT] EXPECT %v != %v`, value, expect)
glog.Header(false).PrintBacktrace(1)
os.Exit(1)
panic(fmt.Sprintf(`[ASSERT] EXPECT %v != %v`, value, expect))
}
}
@ -77,9 +82,7 @@ func AssertGT(value, expect interface{}) {
passed = gconv.Float64(value) > gconv.Float64(expect)
}
if !passed {
glog.To(os.Stderr).Printfln(`[ASSERT] EXPECT %v > %v`, value, expect)
glog.Header(false).PrintBacktrace(1)
os.Exit(1)
panic(fmt.Sprintf(`[ASSERT] EXPECT %v > %v`, value, expect))
}
}
@ -100,9 +103,7 @@ func AssertGTE(value, expect interface{}) {
passed = gconv.Float64(value) >= gconv.Float64(expect)
}
if !passed {
glog.To(os.Stderr).Printfln(`[ASSERT] EXPECT %v >= %v`, value, expect)
glog.Header(false).PrintBacktrace(1)
os.Exit(1)
panic(fmt.Sprintf(`[ASSERT] EXPECT %v >= %v`, value, expect))
}
}
@ -123,9 +124,7 @@ func AssertLT(value, expect interface{}) {
passed = gconv.Float64(value) < gconv.Float64(expect)
}
if !passed {
glog.To(os.Stderr).Printfln(`[ASSERT] EXPECT %v < %v`, value, expect)
glog.Header(false).PrintBacktrace(1)
os.Exit(1)
panic(fmt.Sprintf(`[ASSERT] EXPECT %v < %v`, value, expect))
}
}
@ -146,9 +145,7 @@ func AssertLTE(value, expect interface{}) {
passed = gconv.Float64(value) <= gconv.Float64(expect)
}
if !passed {
glog.To(os.Stderr).Printfln(`[ASSERT] EXPECT %v <= %v`, value, expect)
glog.Header(false).PrintBacktrace(1)
os.Exit(1)
panic(fmt.Sprintf(`[ASSERT] EXPECT %v <= %v`, value, expect))
}
}
@ -166,9 +163,7 @@ func AssertIN(value, expect interface{}) {
}
}
if !passed {
glog.To(os.Stderr).Printfln(`[ASSERT] EXPECT %v IN %v`, value, expect)
glog.Header(false).PrintBacktrace(1)
os.Exit(1)
panic(fmt.Sprintf(`[ASSERT] EXPECT %v IN %v`, value, expect))
}
}
@ -185,9 +180,7 @@ func AssertNI(value, expect interface{}) {
}
}
if passed {
glog.To(os.Stderr).Printfln(`[ASSERT] EXPECT %v NOT IN %v`, value, expect)
glog.Header(false).PrintBacktrace(1)
os.Exit(1)
panic(fmt.Sprintf(`[ASSERT] EXPECT %v NOT IN %v`, value, expect))
}
}