diff --git a/g/container/gpool/gpool.go b/g/container/gpool/gpool.go index a8332b5da..5d2a18449 100644 --- a/g/container/gpool/gpool.go +++ b/g/container/gpool/gpool.go @@ -105,6 +105,8 @@ func (p *Pool) expireCheckingLoop() { if p.ExpireFunc != nil { p.ExpireFunc(item.value) } + } else { + break } } time.Sleep(time.Second) diff --git a/g/database/gdb/gdb.go b/g/database/gdb/gdb.go index 9abcda604..8c44a28f9 100644 --- a/g/database/gdb/gdb.go +++ b/g/database/gdb/gdb.go @@ -18,6 +18,7 @@ import ( "gitee.com/johng/gf/g/container/gring" "gitee.com/johng/gf/g/os/gcache" "gitee.com/johng/gf/g/container/gmap" + "time" ) const ( @@ -49,6 +50,7 @@ type Link interface { // 连接属性设置 SetMaxIdleConns(n int) SetMaxOpenConns(n int) + SetConnMaxLifetime(d time.Duration) // 开启事务操作 Begin() (*Tx, error) @@ -229,6 +231,16 @@ func newDb (masterNode *ConfigNode, slaveNode *ConfigNode, groupName string) (*D charr : link.getQuoteCharRight(), debug : gtype.NewBool(), } + // 设置连接属性,master和slave必须是一致的,所以这里使用的是master的属性设置 + if masterNode.MaxIdleConnCount > 0 { + db.SetMaxIdleConns(masterNode.MaxIdleConnCount) + } + if masterNode.MaxOpenConnCount > 0 { + db.SetMaxOpenConns(masterNode.MaxOpenConnCount) + } + if masterNode.MaxConnLifetime > 0 { + db.SetConnMaxLifetime(time.Duration(masterNode.MaxConnLifetime)*time.Second) + } if v := dbCaches.Get(groupName); v == nil { dbCaches.LockFunc(func(m map[string]interface{}) { if v, ok := m[groupName]; !ok { diff --git a/g/database/gdb/gdb_base.go b/g/database/gdb/gdb_base.go index d787e9693..260d784b7 100644 --- a/g/database/gdb/gdb_base.go +++ b/g/database/gdb/gdb_base.go @@ -17,6 +17,7 @@ import ( "gitee.com/johng/gf/g/util/gconv" "gitee.com/johng/gf/g/container/gring" "gitee.com/johng/gf/g/os/gtime" + "time" ) const ( @@ -237,24 +238,42 @@ func (db *Db) Prepare(query string) (*sql.Stmt, error) { // ping一下,判断或保持数据库链接(master) func (db *Db) PingMaster() error { - err := db.master.Ping(); + err := db.master.Ping() return err } // ping一下,判断或保持数据库链接(slave) func (db *Db) PingSlave() error { - err := db.slave.Ping(); + err := db.slave.Ping() return err } // 设置数据库连接池中空闲链接的大小 func (db *Db) SetMaxIdleConns(n int) { - db.master.SetMaxIdleConns(n); + db.master.SetMaxIdleConns(n) + // 比较的是指向的变量地址 + if db.master != db.slave { + db.slave.SetMaxIdleConns(n) + } } // 设置数据库连接池最大打开的链接数量 func (db *Db) SetMaxOpenConns(n int) { - db.master.SetMaxOpenConns(n); + db.master.SetMaxOpenConns(n) + // 比较的是指向的变量地址 + if db.master != db.slave { + db.slave.SetMaxOpenConns(n) + } +} + +// 设置数据库连接可重复利用的时间,超过该时间则被关闭废弃 +// 如果 d <= 0 表示该链接会一直重复利用 +func (db *Db) SetConnMaxLifetime(d time.Duration) { + db.master.SetConnMaxLifetime(d) + // 比较的是指向的变量地址 + if db.master != db.slave { + db.slave.SetConnMaxLifetime(d) + } } // 事务操作,开启,会返回一个底层的事务操作对象链接如需要嵌套事务,那么可以使用该对象,否则请忽略 diff --git a/g/database/gdb/gdb_config.go b/g/database/gdb/gdb_config.go index a19cb1a23..a753ed2d2 100644 --- a/g/database/gdb/gdb_config.go +++ b/g/database/gdb/gdb_config.go @@ -30,16 +30,19 @@ type ConfigGroup []ConfigNode // 数据库单项配置 type ConfigNode struct { - Host string // 地址 - Port string // 端口 - User string // 账号 - Pass string // 密码 - Name string // 数据库名称 - Type string // 数据库类型:mysql, sqlite, mssql, pgsql, oracle(目前仅支持mysql) - Role string // (可选,默认为master)数据库的角色,用于主从操作分离,至少需要有一个master,参数值:master, slave - Charset string // (可选,默认为 utf8)编码,默认为 utf8 - Priority int // (可选)用于负载均衡的权重计算,当集群中只有一个节点时,权重没有任何意义 - Linkinfo string // (可选)自定义链接信息,当该字段被设置值时,以上链接字段(Host,Port,User,Pass,Name)将失效(该字段是一个扩展功能) + Host string // 地址 + Port string // 端口 + User string // 账号 + Pass string // 密码 + Name string // 数据库名称 + Type string // 数据库类型:mysql, sqlite, mssql, pgsql, oracle(目前仅支持mysql) + Role string // (可选,默认为master)数据库的角色,用于主从操作分离,至少需要有一个master,参数值:master, slave + Charset string // (可选,默认为 utf8)编码,默认为 utf8 + Priority int // (可选)用于负载均衡的权重计算,当集群中只有一个节点时,权重没有任何意义 + Linkinfo string // (可选)自定义链接信息,当该字段被设置值时,以上链接字段(Host,Port,User,Pass,Name)将失效(该字段是一个扩展功能) + MaxIdleConnCount int // (可选)连接池最大限制的连接数 + MaxOpenConnCount int // (可选)连接池最大打开的连接数 + MaxConnLifetime int // (可选,单位秒)连接对象可重复使用的时间长度 } // 数据库集群配置示例,支持主从处理,多数据库集群支持 diff --git a/g/g.go b/g/g.go index aacde4597..fb3890a5a 100644 --- a/g/g.go +++ b/g/g.go @@ -107,6 +107,15 @@ func Database(name...string) *gdb.Db { if value, ok := nodem["priority"]; ok { node.Priority = gconv.Int(value) } + if value, ok := nodem["max-idle"]; ok { + node.MaxIdleConnCount = gconv.Int(value) + } + if value, ok := nodem["max-open"]; ok { + node.MaxOpenConnCount = gconv.Int(value) + } + if value, ok := nodem["max-lifetime"]; ok { + node.MaxConnLifetime = gconv.Int(value) + } cg = append(cg, node) } } diff --git a/g/os/grpool/grpool.go b/g/os/grpool/grpool.go new file mode 100644 index 000000000..f9ef097de --- /dev/null +++ b/g/os/grpool/grpool.go @@ -0,0 +1,120 @@ +// Copyright 2017-2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved. +// +// This Source Code Form is subject to the terms of the MIT License. +// If a copy of the MIT was not distributed with this file, +// You can obtain one at https://gitee.com/johng/gf. + +// Goroutine池. +// 用于goroutine复用,提升异步操作执行效率. +// 需要注意的是,grpool提供给的公共池不提供关闭方法,自创建的池可以手动关闭掉。 +package grpool + +import ( + "math" + "gitee.com/johng/gf/g/container/glist" + "gitee.com/johng/gf/g/container/gtype" +) + +// goroutine池对象 +type Pool struct { + workerChan chan struct{} // 使用channel限制最大的goroutine数量 + workerNum *gtype.Int // 当前正在运行的worker/goroutine数量 + jobQueue *glist.List // 待处理任务操作队列 + jobEvents chan struct{} // 任务添加事件(jobQueue+jobEvents结合使用) + closed *gtype.Bool +} + +// 默认的goroutine池管理对象 +// 该对象与进程同生命周期,无需Close +var defaultPool = New() + +// 创建goroutine池管理对象,给定过期时间(秒) +// 第二个参数用于限制限制最大的goroutine数量/线程数/worker数量,非必需参数,默认不做限制 +func New(size...int) *Pool { + s := 0 + if len(size) > 0 { + s = size[0] + } + p := &Pool { + workerNum : gtype.NewInt(), + jobQueue : glist.New(), + jobEvents : make(chan struct{}, math.MaxInt32), + workerChan : make(chan struct{}, s), + closed : gtype.NewBool(), + } + return p +} + +// 添加异步任务(使用默认的池对象) +func Add(f func()) error { + return defaultPool.Add(f) +} + +// 查询当前goroutine总数 +func Size() int { + return defaultPool.workerNum.Val() +} + +// 查询当前等待处理的任务总数 +func Jobs() int { + return len(defaultPool.jobEvents) +} + +// 添加异步任务 +func (p *Pool) Add(f func()) error { + p.jobQueue.PushBack(f) + p.jobEvents <- struct{}{} + // 判断是否创建新的worker + if p.Jobs() > 1 || p.workerNum.Val() == 0 { + p.ForkWorker() + } + return nil +} + +// 查询当前goroutine worker总数 +func (p *Pool) Size() int { + return p.workerNum.Val() +} + +// 查询当前等待处理的任务总数 +func (p *Pool) Jobs() int { + return p.jobQueue.Len() +} + +// 创建新的worker执行任务 +func (p *Pool) ForkWorker() { + if cap(p.workerChan) > 0 { + // 如果worker数量已经达到限制,那么不创建新worker,直接返回 + if p.workerNum.Val() == cap(p.workerChan) { + return + } + p.workerNum.Add(1) + p.workerChan <- struct{}{} + } else { + p.workerNum.Add(1) + } + go func() { + for !p.closed.Val() { + select { + case <- p.jobEvents: + if job := p.jobQueue.PopFront(); job != nil { + job.(func())() + } else { + goto WorkerDone + } + default: + goto WorkerDone + } + } +WorkerDone: + p.workerNum.Add(-1) + if cap(p.workerChan) > 0 { + <- p.workerChan + } + }() +} + +// 关闭池,所有的任务将会停止,此后继续添加的任务将不会被执行 +func (p *Pool) Close() { + p.closed.Set(true) +} \ No newline at end of file diff --git a/g/os/grpool/grpool_api.go b/g/os/grpool/grpool_api.go deleted file mode 100644 index 8a4153ced..000000000 --- a/g/os/grpool/grpool_api.go +++ /dev/null @@ -1,128 +0,0 @@ -// Copyright 2017-2018 gf Author(https://gitee.com/johng/gf). All Rights Reserved. -// -// This Source Code Form is subject to the terms of the MIT License. -// If a copy of the MIT was not distributed with this file, -// You can obtain one at https://gitee.com/johng/gf. - -// Goroutine池. -// 用于goroutine复用,提升异步操作执行效率. -// 需要注意的是,grpool提供给的公共池不提供关闭方法(但可以修改公共属性),自创建的池可以手动关闭掉。 -package grpool - -import ( - "math" - "gitee.com/johng/gf/g/container/glist" - "gitee.com/johng/gf/g/container/gtype" -) - -const ( - gDEFAULT_EXPIRE_TIME = 60 // 默认goroutine过期时间(秒) - gDEFAULT_CLEAR_INTERVAL = 60 // 定期检查任务过期时间间隔(秒) -) - -// goroutine池对象 -type Pool struct { - size *gtype.Int // 限制最大的goroutine数量/协程数/worker数量 - expire *gtype.Int // goroutine过期时间(秒) - workerNum *gtype.Int // 当前正在运行的goroutine数量(非任务数) - blockedNum *gtype.Int // 当前被阻塞运行的goroutine数量 - queue *glist.List // 空闲任务队列(*PoolJob) - jobs *glist.List // 待处理任务操作队列 - jobEvents chan struct{} // 任务添加事件(兄弟们该干活了!) - freeEvents chan struct{} // 空闲协程通知事件 - stopEvents chan struct{} // 池关闭事件(用于池相关异步协程通知) -} - -// 一个worker对应一个goroutine -type PoolWorker struct { - job chan func() // 当前任务(当为nil时表示关闭) - pool *Pool // 所属协程池 - update int64 // 更新时间 -} - -// 默认的goroutine池管理对象 -// 该对象与进程同生命周期,无需Close -var defaultPool = New(gDEFAULT_EXPIRE_TIME) - -// 创建goroutine池管理对象,给定过期时间(秒) -// 第二个参数用于限制限制最大的goroutine数量/线程数/worker数量,非必需参数,默认不做限制 -func New(expire int, size...int) *Pool { - s := math.MaxInt32 - if len(size) > 0 { - s = size[0] - } - p := &Pool { - size : gtype.NewInt(s), - expire : gtype.NewInt(expire), - workerNum : gtype.NewInt(), - blockedNum : gtype.NewInt(), - queue : glist.New(), - jobs : glist.New(), - jobEvents : make(chan struct{}, math.MaxInt32), - freeEvents : make(chan struct{}, math.MaxInt32), - stopEvents : make(chan struct{}, 0), - } - p.startSchedLoop() - p.startClearLoop() - return p -} - -// 添加异步任务(使用默认的池对象) -func Add(f func()) error { - return defaultPool.Add(f) -} - -// 查询当前goroutine总数 -func Size() int { - return defaultPool.workerNum.Val() -} - -// 查询当前等待处理的任务总数 -func Jobs() int { - return len(defaultPool.jobEvents) -} - -// 动态改变默认池中goroutine的上线数量 -func SetSize(size int) { - defaultPool.SetSize(size) -} - -// 动态改变默认池中goroutine的过期时间 -func SetExpire(expire int) { - defaultPool.SetExpire(expire) -} - -// 添加异步任务 -func (p *Pool) Add(f func()) error { - p.jobs.PushBack(f) - p.jobEvents <- struct{}{} - return nil -} - -// 查询当前goroutine worker总数 -func (p *Pool) Size() int { - return p.workerNum.Val() -} - -// 查询当前等待处理的任务总数 -func (p *Pool) Jobs() int { - return len(p.jobEvents) -} - -// 动态改变当前池中goroutine的上线数量 -func (p *Pool) SetSize(size int) { - p.size.Set(size) -} - -// 动态改变当前池中goroutine的过期时间 -func (p *Pool) SetExpire(expire int) { - p.expire.Set(expire) -} - -// 关闭池,所有的任务将会停止,此后继续添加的任务将不会被执行 -func (p *Pool) Close() { - // 必须首先标识让任务过期自动关闭 - p.SetExpire(-1) - // 使用stopEvents事件通知所有的异步协程及清理协程自动退出 - close(p.stopEvents) -} \ No newline at end of file diff --git a/g/os/grpool/grpool_pool.go b/g/os/grpool/grpool_pool.go deleted file mode 100644 index b5bba9fb6..000000000 --- a/g/os/grpool/grpool_pool.go +++ /dev/null @@ -1,121 +0,0 @@ -// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. -// -// This Source Code Form is subject to the terms of the MIT License. -// If a copy of the MIT was not distributed with this file, -// You can obtain one at https://gitee.com/johng/gf. - -package grpool - -import ( - "time" - "runtime" - "gitee.com/johng/gf/g/os/gtime" -) - -// 任务分配循环协程,使用基于 runtime.GOMAXPROCS 数量的协程来实现抢占调度, -// 使用抢占调度的目的是使得任务能够并发地快速被分配出去执行 -func (p *Pool) startSchedLoop() { - for i := 0; i < runtime.GOMAXPROCS(-1); i++ { - go func() { - for { - select { - case <-p.jobEvents: - p.getWorker().setJob(p.jobs.PopFront().(func())) - case <-p.stopEvents: - return - } - } - }() - } -} - -// 定时清理过期任务,单协程处理 -func (p *Pool) startClearLoop() { - go func() { - for { - select { - case <-p.stopEvents: - // 如果接收到关闭通知(池已经关闭),关闭所有worker后退出 - for { - if r := p.queue.PopFront(); r != nil { - // 主动关闭所有worker,防止goroutine泄露 - r.(*PoolWorker).stop() - } else { - break - } - } - return - - default: - time.Sleep(gDEFAULT_CLEAR_INTERVAL*time.Second) - // 保证没有工作任务的情况下,执行worker清理操作 - if len(p.jobEvents) == 0 { - var w *PoolWorker - for { - if r := p.queue.PopFront(); r != nil { - w = r.(*PoolWorker) - if gtime.Second() - int64(p.expire.Val()) > w.update { - w.stop() - } else { - p.queue.PushFront(w) - break - } - } else { - break - } - } - } - } - } - }() -} - -// 获取过期时间 -func (p *Pool) getExpire() int { - return p.expire.Val() -} - -// 创建一个空的任务对象 -func (p *Pool) newWorker() *PoolWorker { - // 如果达到goroutine数限制,那么阻塞等待有空闲goroutine后继续 - // 需要注意的是在高并发下workerNum的值可能会高于size, - // 从效率上考虑没有将workerNum和size都放到一个互斥锁中进行准确度控制, - // 精准是要付出代价的 - if p.workerNum.Val() >= p.size.Val() { - // (非精准控制)阻塞等待空闲的协程资源, - // 这是一个递归循环,因为该流程中存在协程抢占机制, - // 如果进入getJob方法没有抢占到协程资源,那么该任务执行会继续等待下一个freeEvents事件产生 - p.blockedNum.Add(1) - <- p.freeEvents - return p.getWorker() - } - w := &PoolWorker { - job : make(chan func(), 1), - pool : p, - } - w.start() - p.workerNum.Add(1) - return w -} - -// 添加worker对象到空闲队列 -func (p *Pool) addWorker(w *PoolWorker) bool { - if p.workerNum.Val() > p.size.Val() || w.pool.getExpire() == -1 { - return false - } - p.queue.PushBack(w) - // 如果当前的goroutine数量达到上线,那么需要使用空闲goroutine通知事件 - if p.blockedNum.Val() > 0 { - p.blockedNum.Add(-1) - p.freeEvents <- struct{}{} - } - return true -} - -// 获取/创建任务 -func (p *Pool) getWorker() *PoolWorker { - if r := p.queue.PopFront(); r != nil { - return r.(*PoolWorker) - } - return p.newWorker() -} diff --git a/g/os/grpool/grpool_pool_worker.go b/g/os/grpool/grpool_pool_worker.go deleted file mode 100644 index 65e363f04..000000000 --- a/g/os/grpool/grpool_pool_worker.go +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright 2017 gf Author(https://gitee.com/johng/gf). All Rights Reserved. -// -// This Source Code Form is subject to the terms of the MIT License. -// If a copy of the MIT was not distributed with this file, -// You can obtain one at https://gitee.com/johng/gf. - -package grpool - -import "gitee.com/johng/gf/g/os/gtime" - -// 开始任务 -func (w *PoolWorker) start() { - go func() { - for { - if f := <- w.job; f != nil { - // 执行任务 - f() - // 更新活动时间(不存在并发安全问题) - w.update = gtime.Second() - // 执行完毕后添加到空闲队列 - if !w.pool.addWorker(w) { - break - } - } else { - break - } - } - w.pool.workerNum.Add(-1) - }() -} - -// 关闭当前任务 -func (w *PoolWorker) stop() { - w.setJob(nil) -} - -// 设置当前任务的执行函数 -func (w *PoolWorker) setJob(f func()) { - w.job <- f -} - diff --git a/g/os/grpool/grpool_test.go b/g/os/grpool/grpool_test.go index 5ddd7fb8a..1107499c2 100644 --- a/g/os/grpool/grpool_test.go +++ b/g/os/grpool/grpool_test.go @@ -7,30 +7,29 @@ package grpool_test import ( - "fmt" - "runtime" "testing" - "gitee.com/johng/gf/g/os/grpool" + "runtime" + "fmt" ) func increment() { - for i := 0; i < 1000000; i++ {} + for i := 0; i < 100000; i++ {} } -func Test_GrpoolMemUsage(t *testing.T) { - for i := 0; i < n; i++ { - grpool.Add(increment) - } - mem := runtime.MemStats{} - runtime.ReadMemStats(&mem) - fmt.Println("mem usage:", mem.TotalAlloc/1024) -} - -//func Test_GroroutineMemUsage(t *testing.T) { +//func Test_GrpoolMemUsage(t *testing.T) { // for i := 0; i < n; i++ { -// go increment() +// grpool.Add(increment) // } // mem := runtime.MemStats{} // runtime.ReadMemStats(&mem) // fmt.Println("mem usage:", mem.TotalAlloc/1024) -//} \ No newline at end of file +//} + +func Test_GroroutineMemUsage(t *testing.T) { + for i := 0; i < n; i++ { + go increment() + } + mem := runtime.MemStats{} + runtime.ReadMemStats(&mem) + fmt.Println("mem usage:", mem.TotalAlloc/1024) +} \ No newline at end of file diff --git a/geg/os/grpool/goroutine.go b/geg/os/grpool/goroutine.go index b977958a9..edcf8d529 100644 --- a/geg/os/grpool/goroutine.go +++ b/geg/os/grpool/goroutine.go @@ -11,10 +11,10 @@ import ( func main() { start := gtime.Millisecond() wg := sync.WaitGroup{} - for i := 0; i < 10000000; i++ { + for i := 0; i < 100000; i++ { wg.Add(1) go func() { - time.Sleep(time.Millisecond) + time.Sleep(time.Second) wg.Done() }() } diff --git a/geg/os/grpool/grpool.go b/geg/os/grpool/grpool.go index bad4bb92c..bcb1aef6e 100644 --- a/geg/os/grpool/grpool.go +++ b/geg/os/grpool/grpool.go @@ -11,10 +11,10 @@ import ( func main() { start := gtime.Millisecond() wg := sync.WaitGroup{} - for i := 0; i < 10000000; i++ { + for i := 0; i < 100000; i++ { wg.Add(1) grpool.Add(func() { - time.Sleep(time.Millisecond) + time.Sleep(time.Second) wg.Done() }) } diff --git a/geg/os/grpool/grpool1.go b/geg/os/grpool/grpool1.go index 2a1f41f5e..2b08a94c1 100644 --- a/geg/os/grpool/grpool1.go +++ b/geg/os/grpool/grpool1.go @@ -3,8 +3,8 @@ package main import ( "time" "fmt" - "gitee.com/johng/gf/g/os/gtime" "gitee.com/johng/gf/g/os/grpool" + "gitee.com/johng/gf/g/os/gtime" ) func job() { @@ -12,21 +12,18 @@ func job() { } func main() { - grpool.SetSize(10) + pool := grpool.New(100) for i := 0; i < 1000; i++ { - grpool.Add(job) + pool.Add(job) } - fmt.Println("size:", grpool.Size()) - fmt.Println("jobs:", grpool.Jobs()) - gtime.SetInterval(2*time.Second, func() bool { - fmt.Println("size:", grpool.Size()) - fmt.Println("jobs:", grpool.Jobs()) - return true + fmt.Println("worker:", pool.Size()) + fmt.Println(" jobs:", pool.Jobs()) + gtime.SetInterval(time.Second, func() bool { + fmt.Println("worker:", pool.Size()) + fmt.Println(" jobs:", pool.Jobs()) + fmt.Println() + return true }) - gtime.SetInterval(5*time.Second, func() bool { - grpool.SetSize(2) - return true - }) select {} }