gfpool改进中

This commit is contained in:
john
2018-10-31 19:00:28 +08:00
parent 371ab48d7a
commit 6fcf17140d
6 changed files with 119 additions and 52 deletions

View File

@ -93,6 +93,11 @@ func (p *Pool) Close() {
p.closed.Set(true)
}
// 池是否已关闭
func (p *Pool) IsClosed() bool {
return p.closed.Val()
}
// 超时检测循环
func (p *Pool) expireCheckingLoop() {
for !p.closed.Val() {

View File

@ -8,16 +8,22 @@
package gfpool
import (
"os"
"sync"
"fmt"
"gitee.com/johng/gf/g/container/gmap"
"gitee.com/johng/gf/g/container/gpool"
"fmt"
"gitee.com/johng/gf/g/container/gtype"
"gitee.com/johng/gf/third/github.com/fsnotify/fsnotify"
"os"
"sync"
)
// 文件指针池
type Pool struct {
pool *gpool.Pool // 底层对象池
pool *gpool.Pool // 底层对象池
inited *gtype.Bool // 是否初始化(在执行第一次File方法后初始化)
watcher *fsnotify.Watcher // 文件监控对象
closeChan chan struct{} // 关闭事件
expire int // 过期时间
}
// 文件指针池指针
@ -39,13 +45,9 @@ func Open(path string, flag int, perm os.FileMode, expire...int) (*File, error)
if len(expire) > 0 {
fpExpire = expire[0]
}
key := fmt.Sprintf("%s&%d&%d&%d", path, flag, expire, perm)
result := pools.Get(key)
if result != nil {
return result.(*Pool).File()
}
pool := New(path, flag, perm, fpExpire)
pools.Set(key, pool)
pool := pools.GetOrSetFuncLock(fmt.Sprintf("%s&%d&%d&%d", path, flag, expire, perm), func() interface{} {
return New(path, flag, perm, fpExpire)
}).(*Pool)
return pool.File()
}
@ -60,8 +62,23 @@ func New(path string, flag int, perm os.FileMode, expire...int) *Pool {
if len(expire) > 0 {
fpExpire = expire[0]
}
p := &Pool {}
p.pool = gpool.New(fpExpire, func() (interface{}, error) {
p := &Pool {
expire : fpExpire,
inited : gtype.NewBool(),
closeChan : make(chan struct{}),
}
p.pool = newFilePool(p, path, flag, perm, fpExpire)
if watcher, err := fsnotify.NewWatcher(); err == nil {
p.watcher = watcher
} else {
return nil
}
return p
}
// 创建文件指针池
func newFilePool(p *Pool, path string, flag int, perm os.FileMode, expire int) *gpool.Pool {
pool := gpool.New(expire, func() (interface{}, error) {
file, err := os.OpenFile(path, flag, perm)
if err != nil {
return nil, err
@ -74,10 +91,10 @@ func New(path string, flag int, perm os.FileMode, expire...int) *Pool {
path : path,
}, nil
})
p.pool.SetExpireFunc(func(i interface{}) {
pool.SetExpireFunc(func(i interface{}) {
i.(*File).File.Close()
})
return p
return pool
}
// 获得一个文件打开指针
@ -87,29 +104,53 @@ func (p *Pool) File() (*File, error) {
} else {
f := v.(*File)
if f.flag & os.O_CREATE > 0 {
if _, err := os.Stat(f.path); os.IsNotExist(err) {
if file, err := os.OpenFile(f.path, f.flag, f.perm); err != nil {
return nil, err
} else {
f.File = *file
}
}
if _, err := os.Stat(f.path); os.IsNotExist(err) {
if file, err := os.OpenFile(f.path, f.flag, f.perm); err != nil {
return nil, err
} else {
f.File = *file
}
}
}
if f.flag & os.O_TRUNC > 0 {
if stat, err := f.Stat(); err == nil {
if stat.Size() > 0 {
if err := f.Truncate(0); err != nil {
return nil, err
}
}
}
if stat, err := f.Stat(); err == nil {
if stat.Size() > 0 {
if err := f.Truncate(0); err != nil {
return nil, err
}
}
}
}
if f.flag & os.O_APPEND > 0 {
if _, err := f.Seek(0, 2); err != nil {
return nil, err
}
if _, err := f.Seek(0, 2); err != nil {
return nil, err
}
} else {
f.Seek(0, 0)
f.Seek(0, 0)
}
if !p.inited.Set(true) {
if err := p.watcher.Add(f.path); err != nil {
p.inited.Set(false)
}
go func() {
for {
select {
// 关闭事件
case <- p.closeChan:
return
// 监听事件
case ev := <- p.watcher.Events:
// 如果文件被删除或者重命名,重建指针池
if ev.Op & fsnotify.Remove == fsnotify.Remove || ev.Op & fsnotify.Rename == fsnotify.Rename {
p.pool.Close()
p.pool = newFilePool(p, f.Name(), f.flag, f.perm, f.pool.expire)
p.watcher.Remove(ev.Name)
}
}
}
}()
}
return f, nil
}
@ -117,6 +158,7 @@ func (p *Pool) File() (*File, error) {
// 关闭指针池(返回error是标准库io.ReadWriteCloser接口实现)
func (p *Pool) Close() error {
close(p.closeChan)
p.pool.Close()
return nil
}

View File

@ -5,7 +5,7 @@
// You can obtain one at https://gitee.com/johng/gf.
// 文件监控.
// 使用时需要注意的是,一旦一个文件被删除,那么对其的监控将会失效。
// 使用时需要注意的是,一旦一个文件被删除,那么对其的监控将会失效;如果删除的是目录,那么该目录及其下的文件都将被递归删除监控
package gfsnotify
import (
@ -29,8 +29,9 @@ type Watcher struct {
// 监听事件对象
type Event struct {
Path string // 文件绝对路径
Op Op // 触发监听的文件操作
Path string // 文件绝对路径
Op Op // 触发监听的文件操作
Watcher *Watcher // 时间对应的监听对象
}
// 按位进行识别的操作集合
@ -200,8 +201,8 @@ func (w *Watcher) startEventLoop() {
// 如果是文件删除事件,判断该文件是否存在,如果存在,那么将此事件认为“假删除”,
// 并重新添加监控(底层fsnotify会自动删除掉监控这里重新添加回去)
w.watcher.Add(event.Path)
// 修改时间操作为写入
event.Op = WRITE
// 修改时间操作为重命名(相当于重命名为自身名称,最终名称没变)
event.Op = RENAME
} else {
// 如果是真实删除,那么递归删除监控信息
w.Remove(event.Path)

View File

@ -29,4 +29,4 @@ func (e *Event) IsRename() bool {
// 文件/目录修改权限
func (e *Event) IsChmod() bool {
return e.Op & CHMOD == CHMOD
}
}

19
geg/os/gfpool/gfpool.go Normal file
View File

@ -0,0 +1,19 @@
package main
import (
"fmt"
"gitee.com/johng/gf/g/os/gfpool"
"os"
"time"
)
func main() {
for {
time.Sleep(time.Second)
if f, err := gfpool.Open("/home/john/temp/log.log", os.O_RDONLY, 0666, 60000000*1000); err == nil {
fmt.Println(f.Name())
} else {
fmt.Println(err)
}
}
}

View File

@ -1,20 +1,20 @@
package main
import (
"gitee.com/johng/gf/g"
"gitee.com/johng/gf/g/net/ghttp"
"fmt"
"gitee.com/johng/gf/g/os/gfpool"
"os"
"time"
)
func main() {
s := g.Server()
s.Domain("www.a.com").BindHandler("/*", func(r *ghttp.Request) {
r.Response.ServeFile("/home/john/www1" + r.URL.Path)
})
s.Domain("www.b.com").BindHandler("/*", func(r *ghttp.Request) {
r.Response.ServeFile("/home/john/www2" + r.URL.Path)
})
s.SetIndexFolder(true)
s.SetPort(8080)
s.Run()
for {
time.Sleep(time.Second)
if f, err := gfpool.Open("/home/john/temp/log.log", os.O_RDONLY, 0666, 60000000*1000); err == nil {
s, _ := f.Stat()
fmt.Println(f.Name(), s.Name())
} else {
fmt.Println(err)
}
}
}