diff --git a/g/os/gfpool/gfpool.go b/g/os/gfpool/gfpool.go index 2f1544f2a..9943b6e51 100644 --- a/g/os/gfpool/gfpool.go +++ b/g/os/gfpool/gfpool.go @@ -12,7 +12,7 @@ import ( "gitee.com/johng/gf/g/container/gmap" "gitee.com/johng/gf/g/container/gpool" "gitee.com/johng/gf/g/container/gtype" - "gitee.com/johng/gf/third/github.com/fsnotify/fsnotify" + "gitee.com/johng/gf/g/os/gfsnotify" "os" "sync" ) @@ -21,8 +21,7 @@ import ( type Pool struct { id *gtype.Int // 指针池ID,用以识别指针池是否重建 pool *gpool.Pool // 底层对象池 - inited *gtype.Bool // 是否初始化(在执行第一次File方法后初始化) - watcher *fsnotify.Watcher // 文件监控对象 + inited *gtype.Bool // 是否初始化(在执行第一次File方法后初始化,主要用于监听的添加,但是只能添加一次) closeChan chan struct{} // 关闭事件 expire int // 过期时间 } @@ -48,16 +47,9 @@ func Open(path string, flag int, perm os.FileMode, expire...int) (file *File, er fpExpire = expire[0] } pool := pools.GetOrSetFuncLock(fmt.Sprintf("%s&%d&%d&%d", path, flag, expire, perm), func() interface{} { - if p, e := New(path, flag, perm, fpExpire); e == nil { - return p - } else { - err = e - } - return nil + return New(path, flag, perm, fpExpire) }).(*Pool) - if pool == nil { - return nil, err - } + return pool.File() } @@ -67,7 +59,7 @@ func OpenFile(path string, flag int, perm os.FileMode, expire...int) (file *File // 创建一个文件指针池,expire = 0表示不过期,expire < 0表示使用完立即回收,expire > 0表示超时回收,默认值为0不过期 // 过期时间单位:毫秒 -func New(path string, flag int, perm os.FileMode, expire...int) (*Pool, error) { +func New(path string, flag int, perm os.FileMode, expire...int) *Pool { fpExpire := 0 if len(expire) > 0 { fpExpire = expire[0] @@ -79,12 +71,7 @@ func New(path string, flag int, perm os.FileMode, expire...int) (*Pool, error) { closeChan : make(chan struct{}), } p.pool = newFilePool(p, path, flag, perm, fpExpire) - if watcher, err := fsnotify.NewWatcher(); err == nil { - p.watcher = watcher - } else { - return nil, err - } - return p, nil + return p } // 创建文件指针池 @@ -142,31 +129,18 @@ func (p *Pool) File() (*File, error) { } if !p.inited.Set(true) { - if err := p.watcher.Add(f.path); err != nil { - p.inited.Set(false) - } - go func() { - for { - select { - // 关闭事件 - case <- p.closeChan: - return - - // 监听事件 - case ev := <- p.watcher.Events: - // 如果文件被删除或者重命名,立即重建指针池 - if ev.Op & fsnotify.Remove == fsnotify.Remove || ev.Op & fsnotify.Rename == fsnotify.Rename { - // 原有的指针都不要了 - p.id.Add(1) - // Clear相当于重建指针池 - p.pool.Clear() - // 为保证原子操作,但又不想加锁, - // 这里再执行一次原子Add,将在两次Add中间可能分配出去的文件指针丢弃掉 - p.id.Add(1) - } - } + gfsnotify.Add(f.path, func(event *gfsnotify.Event) { + // 如果文件被删除或者重命名,立即重建指针池 + if event.IsRemove() || event.IsRename() { + // 原有的指针都不要了 + p.id.Add(1) + // Clear相当于重建指针池 + p.pool.Clear() + // 为保证原子操作,但又不想加锁, + // 这里再执行一次原子Add,将在两次Add中间可能分配出去的文件指针丢弃掉 + p.id.Add(1) } - }() + }, false) } return f, nil } diff --git a/g/os/gfsnotify/gfsnotify.go b/g/os/gfsnotify/gfsnotify.go index 10cc71f6f..617e654ba 100644 --- a/g/os/gfsnotify/gfsnotify.go +++ b/g/os/gfsnotify/gfsnotify.go @@ -9,14 +9,11 @@ package gfsnotify import ( - "errors" - "gitee.com/johng/gf/g/os/glog" - "gitee.com/johng/gf/third/github.com/fsnotify/fsnotify" - "gitee.com/johng/gf/g/os/gfile" "gitee.com/johng/gf/g/container/gmap" - "gitee.com/johng/gf/g/container/glist" "gitee.com/johng/gf/g/container/gqueue" - "fmt" + "gitee.com/johng/gf/g/encoding/ghash" + "gitee.com/johng/gf/g/os/gcache" + "gitee.com/johng/gf/third/github.com/fsnotify/fsnotify" ) // 监听管理对象 @@ -25,18 +22,21 @@ type Watcher struct { events *gqueue.Queue // 过滤后的事件通知,不会出现重复事件 closeChan chan struct{} // 关闭事件 callbacks *gmap.StringInterfaceMap // 监听的回调函数 + cache *gcache.Cache // 缓存对象,用于事件重复过滤 } // 监听事件对象 type Event struct { - Path string // 文件绝对路径 - Op Op // 触发监听的文件操作 - Watcher *Watcher // 时间对应的监听对象 + event fsnotify.Event // 底层事件对象 + Path string // 文件绝对路径 + Op Op // 触发监听的文件操作 + Watcher *Watcher // 事件对应的监听对象 } // 按位进行识别的操作集合 type Op uint32 +// 必须放到一个const分组里面 const ( CREATE Op = 1 << iota WRITE @@ -45,13 +45,30 @@ const ( CHMOD ) -// 全局监听对象,方便应用端调用 -var watcher, _ = New() +const ( + REPEAT_EVENT_FILTER_INTERVAL = 1 // (毫秒)重复事件过滤间隔 + DEFAULT_WATCHER_COUNT = 4 // 默认创建的监控对象数量(使用哈希取模) +) -// 创建监听管理对象 +// 全局监听对象,方便应用端调用 +var watchers = make([]*Watcher, DEFAULT_WATCHER_COUNT) + +// 包初始化,创建8个watcher对象,用于包默认管理监听 +func init() { + for i := 0; i < DEFAULT_WATCHER_COUNT; i++ { + if w, err := New(); err == nil { + watchers[i] = w + } else { + panic(err) + } + } +} + +// 创建监听管理对象,主要注意的是创建监听对象会占用系统的inotify句柄数量,受到 fs.inotify.max_user_instances 的限制 func New() (*Watcher, error) { if watch, err := fsnotify.NewWatcher(); err == nil { w := &Watcher { + cache : gcache.New(), watcher : watch, events : gqueue.New(), closeChan : make(chan struct{}), @@ -67,165 +84,15 @@ func New() (*Watcher, error) { // 添加对指定文件/目录的监听,并给定回调函数;如果给定的是一个目录,默认递归监控。 func Add(path string, callback func(event *Event), recursive...bool) error { - if watcher == nil { - return errors.New("global watcher creating failed") - } - return watcher.Add(path, callback, recursive...) + return getWatcherByPath(path).Add(path, callback, recursive...) } // 移除监听,默认递归删除。 func Remove(path string) error { - if watcher == nil { - return errors.New("global watcher creating failed") - } - return watcher.Remove(path) + return getWatcherByPath(path).Remove(path) } -// 关闭监听管理对象 -func (w *Watcher) Close() { - w.watcher.Close() - w.events.Close() - close(w.closeChan) +// 根据path计算对应的watcher对象 +func getWatcherByPath(path string) *Watcher { + return watchers[ghash.BKDRHash([]byte(path)) % DEFAULT_WATCHER_COUNT] } - -// 添加对指定文件/目录的监听,并给定回调函数 -func (w *Watcher) addWatch(path string, callback func(event *Event)) error { - // 这里统一转换为当前系统的绝对路径,便于统一监控文件名称 - t := gfile.RealPath(path) - if t == "" { - return errors.New(fmt.Sprintf(`"%s" does not exist`, path)) - } - path = t - // 注册回调函数 - w.callbacks.LockFunc(func(m map[string]interface{}) { - var result interface{} - if v, ok := m[path]; !ok { - result = glist.New() - m[path] = result - } else { - result = v - } - result.(*glist.List).PushBack(callback) - }) - // 添加底层监听 - w.watcher.Add(path) - return nil -} - -// 添加监控,path参数支持文件或者目录路径,recursive为非必需参数,默认为递归添加监控(当path为目录时) -func (w *Watcher) Add(path string, callback func(event *Event), recursive...bool) error { - if gfile.IsDir(path) && (len(recursive) == 0 || recursive[0]) { - paths, _ := gfile.ScanDir(path, "*", true) - list := []string{path} - list = append(list, paths...) - for _, v := range list { - if err := w.addWatch(v, callback); err != nil { - return err - } - } - return nil - } else { - return w.addWatch(path, callback) - } -} - - -// 移除监听 -func (w *Watcher) removeWatch(path string) error { - w.callbacks.Remove(path) - return w.watcher.Remove(path) -} - -// 递归移除监听 -func (w *Watcher) Remove(path string) error { - if gfile.IsDir(path) { - paths, _ := gfile.ScanDir(path, "*", true) - list := []string{path} - list = append(list, paths...) - for _, v := range list { - if err := w.removeWatch(v); err != nil { - return err - } - } - return nil - } else { - return w.removeWatch(path) - } -} - -// 监听循环 -func (w *Watcher) startWatchLoop() { - go func() { - for { - select { - // 关闭事件 - case <- w.closeChan: - return - - // 监听事件 - case ev := <- w.watcher.Events: - //glog.Debug("gfsnotify: watch loop", ev) - w.events.Push(&Event{ - Path : ev.Name, - Op : Op(ev.Op), - }) - - case err := <- w.watcher.Errors: - glog.Error("error : ", err); - } - } - }() -} - -// 检索给定path的回调方法**列表** -func (w *Watcher) getCallbacks(path string) *glist.List { - for path != "/" { - if l := w.callbacks.Get(path); l != nil { - return l.(*glist.List) - } else { - path = gfile.Dir(path) - } - } - return nil -} - -// 事件循环 -func (w *Watcher) startEventLoop() { - go func() { - for { - if v := w.events.Pop(); v != nil { - //glog.Debug("gfsnotidy: event loop", v) - event := v.(*Event) - if event.IsRemove() { - if gfile.Exists(event.Path) { - // 如果是文件删除事件,判断该文件是否存在,如果存在,那么将此事件认为“假删除”, - // 并重新添加监控(底层fsnotify会自动删除掉监控,这里重新添加回去) - w.watcher.Add(event.Path) - // 修改事件操作为重命名(相当于重命名为自身名称,最终名称没变) - event.Op = RENAME - } else { - // 如果是真实删除,那么递归删除监控信息 - w.Remove(event.Path) - } - } - //glog.Debug("gfsnotidy: event loop callbacks", v) - callbacks := w.getCallbacks(event.Path) - // 如果创建了新的目录,那么将这个目录递归添加到监控中 - if event.IsCreate() && gfile.IsDir(event.Path) { - for _, callback := range callbacks.FrontAll() { - w.Add(event.Path, callback.(func(event *Event))) - } - } - if callbacks != nil { - go func(callbacks *glist.List) { - for _, callback := range callbacks.FrontAll() { - callback.(func(event *Event))(event) - } - }(callbacks) - } - } else { - break - } - } - }() -} \ No newline at end of file diff --git a/g/os/gfsnotify/gfsnotify_event.go b/g/os/gfsnotify/gfsnotify_event.go index 4f3352224..cb9eb9cbc 100644 --- a/g/os/gfsnotify/gfsnotify_event.go +++ b/g/os/gfsnotify/gfsnotify_event.go @@ -6,9 +6,13 @@ package gfsnotify +func (e *Event) String() string { + return e.event.String() +} + // 文件/目录创建 func (e *Event) IsCreate() bool { - return e.Op & CREATE == CREATE + return e.Op == 1 || e.Op & CREATE == CREATE } // 文件/目录修改 diff --git a/g/os/gfsnotify/gfsnotify_filefunc.go b/g/os/gfsnotify/gfsnotify_filefunc.go new file mode 100644 index 000000000..876ab0bb4 --- /dev/null +++ b/g/os/gfsnotify/gfsnotify_filefunc.go @@ -0,0 +1,96 @@ +// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// ThIs Source Code Form Is subject to the terms of the MIT License. +// If a copy of the MIT was not dIstributed with thIs file, +// You can obtain one at https://gitee.com/johng/gf. + +package gfsnotify + +import ( + "fmt" + "os" + "path/filepath" + "sort" + "strings" +) + +// 获取指定文件路径的目录地址绝对路径 +func fileDir(path string) string { + return filepath.Dir(path) +} + +// 将所给定的路径转换为绝对路径 +// 并判断文件路径是否存在,如果文件不存在,那么返回空字符串 +func fileRealPath(path string) string { + p, err := filepath.Abs(path) + if err != nil { + return "" + } + if !fileExists(p) { + return "" + } + return p +} + +// 判断所给路径文件/文件夹是否存在 +func fileExists(path string) bool { + if _, err := os.Stat(path); !os.IsNotExist(err) { + return true + } + return false +} + +// 判断所给路径是否为文件夹 +func fileIsDir(path string) bool { + s, err := os.Stat(path) + if err != nil { + return false + } + return s.IsDir() +} + +// 打开目录,并返回其下一级文件列表(绝对路径),按照文件名称大小写进行排序,支持目录递归遍历。 +func fileScanDir(path string, pattern string, recursive ... bool) ([]string, error) { + list, err := fileDoScanDir(path, pattern, recursive...) + if err != nil { + return nil, err + } + if len(list) > 0 { + sort.Strings(list) + } + return list, nil +} + +// 内部检索目录方法,支持递归,返回没有排序的文件绝对路径列表结果。 +// pattern参数支持多个文件名称模式匹配,使用','符号分隔多个模式。 +func fileDoScanDir(path string, pattern string, recursive ... bool) ([]string, error) { + var list []string + // 打开目录 + dfile, err := os.Open(path) + if err != nil { + return nil, err + } + defer dfile.Close() + // 读取目录下的文件列表 + names, err := dfile.Readdirnames(-1) + if err != nil { + return nil, err + } + // 是否递归遍历 + for _, name := range names { + path := fmt.Sprintf("%s%s%s", path, string(filepath.Separator), name) + if fileIsDir(path) && len(recursive) > 0 && recursive[0] { + array, _ := fileDoScanDir(path, pattern, true) + if len(array) > 0 { + list = append(list, array...) + } + } + // 满足pattern才加入结果列表 + for _, p := range strings.Split(pattern, ",") { + if match, err := filepath.Match(strings.TrimSpace(p), name); err == nil && match { + list = append(list, path) + } + } + } + return list, nil +} \ No newline at end of file diff --git a/g/os/gfsnotify/gfsnotify_watcher.go b/g/os/gfsnotify/gfsnotify_watcher.go new file mode 100644 index 000000000..9cb15f447 --- /dev/null +++ b/g/os/gfsnotify/gfsnotify_watcher.go @@ -0,0 +1,165 @@ +// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +package gfsnotify + +import ( + "errors" + "fmt" + "gitee.com/johng/gf/g/container/glist" +) + +// 关闭监听管理对象 +func (w *Watcher) Close() { + w.watcher.Close() + w.events.Close() + close(w.closeChan) +} + +// 添加对指定文件/目录的监听,并给定回调函数 +func (w *Watcher) addWatch(path string, callback func(event *Event)) error { + // 这里统一转换为当前系统的绝对路径,便于统一监控文件名称 + t := fileRealPath(path) + if t == "" { + return errors.New(fmt.Sprintf(`"%s" does not exist`, path)) + } + path = t + // 注册回调函数 + w.callbacks.LockFunc(func(m map[string]interface{}) { + var result interface{} + if v, ok := m[path]; !ok { + result = glist.New() + m[path] = result + } else { + result = v + } + result.(*glist.List).PushBack(callback) + }) + // 添加底层监听 + w.watcher.Add(path) + return nil +} + +// 添加监控,path参数支持文件或者目录路径,recursive为非必需参数,默认为递归添加监控(当path为目录时) +func (w *Watcher) Add(path string, callback func(event *Event), recursive...bool) error { + if fileIsDir(path) && (len(recursive) == 0 || recursive[0]) { + paths, _ := fileScanDir(path, "*", true) + list := []string{path} + list = append(list, paths...) + for _, v := range list { + if err := w.addWatch(v, callback); err != nil { + return err + } + } + return nil + } else { + return w.addWatch(path, callback) + } +} + + +// 移除监听 +func (w *Watcher) removeWatch(path string) error { + w.callbacks.Remove(path) + return w.watcher.Remove(path) +} + +// 递归移除监听 +func (w *Watcher) Remove(path string) error { + if fileIsDir(path) { + paths, _ := fileScanDir(path, "*", true) + list := []string{path} + list = append(list, paths...) + for _, v := range list { + if err := w.removeWatch(v); err != nil { + return err + } + } + return nil + } else { + return w.removeWatch(path) + } +} + +// 监听循环 +func (w *Watcher) startWatchLoop() { + go func() { + for { + select { + // 关闭事件 + case <- w.closeChan: + return + + // 监听事件 + case ev := <- w.watcher.Events: + key := ev.String() + if !w.cache.Contains(key) { + w.cache.Set(key, struct {}{}, REPEAT_EVENT_FILTER_INTERVAL) + w.events.Push(&Event{ + event : ev, + Path : ev.Name, + Op : Op(ev.Op), + Watcher : w, + }) + } + + case err := <- w.watcher.Errors: + panic("error : " + err.Error()); + } + } + }() +} + +// 检索给定path的回调方法**列表** +func (w *Watcher) getCallbacks(path string) *glist.List { + for path != "/" { + if l := w.callbacks.Get(path); l != nil { + return l.(*glist.List) + } else { + path = fileDir(path) + } + } + return nil +} + +// 事件循环 +func (w *Watcher) startEventLoop() { + go func() { + for { + if v := w.events.Pop(); v != nil { + event := v.(*Event) + if event.IsRemove() { + if fileExists(event.Path) { + // 如果是文件删除事件,判断该文件是否存在,如果存在,那么将此事件认为“假删除”, + // 并重新添加监控(底层fsnotify会自动删除掉监控,这里重新添加回去) + w.watcher.Add(event.Path) + // 修改事件操作为重命名(相当于重命名为自身名称,最终名称没变) + event.Op = RENAME + } else { + // 如果是真实删除,那么递归删除监控信息 + w.Remove(event.Path) + } + } + callbacks := w.getCallbacks(event.Path) + // 如果创建了新的目录,那么将这个目录递归添加到监控中 + if event.IsCreate() && fileIsDir(event.Path) { + for _, callback := range callbacks.FrontAll() { + w.Add(event.Path, callback.(func(event *Event))) + } + } + if callbacks != nil { + go func(callbacks *glist.List) { + for _, callback := range callbacks.FrontAll() { + callback.(func(event *Event))(event) + } + }(callbacks) + } + } else { + break + } + } + }() +} \ No newline at end of file diff --git a/geg/os/gfpool/gfpool.go b/geg/os/gfpool/gfpool.go index fd2b2315c..c960e9c2f 100644 --- a/geg/os/gfpool/gfpool.go +++ b/geg/os/gfpool/gfpool.go @@ -12,6 +12,7 @@ func main() { time.Sleep(time.Second) if f, err := gfpool.Open("/home/john/temp/log.log", os.O_RDONLY, 0666, 60000000*1000); err == nil { fmt.Println(f.Name()) + f.Close() } else { fmt.Println(err) } diff --git a/geg/os/gfsnotify/gfsnotify.go b/geg/os/gfsnotify/gfsnotify.go index 4150aed1b..4683b9d70 100644 --- a/geg/os/gfsnotify/gfsnotify.go +++ b/geg/os/gfsnotify/gfsnotify.go @@ -1,30 +1,31 @@ package main import ( - "log" "gitee.com/johng/gf/g/os/gfsnotify" + "gitee.com/johng/gf/g/os/glog" ) func main() { err := gfsnotify.Add("/home/john/temp", func(event *gfsnotify.Event) { if event.IsCreate() { - log.Println("创建文件 : ", event.Path) + glog.Println("创建文件 : ", event.Path) } if event.IsWrite() { - log.Println("写入文件 : ", event.Path) + glog.Println("写入文件 : ", event.Path) } if event.IsRemove() { - log.Println("删除文件 : ", event.Path) + glog.Println("删除文件 : ", event.Path) } if event.IsRename() { - log.Println("重命名文件 : ", event.Path) + glog.Println("重命名文件 : ", event.Path) } if event.IsChmod() { - log.Println("修改权限 : ", event.Path) + glog.Println("修改权限 : ", event.Path) } + glog.Println(event) }) if err != nil { - log.Fatalln(err) + glog.Fatalln(err) } else { select {} } diff --git a/geg/other/test.go b/geg/other/test.go index 31222b2fa..7dab9d6fe 100644 --- a/geg/other/test.go +++ b/geg/other/test.go @@ -2,16 +2,27 @@ package main import ( "fmt" - "gitee.com/johng/gf/g/os/gtime" + "gitee.com/johng/gf/g/os/gfile" "gitee.com/johng/gf/third/github.com/fsnotify/fsnotify" + "os" ) func main() { if w, err := fsnotify.NewWatcher(); err != nil { fmt.Println(err) } else { - fmt.Println(gtime.Now().String()) - w.Add("/tmp/test") + index := 0 + if array, err := gfile.ScanDir("/home/john", "*", true); err == nil { + for _, path := range array { + index++ + if err := w.Add(path); err != nil { + fmt.Println(err) + os.Exit(1) + } else { + fmt.Println(index) + } + } + } } } \ No newline at end of file diff --git a/geg/other/test2.go b/geg/other/test2.go index eac8f2d78..488908146 100644 --- a/geg/other/test2.go +++ b/geg/other/test2.go @@ -1,6 +1,7 @@ package main import ( + "fmt" "gitee.com/johng/gf/g/container/garray" ) @@ -8,4 +9,5 @@ func main() { a := garray.NewSortedIntArray(0) a.Add(1) a.Remove(0) + fmt.Println(a.Len()) } \ No newline at end of file