diff --git a/g/database/gmq/gmq.go b/g/database/gmq/gmq.go deleted file mode 100644 index 42a53f0d2..000000000 --- a/g/database/gmq/gmq.go +++ /dev/null @@ -1,91 +0,0 @@ -// Go语言实现的高性能消息队列。 -// 每个xxx.mq消息文件存储10万条消息,顶部为消息索引域,底部为数据存储域。 -// 顶部索引域:消息状态(1byte) 数据开始位置(40bit,1TB) 数据长度(24bit, 16MB) -// 底部数据域:[消息数据](变长) - -// @todo 未完成开发测试,暂时无法使用 - -package gmq - -import ( - "errors" - "gitee.com/johng/gf/g/container/gmap" - "gitee.com/johng/gf/g/os/gfile" - "sync" -) - -const ( - gDEAFULT_MQGROUP_NAME = "default" // 默认的队列分组名称 - gMQFILE_MAX_COUNT = 100000 // 每个队列文件存储的消息条数上限(不能随便改,和数据结构设计有关系) - gMQFILE_INDEX_ITEM_SIZE = 9 // 每个队列文件的索引域大小(byte) - gMQFILEPOOL_TIMEOUT = 60 // 消息队列文件指针池过期时间 - gMQFILE_AUTO_CLEAN_TIMEOUT = 10 // 自动清理过期队列文件的间隔(秒) - gMQFILE_INDEX_LENGTH = gMQFILE_MAX_COUNT*gMQFILE_INDEX_ITEM_SIZE // 消息队列文件索引域固定大小 -) - -// 消息队列管理对象 -type MQ struct { - path string // 消息队列数据文件存放目录(绝对路径) - groups *gmap.StringInterfaceMap // 所有的消息队列分类 -} - -// 消息队列分类管理对象 -type MQGroup struct { - mu sync.RWMutex - minid uint64 // 队列当前最小id - maxid uint64 // 队列当前最大id - path string // 该分类的消息队列数据文件存放目录(绝对路径) - name string // 分组名称(命名规则和文件名一致,因为要生成对应的目录) - closed bool // 是否关闭 -} - -// 创建消息队列管理对象 -func New(path string) (*MQ, error) { - if !gfile.Exists(path) { - if err := gfile.Mkdir(path); err != nil { - return nil, errors.New("creating mq folder failed: " + err.Error()) - } - } - if !gfile.IsWritable(path) || !gfile.IsReadable(path) { - return nil, errors.New("permission denied to mq folder: " + path) - } - mq := &MQ { - path : path, - groups : gmap.NewStringInterfaceMap(), - } - return mq, nil -} - -// 获取或者创建一个消息队列分类 -func (mq *MQ) Group(name string) *MQGroup { - if result := mq.groups.Get(name); result != nil { - return result.(*MQGroup) - } - path := mq.path + gfile.Separator + name - mqg := &MQGroup {path : path, name : name} - mqg.init() - mq.groups.Set(name, mqg) - return mqg -} - -// 关闭分类队列 -func (mq *MQ) CloseGroup(name string) { - mq.Group(name).Close() -} - -// 关闭所有队列 -func (mq *MQ) Close() { - for _, v := range *mq.groups.Clone() { - v.(*MQGroup).Close() - } -} - -// 向默认分类队列写入消息 -func (mq *MQ) Push(msg []byte) error { - return mq.Group(gDEAFULT_MQGROUP_NAME).Push(msg) -} - -// 向默认分类队列头获取消息 -func (mq *MQ) Pop() []byte { - return mq.Group(gDEAFULT_MQGROUP_NAME).Pop() -} \ No newline at end of file diff --git a/g/database/gmq/gmq_auto.go b/g/database/gmq/gmq_auto.go deleted file mode 100644 index 3f8c6e3f0..000000000 --- a/g/database/gmq/gmq_auto.go +++ /dev/null @@ -1,30 +0,0 @@ -package gmq - -import ( - "time" - "sync/atomic" - "gitee.com/johng/gf/g/os/gfile" -) - -// 自动清理过期的队列文件 -func (mqg *MQGroup) startAutoClean() { - go func() { - minid := atomic.LoadUint64(&mqg.minid) - for !mqg.isClosed() { - id := atomic.LoadUint64(&mqg.minid) - if id != minid { - // 多个队列文件 - if id - minid >= gMQFILE_MAX_COUNT { - gfile.Remove(mqg.getFilePathById(minid)) - minid = id - } - // 数据已全部使用完 - if id == atomic.LoadUint64(&mqg.maxid) { - gfile.Remove(mqg.getFilePathById(id)) - minid = id - } - } - time.Sleep(gMQFILE_AUTO_CLEAN_TIMEOUT*time.Second) - } - }() -} \ No newline at end of file diff --git a/g/database/gmq/gmq_group.go b/g/database/gmq/gmq_group.go deleted file mode 100644 index a468d2dcf..000000000 --- a/g/database/gmq/gmq_group.go +++ /dev/null @@ -1,223 +0,0 @@ -package gmq - -import ( - "sync/atomic" - "os" - "gitee.com/johng/gf/g/os/gfile" - "strconv" - "gitee.com/johng/gf/g/os/gfilepool" - "gitee.com/johng/gf/g/encoding/gbinary" - "strings" - "bytes" - "math" - "sort" -) - -// 根据需要写入的id获取对应的文件指针 -func (mqg *MQGroup) getFileById(id uint64) (*os.File, error) { - path := mqg.getFilePathById(id) - if file, err := gfilepool.OpenWithPool(path, os.O_RDWR|os.O_CREATE, gMQFILEPOOL_TIMEOUT); err == nil { - return file.File(), nil - } else { - return nil, err - } -} - -// 根据id获取对应的队列文件的绝对路径 -func (mqg *MQGroup) getFilePathById(id uint64) string { - fnum := int(id/uint64(gMQFILE_MAX_COUNT)) - path := mqg.path + gfile.Separator + strconv.Itoa(fnum) - return path -} - -// 根据消息id计算索引在队列文件中的偏移量 -func (mqg *MQGroup) getIndexOffsetById(id uint64) int64 { - return int64(id%gMQFILE_MAX_COUNT)*gMQFILE_INDEX_ITEM_SIZE -} - -// 初始化队列分类,获取当前队列分类的最小id及最大id -func (mqg *MQGroup) init() { - minid := uint64(0) - maxid := uint64(0) - fnums := make([]uint64, 0) - files := gfile.ScanDir(mqg.path) - // 查找最大编号的队列文件 - for _, name := range files { - if n, err := strconv.ParseUint(strings.Split(name, ".")[0], 10, 64); err == nil { - fnums = append(fnums, n) - } - } - sort.Slice(fnums, func(i, j int) bool { return fnums[i] < fnums[j] }) - if len(fnums) == 0 { - return - } - // 查找当前队列文件的消息数量(最大id) - if file, err := mqg.getFileById(uint64(fnums[len(fnums) - 1])*gMQFILE_MAX_COUNT); err == nil { - if ixbuffer := gfile.GetBinContentByTwoOffsets(file, 0, gMQFILE_INDEX_LENGTH); ixbuffer != nil { - zerob := make([]byte, gMQFILE_INDEX_ITEM_SIZE) - for i := 0; i < gMQFILE_INDEX_LENGTH; i += gMQFILE_INDEX_ITEM_SIZE { - maxid = uint64(fnums[len(fnums) - 1]*gMQFILE_MAX_COUNT) - if bytes.Compare(zerob, ixbuffer[i : i + gMQFILE_INDEX_ITEM_SIZE]) == 0 { - if i > 0 { - maxid = uint64(fnums[len(fnums) - 1]*gMQFILE_MAX_COUNT) + uint64((i - gMQFILE_INDEX_ITEM_SIZE)/gMQFILE_INDEX_ITEM_SIZE) - } - if maxid != 0 { - break - } - } - - } - } - } - // 查找未使用的最小id - for _, fnum := range fnums { - if file, err := mqg.getFileById(uint64(fnum)*gMQFILE_MAX_COUNT); err == nil { - if ixbuffer := gfile.GetBinContentByTwoOffsets(file, 0, gMQFILE_INDEX_LENGTH); ixbuffer != nil { - for i := 0; i < gMQFILE_INDEX_LENGTH; i += gMQFILE_INDEX_ITEM_SIZE { - status := gbinary.DecodeToInt8(ixbuffer[i : i + 1]) - if status == 1 { - minid = uint64(fnum*gMQFILE_MAX_COUNT) + uint64((i)/gMQFILE_INDEX_ITEM_SIZE) - if minid != 0 { - break - } - } - } - } - } - if minid != uint64(math.MaxUint64) { - break - } - } - mqg.minid = minid - mqg.maxid = maxid - mqg.startAutoClean() -} - -func (mqg *MQGroup) getMinId() uint64 { - return atomic.LoadUint64(&mqg.minid) -} - -func (mqg *MQGroup) setMinId(id uint64) { - atomic.StoreUint64(&mqg.minid, id) -} - -func (mqg *MQGroup) getMaxId() uint64 { - return atomic.LoadUint64(&mqg.maxid) -} - -func (mqg *MQGroup) setMaxId(id uint64) { - atomic.StoreUint64(&mqg.maxid, id) -} - -// 获取队列的总数 -func (mqg *MQGroup) Length() uint64 { - if mqg.getMinId() > 0 { - return mqg.getMaxId() - mqg.getMinId() + 1 - } - return 0 -} - -// Push -func (mqg *MQGroup) Push(msg []byte) error { - // 锁定队列文件 - mqg.mu.Lock() - defer mqg.mu.Unlock() - - id := mqg.maxid + 1 - file, err := mqg.getFileById(id) - if err != nil { - return err - } - ixoffset := mqg.getIndexOffsetById(id) - - // 消息数据写到文件末尾 - dataOffset, err := file.Seek(0, 2) - if err != nil { - return err - } - if dataOffset < gMQFILE_INDEX_LENGTH { - dataOffset = gMQFILE_INDEX_LENGTH - // 文件需要初始化,索引域初始化为0 - if _, err := file.WriteAt(make([]byte, gMQFILE_INDEX_LENGTH), 0); err != nil { - return err - } - } - if _, err := file.WriteAt(msg, dataOffset); err != nil { - return err - } - // 数据写入成功后再写入索引域 - bits := make([]gbinary.Bit, 0) - bits = gbinary.EncodeBits(bits, uint(dataOffset), 40) - bits = gbinary.EncodeBits(bits, uint(len(msg)), 24) - indexb := append(gbinary.EncodeInt8(1), gbinary.EncodeBitsToBytes(bits)...) - if _, err := file.WriteAt(indexb, ixoffset); err != nil { - return err - } - // 执行成功之后最大id才会递增 - mqg.maxid++ - return nil -} - -// POP,队列没有数据返回nil -func (mqg *MQGroup) Pop() []byte { - mqg.mu.Lock() - defer mqg.mu.Unlock() - // 没有数据可获取 - if mqg.minid == 0 || mqg.minid == mqg.maxid { - return nil - } - if buffer := mqg.get(mqg.minid); buffer != nil { - if mqg.remove(mqg.minid) == nil { - mqg.minid++ - return buffer - } - } - return nil -} - -// 根据消息id查询消息 -func (mqg *MQGroup) get(id uint64) []byte { - file, err := mqg.getFileById(id) - if err != nil { - return nil - } - offset := mqg.getIndexOffsetById(id) - if ixbuffer := gfile.GetBinContentByTwoOffsets(file, offset, offset + gMQFILE_INDEX_ITEM_SIZE); ixbuffer != nil { - status := gbinary.DecodeToInt8(ixbuffer[0 : 1]) - if status > 0 { - bits := gbinary.DecodeBytesToBits(ixbuffer[1 : ]) - start := gbinary.DecodeBits(bits[0 : 40]) - size := gbinary.DecodeBits(bits[40 : ]) - end := start + size - return gfile.GetBinContentByTwoOffsets(file, int64(start), int64(end)) - } - } - return nil -} - -// 根据消息id删除消息 -func (mqg *MQGroup) remove(id uint64) error { - file, err := mqg.getFileById(id) - if err != nil { - return err - } - // 标记对应索引字段为删除(软删除) - if _, err := file.WriteAt([]byte{byte(0)}, mqg.getIndexOffsetById(id)); err != nil { - return err - } - return nil -} - -// 关闭队列分类,自动回收资源 -func (mqg *MQGroup) Close() { - mqg.mu.Lock() - defer mqg.mu.Unlock() - mqg.closed = true -} - -// 队列分类是否已关闭 -func (mqg *MQGroup) isClosed() bool { - mqg.mu.RLock() - defer mqg.mu.RUnlock() - return mqg.closed -} \ No newline at end of file diff --git a/geg/other/test.go b/geg/other/test.go index 8688185dd..cee21fbac 100644 --- a/geg/other/test.go +++ b/geg/other/test.go @@ -2,23 +2,10 @@ package main import ( "fmt" - "reflect" + "math" ) -type Test struct { - name string -} - -func (t *Test) Show(s string) { - fmt.Println(t.name) - fmt.Println(s) -} - -type F func(string) func main() { - t := &Test{} - t.name = "john" - reflect.ValueOf(t).Method(0).Interface().(F)("123") - + fmt.Printf("given id not match current maxid [%d]", int(math.MaxInt64)) } \ No newline at end of file