改进gcache/gqueue,提高执行效率,并完善相应的基准测试用例

This commit is contained in:
John
2018-09-15 16:40:13 +08:00
parent 68cd27557e
commit 4f3d9f0dcc
26 changed files with 738 additions and 554 deletions

View File

@ -4,7 +4,7 @@
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// go test *.go -bench=".*"
// go test *.go -bench=".*" -benchmem
package glist

View File

@ -4,7 +4,7 @@
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// go test *.go -bench=".*"
// go test *.go -bench=".*" -benchmem
package gmap

View File

@ -6,130 +6,87 @@
// 并发安全的动态队列.
// 特点:
// 1、队列初始化速度快
// 2、可以向队头/队尾进行Push/Pop操作
// 1、动态队列初始化速度快;
// 2、动态的队列大小(不限大小)
// 3、取数据时如果队列为空那么会阻塞等待
package gqueue
import (
"gitee.com/johng/gf/g/container/glist"
"math"
"sync"
"errors"
"container/list"
"gitee.com/johng/gf/g/container/gtype"
)
// 0、这是一个先进先出的队列(chan <-- list)
// 1、当创建Queue对象时限定大小那么等同于一个同步的chan并发安全队列
// 2、不限制大小时list链表用以存储数据临时chan负责为客户端读取数据当从chan获取数据时list往chan中不停补充数据
// 3、由于功能主体是chan那么操作仍然像chan那样具有阻塞效果
type Queue struct {
mu sync.RWMutex // 用于队列并发安全处理
list *list.List // 数据队列
limit int // 队列限制大小
limits chan struct{} // 用于队列写入限制
events chan struct{} // 用于队列出列限制
closed *gtype.Bool // 队列是否关闭
limit int // 队列限制大小
queue chan interface{} // 用于队列写入限制
list *glist.List // 数据链表
events chan struct{} // 当不限制队列大小时的写入事件chan
}
const (
// 默认临时队列大小,注意是临时的
gDEFAULT_QUEUE_SIZE = 10000
)
// 队列大小为非必须参数,默认不限制
func New(limit...int) *Queue {
size := 0
size := gDEFAULT_QUEUE_SIZE
if len(limit) > 0 {
size = limit[0]
}
return &Queue {
list : list.New(),
q := &Queue {
list : glist.New(),
limit : size,
limits : make(chan struct{}, size),
queue : make(chan interface{}, size),
events : make(chan struct{}, math.MaxInt32),
closed : gtype.NewBool(),
}
go q.startAsyncLoop()
return q
}
// 将数据压入队列, 队尾
func (q *Queue) PushBack(v interface{}) error {
if q.closed.Val() {
return errors.New("closed")
// 异步list->chan同步队列
func (q *Queue) startAsyncLoop() {
for {
<- q.events
if v := q.list.PopFront(); v != nil {
q.queue <- v
} else {
if q.list.Len() == 0 {
break
}
}
}
if q.limit > 0 {
q.limits <- struct{}{}
}
q.mu.Lock()
q.list.PushBack(v)
q.mu.Unlock()
if q.limit == 0 {
q.events <- struct{}{}
}
return nil
}
// 将数据压入队列, 队头
func (q *Queue) PushFront(v interface{}) error {
if q.closed.Val() {
return errors.New("closed")
}
// 限制队列大小使用channel进行阻塞限制
func (q *Queue) Push(v interface{}) {
if q.limit > 0 {
q.limits <- struct{}{}
}
q.mu.Lock()
q.list.PushFront(v)
q.mu.Unlock()
if q.limit == 0 {
q.queue <- v
} else {
q.list.PushBack(v)
q.events <- struct{}{}
}
return nil
}
// 从队头先进先出地从队列取出一项数据,当没有数据可获取时,阻塞等待
func (q *Queue) PopFront() interface{} {
if q.closed.Val() {
return nil
}
if q.limit > 0 {
<- q.limits
} else {
<- q.events
}
q.mu.Lock()
if elem := q.list.Front(); elem != nil {
item := q.list.Remove(elem)
q.mu.Unlock()
return item
}
q.mu.Unlock()
return nil
}
// 从队尾先进先出地从队列取出一项数据,当没有数据可获取时,阻塞等待
func (q *Queue) PopBack() interface{} {
if q.closed.Val() {
return nil
}
if q.limit > 0 {
<- q.limits
} else {
<- q.events
}
q.mu.Lock()
if elem := q.list.Front(); elem != nil {
item := q.list.Remove(elem)
q.mu.Unlock()
return item
}
q.mu.Unlock()
return nil
// 从队头先进先出地从队列取出一项数据
func (q *Queue) Pop() interface{} {
return <- q.queue
}
// 关闭队列(通知所有通过Pop*阻塞的协程退出)
func (q *Queue) Close() {
if !q.closed.Val() {
q.closed.Set(true)
close(q.limits)
close(q.events)
}
q.list.RemoveAll()
close(q.queue)
close(q.events)
}
// 获取当前队列大小
func (q *Queue) Size() int {
return len(q.events)
return len(q.queue) + q.list.Len()
}

View File

@ -4,7 +4,7 @@
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// go test *.go -bench=".*"
// go test *.go -bench=".*" -benchmem
package gqueue_test
@ -13,39 +13,37 @@ import (
"gitee.com/johng/gf/g/container/gqueue"
)
var length = 10000000
var q = gqueue.New(length)
var c = make(chan int, length)
var length = 10000000
var qstatic = gqueue.New(length)
var qdynamic = gqueue.New()
var cany = make(chan interface{}, length)
var cint = make(chan int, length)
func BenchmarkGqueueNew1000W(b *testing.B) {
func Benchmark_GqueueStaticPushAndPop(b *testing.B) {
for i := 0; i < b.N; i++ {
gqueue.New(length)
qstatic.Push(i)
qstatic.Pop()
}
}
func BenchmarkChannelNew1000W(b *testing.B) {
func Benchmark_GqueueDynamicPushAndPop(b *testing.B) {
for i := 0; i < b.N; i++ {
c = make(chan int, length)
qdynamic.Push(i)
qdynamic.Pop()
}
}
func BenchmarkGqueuePush(b *testing.B) {
func Benchmark_ChannelInterfacePushAndPop(b *testing.B) {
for i := 0; i < b.N; i++ {
q.PushBack(i)
cany <- i
<- cany
}
}
func BenchmarkGqueuePushAndPop(b *testing.B) {
func Benchmark_ChannelIntPushAndPop(b *testing.B) {
for i := 0; i < b.N; i++ {
q.PushBack(i)
q.PopFront()
}
}
func BenchmarkChannelPushAndPop(b *testing.B) {
for i := 0; i < b.N; i++ {
c <- i
<- c
cint <- i
<- cint
}
}

View File

@ -213,7 +213,7 @@ func (r *Response) OutputBuffer() {
// 获取输出到客户端的数据大小
func (r *Response) ContentSize() int {
if r.length > 0 {
if r.Status == http.StatusOK && r.length > 0 {
return r.length
}
if length := r.Header().Get("Content-Length"); length != "" {

View File

@ -49,7 +49,7 @@ func (s *Server)handleRequest(w http.ResponseWriter, r *http.Request) {
s.handleErrorLog(e, request)
}
// 将Request对象指针丢到队列中异步关闭
s.closeQueue.PushBack(request)
s.closeQueue.Push(request)
}()
// 优先执行静态文件检索
@ -196,7 +196,7 @@ func (s *Server)listDir(r *Request, f http.File) {
func (s *Server) startCloseQueueLoop() {
go func() {
for {
if v := s.closeQueue.PopFront(); v != nil {
if v := s.closeQueue.Pop(); v != nil {
r := v.(*Request)
s.callHookHandler(HOOK_BEFORE_CLOSE, r)
// 更新Session会话超时时间

View File

@ -7,121 +7,75 @@
// 单进程高速缓存.
package gcache
import (
"sync"
"time"
"math"
"gitee.com/johng/gf/g/os/gtime"
"gitee.com/johng/gf/g/container/gset"
"gitee.com/johng/gf/g/container/gqueue"
"gitee.com/johng/gf/g/container/gtype"
)
const (
gDEFAULT_MAX_EXPIRE = 9223372036854 // 当数据不过期时默认设置的过期属性值相当于math.MaxInt64/1000000
// 当数据不过期时默认设置的过期属性值相当于math.MaxInt64/1000000
gDEFAULT_MAX_EXPIRE = 9223372036854
)
// 缓存对象
type Cache struct {
dmu sync.RWMutex // data锁
emu sync.RWMutex // ekmap锁
smu sync.RWMutex // eksets锁
lru *_Lru // LRU缓存限制(只有限定池大小时才启用)
cap *gtype.Int // 控制缓存池大小超过大小则按照LRU算法进行缓存过期处理(默认为0表示不进行限制)
data map[string]CacheItem // 缓存数据(所有的缓存数据存放哈希表)
ekmap map[string]int64 // 键名对应的分组过期时间(用于相同键名过期时间快速更新),键值为秒级时间戳
eksets map[int64]*gset.StringSet // 分组过期时间对应的键名列表(用于自动过期快速删除),键值为秒级时间戳
eventQueue *gqueue.Queue // 异步处理队列
stopEvents chan struct{} // 关闭时间通知
}
// 缓存数据项
type CacheItem struct {
v interface{} // 缓存键值
e int64 // 过期时间
}
// 异步队列数据项
type _EventItem struct {
k string // 键名
e int64 // 过期时间
}
// 全局缓存管理对象
var cache = New()
// Cache对象按照缓存键名首字母做了分组
func New() *Cache {
c := &Cache {
lru : newLru(),
cap : gtype.NewInt(),
data : make(map[string]CacheItem),
ekmap : make(map[string]int64),
eksets : make(map[int64]*gset.StringSet),
eventQueue : gqueue.New(),
stopEvents : make(chan struct{}, 2),
}
go c.autoSyncLoop()
go c.autoClearLoop()
return c
}
// 设置缓存池大小内部依靠LRU算法进行缓存淘汰处理
func SetCap(cap int) {
cache.cap.Set(cap)
}
// (使用全局KV缓存对象)设置kv缓存键值对过期时间单位为**毫秒**
func Set(key string, value interface{}, expire int) {
func Set(key interface{}, value interface{}, expire int) {
cache.Set(key, value, expire)
}
// 当键名不存在时写入并返回true否则返回false。
// 常用来做对并发性要求不高的内存锁。
func SetIfNotExist(key string, value interface{}, expire int) bool {
func SetIfNotExist(key interface{}, value interface{}, expire int) bool {
return cache.SetIfNotExist(key, value, expire)
}
// (使用全局KV缓存对象)批量设置kv缓存键值对过期时间单位为**毫秒**
func BatchSet(data map[string]interface{}, expire int) {
func BatchSet(data map[interface{}]interface{}, expire int) {
cache.BatchSet(data, expire)
}
// (使用全局KV缓存对象)获取指定键名的值
func Get(key string) interface{} {
func Get(key interface{}) interface{} {
return cache.Get(key)
}
// 当键名存在时返回其键值,否则写入指定的键值
func GetOrSet(key string, value interface{}, expire int) interface{} {
func GetOrSet(key interface{}, value interface{}, expire int) interface{} {
return cache.GetOrSet(key, value, expire)
}
// 当键名存在时返回其键值,否则写入指定的键值,键值由指定的函数生成
func GetOrSetFunc(key string, f func() interface{}, expire int) interface{} {
func GetOrSetFunc(key interface{}, f func() interface{}, expire int) interface{} {
return cache.GetOrSetFunc(key, f, expire)
}
// 是否存在指定的键名true表示存在false表示不存在。
func Contains(key string) bool {
func Contains(key interface{}) bool {
return cache.Contains(key)
}
// (使用全局KV缓存对象)删除指定键值对
func Remove(key string) {
func Remove(key interface{}) {
cache.Remove(key)
}
// (使用全局KV缓存对象)批量删除指定键值对
func BatchRemove(keys []string) {
func BatchRemove(keys []interface{}) {
cache.BatchRemove(keys)
}
// 获得所有的键名,组成字符串数组返回
func Keys() []string {
// 获得所有的键名,组成数组返回
func Keys() []interface{} {
return cache.Keys()
}
// 获得所有的键名,组成字符串数组返回
func KeyStrings() []string {
return cache.KeyStrings()
}
// 获得所有的值,组成数组返回
func Values() []interface{} {
return cache.Values()
@ -131,254 +85,3 @@ func Values() []interface{} {
func Size() int {
return cache.Size()
}
// 设置缓存池大小内部依靠LRU算法进行缓存淘汰处理
func (c *Cache) SetCap(cap int) {
c.cap.Set(cap)
}
// 计算过期缓存的键名(将毫秒换算成秒的整数毫秒)
func (c *Cache) makeExpireKey(expire int64) int64 {
return int64(math.Ceil(float64(expire/1000) + 1)*1000)
}
// 获取一个过期键名存放Set,如果没有则返回nil
func (c *Cache) getExpireSet(expire int64) *gset.StringSet {
c.smu.RLock()
if ekset, ok := c.eksets[expire]; ok {
c.smu.RUnlock()
return ekset
}
c.smu.RUnlock()
return nil
}
// 获取或者创建一个过期键名存放Set(由于是异步单线程执行因此不会出现创建set时的覆盖问题)
func (c *Cache) getOrNewExpireSet(expire int64) *gset.StringSet {
if ekset := c.getExpireSet(expire); ekset == nil {
set := gset.NewStringSet()
c.smu.Lock()
c.eksets[expire] = set
c.smu.Unlock()
return set
} else {
return ekset
}
}
// 设置kv缓存键值对过期时间单位为毫秒expire<=0表示不过期
func (c *Cache) Set(key string, value interface{}, expire int) {
var e int64
if expire != 0 {
e = gtime.Millisecond() + int64(expire)
} else {
e = gDEFAULT_MAX_EXPIRE
}
c.dmu.Lock()
c.data[key] = CacheItem{v : value, e : e}
c.dmu.Unlock()
c.eventQueue.PushBack(_EventItem{k : key, e : e})
}
// 当键名不存在时写入并返回true否则返回false。
// 常用来做对并发性要求不高的内存锁。
func (c *Cache) SetIfNotExist(key string, value interface{}, expire int) bool {
if !c.Contains(key) {
c.Set(key, value, expire)
return true
}
return false
}
// 批量设置
func (c *Cache) BatchSet(data map[string]interface{}, expire int) {
var e int64
if expire != 0 {
e = gtime.Millisecond() + int64(expire)
} else {
e = gDEFAULT_MAX_EXPIRE
}
for k, v := range data {
c.dmu.Lock()
c.data[k] = CacheItem{v: v, e: e}
c.dmu.Unlock()
c.eventQueue.PushBack(_EventItem{k: k, e:e})
}
}
// 获取指定键名的值
func (c *Cache) Get(key string) interface{} {
c.dmu.RLock()
item, ok := c.data[key]
c.dmu.RUnlock()
if ok && !item.IsExpired() {
return item.v
}
return nil
}
// 当键名存在时返回其键值,否则写入指定的键值
func (c *Cache) GetOrSet(key string, value interface{}, expire int) interface{} {
if v := c.Get(key); v == nil {
c.Set(key, value, expire)
return value
} else {
return v
}
}
// 当键名存在时返回其键值,否则写入指定的键值,键值由指定的函数生成
func (c *Cache) GetOrSetFunc(key string, f func() interface{}, expire int) interface{} {
if v := c.Get(key); v == nil {
v = f()
c.Set(key, v, expire)
return v
} else {
return v
}
}
// 是否存在指定的键名true表示存在false表示不存在。
func (c *Cache) Contains(key string) bool {
return c.Get(key) != nil
}
// 删除指定键值对
func (c *Cache) Remove(key string) {
c.Set(key, nil, -1000)
}
// 批量删除键值对
func (c *Cache) BatchRemove(keys []string) {
for _, key := range keys {
c.dmu.Lock()
c.data[key] = CacheItem{v: nil, e: -1000}
c.dmu.Unlock()
c.eventQueue.PushBack(_EventItem{k: key, e: -1000})
}
}
// 获得所有的键名,组成字符串数组返回
func (c *Cache) Keys() []string {
keys := make([]string, 0)
c.dmu.RLock()
for k, _ := range c.data {
keys = append(keys, k)
}
c.dmu.RUnlock()
return keys
}
// 获得所有的值,组成数组返回
func (c *Cache) Values() []interface{} {
values := make([]interface{}, 0)
c.dmu.RLock()
for _, v := range c.data {
values = append(values, v)
}
c.dmu.RUnlock()
return values
}
// 获得缓存对象的键值对数量
func (c *Cache) Size() int {
c.dmu.RLock()
length := len(c.data)
c.dmu.RUnlock()
return length
}
// 删除缓存对象
func (c *Cache) Close() {
c.stopEvents <- struct{}{}
c.eventQueue.Close()
c.lru.Close()
}
// 数据自动同步循环
func (c *Cache) autoSyncLoop() {
for {
if r := c.eventQueue.PopFront(); r != nil {
item := r.(_EventItem)
newe := c.makeExpireKey(item.e)
// 查询键名是否已经存在过期时间
c.emu.RLock()
olde, ok := c.ekmap[item.k];
c.emu.RUnlock()
// 是否需要删除旧的过期时间map中对应的键名
if ok && newe != olde {
if ekset := c.getExpireSet(olde); ekset != nil {
ekset.Remove(item.k)
}
}
c.getOrNewExpireSet(newe).Add(item.k)
// 重新设置对应键名的过期时间
c.emu.Lock()
c.ekmap[item.k] = newe
c.emu.Unlock()
// LRU操作记录(只有新增和修改操作才会记录到LRU管理对象中删除不会)
if newe >= olde && c.cap.Val() > 0 {
c.lru.Push(item.k)
}
} else {
return
}
}
}
// LRU缓存淘汰处理+自动清理过期键值对
// 每隔1秒清除过去3秒的键值对数据
func (c *Cache) autoClearLoop() {
for {
select {
case <- c.stopEvents:
return
default:
// 缓存过期处理
ek := c.makeExpireKey(gtime.Millisecond())
eks := []int64{ek - 2000, ek - 3000, ek - 4000}
for _, v := range eks {
if ekset := c.getExpireSet(v); ekset != nil {
ekset.Iterator(c.clearByKey)
}
// 删除异步处理键名set
c.smu.Lock()
delete(c.eksets, v)
c.smu.Unlock()
}
// LRU缓存淘汰处理
if c.cap.Val() > 0 {
for i := c.Size() - c.cap.Val(); i > 0; i-- {
if s := c.lru.Pop(); s != "" {
c.clearByKey(s)
}
}
}
time.Sleep(time.Second)
}
}
}
// 删除对应键名的缓存数据
func (c *Cache) clearByKey(key string) bool {
// 删除缓存数据
c.dmu.Lock()
// 为防止删除时正好该key正在进行写操作因此在内部需要使用写锁再进行一次确认
if item, ok := c.data[key]; ok && item.IsExpired() {
delete(c.data, key)
} else {
c.dmu.Unlock()
return true
}
c.dmu.Unlock()
// 删除异步处理数据项,并更新缓存的内存使用大小记录值
c.emu.Lock()
delete(c.ekmap, key)
c.emu.Unlock()
// 删除LRU管理对象中指定键名
c.lru.Remove(key)
return true
}

View File

@ -0,0 +1,30 @@
// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package gcache
import (
"sync/atomic"
"unsafe"
)
// 缓存对象。
// 底层只有一个缓存对象,如果需要提高并发性能,可新增缓存对象无锁哈希表,用键名做固定分区。
type Cache struct {
*memCache // 底层缓存对象
}
// Cache对象按照缓存键名首字母做了分组
func New() *Cache {
return &Cache {
memCache : newMemCache(),
}
}
// 清空缓存中的所有数据
func (c *Cache) Clear() {
atomic.SwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.memCache)), unsafe.Pointer(newMemCache()))
}

View File

@ -0,0 +1,322 @@
// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package gcache
import (
"time"
"math"
"gitee.com/johng/gf/g/container/gset"
"gitee.com/johng/gf/g/os/gtime"
"sync"
"gitee.com/johng/gf/g/container/gtype"
"fmt"
"gitee.com/johng/gf/g/util/gconv"
)
// 缓存对象
type memCache struct {
dmu sync.RWMutex // data锁(自定义锁的目的是除去键值的断言转换造成的性能损耗)
emu sync.RWMutex // ekmap锁(expire key map)
smu sync.RWMutex // eksets锁(expire key sets)
lru *memCacheLru // LRU缓存限制(只有限定池大小时才启用)
cap *gtype.Int // 控制缓存池大小超过大小则按照LRU算法进行缓存过期处理(默认为0表示不进行限制)
data map[interface{}]memCacheItem // 缓存数据(所有的缓存数据存放哈希表)
ekmap map[interface{}]int64 // 键名对应的分组过期时间(用于相同键名过期时间快速更新)键值为10秒级时间戳
eksets map[int64]*gset.Set // 分组过期时间对应的键名列表(用于自动过期快速删除)键值为10秒级时间戳
eventChan chan memCacheEvent // 异步处理队列
stopChan chan struct{} // 关闭时间通知
}
// 缓存数据项
type memCacheItem struct {
v interface{} // 缓存键值
e int64 // 过期时间
}
// 异步队列数据项
type memCacheEvent struct {
k interface{} // 键名
e int64 // 过期时间
}
const (
// 这个数值不能太大,否则初始化会占用太多无意义的内存
// 60W这个数值是创始人的机器上支持基准测试的参考结果
gEVENT_QUEUE_SIZE = 600000
)
// 创建底层的缓存对象
func newMemCache() *memCache {
c := &memCache {
lru : newMemCacheLru(),
cap : gtype.NewInt(),
data : make(map[interface{}]memCacheItem),
ekmap : make(map[interface{}]int64),
eksets : make(map[int64]*gset.Set),
stopChan : make(chan struct{}),
eventChan : make(chan memCacheEvent, gEVENT_QUEUE_SIZE),
}
go c.autoSyncLoop()
go c.autoClearLoop()
//time.Sleep(time.Hour)
return c
}
// 设置缓存池大小内部依靠LRU算法进行缓存淘汰处理
func (c *memCache) SetCap(cap int) {
c.cap.Set(cap)
}
// 计算过期缓存的键名(将毫秒换算成秒的整数毫秒)
func (c *memCache) makeExpireKey(expire int64) int64 {
return int64(math.Ceil(float64(expire/10000) + 1)*10000)
}
// 获取一个过期键名存放Set,如果没有则返回nil
func (c *memCache) getExpireSet(expire int64) *gset.Set {
c.smu.RLock()
if ekset, ok := c.eksets[expire]; ok {
c.smu.RUnlock()
return ekset
}
c.smu.RUnlock()
return nil
}
// 获取或者创建一个过期键名存放Set(由于是异步单线程执行因此不会出现创建set时的覆盖问题)
func (c *memCache) getOrNewExpireSet(expire int64) *gset.Set {
if ekset := c.getExpireSet(expire); ekset == nil {
set := gset.New()
c.smu.Lock()
c.eksets[expire] = set
c.smu.Unlock()
return set
} else {
return ekset
}
}
// 设置kv缓存键值对过期时间单位为毫秒expire<=0表示不过期
func (c *memCache) Set(key interface{}, value interface{}, expire int) {
var e int64
if expire != 0 {
e = gtime.Millisecond() + int64(expire)
} else {
e = gDEFAULT_MAX_EXPIRE
}
c.dmu.Lock()
c.data[key] = memCacheItem{v : value, e : e}
c.dmu.Unlock()
c.eventChan <- memCacheEvent{k : key, e : e}
}
// 当键名不存在时写入并返回true否则返回false。
// 常用来做对并发性要求不高的内存锁。
func (c *memCache) SetIfNotExist(key interface{}, value interface{}, expire int) bool {
if !c.Contains(key) {
c.Set(key, value, expire)
return true
}
return false
}
// 批量设置
func (c *memCache) BatchSet(data map[interface{}]interface{}, expire int) {
var e int64
if expire != 0 {
e = gtime.Millisecond() + int64(expire)
} else {
e = gDEFAULT_MAX_EXPIRE
}
for k, v := range data {
c.dmu.Lock()
c.data[k] = memCacheItem{v: v, e: e}
c.dmu.Unlock()
c.eventChan <- memCacheEvent{k: k, e:e}
}
}
// 获取指定键名的值
func (c *memCache) Get(key interface{}) interface{} {
c.dmu.RLock()
item, ok := c.data[key]
c.dmu.RUnlock()
if ok && !item.IsExpired() {
return item.v
}
return nil
}
// 当键名存在时返回其键值,否则写入指定的键值
func (c *memCache) GetOrSet(key interface{}, value interface{}, expire int) interface{} {
if v := c.Get(key); v == nil {
c.Set(key, value, expire)
return value
} else {
return v
}
}
// 当键名存在时返回其键值,否则写入指定的键值,键值由指定的函数生成
func (c *memCache) GetOrSetFunc(key interface{}, f func() interface{}, expire int) interface{} {
if v := c.Get(key); v == nil {
v = f()
c.Set(key, v, expire)
return v
} else {
return v
}
}
// 是否存在指定的键名true表示存在false表示不存在。
func (c *memCache) Contains(key interface{}) bool {
return c.Get(key) != nil
}
// 删除指定键值对
func (c *memCache) Remove(key interface{}) {
c.dmu.Lock()
delete(c.data, key)
c.dmu.Unlock()
}
// 批量删除键值对
func (c *memCache) BatchRemove(keys []interface{}) {
for _, key := range keys {
c.dmu.Lock()
delete(c.data, key)
c.dmu.Unlock()
}
}
// 获得所有的键名,组成数组返回
func (c *memCache) Keys() []interface{} {
keys := make([]interface{}, 0)
c.dmu.RLock()
for k, v := range c.data {
if !v.IsExpired() {
keys = append(keys, k)
}
}
c.dmu.RUnlock()
return keys
}
// 获得所有的键名,组成字符串数组返回
func (c *memCache) KeyStrings() []string {
return gconv.Strings(c.Keys())
}
// 获得所有的值,组成数组返回
func (c *memCache) Values() []interface{} {
values := make([]interface{}, 0)
c.dmu.RLock()
for _, v := range c.data {
if !v.IsExpired() {
values = append(values, v.v)
}
}
c.dmu.RUnlock()
return values
}
// 获得缓存对象的键值对数量
func (c *memCache) Size() int {
c.dmu.RLock()
length := len(c.data)
c.dmu.RUnlock()
return length
}
// 删除缓存对象
func (c *memCache) Close() {
close(c.stopChan)
close(c.eventChan)
c.lru.Close()
}
// 数据自动同步循环
func (c *memCache) autoSyncLoop() {
for {
if len(c.eventChan) > gEVENT_QUEUE_SIZE - 1000 {
fmt.Println("full")
}
item := <- c.eventChan
if item.k == nil {
break
}
// 添加该key到对应的过期集合中
// 注意:这里不需要检查存在性,
// 因为在key过期的时候会和原始的键值对中的过期时间做核对
newe := c.makeExpireKey(item.e)
c.getOrNewExpireSet(newe).Add(item.k)
// 重新设置对应键名的过期时间
c.emu.Lock()
c.ekmap[item.k] = newe
c.emu.Unlock()
// LRU操作记录(只有新增和修改操作才会记录到LRU管理对象中删除不会)
if c.cap.Val() > 0 {
c.lru.Push(item.k)
}
}
}
// LRU缓存淘汰处理+自动清理过期键值对
// 每隔10秒清除过去30秒的键值对数据
func (c *memCache) autoClearLoop() {
for {
select {
case <- c.stopChan:
return
default:
// 缓存过期处理
ek := c.makeExpireKey(gtime.Millisecond())
eks := []int64{ek - 10000, ek - 20000, ek - 30000}
for _, v := range eks {
if ekset := c.getExpireSet(v); ekset != nil {
ekset.Iterator(func(v interface{}) bool {
return c.clearByKey(v)
})
}
// 数据处理完之后从集合中删除该时间段
c.smu.Lock()
delete(c.eksets, v)
c.smu.Unlock()
}
// LRU缓存淘汰处理
if c.cap.Val() > 0 {
for i := c.Size() - c.cap.Val(); i > 0; i-- {
if s := c.lru.Pop(); s != "" {
c.clearByKey(s, true)
}
}
}
time.Sleep(10*time.Second)
}
}
}
// 删除对应键名的缓存数据
func (c *memCache) clearByKey(key interface{}, force...bool) bool {
// 删除缓存数据
c.dmu.Lock()
// 删除核对,真正的过期才删除
if item, ok := c.data[key]; (ok && item.IsExpired()) || (len(force) > 0 && force[0]) {
delete(c.data, key)
}
c.dmu.Unlock()
// 删除异步处理数据项
c.emu.Lock()
delete(c.ekmap, key)
c.emu.Unlock()
// 删除LRU管理对象中指定键名
c.lru.Remove(key)
return true
}

View File

@ -9,7 +9,7 @@ package gcache
import "gitee.com/johng/gf/g/os/gtime"
// 判断缓存项是否已过期
func (item *CacheItem) IsExpired() bool {
func (item *memCacheItem) IsExpired() bool {
if item.e > gtime.Millisecond() {
return false
}

View File

@ -16,16 +16,17 @@ import (
)
// LRU算法实现对象底层双向链表使用了标准库的list.List
type _Lru struct {
list *glist.List
data *gmap.StringInterfaceMap
queue *gqueue.Queue
type memCacheLru struct {
data *gmap.Map // 记录键名与链表中的位置项指针
list *glist.List // 键名历史记录链表
queue *gqueue.Queue // 事件队列
}
func newLru() *_Lru {
lru := &_Lru {
// 创建LRU管理对象
func newMemCacheLru() *memCacheLru {
lru := &memCacheLru {
list : glist.New(),
data : gmap.NewStringInterfaceMap(),
data : gmap.New(),
queue : gqueue.New(),
}
go lru.StartAutoLoop()
@ -33,50 +34,49 @@ func newLru() *_Lru {
}
// 关闭LRU对象
func (lru *_Lru) Close() {
func (lru *memCacheLru) Close() {
lru.queue.Close()
}
// 删除指定数据项
func (lru *_Lru) Remove(key string) {
func (lru *memCacheLru) Remove(key interface{}) {
if v := lru.data.Get(key); v != nil {
lru.data.Remove(key)
lru.list.Remove(v.(*list.Element))
}
}
// 添加LRU数据项
func (lru *_Lru) Push(key string) {
lru.queue.PushBack(key)
func (lru *memCacheLru) Push(key interface{}) {
lru.queue.Push(key)
}
// 从链表尾删除LRU数据项并返回对应数据
func (lru *_Lru) Pop() string {
func (lru *memCacheLru) Pop() interface{} {
if v := lru.list.PopBack(); v != nil {
s := v.(string)
lru.data.Remove(s)
return s
lru.data.Remove(v)
return v
}
return ""
return nil
}
// 从链表头打印LRU链表值
func (lru *_Lru) Print() {
func (lru *memCacheLru) Print() {
for _, v := range lru.list.FrontAll() {
fmt.Printf("%s ", v.(string))
fmt.Printf("%v ", v)
}
}
// 异步执行协程
func (lru *_Lru) StartAutoLoop() {
func (lru *memCacheLru) StartAutoLoop() {
for {
if v := lru.queue.PopFront(); v != nil {
s := v.(string)
if v := lru.queue.Pop(); v != nil {
// 删除对应链表项
if v := lru.data.Get(s); v != nil {
if v := lru.data.Get(v); v != nil {
lru.list.Remove(v.(*list.Element))
}
// 将数据插入到链表头,并生成新的链表项
lru.data.Set(s, lru.list.PushFront(s))
// 将数据插入到链表头,并记录对应的链表项到哈希表中,便于检索
lru.data.Set(v, lru.list.PushFront(v))
} else {
break
}

View File

@ -10,24 +10,85 @@ package gcache
import (
"testing"
"strconv"
"sync"
)
func BenchmarkSet(b *testing.B) {
var (
c = New()
mInt = make(map[int]int)
mMap = make(map[interface{}]interface{})
muInt = sync.RWMutex{}
muMap = sync.RWMutex{}
)
func Benchmark_CacheSet(b *testing.B) {
for i := 0; i < b.N; i++ {
Set(strconv.Itoa(i), strconv.Itoa(i), 0)
c.Set(i, i, 0)
}
}
func BenchmarkGet(b *testing.B) {
for i := 0; i < b.N; i++ {
Get(strconv.Itoa(i))
}
func Benchmark_CacheGet(b *testing.B) {
for i := 0; i < b.N; i++ {
c.Get(i)
}
}
func BenchmarkRemove(b *testing.B) {
for i := 0; i < b.N; i++ {
Remove(strconv.Itoa(i))
}
func Benchmark_CacheRemove(b *testing.B) {
for i := 0; i < b.N; i++ {
c.Remove(i)
}
}
func Benchmark_InterfaceMapWithLockSet(b *testing.B) {
for i := 0; i < b.N; i++ {
muMap.Lock()
mMap[i] = i
muMap.Unlock()
}
}
func Benchmark_InterfaceMapWithLockGet(b *testing.B) {
for i := 0; i < b.N; i++ {
muMap.RLock()
if _, ok := mMap[i]; ok {
}
muMap.RUnlock()
}
}
func Benchmark_InterfaceMapWithLockRemove(b *testing.B) {
for i := 0; i < b.N; i++ {
muMap.Lock()
delete(mMap, i)
muMap.Unlock()
}
}
func Benchmark_IntMapWithLockWithLockSet(b *testing.B) {
for i := 0; i < b.N; i++ {
muInt.Lock()
mInt[i] = i
muInt.Unlock()
}
}
func Benchmark_IntMapWithLockGet(b *testing.B) {
for i := 0; i < b.N; i++ {
muInt.RLock()
if _, ok := mInt[i]; ok {
}
muInt.RUnlock()
}
}
func Benchmark_IntMapWithLockRemove(b *testing.B) {
for i := 0; i < b.N; i++ {
muInt.Lock()
delete(mInt, i)
muInt.Unlock()
}
}

View File

@ -23,6 +23,7 @@ import (
"gitee.com/johng/gf/g/util/gregex"
"gitee.com/johng/gf/g/container/gtype"
"sort"
"gitee.com/johng/gf/g/util/gconv"
)
// 封装了常用的文件操作方法如需更详细的文件控制请查看官方os包
@ -277,7 +278,7 @@ func IsWritable(path string) bool {
result := true
if IsDir(path) {
// 如果是目录,那么创建一个临时文件进行写入测试
tfile := strings.TrimRight(path, Separator) + Separator + string(time.Now().UnixNano())
tfile := strings.TrimRight(path, Separator) + Separator + gconv.String(time.Now().UnixNano())
err := Create(tfile)
if err != nil || !Exists(tfile){
result = false

View File

@ -162,7 +162,7 @@ func (w *Watcher) startWatchLoop() {
// 监听事件
case ev := <- w.watcher.Events:
w.events.PushBack(&Event{
w.events.Push(&Event{
Path : ev.Name,
Op : Op(ev.Op),
})
@ -190,7 +190,7 @@ func (w *Watcher) getCallbacks(path string) *glist.List {
func (w *Watcher) startEventLoop() {
go func() {
for {
if v := w.events.PopFront(); v != nil {
if v := w.events.Pop(); v != nil {
event := v.(*Event)
if event.IsRemove() {
if gfile.Exists(event.Path) {

View File

@ -12,7 +12,6 @@ import (
"io"
"time"
"fmt"
"errors"
"strings"
"runtime"
"strconv"
@ -115,18 +114,18 @@ func (l *Logger) getFilePointer() *os.File {
// 设置日志文件的存储目录路径
func (l *Logger) SetPath(path string) error {
// 检测目录权限
if !gfile.Exists(path) {
if err := gfile.Mkdir(path); err != nil {
fmt.Fprintln(os.Stderr, fmt.Sprintf(`glog mkdir "%s" failed: %s`, path, err.Error()))
return err
}
}
if !gfile.IsWritable(path) {
errstr := path + " is no writable for current user"
fmt.Fprintln(os.Stderr, errstr)
return errors.New(errstr)
}
//// 检测目录权限
//if !gfile.Exists(path) {
// if err := gfile.Mkdir(path); err != nil {
// fmt.Fprintln(os.Stderr, fmt.Sprintf(`glog mkdir "%s" failed: %s`, path, err.Error()))
// return err
// }
//}
//if !gfile.IsWritable(path) {
// errstr := path + " is no writable for current user"
// fmt.Fprintln(os.Stderr, errstr)
// return errors.New(errstr)
//}
l.path.Set(strings.TrimRight(path, gfile.Separator))
return nil
}
@ -148,14 +147,21 @@ func (l *Logger) print(std io.Writer, s string) {
defer f.Close()
key := l.path.Val()
gmlock.Lock(key)
io.WriteString(f, str)
_, err := io.WriteString(f, str)
gmlock.Unlock(key)
if err != nil {
fmt.Fprintln(os.Stderr, err.Error())
}
}
} else {
io.WriteString(writer, str)
if _, err := io.WriteString(writer, str); err != nil {
fmt.Fprintln(os.Stderr, err.Error())
}
}
if l.alsoStdPrint.Val() {
io.WriteString(std, str)
if _, err := io.WriteString(std, str); err != nil {
fmt.Fprintln(os.Stderr, err.Error())
}
}
}

View File

@ -81,7 +81,7 @@ func tcpServiceHandler(conn *gtcp.Conn) {
result = []byte("ok")
for _, msg := range msgs {
if v := commReceiveQueues.Get(msg.Group); v != nil {
v.(*gqueue.Queue).PushBack(msg)
v.(*gqueue.Queue).Push(msg)
}
}
}
@ -154,7 +154,7 @@ func Receive(group...string) *Msg {
queue = v.(*gqueue.Queue)
}
if v := queue.PopFront(); v != nil {
if v := queue.Pop(); v != nil {
return v.(*Msg)
}
return nil

View File

@ -17,6 +17,7 @@ import (
"gitee.com/johng/gf/g/util/gconv"
"gitee.com/johng/gf/g/os/gspath"
"gitee.com/johng/gf/g/os/gfcache"
"gitee.com/johng/gf/g/encoding/ghtml"
)
// 视图对象
@ -68,6 +69,7 @@ func New(path string) *View {
}
view.SetDelimiters("{{", "}}")
// 内置方法
view.BindFunc("text", view.funcText)
view.BindFunc("include", view.funcInclude)
return view
}
@ -153,3 +155,8 @@ func (view *View) funcInclude(file string, data...map[string]interface{}) templa
return template.HTML(content)
}
// 模板内置方法text
func (view *View) funcText(html interface{}) string {
return ghtml.StripTags(gconv.String(html))
}

View File

@ -111,70 +111,75 @@ func String(i interface{}) string {
}
}
// 任意类型转换为[]int类型
func Ints(i interface{}) []int {
return nil
if i == nil {
return nil
}
if r, ok := i.([]int); ok {
return r
} else {
switch i.(type) {
case []int8, []int16, []int32, []int64,
[]uint, []uint8, []uint16, []uint32, []uint64,
[]bool, []string,
[]float32, []float64:
array := make([]int, len(r))
for k, v := range r {
array[k] = Int(v)
}
return array
}
}
return []int{Int(i)}
}
// 任意类型转换为[]uint类型
func Uints(i interface{}) []uint {
return nil
if i == nil {
return nil
}
if r, ok := i.([]uint); ok {
return r
} else {
switch i.(type) {
case []int, []int8, []int16, []int32, []int64, []uint8, []uint16, []uint32, []uint64,
[]bool, []string, []float32, []float64:
array := make([]uint, len(r))
for k, v := range r {
array[k] = Uint(v)
}
return array
}
}
return []uint{Uint(i)}
}
// 任意类型转换为[]string类型
func Strings(i interface{}) []string {
return nil
if i == nil {
return nil
}
if r, ok := i.([]string); ok {
return r
} else {
array := make([]string, 0)
switch i.(type) {
case []int, []int8, []int16, []int32, []int64,
[]uint, []uint8, []uint16, []uint32, []uint64,
[]bool, []float32, []float64:
array := make([]string, len(r))
for k, v := range r {
array[k] = String(v)
case []int:
for _, v := range i.([]int) {
array = append(array, String(v))
}
return array
case []int8:
for _, v := range i.([]int8) {
array = append(array, String(v))
}
case []int16:
for _, v := range i.([]int16) {
array = append(array, String(v))
}
case []int32:
for _, v := range i.([]int32) {
array = append(array, String(v))
}
case []int64:
for _, v := range i.([]int64) {
array = append(array, String(v))
}
case []uint:
for _, v := range i.([]uint) {
array = append(array, String(v))
}
case []uint8:
for _, v := range i.([]uint8) {
array = append(array, String(v))
}
case []uint16:
for _, v := range i.([]uint16) {
array = append(array, String(v))
}
case []uint32:
for _, v := range i.([]uint32) {
array = append(array, String(v))
}
case []uint64:
for _, v := range i.([]uint64) {
array = append(array, String(v))
}
case []bool:
for _, v := range i.([]bool) {
array = append(array, String(v))
}
case []float32:
for _, v := range i.([]float32) {
array = append(array, String(v))
}
case []float64:
for _, v := range i.([]float64) {
array = append(array, String(v))
}
case []interface{}:
for _, v := range i.([]interface{}) {
array = append(array, String(v))
}
}
if len(array) > 0 {
return array
}
}
return []string{fmt.Sprintf("%v", i)}

View File

@ -0,0 +1,39 @@
// 验证 map 的delete方法是否并发安全
package main
import (
"sync"
"time"
"fmt"
)
func main() {
// 创建一个初始化的map
m := make(map[int]int)
for i := 0; i < 10000; i++ {
m[i] = i
}
fmt.Println("map size:", len(m))
wg := sync.WaitGroup{}
ev := make(chan struct{}, 0)
// 创建10个并发的goroutine使用ev控制并发开始事件更容易模拟data race
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
<- ev
fmt.Println("start")
for i := 0; i < 10000; i++ {
delete(m, i)
}
wg.Done()
}()
}
time.Sleep(time.Second)
close(ev)
wg.Wait()
}

View File

@ -12,8 +12,8 @@ func main() {
// 数据生产者每隔1秒往队列写数据
gtime.SetInterval(time.Second, func() bool {
v := gtime.Now().String()
q.PushBack(v)
fmt.Println("PushBack:", v)
q.Push(v)
fmt.Println("Push:", v)
return true
})
@ -24,8 +24,8 @@ func main() {
// 消费者,不停读取队列数据并输出到终端
for {
if v := q.PopFront(); v != nil {
fmt.Println("PopFront:", v)
if v := q.Pop(); v != nil {
fmt.Println(" Pop:", v)
} else {
break
}

View File

@ -4,12 +4,11 @@ import (
"gitee.com/johng/gf/g/os/gcache"
"time"
"fmt"
"strconv"
)
func main() {
for i := 0; i < 10; i++ {
gcache.Set(strconv.Itoa(i), strconv.Itoa(i), 0)
gcache.Set(i, i, 0)
}
fmt.Println(gcache.Size())
fmt.Println(gcache.Keys())

19
geg/os/gcache/expire.go Normal file
View File

@ -0,0 +1,19 @@
package main
import (
"gitee.com/johng/gf/g/os/gcache"
"fmt"
"time"
)
func main() {
gcache.Set("k1", "v1", 1000)
gcache.Set("k2", "v2", 2000)
fmt.Println(gcache.Keys())
fmt.Println(gcache.Values())
time.Sleep(1*time.Second)
fmt.Println(gcache.Keys())
fmt.Println(gcache.Values())
}

24
geg/os/gcache/lru.go Normal file
View File

@ -0,0 +1,24 @@
package main
import (
"gitee.com/johng/gf/g/os/gcache"
"time"
"fmt"
)
func main() {
// 设置LRU淘汰数量
gcache.SetCap(2)
// 10个元素
for i := 0; i < 10; i++ {
gcache.Set(i, i, 0)
}
fmt.Println(gcache.Size())
fmt.Println(gcache.Keys())
// 等待一定时间后(默认10秒检查一次),元素会被按照从旧到新的顺序进行淘汰
time.Sleep(11*time.Second)
fmt.Println(gcache.Size())
fmt.Println(gcache.Keys())
}

View File

@ -0,0 +1,10 @@
package main
import (
"gitee.com/johng/gf/g/os/gview"
"gitee.com/johng/gf/g/util/gutil"
)
func main() {
gutil.Dump(gview.ParseContent(`{{"<div>测试</div>去掉HTML标签<script>var v=1;</script>"|text}}`, nil))
}

View File

@ -1,17 +1,10 @@
package main
import (
"gitee.com/johng/gf/g/os/gfile"
"strings"
"gitee.com/johng/gf/g/os/gfcache"
"fmt"
"fmt"
"math"
)
func main() {
files, _ := gfile.ScanDir("/home/john/Workspace/med3-svr", "*", true)
for _, file := range files {
if strings.Index(gfcache.GetContents(file), "ENV") != -1 {
fmt.Println(file)
}
}
fmt.Println(int64(math.Ceil(float64(11111/10000) + 1)*10000))
}

10
geg/util/gconv/strings.go Normal file
View File

@ -0,0 +1,10 @@
package main
import (
"fmt"
"gitee.com/johng/gf/g/util/gconv"
)
func main() {
fmt.Println(gconv.Strings([]int{1,2,3}))
}