diff --git a/g/container/gqueue/gqueue.go b/g/container/gqueue/gqueue.go index fa6cf25a9..3eb3f090f 100644 --- a/g/container/gqueue/gqueue.go +++ b/g/container/gqueue/gqueue.go @@ -8,6 +8,7 @@ // 优点: // 1、队列初始化速度快; // 2、可以向队头/队尾进行Push/Pop操作; +// 3、取数据时如果队列为空那么会阻塞等待; package gqueue import ( diff --git a/g/os/gfsnotify/gfsnotify.go b/g/os/gfsnotify/gfsnotify.go index cbed56312..e3414fdc3 100644 --- a/g/os/gfsnotify/gfsnotify.go +++ b/g/os/gfsnotify/gfsnotify.go @@ -13,7 +13,6 @@ import ( "gitee.com/johng/gf/g/os/glog" "github.com/fsnotify/fsnotify" "gitee.com/johng/gf/g/os/gfile" - "gitee.com/johng/gf/g/os/grpool" "gitee.com/johng/gf/g/container/gmap" "gitee.com/johng/gf/g/container/glist" "gitee.com/johng/gf/g/container/gqueue" @@ -152,11 +151,11 @@ func (w *Watcher) startEventLoop() { continue } if l := w.callbacks.Get(event.Path); l != nil { - grpool.Add(func() { - for _, v := range l.(*glist.List).FrontAll() { + go func(list interface{}) { + for _, v := range list.(*glist.List).FrontAll() { v.(func(event *Event))(event) } - }) + }(l) } } else { break diff --git a/g/os/grpool/grpool_api.go b/g/os/grpool/grpool_api.go index 5b0fc14d5..8a4153ced 100644 --- a/g/os/grpool/grpool_api.go +++ b/g/os/grpool/grpool_api.go @@ -1,4 +1,4 @@ -// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// Copyright 2017-2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved. // // This Source Code Form is subject to the terms of the MIT License. // If a copy of the MIT was not distributed with this file, @@ -11,7 +11,6 @@ package grpool import ( "math" - "errors" "gitee.com/johng/gf/g/container/glist" "gitee.com/johng/gf/g/container/gtype" ) @@ -28,7 +27,7 @@ type Pool struct { workerNum *gtype.Int // 当前正在运行的goroutine数量(非任务数) blockedNum *gtype.Int // 当前被阻塞运行的goroutine数量 queue *glist.List // 空闲任务队列(*PoolJob) - funcs *glist.List // 待处理任务操作队列 + jobs *glist.List // 待处理任务操作队列 jobEvents chan struct{} // 任务添加事件(兄弟们该干活了!) freeEvents chan struct{} // 空闲协程通知事件 stopEvents chan struct{} // 池关闭事件(用于池相关异步协程通知) @@ -58,7 +57,7 @@ func New(expire int, size...int) *Pool { workerNum : gtype.NewInt(), blockedNum : gtype.NewInt(), queue : glist.New(), - funcs : glist.New(), + jobs : glist.New(), jobEvents : make(chan struct{}, math.MaxInt32), freeEvents : make(chan struct{}, math.MaxInt32), stopEvents : make(chan struct{}, 0), @@ -95,10 +94,7 @@ func SetExpire(expire int) { // 添加异步任务 func (p *Pool) Add(f func()) error { - if len(p.stopEvents) > 0 { - return errors.New("pool closed") - } - p.funcs.PushBack(f) + p.jobs.PushBack(f) p.jobEvents <- struct{}{} return nil } diff --git a/g/os/grpool/grpool_pool.go b/g/os/grpool/grpool_pool.go index c29d78923..b5bba9fb6 100644 --- a/g/os/grpool/grpool_pool.go +++ b/g/os/grpool/grpool_pool.go @@ -20,7 +20,7 @@ func (p *Pool) startSchedLoop() { for { select { case <-p.jobEvents: - p.getWorker().setJob(p.funcs.PopFront().(func())) + p.getWorker().setJob(p.jobs.PopFront().(func())) case <-p.stopEvents: return }