From b9eead27e9436d948883209af92550eec6f163b6 Mon Sep 17 00:00:00 2001 From: John Date: Tue, 18 Sep 2018 00:01:10 +0800 Subject: [PATCH] =?UTF-8?q?gcache=E6=94=B9=E8=BF=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- g/container/gqueue/gqueue.go | 12 ++- g/net/ghttp/ghttp_server.go | 5 - g/os/gcache/gcache.go | 16 ++- g/os/gcache/gcache_cache.go | 19 +++- g/os/gcache/gcache_mem_cache.go | 175 ++++++++++++++++++-------------- g/os/gcache/gcache_test.go | 104 +++++++++++-------- geg/other/test.go | 12 ++- 7 files changed, 194 insertions(+), 149 deletions(-) diff --git a/g/container/gqueue/gqueue.go b/g/container/gqueue/gqueue.go index 090b33e0c..fddac93c7 100644 --- a/g/container/gqueue/gqueue.go +++ b/g/container/gqueue/gqueue.go @@ -61,8 +61,12 @@ func (q *Queue) startAsyncLoop() { case <- q.closeChan: return case <- q.events: - if v := q.list.PopFront(); v != nil { - q.queue <- v + for { + if v := q.list.PopFront(); v != nil { + q.queue <- v + } else { + break + } } } } @@ -74,9 +78,7 @@ func (q *Queue) Push(v interface{}) { q.queue <- v } else { q.list.PushBack(v) - if len(q.events) == 0 { - q.events <- struct{}{} - } + q.events <- struct{}{} } } diff --git a/g/net/ghttp/ghttp_server.go b/g/net/ghttp/ghttp_server.go index 3797e89f5..38c7609fd 100644 --- a/g/net/ghttp/ghttp_server.go +++ b/g/net/ghttp/ghttp_server.go @@ -44,8 +44,6 @@ const ( gDEFAULT_SERVER = "default" gDEFAULT_DOMAIN = "default" gDEFAULT_METHOD = "ALL" - gSERVE_CACHE_LRU_SIZE = 100000 // 服务回调函数缓存LRU大小 - gHOOKS_CACHE_LRU_SIZE = 100000 // 事件回调函数缓存LRU大小 gROUTE_REGISTER_HANDLER = 1 gROUTE_REGISTER_OBJECT = 2 gROUTE_REGISTER_CONTROLLER = 3 @@ -177,9 +175,6 @@ func GetServer(name...interface{}) (*Server) { closeQueue : gqueue.New(), logger : glog.New(), } - // 设置路由解析缓存上限,使用LRU进行缓存淘汰 - s.serveCache.SetCap(gSERVE_CACHE_LRU_SIZE) - s.hooksCache.SetCap(gHOOKS_CACHE_LRU_SIZE) for _, v := range strings.Split(gHTTP_METHODS, ",") { s.methodsMap[v] = struct{}{} } diff --git a/g/os/gcache/gcache.go b/g/os/gcache/gcache.go index 1164fdc67..18f9f4558 100644 --- a/g/os/gcache/gcache.go +++ b/g/os/gcache/gcache.go @@ -13,12 +13,8 @@ const ( ) // 全局缓存管理对象 -var cache = New() - -// 设置缓存池大小,内部依靠LRU算法进行缓存淘汰处理 -func SetCap(cap int) { - cache.cap.Set(cap) -} +//var cache = New() +var cache = (*Cache)(nil) // (使用全局KV缓存对象)设置kv缓存键值对,过期时间单位为**毫秒** func Set(key interface{}, value interface{}, expire int) { @@ -57,13 +53,13 @@ func Contains(key interface{}) bool { } // (使用全局KV缓存对象)删除指定键值对 -func Remove(key interface{}) { - cache.Remove(key) +func Remove(key interface{}) interface{} { + return cache.Remove(key) } // (使用全局KV缓存对象)批量删除指定键值对 -func BatchRemove(keys []interface{}) { - cache.BatchRemove(keys) +func BatchRemove(keys []interface{}) map[interface{}]interface{} { + return cache.BatchRemove(keys) } // 获得所有的键名,组成数组返回 diff --git a/g/os/gcache/gcache_cache.go b/g/os/gcache/gcache_cache.go index 4d423ff76..ecf5570ac 100644 --- a/g/os/gcache/gcache_cache.go +++ b/g/os/gcache/gcache_cache.go @@ -14,17 +14,26 @@ import ( // 缓存对象。 // 底层只有一个缓存对象,如果需要提高并发性能,可新增缓存对象无锁哈希表,用键名做固定分区。 type Cache struct { - memCache + *memCache } // Cache对象按照缓存键名首字母做了分组 -func New() *Cache { - return &Cache { - memCache : *newMemCache(), +func New(lruCap...int) *Cache { + c := &Cache { + memCache : newMemCache(lruCap...), } + go c.autoSyncLoop() + go c.autoClearLoop() + if c.cap > 0 { + go c.autoLruClearLoop() + } + return c } // 清空缓存中的所有数据 func (c *Cache) Clear() { - atomic.SwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.memCache)), unsafe.Pointer(newMemCache())) + // 使用原子操作替换缓存对象 + old := atomic.SwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.memCache)), unsafe.Pointer(newMemCache())) + // 关闭旧的缓存对象 + (*memCache)(old).Close() } \ No newline at end of file diff --git a/g/os/gcache/gcache_mem_cache.go b/g/os/gcache/gcache_mem_cache.go index 9769983db..87fc778e9 100644 --- a/g/os/gcache/gcache_mem_cache.go +++ b/g/os/gcache/gcache_mem_cache.go @@ -7,14 +7,13 @@ package gcache import ( - "time" "math" "gitee.com/johng/gf/g/container/gset" "gitee.com/johng/gf/g/os/gtime" "sync" - "gitee.com/johng/gf/g/container/gtype" - "fmt" "gitee.com/johng/gf/g/util/gconv" + "gitee.com/johng/gf/g/container/glist" + "time" ) // 缓存对象 @@ -23,11 +22,11 @@ type memCache struct { emu sync.RWMutex // ekmap锁(expire key map) smu sync.RWMutex // eksets锁(expire key sets) lru *memCacheLru // LRU缓存限制(只有限定池大小时才启用) - cap *gtype.Int // 控制缓存池大小,超过大小则按照LRU算法进行缓存过期处理(默认为0表示不进行限制) + cap int // 控制缓存池大小,超过大小则按照LRU算法进行缓存过期处理(默认为0表示不进行限制) data map[interface{}]memCacheItem // 缓存数据(所有的缓存数据存放哈希表) ekmap map[interface{}]int64 // 键名对应的分组过期时间(用于相同键名过期时间快速更新),键值为10秒级时间戳 eksets map[int64]*gset.Set // 分组过期时间对应的键名列表(用于自动过期快速删除),键值为10秒级时间戳 - eventChan chan memCacheEvent // 异步处理队列 + eventQueue *glist.List // 异步处理队列 stopChan chan struct{} // 关闭时间通知 } @@ -43,33 +42,22 @@ type memCacheEvent struct { e int64 // 过期时间 } -const ( - // 这个数值不能太大,否则初始化会占用太多无意义的内存 - // 60W,这个数值是创始人的机器上支持基准测试的参考结果 - gEVENT_QUEUE_SIZE = 10000000 -) - // 创建底层的缓存对象 -func newMemCache() *memCache { +func newMemCache(lruCap...int) *memCache { c := &memCache { lru : newMemCacheLru(), - cap : gtype.NewInt(), data : make(map[interface{}]memCacheItem), ekmap : make(map[interface{}]int64), eksets : make(map[int64]*gset.Set), stopChan : make(chan struct{}), - eventChan : make(chan memCacheEvent, gEVENT_QUEUE_SIZE), + eventQueue : glist.New(), + } + if len(lruCap) > 0 { + c.cap = lruCap[0] } - go c.autoSyncLoop() - go c.autoClearLoop() return c } -// 设置缓存池大小,内部依靠LRU算法进行缓存淘汰处理 -func (c *memCache) SetCap(cap int) { - c.cap.Set(cap) -} - // 计算过期缓存的键名(将毫秒换算成秒的整数毫秒) func (c *memCache) makeExpireKey(expire int64) int64 { return int64(math.Ceil(float64(expire/10000) + 1)*10000) @@ -106,23 +94,43 @@ func (c *memCache) getOrNewExpireSet(expire int64) *gset.Set { // 设置kv缓存键值对,过期时间单位为毫秒,expire<=0表示不过期 func (c *memCache) Set(key interface{}, value interface{}, expire int) { - var e int64 - if expire != 0 { - e = gtime.Millisecond() + int64(expire) - } else { - e = gDEFAULT_MAX_EXPIRE - } + expireTimestamp := c.getInternalExpire(expire) c.dmu.Lock() - c.data[key] = memCacheItem{v : value, e : e} + c.data[key] = memCacheItem{v : value, e : expireTimestamp} c.dmu.Unlock() - c.eventChan <- memCacheEvent{k : key, e : e} + c.eventQueue.PushBack(memCacheEvent{k : key, e : expireTimestamp}) } +// 设置kv缓存键值对,内部会对键名的存在性使用写锁进行二次检索确认,如果存在则不再写入;返回键名对应的键值。 +// 在高并发下有用,防止数据写入的并发逻辑错误。 +func (c *memCache) doSetWithLockCheck(key interface{}, value interface{}, expire int) interface{} { + expireTimestamp := c.getInternalExpire(expire) + c.dmu.Lock() + if v, ok := c.data[key]; ok && !v.IsExpired() { + c.dmu.Unlock() + return v + } + c.data[key] = memCacheItem{v : value, e : expireTimestamp} + c.dmu.Unlock() + c.eventQueue.PushBack(memCacheEvent{k : key, e : expireTimestamp}) + return value +} + +// 根据给定expire参数计算内部使用的expire过期时间 +func (c *memCache) getInternalExpire(expire int) int64 { + if expire != 0 { + return gtime.Millisecond() + int64(expire) + } else { + return gDEFAULT_MAX_EXPIRE + } +} + + // 当键名不存在时写入,并返回true;否则返回false。 // 常用来做对并发性要求不高的内存锁。 func (c *memCache) SetIfNotExist(key interface{}, value interface{}, expire int) bool { if !c.Contains(key) { - c.Set(key, value, expire) + c.doSetWithLockCheck(key, value, expire) return true } return false @@ -130,17 +138,12 @@ func (c *memCache) SetIfNotExist(key interface{}, value interface{}, expire int) // 批量设置 func (c *memCache) BatchSet(data map[interface{}]interface{}, expire int) { - var e int64 - if expire != 0 { - e = gtime.Millisecond() + int64(expire) - } else { - e = gDEFAULT_MAX_EXPIRE - } + expireTimestamp := c.getInternalExpire(expire) for k, v := range data { c.dmu.Lock() - c.data[k] = memCacheItem{v: v, e: e} + c.data[k] = memCacheItem{v: v, e: expireTimestamp} c.dmu.Unlock() - c.eventChan <- memCacheEvent{k: k, e:e} + c.eventQueue.PushBack(memCacheEvent{k: k, e: expireTimestamp}) } } @@ -151,7 +154,7 @@ func (c *memCache) Get(key interface{}) interface{} { c.dmu.RUnlock() if ok && !item.IsExpired() { // LRU(Least Recently Used)操作记录 - if c.cap.Val() > 0 { + if c.cap > 0 { c.lru.Push(key) } return item.v @@ -162,8 +165,7 @@ func (c *memCache) Get(key interface{}) interface{} { // 当键名存在时返回其键值,否则写入指定的键值 func (c *memCache) GetOrSet(key interface{}, value interface{}, expire int) interface{} { if v := c.Get(key); v == nil { - c.Set(key, value, expire) - return value + return c.doSetWithLockCheck(key, value, expire) } else { return v } @@ -172,8 +174,9 @@ func (c *memCache) GetOrSet(key interface{}, value interface{}, expire int) inte // 当键名存在时返回其键值,否则写入指定的键值,键值由指定的函数生成 func (c *memCache) GetOrSetFunc(key interface{}, f func() interface{}, expire int) interface{} { if v := c.Get(key); v == nil { + // 可能存在多个goroutine被阻塞在这里,f可能是并发运行 v = f() - c.Set(key, v, expire) + c.doSetWithLockCheck(key, v, expire) return v } else { return v @@ -249,63 +252,79 @@ func (c *memCache) Size() int { // 删除缓存对象 func (c *memCache) Close() { close(c.stopChan) - close(c.eventChan) c.lru.Close() } // 数据自动同步循环 func (c *memCache) autoSyncLoop() { for { - if len(c.eventChan) > gEVENT_QUEUE_SIZE - 1000 { - fmt.Println("full") - } - item := <- c.eventChan - if item.k == nil { - break - } - // 添加该key到对应的过期集合中 - // 注意:这里不需要检查存在性, - // 因为在key过期的时候,会和原始的键值对中的过期时间做核对 - newe := c.makeExpireKey(item.e) - c.getOrNewExpireSet(newe).Add(item.k) - // 重新设置对应键名的过期时间 - c.emu.Lock() - c.ekmap[item.k] = newe - c.emu.Unlock() + select { + case <-c.stopChan: + return + default: + for { + v := c.eventQueue.PopFront() + if v == nil { + break + } + item := v.(memCacheEvent) + // 添加该key到对应的过期集合中 + // 注意:这里不需要检查存在性, + // 因为在key过期的时候,会和原始的键值对中的过期时间做核对 + newe := c.makeExpireKey(item.e) + c.getOrNewExpireSet(newe).Add(item.k) + // 重新设置对应键名的过期时间 + c.emu.Lock() + c.ekmap[item.k] = newe + c.emu.Unlock() + } + time.Sleep(time.Second) + } } } // LRU缓存淘汰处理+自动清理过期键值对 // 每隔10秒清除过去30秒的键值对数据 func (c *memCache) autoClearLoop() { + for { + select { + case <- c.stopChan: + return + default: + // 缓存过期处理 + ek := c.makeExpireKey(gtime.Millisecond()) + eks := []int64{ek - 10000, ek - 20000, ek - 30000} + for _, v := range eks { + if ekset := c.getExpireSet(v); ekset != nil { + ekset.Iterator(func(v interface{}) bool { + return c.clearByKey(v) + }) + } + // 数据处理完之后从集合中删除该时间段 + c.smu.Lock() + delete(c.eksets, v) + c.smu.Unlock() + } + time.Sleep(10*time.Second) + } + } +} + +// LRU缓存淘汰清理 +func (c *memCache) autoLruClearLoop() { for { select { - case <- c.stopChan: + case <-c.stopChan: return default: - // 缓存过期处理 - ek := c.makeExpireKey(gtime.Millisecond()) - eks := []int64{ek - 10000, ek - 20000, ek - 30000} - for _, v := range eks { - if ekset := c.getExpireSet(v); ekset != nil { - ekset.Iterator(func(v interface{}) bool { - return c.clearByKey(v) - }) - } - // 数据处理完之后从集合中删除该时间段 - c.smu.Lock() - delete(c.eksets, v) - c.smu.Unlock() - } - // LRU缓存淘汰处理 - if c.cap.Val() > 0 { - for i := c.Size() - c.cap.Val(); i > 0; i-- { + if c.cap > 0 { + for i := c.Size() - c.cap; i > 0; i-- { if s := c.lru.Pop(); s != "" { c.clearByKey(s, true) } } } - time.Sleep(10*time.Second) + time.Sleep(time.Second) } } } diff --git a/g/os/gcache/gcache_test.go b/g/os/gcache/gcache_test.go index 1f15aedd5..a168e2562 100644 --- a/g/os/gcache/gcache_test.go +++ b/g/os/gcache/gcache_test.go @@ -13,11 +13,11 @@ import ( "sync" ) - var ( - c = New() - mInt = make(map[int]int) - mMap = make(map[interface{}]interface{}) + c = New() + clru = New(10000) + mInt = make(map[int]int) + mMap = make(map[interface{}]interface{}) muInt = sync.RWMutex{} muMap = sync.RWMutex{} @@ -30,65 +30,83 @@ func Benchmark_CacheSet(b *testing.B) { } func Benchmark_CacheGet(b *testing.B) { - for i := 0; i < b.N; i++ { - c.Get(i) - } + for i := 0; i < b.N; i++ { + c.Get(i) + } } func Benchmark_CacheRemove(b *testing.B) { - for i := 0; i < b.N; i++ { - c.Remove(i) - } + for i := 0; i < b.N; i++ { + c.Remove(i) + } +} + +func Benchmark_CacheLruSet(b *testing.B) { + for i := 0; i < b.N; i++ { + clru.Set(i, i, 0) + } +} + +func Benchmark_CacheLruGet(b *testing.B) { + for i := 0; i < b.N; i++ { + clru.Get(i) + } +} + +func Benchmark_CacheLruRemove(b *testing.B) { + for i := 0; i < b.N; i++ { + clru.Remove(i) + } } func Benchmark_InterfaceMapWithLockSet(b *testing.B) { - for i := 0; i < b.N; i++ { - muMap.Lock() - mMap[i] = i - muMap.Unlock() - } + for i := 0; i < b.N; i++ { + muMap.Lock() + mMap[i] = i + muMap.Unlock() + } } func Benchmark_InterfaceMapWithLockGet(b *testing.B) { - for i := 0; i < b.N; i++ { - muMap.RLock() - if _, ok := mMap[i]; ok { + for i := 0; i < b.N; i++ { + muMap.RLock() + if _, ok := mMap[i]; ok { - } - muMap.RUnlock() - } + } + muMap.RUnlock() + } } func Benchmark_InterfaceMapWithLockRemove(b *testing.B) { - for i := 0; i < b.N; i++ { - muMap.Lock() - delete(mMap, i) - muMap.Unlock() - } + for i := 0; i < b.N; i++ { + muMap.Lock() + delete(mMap, i) + muMap.Unlock() + } } func Benchmark_IntMapWithLockWithLockSet(b *testing.B) { - for i := 0; i < b.N; i++ { - muInt.Lock() - mInt[i] = i - muInt.Unlock() - } + for i := 0; i < b.N; i++ { + muInt.Lock() + mInt[i] = i + muInt.Unlock() + } } func Benchmark_IntMapWithLockGet(b *testing.B) { - for i := 0; i < b.N; i++ { - muInt.RLock() - if _, ok := mInt[i]; ok { + for i := 0; i < b.N; i++ { + muInt.RLock() + if _, ok := mInt[i]; ok { - } - muInt.RUnlock() - } + } + muInt.RUnlock() + } } func Benchmark_IntMapWithLockRemove(b *testing.B) { - for i := 0; i < b.N; i++ { - muInt.Lock() - delete(mInt, i) - muInt.Unlock() - } -} \ No newline at end of file + for i := 0; i < b.N; i++ { + muInt.Lock() + delete(mInt, i) + muInt.Unlock() + } +} diff --git a/geg/other/test.go b/geg/other/test.go index 0a7d6c8ae..85a218ea3 100644 --- a/geg/other/test.go +++ b/geg/other/test.go @@ -1,10 +1,16 @@ package main import ( - "gitee.com/johng/gf/g" + "time" + "gitee.com/johng/gf/g/os/gcache" + "fmt" ) func main() { - g.Dump(1,2,3) - g.Dump(1,2,3) + c := gcache.New(1000) + c.Set(1, 1, 0) + c.Set(2, 2, 0) + c.Clear() + fmt.Println(c.Size()) + time.Sleep(time.Second) } \ No newline at end of file