From 42e27dd14c131d76c050d37ed7da8d455e0b3fcc Mon Sep 17 00:00:00 2001 From: John Guo Date: Sat, 30 Oct 2021 18:09:58 +0800 Subject: [PATCH] add context parameter for package grpool --- os/gcache/gcache_z_unit_basic_test.go | 4 +-- os/glog/glog_logger.go | 2 +- os/grpool/grpool.go | 44 +++++++++++++++++++-------- os/grpool/grpool_bench_1_test.go | 11 +++++-- os/grpool/grpool_bench_2_test.go | 4 +-- os/grpool/grpool_unit_test.go | 41 ++++++++++++++----------- 6 files changed, 68 insertions(+), 38 deletions(-) diff --git a/os/gcache/gcache_z_unit_basic_test.go b/os/gcache/gcache_z_unit_basic_test.go index 1414b3f53..61b02a1ad 100644 --- a/os/gcache/gcache_z_unit_basic_test.go +++ b/os/gcache/gcache_z_unit_basic_test.go @@ -451,7 +451,7 @@ func TestCache_SetConcurrency(t *testing.T) { pool := grpool.New(4) go func() { for { - pool.Add(func() { + pool.Add(ctx, func(ctx context.Context) { cache.SetIfNotExist(ctx, 1, 11, 10) }) } @@ -463,7 +463,7 @@ func TestCache_SetConcurrency(t *testing.T) { go func() { for { - pool.Add(func() { + pool.Add(ctx, func(ctx context.Context) { cache.SetIfNotExist(ctx, 1, nil, 10) }) } diff --git a/os/glog/glog_logger.go b/os/glog/glog_logger.go index 0dd0842e1..2b273c10e 100644 --- a/os/glog/glog_logger.go +++ b/os/glog/glog_logger.go @@ -216,7 +216,7 @@ func (l *Logger) print(ctx context.Context, level int, values ...interface{}) { } if l.config.Flags&F_ASYNC > 0 { input.IsAsync = true - err := asyncPool.Add(func() { + err := asyncPool.Add(ctx, func(ctx context.Context) { input.Next() }) if err != nil { diff --git a/os/grpool/grpool.go b/os/grpool/grpool.go index 30a0089d6..d463dd0ec 100644 --- a/os/grpool/grpool.go +++ b/os/grpool/grpool.go @@ -8,6 +8,7 @@ package grpool import ( + "context" "github.com/gogf/gf/v2/errors/gcode" "github.com/gogf/gf/v2/errors/gerror" @@ -15,6 +16,9 @@ import ( "github.com/gogf/gf/v2/container/gtype" ) +// Func is the pool function which contains context parameter. +type Func func(ctx context.Context) + // Pool manages the goroutines using pool. type Pool struct { limit int // Max goroutine count limit. @@ -23,8 +27,15 @@ type Pool struct { closed *gtype.Bool // Is pool closed or not. } +type internalPoolItem struct { + Ctx context.Context + Func Func +} + // Default goroutine pool. -var pool = New() +var ( + pool = New() +) // New creates and returns a new goroutine pool object. // The parameter `limit` is used to limit the max goroutine count, @@ -44,16 +55,16 @@ func New(limit ...int) *Pool { // Add pushes a new job to the pool using default goroutine pool. // The job will be executed asynchronously. -func Add(f func()) error { - return pool.Add(f) +func Add(ctx context.Context, f Func) error { + return pool.Add(ctx, f) } // AddWithRecover pushes a new job to the pool with specified recover function. // The optional `recoverFunc` is called when any panic during executing of `userFunc`. // If `recoverFunc` is not passed or given nil, it ignores the panic from `userFunc`. // The job will be executed asynchronously. -func AddWithRecover(userFunc func(), recoverFunc ...func(err error)) error { - return pool.AddWithRecover(userFunc, recoverFunc...) +func AddWithRecover(ctx context.Context, userFunc Func, recoverFunc ...func(err error)) error { + return pool.AddWithRecover(ctx, userFunc, recoverFunc...) } // Size returns current goroutine count of default goroutine pool. @@ -68,11 +79,14 @@ func Jobs() int { // Add pushes a new job to the pool. // The job will be executed asynchronously. -func (p *Pool) Add(f func()) error { +func (p *Pool) Add(ctx context.Context, f Func) error { for p.closed.Val() { return gerror.NewCode(gcode.CodeInvalidOperation, "pool closed") } - p.list.PushFront(f) + p.list.PushFront(&internalPoolItem{ + Ctx: ctx, + Func: f, + }) // Check whether fork new goroutine or not. var n int for { @@ -94,8 +108,8 @@ func (p *Pool) Add(f func()) error { // The optional `recoverFunc` is called when any panic during executing of `userFunc`. // If `recoverFunc` is not passed or given nil, it ignores the panic from `userFunc`. // The job will be executed asynchronously. -func (p *Pool) AddWithRecover(userFunc func(), recoverFunc ...func(err error)) error { - return p.Add(func() { +func (p *Pool) AddWithRecover(ctx context.Context, userFunc Func, recoverFunc ...func(err error)) error { + return p.Add(ctx, func(ctx context.Context) { defer func() { if exception := recover(); exception != nil { if len(recoverFunc) > 0 && recoverFunc[0] != nil { @@ -107,7 +121,7 @@ func (p *Pool) AddWithRecover(userFunc func(), recoverFunc ...func(err error)) e } } }() - userFunc() + userFunc(ctx) }) } @@ -135,10 +149,14 @@ func (p *Pool) fork() { go func() { defer p.count.Add(-1) - var job interface{} + var ( + listItem interface{} + poolItem *internalPoolItem + ) for !p.closed.Val() { - if job = p.list.PopBack(); job != nil { - job.(func())() + if listItem = p.list.PopBack(); listItem != nil { + poolItem = listItem.(*internalPoolItem) + poolItem.Func(poolItem.Ctx) } else { return } diff --git a/os/grpool/grpool_bench_1_test.go b/os/grpool/grpool_bench_1_test.go index a188227ca..7c576da97 100644 --- a/os/grpool/grpool_bench_1_test.go +++ b/os/grpool/grpool_bench_1_test.go @@ -9,24 +9,29 @@ package grpool_test import ( + "context" "testing" "github.com/gogf/gf/v2/os/grpool" ) -func increment() { +var ( + ctx = context.TODO() +) + +func increment(ctx context.Context) { for i := 0; i < 1000000; i++ { } } func BenchmarkGrpool_1(b *testing.B) { for i := 0; i < b.N; i++ { - grpool.Add(increment) + grpool.Add(ctx, increment) } } func BenchmarkGoroutine_1(b *testing.B) { for i := 0; i < b.N; i++ { - go increment() + go increment(ctx) } } diff --git a/os/grpool/grpool_bench_2_test.go b/os/grpool/grpool_bench_2_test.go index f2d77ce0c..1dc02e18f 100644 --- a/os/grpool/grpool_bench_2_test.go +++ b/os/grpool/grpool_bench_2_test.go @@ -19,13 +19,13 @@ var n = 500000 func BenchmarkGrpool2(b *testing.B) { b.N = n for i := 0; i < b.N; i++ { - grpool.Add(increment) + grpool.Add(ctx, increment) } } func BenchmarkGoroutine2(b *testing.B) { b.N = n for i := 0; i < b.N; i++ { - go increment() + go increment(ctx) } } diff --git a/os/grpool/grpool_unit_test.go b/os/grpool/grpool_unit_test.go index 918067981..00568b35f 100644 --- a/os/grpool/grpool_unit_test.go +++ b/os/grpool/grpool_unit_test.go @@ -9,6 +9,7 @@ package grpool_test import ( + "context" "sync" "testing" "time" @@ -20,12 +21,14 @@ import ( func Test_Basic(t *testing.T) { gtest.C(t, func(t *gtest.T) { - wg := sync.WaitGroup{} - array := garray.NewArray(true) - size := 100 + var ( + wg = sync.WaitGroup{} + array = garray.NewArray(true) + size = 100 + ) wg.Add(size) for i := 0; i < size; i++ { - grpool.Add(func() { + grpool.Add(ctx, func(ctx context.Context) { array.Append(1) wg.Done() }) @@ -40,13 +43,15 @@ func Test_Basic(t *testing.T) { func Test_Limit1(t *testing.T) { gtest.C(t, func(t *gtest.T) { - wg := sync.WaitGroup{} - array := garray.NewArray(true) - size := 100 - pool := grpool.New(10) + var ( + wg = sync.WaitGroup{} + array = garray.NewArray(true) + size = 100 + pool = grpool.New(10) + ) wg.Add(size) for i := 0; i < size; i++ { - pool.Add(func() { + pool.Add(ctx, func(ctx context.Context) { array.Append(1) wg.Done() }) @@ -66,7 +71,7 @@ func Test_Limit2(t *testing.T) { ) wg.Add(size) for i := 0; i < size; i++ { - pool.Add(func() { + pool.Add(ctx, func(ctx context.Context) { defer wg.Done() array.Append(1) }) @@ -78,12 +83,14 @@ func Test_Limit2(t *testing.T) { func Test_Limit3(t *testing.T) { gtest.C(t, func(t *gtest.T) { - array := garray.NewArray(true) - size := 1000 - pool := grpool.New(100) + var ( + array = garray.NewArray(true) + size = 1000 + pool = grpool.New(100) + ) t.Assert(pool.Cap(), 100) for i := 0; i < size; i++ { - pool.Add(func() { + pool.Add(ctx, func(ctx context.Context) { array.Append(1) time.Sleep(2 * time.Second) }) @@ -98,20 +105,20 @@ func Test_Limit3(t *testing.T) { t.Assert(pool.Jobs(), 900) t.Assert(array.Len(), 100) t.Assert(pool.IsClosed(), true) - t.AssertNE(pool.Add(func() {}), nil) + t.AssertNE(pool.Add(ctx, func(ctx context.Context) {}), nil) }) } func Test_AddWithRecover(t *testing.T) { gtest.C(t, func(t *gtest.T) { array := garray.NewArray(true) - grpool.AddWithRecover(func() { + grpool.AddWithRecover(ctx, func(ctx context.Context) { array.Append(1) panic(1) }, func(err error) { array.Append(1) }) - grpool.AddWithRecover(func() { + grpool.AddWithRecover(ctx, func(ctx context.Context) { panic(1) array.Append(1) })