mirror of
https://gitee.com/johng/gf
synced 2026-06-29 02:26:29 +08:00
improving grpool
This commit is contained in:
@ -4,7 +4,7 @@
|
||||
// If a copy of the MIT was not distributed with this file,
|
||||
// You can obtain one at https://github.com/gogf/gf.
|
||||
|
||||
// Package gchan provides graceful channel for safe operations.
|
||||
// Package gchan provides graceful channel for no panic operations.
|
||||
//
|
||||
// It's safe to call Chan.Push/Close functions repeatedly.
|
||||
package gchan
|
||||
@ -14,12 +14,13 @@ import (
|
||||
"github.com/gogf/gf/g/container/gtype"
|
||||
)
|
||||
|
||||
// Graceful channel.
|
||||
type Chan struct {
|
||||
channel chan interface{}
|
||||
closed *gtype.Bool
|
||||
}
|
||||
|
||||
// New creates a graceful channel with given limit.
|
||||
// New creates a graceful channel with given <limit>.
|
||||
func New(limit int) *Chan {
|
||||
return &Chan {
|
||||
channel : make(chan interface{}, limit),
|
||||
@ -31,7 +32,7 @@ func New(limit int) *Chan {
|
||||
// It is safe to be called repeatedly.
|
||||
func (c *Chan) Push(value interface{}) error {
|
||||
if c.closed.Val() {
|
||||
return errors.New("closed")
|
||||
return errors.New("channel is closed")
|
||||
}
|
||||
c.channel <- value
|
||||
return nil
|
||||
@ -39,6 +40,7 @@ func (c *Chan) Push(value interface{}) error {
|
||||
|
||||
// Pop pops value from channel.
|
||||
// If there's no value in channel, it would block to wait.
|
||||
// If the channel is closed, it will return a nil value immediately.
|
||||
func (c *Chan) Pop() interface{} {
|
||||
return <- c.channel
|
||||
}
|
||||
|
||||
@ -37,8 +37,8 @@ const (
|
||||
)
|
||||
|
||||
// New returns an empty queue object.
|
||||
// Optional parameter <limit> is used to limit the size of the queue, which is unlimited by default.
|
||||
// When <limit> is given, the queue will be static and high performance which is comparable with stdlib chan.
|
||||
// Optional parameter <limit> is used to limit the size of the queue, which is unlimited in default.
|
||||
// When <limit> is given, the queue will be static and high performance which is comparable with stdlib channel.
|
||||
func New(limit...int) *Queue {
|
||||
q := &Queue {
|
||||
closed : make(chan struct{}, 0),
|
||||
@ -103,7 +103,7 @@ func (q *Queue) Pop() interface{} {
|
||||
|
||||
// Close closes the queue.
|
||||
// Notice: It would notify all goroutines return immediately,
|
||||
// which are being blocked reading by Pop method.
|
||||
// which are being blocked reading using Pop method.
|
||||
func (q *Queue) Close() {
|
||||
close(q.C)
|
||||
close(q.events)
|
||||
|
||||
@ -14,10 +14,11 @@ import (
|
||||
|
||||
// Goroutine Pool
|
||||
type Pool struct {
|
||||
limit int // Max goroutine count limit.
|
||||
count *gtype.Int // Current running goroutine count.
|
||||
list *glist.List // Job list.
|
||||
closed *gtype.Bool // Is pool closed or not.
|
||||
limit int // Max goroutine count limit.
|
||||
count *gtype.Int // Current running goroutine count.
|
||||
list *glist.List // Job list for asynchronous job adding purpose.
|
||||
closed *gtype.Bool // Is pool closed or not.
|
||||
workers chan struct{} // Goroutine workers using channel to implements blocking feature.
|
||||
}
|
||||
|
||||
// Default goroutine pool.
|
||||
@ -33,7 +34,7 @@ func New(limit...int) *Pool {
|
||||
list : glist.New(),
|
||||
closed : gtype.NewBool(),
|
||||
}
|
||||
if len(limit) > 0 {
|
||||
if len(limit) > 0 && limit[0] > 0 {
|
||||
p.limit = limit[0]
|
||||
}
|
||||
return p
|
||||
@ -72,6 +73,7 @@ func (p *Pool) Add(f func()) {
|
||||
p.fork()
|
||||
}
|
||||
|
||||
|
||||
// Size returns current goroutine count of the pool.
|
||||
func (p *Pool) Size() int {
|
||||
return p.count.Val()
|
||||
|
||||
Reference in New Issue
Block a user