From e400a94ffb9ffb935bc38b3d681404242f2e6146 Mon Sep 17 00:00:00 2001 From: John Date: Sun, 9 Jun 2019 10:33:16 +0800 Subject: [PATCH] improving grpool --- g/container/gchan/gchan.go | 8 +++++--- g/container/gqueue/gqueue.go | 6 +++--- g/os/grpool/grpool.go | 12 +++++++----- 3 files changed, 15 insertions(+), 11 deletions(-) diff --git a/g/container/gchan/gchan.go b/g/container/gchan/gchan.go index 6b6be6da3..9e762162f 100644 --- a/g/container/gchan/gchan.go +++ b/g/container/gchan/gchan.go @@ -4,7 +4,7 @@ // If a copy of the MIT was not distributed with this file, // You can obtain one at https://github.com/gogf/gf. -// Package gchan provides graceful channel for safe operations. +// Package gchan provides graceful channel for no panic operations. // // It's safe to call Chan.Push/Close functions repeatedly. package gchan @@ -14,12 +14,13 @@ import ( "github.com/gogf/gf/g/container/gtype" ) +// Graceful channel. type Chan struct { channel chan interface{} closed *gtype.Bool } -// New creates a graceful channel with given limit. +// New creates a graceful channel with given . func New(limit int) *Chan { return &Chan { channel : make(chan interface{}, limit), @@ -31,7 +32,7 @@ func New(limit int) *Chan { // It is safe to be called repeatedly. func (c *Chan) Push(value interface{}) error { if c.closed.Val() { - return errors.New("closed") + return errors.New("channel is closed") } c.channel <- value return nil @@ -39,6 +40,7 @@ func (c *Chan) Push(value interface{}) error { // Pop pops value from channel. // If there's no value in channel, it would block to wait. +// If the channel is closed, it will return a nil value immediately. func (c *Chan) Pop() interface{} { return <- c.channel } diff --git a/g/container/gqueue/gqueue.go b/g/container/gqueue/gqueue.go index ea99c0c6f..9ec7b12c7 100644 --- a/g/container/gqueue/gqueue.go +++ b/g/container/gqueue/gqueue.go @@ -37,8 +37,8 @@ const ( ) // New returns an empty queue object. -// Optional parameter is used to limit the size of the queue, which is unlimited by default. -// When is given, the queue will be static and high performance which is comparable with stdlib chan. +// Optional parameter is used to limit the size of the queue, which is unlimited in default. +// When is given, the queue will be static and high performance which is comparable with stdlib channel. func New(limit...int) *Queue { q := &Queue { closed : make(chan struct{}, 0), @@ -103,7 +103,7 @@ func (q *Queue) Pop() interface{} { // Close closes the queue. // Notice: It would notify all goroutines return immediately, -// which are being blocked reading by Pop method. +// which are being blocked reading using Pop method. func (q *Queue) Close() { close(q.C) close(q.events) diff --git a/g/os/grpool/grpool.go b/g/os/grpool/grpool.go index 8d2bf116c..675e15193 100644 --- a/g/os/grpool/grpool.go +++ b/g/os/grpool/grpool.go @@ -14,10 +14,11 @@ import ( // Goroutine Pool type Pool struct { - limit int // Max goroutine count limit. - count *gtype.Int // Current running goroutine count. - list *glist.List // Job list. - closed *gtype.Bool // Is pool closed or not. + limit int // Max goroutine count limit. + count *gtype.Int // Current running goroutine count. + list *glist.List // Job list for asynchronous job adding purpose. + closed *gtype.Bool // Is pool closed or not. + workers chan struct{} // Goroutine workers using channel to implements blocking feature. } // Default goroutine pool. @@ -33,7 +34,7 @@ func New(limit...int) *Pool { list : glist.New(), closed : gtype.NewBool(), } - if len(limit) > 0 { + if len(limit) > 0 && limit[0] > 0 { p.limit = limit[0] } return p @@ -72,6 +73,7 @@ func (p *Pool) Add(f func()) { p.fork() } + // Size returns current goroutine count of the pool. func (p *Pool) Size() int { return p.count.Val()