diff --git a/g/database/gmq/gmq.go b/g/database/gmq/gmq.go index 04dd24075..42a53f0d2 100644 --- a/g/database/gmq/gmq.go +++ b/g/database/gmq/gmq.go @@ -1,7 +1,9 @@ // Go语言实现的高性能消息队列。 // 每个xxx.mq消息文件存储10万条消息,顶部为消息索引域,底部为数据存储域。 // 顶部索引域:消息状态(1byte) 数据开始位置(40bit,1TB) 数据长度(24bit, 16MB) -// 底部数据域:[压缩消息数据](变长) +// 底部数据域:[消息数据](变长) + +// @todo 未完成开发测试,暂时无法使用 package gmq @@ -13,11 +15,12 @@ import ( ) const ( - gDEAFULT_MQGROUP_NAME = "default" // 默认的队列分组名称 - gMQFILE_MAX_COUNT = 100000 // 每个队列文件存储的消息条数上限(不能随便该,和数据结构设计有关系) - gMQFILE_INDEX_ITEM_SIZE = 9 // 每个队列文件的索引域大小 - gMQFILEPOOL_TIMEOUT = 60 // 消息队列文件指针池过期时间 - gMQFILE_INDEX_LENGTH = gMQFILE_MAX_COUNT*gMQFILE_INDEX_ITEM_SIZE // 消息队列文件索引域大小 + gDEAFULT_MQGROUP_NAME = "default" // 默认的队列分组名称 + gMQFILE_MAX_COUNT = 100000 // 每个队列文件存储的消息条数上限(不能随便改,和数据结构设计有关系) + gMQFILE_INDEX_ITEM_SIZE = 9 // 每个队列文件的索引域大小(byte) + gMQFILEPOOL_TIMEOUT = 60 // 消息队列文件指针池过期时间 + gMQFILE_AUTO_CLEAN_TIMEOUT = 10 // 自动清理过期队列文件的间隔(秒) + gMQFILE_INDEX_LENGTH = gMQFILE_MAX_COUNT*gMQFILE_INDEX_ITEM_SIZE // 消息队列文件索引域固定大小 ) // 消息队列管理对象 @@ -28,17 +31,12 @@ type MQ struct { // 消息队列分类管理对象 type MQGroup struct { - mu sync.RWMutex - minid uint64 // 队列当前最小id - maxid uint64 // 队列当前最大id - path string // 该分类的消息队列数据文件存放目录(绝对路径) - name string // 分组名称(命名规则和文件名一致,因为要生成对应的目录) -} - -// 消息队列遍历器 -type MQIterator struct { - group *MQGroup // 遍历的分类 - id uint64 // 当前遍历的消息id + mu sync.RWMutex + minid uint64 // 队列当前最小id + maxid uint64 // 队列当前最大id + path string // 该分类的消息队列数据文件存放目录(绝对路径) + name string // 分组名称(命名规则和文件名一致,因为要生成对应的目录) + closed bool // 是否关闭 } // 创建消息队列管理对象 @@ -70,8 +68,20 @@ func (mq *MQ) Group(name string) *MQGroup { return mqg } +// 关闭分类队列 +func (mq *MQ) CloseGroup(name string) { + mq.Group(name).Close() +} + +// 关闭所有队列 +func (mq *MQ) Close() { + for _, v := range *mq.groups.Clone() { + v.(*MQGroup).Close() + } +} + // 向默认分类队列写入消息 -func (mq *MQ) Push(msg []byte) (uint64, error) { +func (mq *MQ) Push(msg []byte) error { return mq.Group(gDEAFULT_MQGROUP_NAME).Push(msg) } diff --git a/g/database/gmq/gmq_auto.go b/g/database/gmq/gmq_auto.go new file mode 100644 index 000000000..3f8c6e3f0 --- /dev/null +++ b/g/database/gmq/gmq_auto.go @@ -0,0 +1,30 @@ +package gmq + +import ( + "time" + "sync/atomic" + "gitee.com/johng/gf/g/os/gfile" +) + +// 自动清理过期的队列文件 +func (mqg *MQGroup) startAutoClean() { + go func() { + minid := atomic.LoadUint64(&mqg.minid) + for !mqg.isClosed() { + id := atomic.LoadUint64(&mqg.minid) + if id != minid { + // 多个队列文件 + if id - minid >= gMQFILE_MAX_COUNT { + gfile.Remove(mqg.getFilePathById(minid)) + minid = id + } + // 数据已全部使用完 + if id == atomic.LoadUint64(&mqg.maxid) { + gfile.Remove(mqg.getFilePathById(id)) + minid = id + } + } + time.Sleep(gMQFILE_AUTO_CLEAN_TIMEOUT*time.Second) + } + }() +} \ No newline at end of file diff --git a/g/database/gmq/gmq_group.go b/g/database/gmq/gmq_group.go index a9dfdede7..a468d2dcf 100644 --- a/g/database/gmq/gmq_group.go +++ b/g/database/gmq/gmq_group.go @@ -15,8 +15,7 @@ import ( // 根据需要写入的id获取对应的文件指针 func (mqg *MQGroup) getFileById(id uint64) (*os.File, error) { - fnum := int(id/uint64(gMQFILE_MAX_COUNT)) - path := mqg.path + gfile.Separator + strconv.Itoa(fnum) + ".mq" + path := mqg.getFilePathById(id) if file, err := gfilepool.OpenWithPool(path, os.O_RDWR|os.O_CREATE, gMQFILEPOOL_TIMEOUT); err == nil { return file.File(), nil } else { @@ -24,12 +23,19 @@ func (mqg *MQGroup) getFileById(id uint64) (*os.File, error) { } } +// 根据id获取对应的队列文件的绝对路径 +func (mqg *MQGroup) getFilePathById(id uint64) string { + fnum := int(id/uint64(gMQFILE_MAX_COUNT)) + path := mqg.path + gfile.Separator + strconv.Itoa(fnum) + return path +} + // 根据消息id计算索引在队列文件中的偏移量 func (mqg *MQGroup) getIndexOffsetById(id uint64) int64 { return int64(id%gMQFILE_MAX_COUNT)*gMQFILE_INDEX_ITEM_SIZE } -// 获取当前队列分类的最大id +// 初始化队列分类,获取当前队列分类的最小id及最大id func (mqg *MQGroup) init() { minid := uint64(0) maxid := uint64(0) @@ -63,7 +69,7 @@ func (mqg *MQGroup) init() { } } } - // 查找使用的最小id + // 查找未使用的最小id for _, fnum := range fnums { if file, err := mqg.getFileById(uint64(fnum)*gMQFILE_MAX_COUNT); err == nil { if ixbuffer := gfile.GetBinContentByTwoOffsets(file, 0, gMQFILE_INDEX_LENGTH); ixbuffer != nil { @@ -84,18 +90,35 @@ func (mqg *MQGroup) init() { } mqg.minid = minid mqg.maxid = maxid + mqg.startAutoClean() +} + +func (mqg *MQGroup) getMinId() uint64 { + return atomic.LoadUint64(&mqg.minid) +} + +func (mqg *MQGroup) setMinId(id uint64) { + atomic.StoreUint64(&mqg.minid, id) +} + +func (mqg *MQGroup) getMaxId() uint64 { + return atomic.LoadUint64(&mqg.maxid) +} + +func (mqg *MQGroup) setMaxId(id uint64) { + atomic.StoreUint64(&mqg.maxid, id) } // 获取队列的总数 func (mqg *MQGroup) Length() uint64 { - if atomic.LoadUint64(&mqg.minid) > 0 { - return atomic.LoadUint64(&mqg.maxid) - atomic.LoadUint64(&mqg.minid) + 1 + if mqg.getMinId() > 0 { + return mqg.getMaxId() - mqg.getMinId() + 1 } return 0 } -// 添加队列消息 -func (mqg *MQGroup) Push(msg []byte) (uint64, error) { +// Push +func (mqg *MQGroup) Push(msg []byte) error { // 锁定队列文件 mqg.mu.Lock() defer mqg.mu.Unlock() @@ -103,25 +126,24 @@ func (mqg *MQGroup) Push(msg []byte) (uint64, error) { id := mqg.maxid + 1 file, err := mqg.getFileById(id) if err != nil { - return 0, err + return err } ixoffset := mqg.getIndexOffsetById(id) - // 消息数据写到文件末尾 dataOffset, err := file.Seek(0, 2) if err != nil { - return 0, err + return err } if dataOffset < gMQFILE_INDEX_LENGTH { dataOffset = gMQFILE_INDEX_LENGTH // 文件需要初始化,索引域初始化为0 if _, err := file.WriteAt(make([]byte, gMQFILE_INDEX_LENGTH), 0); err != nil { - return 0, err + return err } } if _, err := file.WriteAt(msg, dataOffset); err != nil { - return 0, err + return err } // 数据写入成功后再写入索引域 bits := make([]gbinary.Bit, 0) @@ -129,24 +151,24 @@ func (mqg *MQGroup) Push(msg []byte) (uint64, error) { bits = gbinary.EncodeBits(bits, uint(len(msg)), 24) indexb := append(gbinary.EncodeInt8(1), gbinary.EncodeBitsToBytes(bits)...) if _, err := file.WriteAt(indexb, ixoffset); err != nil { - return 0, err + return err } // 执行成功之后最大id才会递增 mqg.maxid++ - return id, nil + return nil } -// POP +// POP,队列没有数据返回nil func (mqg *MQGroup) Pop() []byte { mqg.mu.Lock() defer mqg.mu.Unlock() - minid := mqg.minid - if minid == 0 { - minid = 1 + // 没有数据可获取 + if mqg.minid == 0 || mqg.minid == mqg.maxid { + return nil } - if buffer := mqg.get(minid); buffer != nil { - if mqg.remove(minid) == nil { - mqg.minid = minid + 1 + if buffer := mqg.get(mqg.minid); buffer != nil { + if mqg.remove(mqg.minid) == nil { + mqg.minid++ return buffer } } @@ -184,4 +206,18 @@ func (mqg *MQGroup) remove(id uint64) error { return err } return nil +} + +// 关闭队列分类,自动回收资源 +func (mqg *MQGroup) Close() { + mqg.mu.Lock() + defer mqg.mu.Unlock() + mqg.closed = true +} + +// 队列分类是否已关闭 +func (mqg *MQGroup) isClosed() bool { + mqg.mu.RLock() + defer mqg.mu.RUnlock() + return mqg.closed } \ No newline at end of file