From b720cb59872bfe753c963fe968d0e40bef757ef7 Mon Sep 17 00:00:00 2001 From: John Date: Fri, 22 Dec 2017 17:32:18 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9Egmq=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E9=98=9F=E5=88=97=E6=A8=A1=E5=9D=97=EF=BC=8C=E5=BC=80=E5=8F=91?= =?UTF-8?q?=E4=B8=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- g/database/gmq/gmq.go | 81 ++++++++++++++++ g/database/gmq/gmq_group.go | 187 ++++++++++++++++++++++++++++++++++++ geg/database/gmq/gmq.go | 23 +++++ geg/other/test.go | 8 +- 4 files changed, 295 insertions(+), 4 deletions(-) create mode 100644 g/database/gmq/gmq.go create mode 100644 g/database/gmq/gmq_group.go create mode 100644 geg/database/gmq/gmq.go diff --git a/g/database/gmq/gmq.go b/g/database/gmq/gmq.go new file mode 100644 index 000000000..04dd24075 --- /dev/null +++ b/g/database/gmq/gmq.go @@ -0,0 +1,81 @@ +// Go语言实现的高性能消息队列。 +// 每个xxx.mq消息文件存储10万条消息,顶部为消息索引域,底部为数据存储域。 +// 顶部索引域:消息状态(1byte) 数据开始位置(40bit,1TB) 数据长度(24bit, 16MB) +// 底部数据域:[压缩消息数据](变长) + +package gmq + +import ( + "errors" + "gitee.com/johng/gf/g/container/gmap" + "gitee.com/johng/gf/g/os/gfile" + "sync" +) + +const ( + gDEAFULT_MQGROUP_NAME = "default" // 默认的队列分组名称 + gMQFILE_MAX_COUNT = 100000 // 每个队列文件存储的消息条数上限(不能随便该,和数据结构设计有关系) + gMQFILE_INDEX_ITEM_SIZE = 9 // 每个队列文件的索引域大小 + gMQFILEPOOL_TIMEOUT = 60 // 消息队列文件指针池过期时间 + gMQFILE_INDEX_LENGTH = gMQFILE_MAX_COUNT*gMQFILE_INDEX_ITEM_SIZE // 消息队列文件索引域大小 +) + +// 消息队列管理对象 +type MQ struct { + path string // 消息队列数据文件存放目录(绝对路径) + groups *gmap.StringInterfaceMap // 所有的消息队列分类 +} + +// 消息队列分类管理对象 +type MQGroup struct { + mu sync.RWMutex + minid uint64 // 队列当前最小id + maxid uint64 // 队列当前最大id + path string // 该分类的消息队列数据文件存放目录(绝对路径) + name string // 分组名称(命名规则和文件名一致,因为要生成对应的目录) +} + +// 消息队列遍历器 +type MQIterator struct { + group *MQGroup // 遍历的分类 + id uint64 // 当前遍历的消息id +} + +// 创建消息队列管理对象 +func New(path string) (*MQ, error) { + if !gfile.Exists(path) { + if err := gfile.Mkdir(path); err != nil { + return nil, errors.New("creating mq folder failed: " + err.Error()) + } + } + if !gfile.IsWritable(path) || !gfile.IsReadable(path) { + return nil, errors.New("permission denied to mq folder: " + path) + } + mq := &MQ { + path : path, + groups : gmap.NewStringInterfaceMap(), + } + return mq, nil +} + +// 获取或者创建一个消息队列分类 +func (mq *MQ) Group(name string) *MQGroup { + if result := mq.groups.Get(name); result != nil { + return result.(*MQGroup) + } + path := mq.path + gfile.Separator + name + mqg := &MQGroup {path : path, name : name} + mqg.init() + mq.groups.Set(name, mqg) + return mqg +} + +// 向默认分类队列写入消息 +func (mq *MQ) Push(msg []byte) (uint64, error) { + return mq.Group(gDEAFULT_MQGROUP_NAME).Push(msg) +} + +// 向默认分类队列头获取消息 +func (mq *MQ) Pop() []byte { + return mq.Group(gDEAFULT_MQGROUP_NAME).Pop() +} \ No newline at end of file diff --git a/g/database/gmq/gmq_group.go b/g/database/gmq/gmq_group.go new file mode 100644 index 000000000..a9dfdede7 --- /dev/null +++ b/g/database/gmq/gmq_group.go @@ -0,0 +1,187 @@ +package gmq + +import ( + "sync/atomic" + "os" + "gitee.com/johng/gf/g/os/gfile" + "strconv" + "gitee.com/johng/gf/g/os/gfilepool" + "gitee.com/johng/gf/g/encoding/gbinary" + "strings" + "bytes" + "math" + "sort" +) + +// 根据需要写入的id获取对应的文件指针 +func (mqg *MQGroup) getFileById(id uint64) (*os.File, error) { + fnum := int(id/uint64(gMQFILE_MAX_COUNT)) + path := mqg.path + gfile.Separator + strconv.Itoa(fnum) + ".mq" + if file, err := gfilepool.OpenWithPool(path, os.O_RDWR|os.O_CREATE, gMQFILEPOOL_TIMEOUT); err == nil { + return file.File(), nil + } else { + return nil, err + } +} + +// 根据消息id计算索引在队列文件中的偏移量 +func (mqg *MQGroup) getIndexOffsetById(id uint64) int64 { + return int64(id%gMQFILE_MAX_COUNT)*gMQFILE_INDEX_ITEM_SIZE +} + +// 获取当前队列分类的最大id +func (mqg *MQGroup) init() { + minid := uint64(0) + maxid := uint64(0) + fnums := make([]uint64, 0) + files := gfile.ScanDir(mqg.path) + // 查找最大编号的队列文件 + for _, name := range files { + if n, err := strconv.ParseUint(strings.Split(name, ".")[0], 10, 64); err == nil { + fnums = append(fnums, n) + } + } + sort.Slice(fnums, func(i, j int) bool { return fnums[i] < fnums[j] }) + if len(fnums) == 0 { + return + } + // 查找当前队列文件的消息数量(最大id) + if file, err := mqg.getFileById(uint64(fnums[len(fnums) - 1])*gMQFILE_MAX_COUNT); err == nil { + if ixbuffer := gfile.GetBinContentByTwoOffsets(file, 0, gMQFILE_INDEX_LENGTH); ixbuffer != nil { + zerob := make([]byte, gMQFILE_INDEX_ITEM_SIZE) + for i := 0; i < gMQFILE_INDEX_LENGTH; i += gMQFILE_INDEX_ITEM_SIZE { + maxid = uint64(fnums[len(fnums) - 1]*gMQFILE_MAX_COUNT) + if bytes.Compare(zerob, ixbuffer[i : i + gMQFILE_INDEX_ITEM_SIZE]) == 0 { + if i > 0 { + maxid = uint64(fnums[len(fnums) - 1]*gMQFILE_MAX_COUNT) + uint64((i - gMQFILE_INDEX_ITEM_SIZE)/gMQFILE_INDEX_ITEM_SIZE) + } + if maxid != 0 { + break + } + } + + } + } + } + // 查找使用的最小id + for _, fnum := range fnums { + if file, err := mqg.getFileById(uint64(fnum)*gMQFILE_MAX_COUNT); err == nil { + if ixbuffer := gfile.GetBinContentByTwoOffsets(file, 0, gMQFILE_INDEX_LENGTH); ixbuffer != nil { + for i := 0; i < gMQFILE_INDEX_LENGTH; i += gMQFILE_INDEX_ITEM_SIZE { + status := gbinary.DecodeToInt8(ixbuffer[i : i + 1]) + if status == 1 { + minid = uint64(fnum*gMQFILE_MAX_COUNT) + uint64((i)/gMQFILE_INDEX_ITEM_SIZE) + if minid != 0 { + break + } + } + } + } + } + if minid != uint64(math.MaxUint64) { + break + } + } + mqg.minid = minid + mqg.maxid = maxid +} + +// 获取队列的总数 +func (mqg *MQGroup) Length() uint64 { + if atomic.LoadUint64(&mqg.minid) > 0 { + return atomic.LoadUint64(&mqg.maxid) - atomic.LoadUint64(&mqg.minid) + 1 + } + return 0 +} + +// 添加队列消息 +func (mqg *MQGroup) Push(msg []byte) (uint64, error) { + // 锁定队列文件 + mqg.mu.Lock() + defer mqg.mu.Unlock() + + id := mqg.maxid + 1 + file, err := mqg.getFileById(id) + if err != nil { + return 0, err + } + ixoffset := mqg.getIndexOffsetById(id) + + + // 消息数据写到文件末尾 + dataOffset, err := file.Seek(0, 2) + if err != nil { + return 0, err + } + if dataOffset < gMQFILE_INDEX_LENGTH { + dataOffset = gMQFILE_INDEX_LENGTH + // 文件需要初始化,索引域初始化为0 + if _, err := file.WriteAt(make([]byte, gMQFILE_INDEX_LENGTH), 0); err != nil { + return 0, err + } + } + if _, err := file.WriteAt(msg, dataOffset); err != nil { + return 0, err + } + // 数据写入成功后再写入索引域 + bits := make([]gbinary.Bit, 0) + bits = gbinary.EncodeBits(bits, uint(dataOffset), 40) + bits = gbinary.EncodeBits(bits, uint(len(msg)), 24) + indexb := append(gbinary.EncodeInt8(1), gbinary.EncodeBitsToBytes(bits)...) + if _, err := file.WriteAt(indexb, ixoffset); err != nil { + return 0, err + } + // 执行成功之后最大id才会递增 + mqg.maxid++ + return id, nil +} + +// POP +func (mqg *MQGroup) Pop() []byte { + mqg.mu.Lock() + defer mqg.mu.Unlock() + minid := mqg.minid + if minid == 0 { + minid = 1 + } + if buffer := mqg.get(minid); buffer != nil { + if mqg.remove(minid) == nil { + mqg.minid = minid + 1 + return buffer + } + } + return nil +} + +// 根据消息id查询消息 +func (mqg *MQGroup) get(id uint64) []byte { + file, err := mqg.getFileById(id) + if err != nil { + return nil + } + offset := mqg.getIndexOffsetById(id) + if ixbuffer := gfile.GetBinContentByTwoOffsets(file, offset, offset + gMQFILE_INDEX_ITEM_SIZE); ixbuffer != nil { + status := gbinary.DecodeToInt8(ixbuffer[0 : 1]) + if status > 0 { + bits := gbinary.DecodeBytesToBits(ixbuffer[1 : ]) + start := gbinary.DecodeBits(bits[0 : 40]) + size := gbinary.DecodeBits(bits[40 : ]) + end := start + size + return gfile.GetBinContentByTwoOffsets(file, int64(start), int64(end)) + } + } + return nil +} + +// 根据消息id删除消息 +func (mqg *MQGroup) remove(id uint64) error { + file, err := mqg.getFileById(id) + if err != nil { + return err + } + // 标记对应索引字段为删除(软删除) + if _, err := file.WriteAt([]byte{byte(0)}, mqg.getIndexOffsetById(id)); err != nil { + return err + } + return nil +} \ No newline at end of file diff --git a/geg/database/gmq/gmq.go b/geg/database/gmq/gmq.go new file mode 100644 index 000000000..be979edda --- /dev/null +++ b/geg/database/gmq/gmq.go @@ -0,0 +1,23 @@ +package main + +import ( + "gitee.com/johng/gf/g/database/gmq" + "fmt" +) + +func main() { + mq, err := gmq.New("/tmp/gmq") + if err != nil { + fmt.Println(err) + } + + //t1 := gtime.Microsecond() + //for i := 0; i < 10; i++ { + // mq.Group("test").Push([]byte("gmq_message_" + strconv.Itoa(i))) + //} + //fmt.Println("push cost:", gtime.Microsecond() - t1) + fmt.Println(string(mq.Group("test").Pop())) + fmt.Println("length", mq.Group("test").Length()) + //fmt.Println(mq.Group("test").Add([]byte("gmq_message"))) + //fmt.Println(mq.Group("test").Remove(1000)) +} \ No newline at end of file diff --git a/geg/other/test.go b/geg/other/test.go index c3830c5e1..c8cc19fe6 100644 --- a/geg/other/test.go +++ b/geg/other/test.go @@ -1,11 +1,11 @@ package main import ( + "sort" "fmt" - "gitee.com/johng/gf/g/frame/ginstance" ) func main() { - db := ginstance.Database() - list, _ := db.Table("test").Select() - fmt.Println(list[0]) + fnums := []uint64{0,12,2,4,5,5,2,uint64(10)} + sort.Slice(fnums, func(i, j int) bool { return fnums[i] < fnums[j] }) + fmt.Println(fnums) } \ No newline at end of file