改进gfsnotify模块,去掉与gfile模块的依赖;改进gfpool文件指针池,加上对gfsnotify模块的依赖,使用全局文件监控对象

This commit is contained in:
john
2018-11-03 17:50:00 +08:00
parent 464b6ed7ec
commit 276bae7ef1
9 changed files with 342 additions and 221 deletions

View File

@ -12,7 +12,7 @@ import (
"gitee.com/johng/gf/g/container/gmap"
"gitee.com/johng/gf/g/container/gpool"
"gitee.com/johng/gf/g/container/gtype"
"gitee.com/johng/gf/third/github.com/fsnotify/fsnotify"
"gitee.com/johng/gf/g/os/gfsnotify"
"os"
"sync"
)
@ -21,8 +21,7 @@ import (
type Pool struct {
id *gtype.Int // 指针池ID用以识别指针池是否重建
pool *gpool.Pool // 底层对象池
inited *gtype.Bool // 是否初始化(在执行第一次File方法后初始化)
watcher *fsnotify.Watcher // 文件监控对象
inited *gtype.Bool // 是否初始化(在执行第一次File方法后初始化,主要用于监听的添加,但是只能添加一次)
closeChan chan struct{} // 关闭事件
expire int // 过期时间
}
@ -48,16 +47,9 @@ func Open(path string, flag int, perm os.FileMode, expire...int) (file *File, er
fpExpire = expire[0]
}
pool := pools.GetOrSetFuncLock(fmt.Sprintf("%s&%d&%d&%d", path, flag, expire, perm), func() interface{} {
if p, e := New(path, flag, perm, fpExpire); e == nil {
return p
} else {
err = e
}
return nil
return New(path, flag, perm, fpExpire)
}).(*Pool)
if pool == nil {
return nil, err
}
return pool.File()
}
@ -67,7 +59,7 @@ func OpenFile(path string, flag int, perm os.FileMode, expire...int) (file *File
// 创建一个文件指针池expire = 0表示不过期expire < 0表示使用完立即回收expire > 0表示超时回收默认值为0不过期
// 过期时间单位:毫秒
func New(path string, flag int, perm os.FileMode, expire...int) (*Pool, error) {
func New(path string, flag int, perm os.FileMode, expire...int) *Pool {
fpExpire := 0
if len(expire) > 0 {
fpExpire = expire[0]
@ -79,12 +71,7 @@ func New(path string, flag int, perm os.FileMode, expire...int) (*Pool, error) {
closeChan : make(chan struct{}),
}
p.pool = newFilePool(p, path, flag, perm, fpExpire)
if watcher, err := fsnotify.NewWatcher(); err == nil {
p.watcher = watcher
} else {
return nil, err
}
return p, nil
return p
}
// 创建文件指针池
@ -142,31 +129,18 @@ func (p *Pool) File() (*File, error) {
}
if !p.inited.Set(true) {
if err := p.watcher.Add(f.path); err != nil {
p.inited.Set(false)
}
go func() {
for {
select {
// 关闭事件
case <- p.closeChan:
return
// 监听事件
case ev := <- p.watcher.Events:
// 如果文件被删除或者重命名,立即重建指针池
if ev.Op & fsnotify.Remove == fsnotify.Remove || ev.Op & fsnotify.Rename == fsnotify.Rename {
// 原有的指针都不要了
p.id.Add(1)
// Clear相当于重建指针池
p.pool.Clear()
// 为保证原子操作,但又不想加锁,
// 这里再执行一次原子Add将在两次Add中间可能分配出去的文件指针丢弃掉
p.id.Add(1)
}
}
gfsnotify.Add(f.path, func(event *gfsnotify.Event) {
// 如果文件被删除或者重命名,立即重建指针池
if event.IsRemove() || event.IsRename() {
// 原有的指针都不要了
p.id.Add(1)
// Clear相当于重建指针池
p.pool.Clear()
// 为保证原子操作,但又不想加锁,
// 这里再执行一次原子Add将在两次Add中间可能分配出去的文件指针丢弃掉
p.id.Add(1)
}
}()
}, false)
}
return f, nil
}

View File

@ -9,14 +9,11 @@
package gfsnotify
import (
"errors"
"gitee.com/johng/gf/g/os/glog"
"gitee.com/johng/gf/third/github.com/fsnotify/fsnotify"
"gitee.com/johng/gf/g/os/gfile"
"gitee.com/johng/gf/g/container/gmap"
"gitee.com/johng/gf/g/container/glist"
"gitee.com/johng/gf/g/container/gqueue"
"fmt"
"gitee.com/johng/gf/g/encoding/ghash"
"gitee.com/johng/gf/g/os/gcache"
"gitee.com/johng/gf/third/github.com/fsnotify/fsnotify"
)
// 监听管理对象
@ -25,18 +22,21 @@ type Watcher struct {
events *gqueue.Queue // 过滤后的事件通知,不会出现重复事件
closeChan chan struct{} // 关闭事件
callbacks *gmap.StringInterfaceMap // 监听的回调函数
cache *gcache.Cache // 缓存对象,用于事件重复过滤
}
// 监听事件对象
type Event struct {
Path string // 文件绝对路径
Op Op // 触发监听的文件操作
Watcher *Watcher // 时间对应的监听对象
event fsnotify.Event // 底层事件对象
Path string // 文件绝对路径
Op Op // 触发监听的文件操作
Watcher *Watcher // 事件对应的监听对象
}
// 按位进行识别的操作集合
type Op uint32
// 必须放到一个const分组里面
const (
CREATE Op = 1 << iota
WRITE
@ -45,13 +45,30 @@ const (
CHMOD
)
// 全局监听对象,方便应用端调用
var watcher, _ = New()
const (
REPEAT_EVENT_FILTER_INTERVAL = 1 // (毫秒)重复事件过滤间隔
DEFAULT_WATCHER_COUNT = 4 // 默认创建的监控对象数量(使用哈希取模)
)
// 创建监听管理对象
// 全局监听对象,方便应用端调用
var watchers = make([]*Watcher, DEFAULT_WATCHER_COUNT)
// 包初始化创建8个watcher对象用于包默认管理监听
func init() {
for i := 0; i < DEFAULT_WATCHER_COUNT; i++ {
if w, err := New(); err == nil {
watchers[i] = w
} else {
panic(err)
}
}
}
// 创建监听管理对象主要注意的是创建监听对象会占用系统的inotify句柄数量受到 fs.inotify.max_user_instances 的限制
func New() (*Watcher, error) {
if watch, err := fsnotify.NewWatcher(); err == nil {
w := &Watcher {
cache : gcache.New(),
watcher : watch,
events : gqueue.New(),
closeChan : make(chan struct{}),
@ -67,165 +84,15 @@ func New() (*Watcher, error) {
// 添加对指定文件/目录的监听,并给定回调函数;如果给定的是一个目录,默认递归监控。
func Add(path string, callback func(event *Event), recursive...bool) error {
if watcher == nil {
return errors.New("global watcher creating failed")
}
return watcher.Add(path, callback, recursive...)
return getWatcherByPath(path).Add(path, callback, recursive...)
}
// 移除监听,默认递归删除。
func Remove(path string) error {
if watcher == nil {
return errors.New("global watcher creating failed")
}
return watcher.Remove(path)
return getWatcherByPath(path).Remove(path)
}
// 关闭监听管理对象
func (w *Watcher) Close() {
w.watcher.Close()
w.events.Close()
close(w.closeChan)
// 根据path计算对应的watcher对象
func getWatcherByPath(path string) *Watcher {
return watchers[ghash.BKDRHash([]byte(path)) % DEFAULT_WATCHER_COUNT]
}
// 添加对指定文件/目录的监听,并给定回调函数
func (w *Watcher) addWatch(path string, callback func(event *Event)) error {
// 这里统一转换为当前系统的绝对路径,便于统一监控文件名称
t := gfile.RealPath(path)
if t == "" {
return errors.New(fmt.Sprintf(`"%s" does not exist`, path))
}
path = t
// 注册回调函数
w.callbacks.LockFunc(func(m map[string]interface{}) {
var result interface{}
if v, ok := m[path]; !ok {
result = glist.New()
m[path] = result
} else {
result = v
}
result.(*glist.List).PushBack(callback)
})
// 添加底层监听
w.watcher.Add(path)
return nil
}
// 添加监控path参数支持文件或者目录路径recursive为非必需参数默认为递归添加监控(当path为目录时)
func (w *Watcher) Add(path string, callback func(event *Event), recursive...bool) error {
if gfile.IsDir(path) && (len(recursive) == 0 || recursive[0]) {
paths, _ := gfile.ScanDir(path, "*", true)
list := []string{path}
list = append(list, paths...)
for _, v := range list {
if err := w.addWatch(v, callback); err != nil {
return err
}
}
return nil
} else {
return w.addWatch(path, callback)
}
}
// 移除监听
func (w *Watcher) removeWatch(path string) error {
w.callbacks.Remove(path)
return w.watcher.Remove(path)
}
// 递归移除监听
func (w *Watcher) Remove(path string) error {
if gfile.IsDir(path) {
paths, _ := gfile.ScanDir(path, "*", true)
list := []string{path}
list = append(list, paths...)
for _, v := range list {
if err := w.removeWatch(v); err != nil {
return err
}
}
return nil
} else {
return w.removeWatch(path)
}
}
// 监听循环
func (w *Watcher) startWatchLoop() {
go func() {
for {
select {
// 关闭事件
case <- w.closeChan:
return
// 监听事件
case ev := <- w.watcher.Events:
//glog.Debug("gfsnotify: watch loop", ev)
w.events.Push(&Event{
Path : ev.Name,
Op : Op(ev.Op),
})
case err := <- w.watcher.Errors:
glog.Error("error : ", err);
}
}
}()
}
// 检索给定path的回调方法**列表**
func (w *Watcher) getCallbacks(path string) *glist.List {
for path != "/" {
if l := w.callbacks.Get(path); l != nil {
return l.(*glist.List)
} else {
path = gfile.Dir(path)
}
}
return nil
}
// 事件循环
func (w *Watcher) startEventLoop() {
go func() {
for {
if v := w.events.Pop(); v != nil {
//glog.Debug("gfsnotidy: event loop", v)
event := v.(*Event)
if event.IsRemove() {
if gfile.Exists(event.Path) {
// 如果是文件删除事件,判断该文件是否存在,如果存在,那么将此事件认为“假删除”,
// 并重新添加监控(底层fsnotify会自动删除掉监控这里重新添加回去)
w.watcher.Add(event.Path)
// 修改事件操作为重命名(相当于重命名为自身名称,最终名称没变)
event.Op = RENAME
} else {
// 如果是真实删除,那么递归删除监控信息
w.Remove(event.Path)
}
}
//glog.Debug("gfsnotidy: event loop callbacks", v)
callbacks := w.getCallbacks(event.Path)
// 如果创建了新的目录,那么将这个目录递归添加到监控中
if event.IsCreate() && gfile.IsDir(event.Path) {
for _, callback := range callbacks.FrontAll() {
w.Add(event.Path, callback.(func(event *Event)))
}
}
if callbacks != nil {
go func(callbacks *glist.List) {
for _, callback := range callbacks.FrontAll() {
callback.(func(event *Event))(event)
}
}(callbacks)
}
} else {
break
}
}
}()
}

View File

@ -6,9 +6,13 @@
package gfsnotify
func (e *Event) String() string {
return e.event.String()
}
// 文件/目录创建
func (e *Event) IsCreate() bool {
return e.Op & CREATE == CREATE
return e.Op == 1 || e.Op & CREATE == CREATE
}
// 文件/目录修改

View File

@ -0,0 +1,96 @@
// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// ThIs Source Code Form Is subject to the terms of the MIT License.
// If a copy of the MIT was not dIstributed with thIs file,
// You can obtain one at https://gitee.com/johng/gf.
package gfsnotify
import (
"fmt"
"os"
"path/filepath"
"sort"
"strings"
)
// 获取指定文件路径的目录地址绝对路径
func fileDir(path string) string {
return filepath.Dir(path)
}
// 将所给定的路径转换为绝对路径
// 并判断文件路径是否存在,如果文件不存在,那么返回空字符串
func fileRealPath(path string) string {
p, err := filepath.Abs(path)
if err != nil {
return ""
}
if !fileExists(p) {
return ""
}
return p
}
// 判断所给路径文件/文件夹是否存在
func fileExists(path string) bool {
if _, err := os.Stat(path); !os.IsNotExist(err) {
return true
}
return false
}
// 判断所给路径是否为文件夹
func fileIsDir(path string) bool {
s, err := os.Stat(path)
if err != nil {
return false
}
return s.IsDir()
}
// 打开目录,并返回其下一级文件列表(绝对路径),按照文件名称大小写进行排序,支持目录递归遍历。
func fileScanDir(path string, pattern string, recursive ... bool) ([]string, error) {
list, err := fileDoScanDir(path, pattern, recursive...)
if err != nil {
return nil, err
}
if len(list) > 0 {
sort.Strings(list)
}
return list, nil
}
// 内部检索目录方法,支持递归,返回没有排序的文件绝对路径列表结果。
// pattern参数支持多个文件名称模式匹配使用','符号分隔多个模式。
func fileDoScanDir(path string, pattern string, recursive ... bool) ([]string, error) {
var list []string
// 打开目录
dfile, err := os.Open(path)
if err != nil {
return nil, err
}
defer dfile.Close()
// 读取目录下的文件列表
names, err := dfile.Readdirnames(-1)
if err != nil {
return nil, err
}
// 是否递归遍历
for _, name := range names {
path := fmt.Sprintf("%s%s%s", path, string(filepath.Separator), name)
if fileIsDir(path) && len(recursive) > 0 && recursive[0] {
array, _ := fileDoScanDir(path, pattern, true)
if len(array) > 0 {
list = append(list, array...)
}
}
// 满足pattern才加入结果列表
for _, p := range strings.Split(pattern, ",") {
if match, err := filepath.Match(strings.TrimSpace(p), name); err == nil && match {
list = append(list, path)
}
}
}
return list, nil
}

View File

@ -0,0 +1,165 @@
// Copyright 2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved.
//
// This Source Code Form is subject to the terms of the MIT License.
// If a copy of the MIT was not distributed with this file,
// You can obtain one at https://gitee.com/johng/gf.
package gfsnotify
import (
"errors"
"fmt"
"gitee.com/johng/gf/g/container/glist"
)
// 关闭监听管理对象
func (w *Watcher) Close() {
w.watcher.Close()
w.events.Close()
close(w.closeChan)
}
// 添加对指定文件/目录的监听,并给定回调函数
func (w *Watcher) addWatch(path string, callback func(event *Event)) error {
// 这里统一转换为当前系统的绝对路径,便于统一监控文件名称
t := fileRealPath(path)
if t == "" {
return errors.New(fmt.Sprintf(`"%s" does not exist`, path))
}
path = t
// 注册回调函数
w.callbacks.LockFunc(func(m map[string]interface{}) {
var result interface{}
if v, ok := m[path]; !ok {
result = glist.New()
m[path] = result
} else {
result = v
}
result.(*glist.List).PushBack(callback)
})
// 添加底层监听
w.watcher.Add(path)
return nil
}
// 添加监控path参数支持文件或者目录路径recursive为非必需参数默认为递归添加监控(当path为目录时)
func (w *Watcher) Add(path string, callback func(event *Event), recursive...bool) error {
if fileIsDir(path) && (len(recursive) == 0 || recursive[0]) {
paths, _ := fileScanDir(path, "*", true)
list := []string{path}
list = append(list, paths...)
for _, v := range list {
if err := w.addWatch(v, callback); err != nil {
return err
}
}
return nil
} else {
return w.addWatch(path, callback)
}
}
// 移除监听
func (w *Watcher) removeWatch(path string) error {
w.callbacks.Remove(path)
return w.watcher.Remove(path)
}
// 递归移除监听
func (w *Watcher) Remove(path string) error {
if fileIsDir(path) {
paths, _ := fileScanDir(path, "*", true)
list := []string{path}
list = append(list, paths...)
for _, v := range list {
if err := w.removeWatch(v); err != nil {
return err
}
}
return nil
} else {
return w.removeWatch(path)
}
}
// 监听循环
func (w *Watcher) startWatchLoop() {
go func() {
for {
select {
// 关闭事件
case <- w.closeChan:
return
// 监听事件
case ev := <- w.watcher.Events:
key := ev.String()
if !w.cache.Contains(key) {
w.cache.Set(key, struct {}{}, REPEAT_EVENT_FILTER_INTERVAL)
w.events.Push(&Event{
event : ev,
Path : ev.Name,
Op : Op(ev.Op),
Watcher : w,
})
}
case err := <- w.watcher.Errors:
panic("error : " + err.Error());
}
}
}()
}
// 检索给定path的回调方法**列表**
func (w *Watcher) getCallbacks(path string) *glist.List {
for path != "/" {
if l := w.callbacks.Get(path); l != nil {
return l.(*glist.List)
} else {
path = fileDir(path)
}
}
return nil
}
// 事件循环
func (w *Watcher) startEventLoop() {
go func() {
for {
if v := w.events.Pop(); v != nil {
event := v.(*Event)
if event.IsRemove() {
if fileExists(event.Path) {
// 如果是文件删除事件,判断该文件是否存在,如果存在,那么将此事件认为“假删除”,
// 并重新添加监控(底层fsnotify会自动删除掉监控这里重新添加回去)
w.watcher.Add(event.Path)
// 修改事件操作为重命名(相当于重命名为自身名称,最终名称没变)
event.Op = RENAME
} else {
// 如果是真实删除,那么递归删除监控信息
w.Remove(event.Path)
}
}
callbacks := w.getCallbacks(event.Path)
// 如果创建了新的目录,那么将这个目录递归添加到监控中
if event.IsCreate() && fileIsDir(event.Path) {
for _, callback := range callbacks.FrontAll() {
w.Add(event.Path, callback.(func(event *Event)))
}
}
if callbacks != nil {
go func(callbacks *glist.List) {
for _, callback := range callbacks.FrontAll() {
callback.(func(event *Event))(event)
}
}(callbacks)
}
} else {
break
}
}
}()
}

View File

@ -12,6 +12,7 @@ func main() {
time.Sleep(time.Second)
if f, err := gfpool.Open("/home/john/temp/log.log", os.O_RDONLY, 0666, 60000000*1000); err == nil {
fmt.Println(f.Name())
f.Close()
} else {
fmt.Println(err)
}

View File

@ -1,30 +1,31 @@
package main
import (
"log"
"gitee.com/johng/gf/g/os/gfsnotify"
"gitee.com/johng/gf/g/os/glog"
)
func main() {
err := gfsnotify.Add("/home/john/temp", func(event *gfsnotify.Event) {
if event.IsCreate() {
log.Println("创建文件 : ", event.Path)
glog.Println("创建文件 : ", event.Path)
}
if event.IsWrite() {
log.Println("写入文件 : ", event.Path)
glog.Println("写入文件 : ", event.Path)
}
if event.IsRemove() {
log.Println("删除文件 : ", event.Path)
glog.Println("删除文件 : ", event.Path)
}
if event.IsRename() {
log.Println("重命名文件 : ", event.Path)
glog.Println("重命名文件 : ", event.Path)
}
if event.IsChmod() {
log.Println("修改权限 : ", event.Path)
glog.Println("修改权限 : ", event.Path)
}
glog.Println(event)
})
if err != nil {
log.Fatalln(err)
glog.Fatalln(err)
} else {
select {}
}

View File

@ -2,16 +2,27 @@ package main
import (
"fmt"
"gitee.com/johng/gf/g/os/gtime"
"gitee.com/johng/gf/g/os/gfile"
"gitee.com/johng/gf/third/github.com/fsnotify/fsnotify"
"os"
)
func main() {
if w, err := fsnotify.NewWatcher(); err != nil {
fmt.Println(err)
} else {
fmt.Println(gtime.Now().String())
w.Add("/tmp/test")
index := 0
if array, err := gfile.ScanDir("/home/john", "*", true); err == nil {
for _, path := range array {
index++
if err := w.Add(path); err != nil {
fmt.Println(err)
os.Exit(1)
} else {
fmt.Println(index)
}
}
}
}
}

View File

@ -1,6 +1,7 @@
package main
import (
"fmt"
"gitee.com/johng/gf/g/container/garray"
)
@ -8,4 +9,5 @@ func main() {
a := garray.NewSortedIntArray(0)
a.Add(1)
a.Remove(0)
fmt.Println(a.Len())
}