From 6fcf17140de7d2a91bc85c10b910fe16a752c375 Mon Sep 17 00:00:00 2001 From: john Date: Wed, 31 Oct 2018 19:00:28 +0800 Subject: [PATCH] =?UTF-8?q?gfpool=E6=94=B9=E8=BF=9B=E4=B8=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- g/container/gpool/gpool.go | 5 ++ g/os/gfpool/gfpool.go | 108 +++++++++++++++++++++--------- g/os/gfsnotify/gfsnotify.go | 11 +-- g/os/gfsnotify/gfsnotify_event.go | 2 +- geg/os/gfpool/gfpool.go | 19 ++++++ geg/other/test2.go | 26 +++---- 6 files changed, 119 insertions(+), 52 deletions(-) create mode 100644 geg/os/gfpool/gfpool.go diff --git a/g/container/gpool/gpool.go b/g/container/gpool/gpool.go index c3b6374a3..bbade019a 100644 --- a/g/container/gpool/gpool.go +++ b/g/container/gpool/gpool.go @@ -93,6 +93,11 @@ func (p *Pool) Close() { p.closed.Set(true) } +// 池是否已关闭 +func (p *Pool) IsClosed() bool { + return p.closed.Val() +} + // 超时检测循环 func (p *Pool) expireCheckingLoop() { for !p.closed.Val() { diff --git a/g/os/gfpool/gfpool.go b/g/os/gfpool/gfpool.go index 6bf38aef0..b31441a7f 100644 --- a/g/os/gfpool/gfpool.go +++ b/g/os/gfpool/gfpool.go @@ -8,16 +8,22 @@ package gfpool import ( - "os" - "sync" + "fmt" "gitee.com/johng/gf/g/container/gmap" "gitee.com/johng/gf/g/container/gpool" - "fmt" + "gitee.com/johng/gf/g/container/gtype" + "gitee.com/johng/gf/third/github.com/fsnotify/fsnotify" + "os" + "sync" ) // 文件指针池 type Pool struct { - pool *gpool.Pool // 底层对象池 + pool *gpool.Pool // 底层对象池 + inited *gtype.Bool // 是否初始化(在执行第一次File方法后初始化) + watcher *fsnotify.Watcher // 文件监控对象 + closeChan chan struct{} // 关闭事件 + expire int // 过期时间 } // 文件指针池指针 @@ -39,13 +45,9 @@ func Open(path string, flag int, perm os.FileMode, expire...int) (*File, error) if len(expire) > 0 { fpExpire = expire[0] } - key := fmt.Sprintf("%s&%d&%d&%d", path, flag, expire, perm) - result := pools.Get(key) - if result != nil { - return result.(*Pool).File() - } - pool := New(path, flag, perm, fpExpire) - pools.Set(key, pool) + pool := pools.GetOrSetFuncLock(fmt.Sprintf("%s&%d&%d&%d", path, flag, expire, perm), func() interface{} { + return New(path, flag, perm, fpExpire) + }).(*Pool) return pool.File() } @@ -60,8 +62,23 @@ func New(path string, flag int, perm os.FileMode, expire...int) *Pool { if len(expire) > 0 { fpExpire = expire[0] } - p := &Pool {} - p.pool = gpool.New(fpExpire, func() (interface{}, error) { + p := &Pool { + expire : fpExpire, + inited : gtype.NewBool(), + closeChan : make(chan struct{}), + } + p.pool = newFilePool(p, path, flag, perm, fpExpire) + if watcher, err := fsnotify.NewWatcher(); err == nil { + p.watcher = watcher + } else { + return nil + } + return p +} + +// 创建文件指针池 +func newFilePool(p *Pool, path string, flag int, perm os.FileMode, expire int) *gpool.Pool { + pool := gpool.New(expire, func() (interface{}, error) { file, err := os.OpenFile(path, flag, perm) if err != nil { return nil, err @@ -74,10 +91,10 @@ func New(path string, flag int, perm os.FileMode, expire...int) *Pool { path : path, }, nil }) - p.pool.SetExpireFunc(func(i interface{}) { + pool.SetExpireFunc(func(i interface{}) { i.(*File).File.Close() }) - return p + return pool } // 获得一个文件打开指针 @@ -87,29 +104,53 @@ func (p *Pool) File() (*File, error) { } else { f := v.(*File) if f.flag & os.O_CREATE > 0 { - if _, err := os.Stat(f.path); os.IsNotExist(err) { - if file, err := os.OpenFile(f.path, f.flag, f.perm); err != nil { - return nil, err - } else { - f.File = *file - } - } + if _, err := os.Stat(f.path); os.IsNotExist(err) { + if file, err := os.OpenFile(f.path, f.flag, f.perm); err != nil { + return nil, err + } else { + f.File = *file + } + } } if f.flag & os.O_TRUNC > 0 { - if stat, err := f.Stat(); err == nil { - if stat.Size() > 0 { - if err := f.Truncate(0); err != nil { - return nil, err - } - } - } + if stat, err := f.Stat(); err == nil { + if stat.Size() > 0 { + if err := f.Truncate(0); err != nil { + return nil, err + } + } + } } if f.flag & os.O_APPEND > 0 { - if _, err := f.Seek(0, 2); err != nil { - return nil, err - } + if _, err := f.Seek(0, 2); err != nil { + return nil, err + } } else { - f.Seek(0, 0) + f.Seek(0, 0) + } + + if !p.inited.Set(true) { + if err := p.watcher.Add(f.path); err != nil { + p.inited.Set(false) + } + go func() { + for { + select { + // 关闭事件 + case <- p.closeChan: + return + + // 监听事件 + case ev := <- p.watcher.Events: + // 如果文件被删除或者重命名,重建指针池 + if ev.Op & fsnotify.Remove == fsnotify.Remove || ev.Op & fsnotify.Rename == fsnotify.Rename { + p.pool.Close() + p.pool = newFilePool(p, f.Name(), f.flag, f.perm, f.pool.expire) + p.watcher.Remove(ev.Name) + } + } + } + }() } return f, nil } @@ -117,6 +158,7 @@ func (p *Pool) File() (*File, error) { // 关闭指针池(返回error是标准库io.ReadWriteCloser接口实现) func (p *Pool) Close() error { + close(p.closeChan) p.pool.Close() return nil } diff --git a/g/os/gfsnotify/gfsnotify.go b/g/os/gfsnotify/gfsnotify.go index 8c0263ccf..6ba1ecc7c 100644 --- a/g/os/gfsnotify/gfsnotify.go +++ b/g/os/gfsnotify/gfsnotify.go @@ -5,7 +5,7 @@ // You can obtain one at https://gitee.com/johng/gf. // 文件监控. -// 使用时需要注意的是,一旦一个文件被删除,那么对其的监控将会失效。 +// 使用时需要注意的是,一旦一个文件被删除,那么对其的监控将会失效;如果删除的是目录,那么该目录及其下的文件都将被递归删除监控。 package gfsnotify import ( @@ -29,8 +29,9 @@ type Watcher struct { // 监听事件对象 type Event struct { - Path string // 文件绝对路径 - Op Op // 触发监听的文件操作 + Path string // 文件绝对路径 + Op Op // 触发监听的文件操作 + Watcher *Watcher // 时间对应的监听对象 } // 按位进行识别的操作集合 @@ -200,8 +201,8 @@ func (w *Watcher) startEventLoop() { // 如果是文件删除事件,判断该文件是否存在,如果存在,那么将此事件认为“假删除”, // 并重新添加监控(底层fsnotify会自动删除掉监控,这里重新添加回去) w.watcher.Add(event.Path) - // 修改时间操作为写入 - event.Op = WRITE + // 修改时间操作为重命名(相当于重命名为自身名称,最终名称没变) + event.Op = RENAME } else { // 如果是真实删除,那么递归删除监控信息 w.Remove(event.Path) diff --git a/g/os/gfsnotify/gfsnotify_event.go b/g/os/gfsnotify/gfsnotify_event.go index 2950e1dcb..4f3352224 100644 --- a/g/os/gfsnotify/gfsnotify_event.go +++ b/g/os/gfsnotify/gfsnotify_event.go @@ -29,4 +29,4 @@ func (e *Event) IsRename() bool { // 文件/目录修改权限 func (e *Event) IsChmod() bool { return e.Op & CHMOD == CHMOD -} \ No newline at end of file +} diff --git a/geg/os/gfpool/gfpool.go b/geg/os/gfpool/gfpool.go new file mode 100644 index 000000000..fd2b2315c --- /dev/null +++ b/geg/os/gfpool/gfpool.go @@ -0,0 +1,19 @@ +package main + +import ( + "fmt" + "gitee.com/johng/gf/g/os/gfpool" + "os" + "time" +) + +func main() { + for { + time.Sleep(time.Second) + if f, err := gfpool.Open("/home/john/temp/log.log", os.O_RDONLY, 0666, 60000000*1000); err == nil { + fmt.Println(f.Name()) + } else { + fmt.Println(err) + } + } +} \ No newline at end of file diff --git a/geg/other/test2.go b/geg/other/test2.go index e2811e825..4b007ae67 100644 --- a/geg/other/test2.go +++ b/geg/other/test2.go @@ -1,20 +1,20 @@ package main import ( - "gitee.com/johng/gf/g" - "gitee.com/johng/gf/g/net/ghttp" + "fmt" + "gitee.com/johng/gf/g/os/gfpool" + "os" + "time" ) - func main() { - s := g.Server() - s.Domain("www.a.com").BindHandler("/*", func(r *ghttp.Request) { - r.Response.ServeFile("/home/john/www1" + r.URL.Path) - }) - s.Domain("www.b.com").BindHandler("/*", func(r *ghttp.Request) { - r.Response.ServeFile("/home/john/www2" + r.URL.Path) - }) - s.SetIndexFolder(true) - s.SetPort(8080) - s.Run() + for { + time.Sleep(time.Second) + if f, err := gfpool.Open("/home/john/temp/log.log", os.O_RDONLY, 0666, 60000000*1000); err == nil { + s, _ := f.Stat() + fmt.Println(f.Name(), s.Name()) + } else { + fmt.Println(err) + } + } } \ No newline at end of file