新增gmlock内存锁组件,改进glog日志写文件时的文件锁机制

This commit is contained in:
John
2018-08-30 00:00:15 +08:00
parent 050d7a868f
commit 5a0b447d35
14 changed files with 578 additions and 90 deletions

View File

@ -28,15 +28,15 @@ type Cache struct {
smu sync.RWMutex // eksets锁
lru *_Lru // LRU缓存限制(只有限定池大小时才启用)
cap *gtype.Int // 控制缓存池大小超过大小则按照LRU算法进行缓存过期处理(默认为0表示不进行限制)
data map[string]_CacheItem // 缓存数据(所有的缓存数据存放哈希表)
ekmap map[string]int64 // 键名对应的分组过期时间(用于相同键名过期时间快速更新)
eksets map[int64]*gset.StringSet // 分组过期时间对应的键名列表(用于自动过期快速删除)
data map[string]CacheItem // 缓存数据(所有的缓存数据存放哈希表)
ekmap map[string]int64 // 键名对应的分组过期时间(用于相同键名过期时间快速更新),键值为秒级时间戳
eksets map[int64]*gset.StringSet // 分组过期时间对应的键名列表(用于自动过期快速删除),键值为秒级时间戳
eventQueue *gqueue.Queue // 异步处理队列
stopEvents chan struct{} // 关闭时间通知
}
// 缓存数据项
type _CacheItem struct {
type CacheItem struct {
v interface{} // 缓存键值
e int64 // 过期时间
}
@ -48,14 +48,14 @@ type _EventItem struct {
}
// 全局缓存管理对象
var cache *Cache = New()
var cache = New()
// Cache对象按照缓存键名首字母做了分组
func New() *Cache {
c := &Cache {
lru : newLru(),
cap : gtype.NewInt(),
data : make(map[string]_CacheItem),
data : make(map[string]CacheItem),
ekmap : make(map[string]int64),
eksets : make(map[int64]*gset.StringSet),
eventQueue : gqueue.New(),
@ -96,20 +96,6 @@ func BatchRemove(keys []string) {
cache.BatchRemove(keys)
}
// 基于内存缓存的锁锁成功返回true失败返回false当失败时表示有其他的锁存在
func Lock(key string, expire int) bool {
if v := cache.Get(key); v != nil {
return false
}
cache.Set(key, struct {}{}, expire)
return true
}
// 解除基于内存缓存的锁
func Unlock(key string) {
cache.Remove(key)
}
// 获得所有的键名,组成字符串数组返回
func Keys() []string {
return cache.Keys()
@ -168,7 +154,7 @@ func (c *Cache) Set(key string, value interface{}, expire int) {
e = gDEFAULT_MAX_EXPIRE
}
c.dmu.Lock()
c.data[key] = _CacheItem{v : value, e : e}
c.data[key] = CacheItem{v : value, e : e}
c.dmu.Unlock()
c.eventQueue.PushBack(_EventItem{k : key, e : e})
}
@ -183,35 +169,19 @@ func (c *Cache) BatchSet(data map[string]interface{}, expire int) {
}
for k, v := range data {
c.dmu.Lock()
c.data[k] = _CacheItem{v: v, e: e}
c.data[k] = CacheItem{v: v, e: e}
c.dmu.Unlock()
c.eventQueue.PushBack(_EventItem{k: k, e:e})
}
}
// 基于内存缓存的锁锁成功返回true失败返回false当失败时表示有其他的锁存在
func (c *Cache) Lock(key string, expire int) bool {
if v := c.Get(key); v != nil {
return false
}
c.Set(key, struct {}{}, expire)
return true
}
// 解除基于内存缓存的锁
func (c *Cache) Unlock(key string) {
c.Remove(key)
}
// 获取指定键名的值
func (c *Cache) Get(key string) interface{} {
c.dmu.RLock()
item, ok := c.data[key]
c.dmu.RUnlock()
if ok {
if item.e > gtime.Millisecond() {
return item.v
}
if ok && !item.IsExpired() {
return item.v
}
return nil
}
@ -225,7 +195,7 @@ func (c *Cache) Remove(key string) {
func (c *Cache) BatchRemove(keys []string) {
for _, key := range keys {
c.dmu.Lock()
c.data[key] = _CacheItem{v: nil, e: -1000}
c.data[key] = CacheItem{v: nil, e: -1000}
c.dmu.Unlock()
c.eventQueue.PushBack(_EventItem{k: key, e: -1000})
}
@ -336,7 +306,13 @@ func (c *Cache) autoClearLoop() {
func (c *Cache) clearByKey(key string) bool {
// 删除缓存数据
c.dmu.Lock()
delete(c.data, key)
// 为防止删除时正好该key正在进行写操作因此在内部需要使用写锁再进行一次确认
if item, ok := c.data[key]; ok && item.IsExpired() {
delete(c.data, key)
} else {
c.dmu.Unlock()
return true
}
c.dmu.Unlock()
// 删除异步处理数据项,并更新缓存的内存使用大小记录值

View File

@ -0,0 +1,17 @@
// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package gcache
import "gitee.com/johng/gf/g/os/gtime"
// 判断缓存项是否已过期
func (item *CacheItem) IsExpired() bool {
if item.e > gtime.Millisecond() {
return false
}
return true
}

View File

@ -15,14 +15,14 @@ import (
)
type Logger struct {
mu sync.RWMutex
pr *Logger // 父级Logger
io io.Writer // 日志内容写入的IO接口
path *gtype.String // 日志写入的目录路径
debug *gtype.Bool // 是否允许输出DEBUG信息
btSkip *gtype.Int // 错误产生时的backtrace回调信息skip条数
btEnabled *gtype.Bool // 是否当打印错误时同时开启backtrace打印
allowMulti *gtype.Bool // 控制台打印开关,当输出到文件时也同时打印到终端
mu sync.RWMutex
pr *Logger // 父级Logger
io io.Writer // 日志内容写入的IO接口
path *gtype.String // 日志写入的目录路径
debug *gtype.Bool // 是否允许输出DEBUG信息
btSkip *gtype.Int // 错误产生时的backtrace回调信息skip条数
btEnabled *gtype.Bool // 是否当打印错误时同时开启backtrace打印
alsoStdPrint *gtype.Bool // 控制台打印开关,当输出到文件/自定义输出时也同时打印到终端
}
var (

View File

@ -3,6 +3,7 @@
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// @author john, zseeker
package glog
@ -19,6 +20,7 @@ import (
"gitee.com/johng/gf/g/util/gregex"
"gitee.com/johng/gf/g/os/gfilepool"
"gitee.com/johng/gf/g/container/gtype"
"gitee.com/johng/gf/g/os/gmlock"
)
const (
@ -41,25 +43,25 @@ func init() {
// 新建自定义的日志操作对象
func New() *Logger {
return &Logger {
io : nil,
path : gtype.NewString(),
debug : gtype.NewBool(true),
btSkip : gtype.NewInt(),
btEnabled : gtype.NewBool(true),
allowMulti : gtype.NewBool(true),
io : nil,
path : gtype.NewString(),
debug : gtype.NewBool(true),
btSkip : gtype.NewInt(),
btEnabled : gtype.NewBool(true),
alsoStdPrint : gtype.NewBool(true),
}
}
// Logger深拷贝
func (l *Logger) Clone() *Logger {
return &Logger {
pr : l,
io : l.GetIO(),
path : l.path.Clone(),
debug : l.debug.Clone(),
btSkip : l.btSkip.Clone(),
btEnabled : l.btEnabled.Clone(),
allowMulti : l.allowMulti.Clone(),
pr : l,
io : l.GetIO(),
path : l.path.Clone(),
debug : l.debug.Clone(),
btSkip : l.btSkip.Clone(),
btEnabled : l.btEnabled.Clone(),
alsoStdPrint : l.alsoStdPrint.Clone(),
}
}
@ -130,30 +132,30 @@ func (l *Logger) SetPath(path string) error {
// @author zseeker
// @date 2018-05-24
func (l *Logger) SetStdPrint(enabled bool) {
l.allowMulti.Set(enabled)
l.alsoStdPrint.Set(enabled)
}
// 这里的写锁保证统一时刻只会写入一行日志,防止串日志的情况
func (l *Logger) print(def io.Writer, s string) {
func (l *Logger) print(std io.Writer, s string) {
// 优先使用自定义的IO输出
str := l.format(s)
writer := l.GetIO()
if writer == nil {
// 如果设置的IO为空那么其次判断是否有文件输出设置
// 内部使用了内存锁保证在glog中对同一个日志文件的并发写入不会串日志
if f := l.getFileByPool(); f != nil {
writer = f
// 如果有文件设置那么需要判断是否同时输出到文件和终端
// @author zseeker
if l.allowMulti.Val() {
writer = io.MultiWriter(writer, def)
}
defer f.Close()
} else {
writer = def
key := l.path.Val()
gmlock.Lock(key)
io.WriteString(f, str)
gmlock.Unlock(key)
}
} else {
io.WriteString(writer, str)
}
if l.alsoStdPrint.Val() {
io.WriteString(std, str)
}
l.mu.Lock()
fmt.Fprint(writer, l.format(s))
l.mu.Unlock()
}
// 核心打印数据方法(标准输出)

40
g/os/gmlock/gmlock.go Normal file
View File

@ -0,0 +1,40 @@
// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
// 内存锁.
package gmlock
var locker = New()
// 内存写锁如果锁成功返回true失败则返回false;过期时间单位为毫秒默认为0表示不过期
func TryLock(key string, expire...int) bool {
return locker.TryLock(key, expire...)
}
// 内存写锁锁成功返回true失败时阻塞当失败时表示有其他写锁存在;过期时间单位为毫秒默认为0表示不过期
func Lock(key string, expire...int) {
locker.Lock(key, expire...)
}
// 解除基于内存锁的写锁
func Unlock(key string) {
locker.Unlock(key)
}
// 内存读锁如果锁成功返回true失败则返回false;过期时间单位为毫秒默认为0表示不过期
func TryRLock(key string, expire...int) bool {
return locker.TryRLock(key, expire...)
}
// 内存写锁锁成功返回true失败时阻塞当失败时表示有写锁存在;过期时间单位为毫秒默认为0表示不过期
func RLock(key string, expire...int) {
locker.RLock(key, expire...)
}
// 解除基于内存锁的读锁
func RUnlock(key string) {
locker.RUnlock(key)
}

View File

@ -0,0 +1,125 @@
// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package gmlock
import (
"gitee.com/johng/gf/g/container/gmap"
"gitee.com/johng/gf/g/os/gtime"
"time"
)
// 内存锁管理对象
type Locker struct {
m *gmap.StringInterfaceMap
}
// 创建一把内存锁使用的底层RWLocker
func New() *Locker {
return &Locker{
m : gmap.NewStringInterfaceMap(),
}
}
// 内存写锁如果锁成功返回true失败则返回false;过期时间单位为毫秒默认为0表示不过期
func (l *Locker) TryLock(key string, expire...int) bool {
return l.doLock(key, l.getExpire(expire...), true)
}
// 内存写锁锁成功返回true失败时阻塞当失败时表示有其他写锁存在;过期时间单位为毫秒默认为0表示不过期
func (l *Locker) Lock(key string, expire...int) {
l.doLock(key, l.getExpire(expire...), false)
}
// 解除基于内存锁的写锁
func (l *Locker) Unlock(key string) {
if v := l.m.Get(key); v != nil {
v.(*Mutex).Unlock()
}
}
// 内存读锁如果锁成功返回true失败则返回false;过期时间单位为毫秒默认为0表示不过期
func (l *Locker) TryRLock(key string, expire...int) bool {
return l.doRLock(key, l.getExpire(expire...), true)
}
// 内存写锁锁成功返回true失败时阻塞当失败时表示有写锁存在;过期时间单位为毫秒默认为0表示不过期
func (l *Locker) RLock(key string, expire...int) {
l.doRLock(key, l.getExpire(expire...), false)
}
// 解除基于内存锁的读锁
func (l *Locker) RUnlock(key string) {
if v := l.m.Get(key); v != nil {
v.(*Mutex).RUnlock()
}
}
// 获得过期时间没有设置时默认为0不过期
func (l *Locker) getExpire(expire...int) int {
e := 0
if len(expire) > 0 {
e = expire[0]
}
return e
}
// 内存写锁当try==true时如果锁成功返回true失败则返回falsetry==false时成功时立即返回否则阻塞等待
func (l *Locker) doLock(key string, expire int, try bool) bool {
mu := l.getOrNewMutex(key)
ok := true
if try {
ok = mu.TryLock()
} else {
mu.Lock()
}
if ok && expire > 0 {
// 异步goroutine计时处理
gtime.SetTimeout(time.Duration(expire)*time.Millisecond, func() {
mu.Unlock()
})
}
return ok
}
// 内存读锁当try==true时如果锁成功返回true失败则返回falsetry==false时成功时立即返回否则阻塞等待
func (l *Locker) doRLock(key string, expire int, try bool) bool {
mu := l.getOrNewMutex(key)
ok := true
if try {
ok = mu.TryRLock()
} else {
mu.RLock()
}
if ok && expire > 0 {
// 异步goroutine计时处理
gtime.SetTimeout(time.Duration(expire)*time.Millisecond, func() {
mu.RUnlock()
})
}
return ok
}
// 根据指定key查询或者创建新的Mutex
func (l *Locker) getOrNewMutex(key string) (mu *Mutex) {
// 优先进行读取检查,提高读取效率
if v := l.m.Get(key); v != nil {
mu = v.(*Mutex)
}
if mu == nil {
l.m.LockFunc(func(m map[string]interface{}) {
// 这里必须再进行一次查询上面那一次使用的是读锁支持并发效率高这里面的是写锁只支持1个goroutine操作
if v, ok := m[key]; ok {
mu = v.(*Mutex)
}
if mu == nil {
mu = NewMutex()
m[key] = mu
}
})
}
return
}

View File

@ -0,0 +1,71 @@
// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package gmlock
import (
"sync"
"gitee.com/johng/gf/g/container/gtype"
)
// 互斥锁对象
type Mutex struct {
mu sync.RWMutex
rcount *gtype.Int // RLock次数
wcount *gtype.Int // Lock次数
}
// 创建一把内存锁使用的底层RWMutex
func NewMutex() *Mutex {
return &Mutex{
rcount : gtype.NewInt(),
wcount : gtype.NewInt(),
}
}
// 不阻塞Lock
func (l *Mutex) TryLock() bool {
if l.wcount.Val() == 0 && l.rcount.Val() == 0 {
l.Lock()
return true
}
return false
}
func (l *Mutex) Lock() {
l.wcount.Add(1)
l.mu.Lock()
}
// 安全的Unlock
func (l *Mutex) Unlock() {
if l.wcount.Val() > 0 {
l.mu.Unlock()
l.wcount.Add(-1)
}
}
// 不阻塞RLock
func (l *Mutex) TryRLock() bool {
if l.wcount.Val() == 0 {
l.RLock()
return true
}
return false
}
func (l *Mutex) RLock() {
l.rcount.Add(1)
l.mu.RLock()
}
// 安全的RUnlock
func (l *Mutex) RUnlock() {
if l.rcount.Val() > 0 {
l.mu.RUnlock()
l.rcount.Add(-1)
}
}

View File

@ -3,6 +3,7 @@ package main
import (
"gitee.com/johng/gf/g/database/gdb"
"gitee.com/johng/gf/g"
"gitee.com/johng/gf/g/os/glog"
)
func main() {
@ -10,7 +11,7 @@ func main() {
Host: "127.0.0.1",
Port: "3306",
User: "root",
Pass: "123456",
Pass: "8692651",
Name: "test",
Type: "mysql",
Role: "master",
@ -20,7 +21,7 @@ func main() {
if err != nil {
panic(err)
}
glog.SetPath("/tmp")
db.SetDebug(true)
// 执行3条SQL查询
for i := 1; i <= 3; i++ {

25
geg/os/gmlock/locker1.go Normal file
View File

@ -0,0 +1,25 @@
package main
import (
"time"
"sync"
"gitee.com/johng/gf/g/os/glog"
"gitee.com/johng/gf/g/os/gmlock"
)
// 内存锁基本使用
func main() {
key := "lock"
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
gmlock.Lock(key)
glog.Println(i)
time.Sleep(time.Second)
gmlock.Unlock(key)
wg.Done()
}(i)
}
wg.Wait()
}

22
geg/os/gmlock/locker2.go Normal file
View File

@ -0,0 +1,22 @@
package main
import (
"sync"
"gitee.com/johng/gf/g/os/glog"
"gitee.com/johng/gf/g/os/gmlock"
)
// 内存锁 - 给定过期时间
func main() {
key := "lock"
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
gmlock.Lock(key, 1000)
glog.Println(i)
wg.Done()
}(i)
}
wg.Wait()
}

28
geg/os/gmlock/locker3.go Normal file
View File

@ -0,0 +1,28 @@
package main
import (
"sync"
"gitee.com/johng/gf/g/os/glog"
"time"
"gitee.com/johng/gf/g/os/gmlock"
)
// 内存锁 - TryLock
func main() {
key := "lock"
wg := sync.WaitGroup{}
for i := 0; i < 10; i++ {
wg.Add(1)
go func(i int) {
if gmlock.TryLock(key) {
glog.Println(i)
time.Sleep(time.Second)
gmlock.Unlock(key)
} else {
glog.Println(false)
}
wg.Done()
}(i)
}
wg.Wait()
}

View File

@ -0,0 +1,95 @@
package main
import (
"gitee.com/johng/gf/g/os/gmlock"
"sync"
"fmt"
"time"
"math/rand"
)
// 测试Locker是否会产生死锁
func main() {
l := gmlock.New()
wg := sync.WaitGroup{}
key := "test"
event := make(chan int)
number := 100000
for i := 0; i < number; i++ {
wg.Add(1)
go func() {
<- event
l.Lock(key)
//fmt.Println("get lock")
l.Unlock(key)
wg.Done()
}()
}
for i := 0; i < number; i++ {
wg.Add(1)
go func() {
<- event
l.RLock(key)
//fmt.Println("get rlock")
l.RUnlock(key)
wg.Done()
}()
}
for i := 0; i < number; i++ {
wg.Add(1)
go func() {
<- event
if l.TryLock(key) {
//fmt.Println("get lock")
l.Unlock(key)
}
wg.Done()
}()
}
for i := 0; i < number; i++ {
wg.Add(1)
go func() {
<- event
if l.TryRLock(key) {
//fmt.Println("get rlock")
l.RUnlock(key)
}
wg.Done()
}()
}
for i := 0; i < number; i++ {
wg.Add(1)
go func() {
<- event
if l.TryLock(key) {
// 模拟业务逻辑的随机处理间隔
time.Sleep(time.Duration(rand.Intn(100))*time.Millisecond)
l.Unlock(key)
}
wg.Done()
}()
}
for i := 0; i < number; i++ {
wg.Add(1)
go func() {
<- event
if l.TryRLock(key) {
// 模拟业务逻辑的随机处理间隔
time.Sleep(time.Duration(rand.Intn(100))*time.Millisecond)
l.RUnlock(key)
}
wg.Done()
}()
}
// 使用chan作为事件发送测试指令让所有的goroutine同时执行
close(event)
wg.Wait()
fmt.Println("done!")
}

View File

@ -0,0 +1,94 @@
package main
import (
"gitee.com/johng/gf/g/os/gmlock"
"sync"
"fmt"
"time"
"math/rand"
)
// 测试是否会产生死锁
func main() {
mu := gmlock.NewMutex()
wg := sync.WaitGroup{}
event := make(chan int)
number := 100000
for i := 0; i < number; i++ {
wg.Add(1)
go func() {
<- event
mu.Lock()
//fmt.Println("get lock")
mu.Unlock()
wg.Done()
}()
}
for i := 0; i < number; i++ {
wg.Add(1)
go func() {
<- event
mu.RLock()
//fmt.Println("get rlock")
mu.RUnlock()
wg.Done()
}()
}
for i := 0; i < number; i++ {
wg.Add(1)
go func() {
<- event
if mu.TryLock() {
//fmt.Println("get lock")
mu.Unlock()
}
wg.Done()
}()
}
for i := 0; i < number; i++ {
wg.Add(1)
go func() {
<- event
if mu.TryRLock() {
//fmt.Println("get rlock")
mu.RUnlock()
}
wg.Done()
}()
}
for i := 0; i < number; i++ {
wg.Add(1)
go func() {
<- event
if mu.TryLock() {
// 模拟业务逻辑的随机处理间隔
time.Sleep(time.Duration(rand.Intn(100))*time.Millisecond)
mu.Unlock()
}
wg.Done()
}()
}
for i := 0; i < number; i++ {
wg.Add(1)
go func() {
<- event
if mu.TryRLock() {
// 模拟业务逻辑的随机处理间隔
time.Sleep(time.Duration(rand.Intn(100))*time.Millisecond)
mu.RUnlock()
}
wg.Done()
}()
}
// 使用chan作为事件发送测试指令让所有的goroutine同时执行
close(event)
wg.Wait()
fmt.Println("done!")
}

View File

@ -1,19 +1,11 @@
package main
import (
"gitee.com/johng/gf/g"
"gitee.com/johng/gf/g/net/ghttp"
"fmt"
"sync"
)
func main() {
s := g.Server()
s.BindHandler("/admin", func(r *ghttp.Request) {
fmt.Println("admin")
})
s.BindHandler("/admin/", func(r *ghttp.Request) {
fmt.Println("admin/")
})
s.SetPort(8199)
s.Run()
mu := sync.RWMutex{}
mu.RLocker()
mu.Lock()
}