gmq开发,暂停

This commit is contained in:
John
2017-12-25 16:23:05 +08:00
parent 4af5a2e7a3
commit 7276970adc
3 changed files with 116 additions and 40 deletions

View File

@ -1,7 +1,9 @@
// Go语言实现的高性能消息队列。
// 每个xxx.mq消息文件存储10万条消息顶部为消息索引域底部为数据存储域。
// 顶部索引域:消息状态(1byte) 数据开始位置(40bit,1TB) 数据长度(24bit, 16MB)
// 底部数据域:[压缩消息数据](变长)
// 底部数据域:[消息数据](变长)
// @todo 未完成开发测试,暂时无法使用
package gmq
@ -13,11 +15,12 @@ import (
)
const (
gDEAFULT_MQGROUP_NAME = "default" // 默认的队列分组名称
gMQFILE_MAX_COUNT = 100000 // 每个队列文件存储的消息条数上限(不能随便,和数据结构设计有关系)
gMQFILE_INDEX_ITEM_SIZE = 9 // 每个队列文件的索引域大小
gMQFILEPOOL_TIMEOUT = 60 // 消息队列文件指针池过期时间
gMQFILE_INDEX_LENGTH = gMQFILE_MAX_COUNT*gMQFILE_INDEX_ITEM_SIZE // 消息队列文件索引域大小
gDEAFULT_MQGROUP_NAME = "default" // 默认的队列分组名称
gMQFILE_MAX_COUNT = 100000 // 每个队列文件存储的消息条数上限(不能随便,和数据结构设计有关系)
gMQFILE_INDEX_ITEM_SIZE = 9 // 每个队列文件的索引域大小(byte)
gMQFILEPOOL_TIMEOUT = 60 // 消息队列文件指针池过期时间
gMQFILE_AUTO_CLEAN_TIMEOUT = 10 // 自动清理过期队列文件的间隔(秒)
gMQFILE_INDEX_LENGTH = gMQFILE_MAX_COUNT*gMQFILE_INDEX_ITEM_SIZE // 消息队列文件索引域固定大小
)
// 消息队列管理对象
@ -28,17 +31,12 @@ type MQ struct {
// 消息队列分类管理对象
type MQGroup struct {
mu sync.RWMutex
minid uint64 // 队列当前最小id
maxid uint64 // 队列当前最大id
path string // 该分类的消息队列数据文件存放目录(绝对路径)
name string // 分组名称(命名规则和文件名一致,因为要生成对应的目录)
}
// 消息队列遍历器
type MQIterator struct {
group *MQGroup // 遍历的分类
id uint64 // 当前遍历的消息id
mu sync.RWMutex
minid uint64 // 队列当前最小id
maxid uint64 // 队列当前最大id
path string // 该分类的消息队列数据文件存放目录(绝对路径)
name string // 分组名称(命名规则和文件名一致,因为要生成对应的目录)
closed bool // 是否关闭
}
// 创建消息队列管理对象
@ -70,8 +68,20 @@ func (mq *MQ) Group(name string) *MQGroup {
return mqg
}
// 关闭分类队列
func (mq *MQ) CloseGroup(name string) {
mq.Group(name).Close()
}
// 关闭所有队列
func (mq *MQ) Close() {
for _, v := range *mq.groups.Clone() {
v.(*MQGroup).Close()
}
}
// 向默认分类队列写入消息
func (mq *MQ) Push(msg []byte) (uint64, error) {
func (mq *MQ) Push(msg []byte) error {
return mq.Group(gDEAFULT_MQGROUP_NAME).Push(msg)
}

View File

@ -0,0 +1,30 @@
package gmq
import (
"time"
"sync/atomic"
"gitee.com/johng/gf/g/os/gfile"
)
// 自动清理过期的队列文件
func (mqg *MQGroup) startAutoClean() {
go func() {
minid := atomic.LoadUint64(&mqg.minid)
for !mqg.isClosed() {
id := atomic.LoadUint64(&mqg.minid)
if id != minid {
// 多个队列文件
if id - minid >= gMQFILE_MAX_COUNT {
gfile.Remove(mqg.getFilePathById(minid))
minid = id
}
// 数据已全部使用完
if id == atomic.LoadUint64(&mqg.maxid) {
gfile.Remove(mqg.getFilePathById(id))
minid = id
}
}
time.Sleep(gMQFILE_AUTO_CLEAN_TIMEOUT*time.Second)
}
}()
}

View File

@ -15,8 +15,7 @@ import (
// 根据需要写入的id获取对应的文件指针
func (mqg *MQGroup) getFileById(id uint64) (*os.File, error) {
fnum := int(id/uint64(gMQFILE_MAX_COUNT))
path := mqg.path + gfile.Separator + strconv.Itoa(fnum) + ".mq"
path := mqg.getFilePathById(id)
if file, err := gfilepool.OpenWithPool(path, os.O_RDWR|os.O_CREATE, gMQFILEPOOL_TIMEOUT); err == nil {
return file.File(), nil
} else {
@ -24,12 +23,19 @@ func (mqg *MQGroup) getFileById(id uint64) (*os.File, error) {
}
}
// 根据id获取对应的队列文件的绝对路径
func (mqg *MQGroup) getFilePathById(id uint64) string {
fnum := int(id/uint64(gMQFILE_MAX_COUNT))
path := mqg.path + gfile.Separator + strconv.Itoa(fnum)
return path
}
// 根据消息id计算索引在队列文件中的偏移量
func (mqg *MQGroup) getIndexOffsetById(id uint64) int64 {
return int64(id%gMQFILE_MAX_COUNT)*gMQFILE_INDEX_ITEM_SIZE
}
// 获取当前队列分类的最大id
// 初始化队列分类,获取当前队列分类的最小id及最大id
func (mqg *MQGroup) init() {
minid := uint64(0)
maxid := uint64(0)
@ -63,7 +69,7 @@ func (mqg *MQGroup) init() {
}
}
}
// 查找使用的最小id
// 查找使用的最小id
for _, fnum := range fnums {
if file, err := mqg.getFileById(uint64(fnum)*gMQFILE_MAX_COUNT); err == nil {
if ixbuffer := gfile.GetBinContentByTwoOffsets(file, 0, gMQFILE_INDEX_LENGTH); ixbuffer != nil {
@ -84,18 +90,35 @@ func (mqg *MQGroup) init() {
}
mqg.minid = minid
mqg.maxid = maxid
mqg.startAutoClean()
}
func (mqg *MQGroup) getMinId() uint64 {
return atomic.LoadUint64(&mqg.minid)
}
func (mqg *MQGroup) setMinId(id uint64) {
atomic.StoreUint64(&mqg.minid, id)
}
func (mqg *MQGroup) getMaxId() uint64 {
return atomic.LoadUint64(&mqg.maxid)
}
func (mqg *MQGroup) setMaxId(id uint64) {
atomic.StoreUint64(&mqg.maxid, id)
}
// 获取队列的总数
func (mqg *MQGroup) Length() uint64 {
if atomic.LoadUint64(&mqg.minid) > 0 {
return atomic.LoadUint64(&mqg.maxid) - atomic.LoadUint64(&mqg.minid) + 1
if mqg.getMinId() > 0 {
return mqg.getMaxId() - mqg.getMinId() + 1
}
return 0
}
// 添加队列消息
func (mqg *MQGroup) Push(msg []byte) (uint64, error) {
// Push
func (mqg *MQGroup) Push(msg []byte) error {
// 锁定队列文件
mqg.mu.Lock()
defer mqg.mu.Unlock()
@ -103,25 +126,24 @@ func (mqg *MQGroup) Push(msg []byte) (uint64, error) {
id := mqg.maxid + 1
file, err := mqg.getFileById(id)
if err != nil {
return 0, err
return err
}
ixoffset := mqg.getIndexOffsetById(id)
// 消息数据写到文件末尾
dataOffset, err := file.Seek(0, 2)
if err != nil {
return 0, err
return err
}
if dataOffset < gMQFILE_INDEX_LENGTH {
dataOffset = gMQFILE_INDEX_LENGTH
// 文件需要初始化索引域初始化为0
if _, err := file.WriteAt(make([]byte, gMQFILE_INDEX_LENGTH), 0); err != nil {
return 0, err
return err
}
}
if _, err := file.WriteAt(msg, dataOffset); err != nil {
return 0, err
return err
}
// 数据写入成功后再写入索引域
bits := make([]gbinary.Bit, 0)
@ -129,24 +151,24 @@ func (mqg *MQGroup) Push(msg []byte) (uint64, error) {
bits = gbinary.EncodeBits(bits, uint(len(msg)), 24)
indexb := append(gbinary.EncodeInt8(1), gbinary.EncodeBitsToBytes(bits)...)
if _, err := file.WriteAt(indexb, ixoffset); err != nil {
return 0, err
return err
}
// 执行成功之后最大id才会递增
mqg.maxid++
return id, nil
return nil
}
// POP
// POP队列没有数据返回nil
func (mqg *MQGroup) Pop() []byte {
mqg.mu.Lock()
defer mqg.mu.Unlock()
minid := mqg.minid
if minid == 0 {
minid = 1
// 没有数据可获取
if mqg.minid == 0 || mqg.minid == mqg.maxid {
return nil
}
if buffer := mqg.get(minid); buffer != nil {
if mqg.remove(minid) == nil {
mqg.minid = minid + 1
if buffer := mqg.get(mqg.minid); buffer != nil {
if mqg.remove(mqg.minid) == nil {
mqg.minid++
return buffer
}
}
@ -184,4 +206,18 @@ func (mqg *MQGroup) remove(id uint64) error {
return err
}
return nil
}
// 关闭队列分类,自动回收资源
func (mqg *MQGroup) Close() {
mqg.mu.Lock()
defer mqg.mu.Unlock()
mqg.closed = true
}
// 队列分类是否已关闭
func (mqg *MQGroup) isClosed() bool {
mqg.mu.RLock()
defer mqg.mu.RUnlock()
return mqg.closed
}