Skip to content

Commit

Permalink
support nonblocking submit and max blocking limit setting
Browse files Browse the repository at this point in the history
Signed-off-by: Cholerae Hu <[email protected]>
  • Loading branch information
choleraehyq authored and panjf2000 committed Aug 25, 2019
1 parent 2cbd2e3 commit 444711e
Show file tree
Hide file tree
Showing 6 changed files with 242 additions and 2 deletions.
17 changes: 17 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Binaries for programs and plugins
*.exe
*.exe~
*.dll
*.so
*.dylib

# Test binary, built with `go test -c`
*.test

# Output of the go coverage tool, specifically when used with LiteIDE
*.out

# Dependency directories (remove the comment below to include it)
# vendor/

.idea
3 changes: 3 additions & 0 deletions ants.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ var (

// ErrPoolClosed will be returned when submitting task to a closed pool.
ErrPoolClosed = errors.New("this pool has been closed")

// ErrPoolOverload will be returned when the pool is full and no workers available.
ErrPoolOverload = errors.New("too many goroutines blocked on submit or Nonblocking is set")
//---------------------------------------------------------------------------

// workerChanCap determines whether the channel of a worker should be a buffered channel
Expand Down
17 changes: 17 additions & 0 deletions ants_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
package ants_test

import (
"runtime"
"sync"
"testing"
"time"
Expand All @@ -46,6 +47,22 @@ func demoPoolFunc(args interface{}) {
time.Sleep(time.Duration(n) * time.Millisecond)
}

func longRunningFunc() {
for {
runtime.Gosched()
}
}

func longRunningPoolFunc(arg interface{}) {
if ch, ok := arg.(chan struct{}); ok {
<-ch
return
}
for {
runtime.Gosched()
}
}

func BenchmarkGoroutineWithFunc(b *testing.B) {
var wg sync.WaitGroup
for i := 0; i < b.N; i++ {
Expand Down
149 changes: 149 additions & 0 deletions ants_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,155 @@ func TestPurgePreMalloc(t *testing.T) {
}
}

func TestNonblockingSubmit(t *testing.T) {
poolSize := 10
p, err := ants.NewPool(poolSize)
if err != nil {
t.Fatalf("create TimingPool failed: %s", err.Error())
}
p.Nonblocking = true
defer p.Release()
for i := 0; i < poolSize-1; i++ {
if err := p.Submit(longRunningFunc); err != nil {
t.Fatalf("nonblocking submit when pool is not full shouldn't return error")
}
}
ch := make(chan struct{})
f := func() {
<-ch
}
// p is full now.
if err := p.Submit(f); err != nil {
t.Fatalf("nonblocking submit when pool is not full shouldn't return error")
}
if err := p.Submit(demoFunc); err == nil || err != ants.ErrPoolOverload {
t.Fatalf("nonblocking submit when pool is full should get an ErrPoolOverload")
}
// interrupt f to get an available worker
close(ch)
time.Sleep(1 * time.Second)
if err := p.Submit(demoFunc); err != nil {
t.Fatalf("nonblocking submit when pool is not full shouldn't return error")
}
}

func TestMaxBlockingSubmit(t *testing.T) {
poolSize := 10
p, err := ants.NewPool(poolSize)
if err != nil {
t.Fatalf("create TimingPool failed: %s", err.Error())
}
p.MaxBlockingTasks = 1
defer p.Release()
for i := 0; i < poolSize-1; i++ {
if err := p.Submit(longRunningFunc); err != nil {
t.Fatalf("submit when pool is not full shouldn't return error")
}
}
ch := make(chan struct{})
f := func() {
<-ch
}
// p is full now.
if err := p.Submit(f); err != nil {
t.Fatalf("submit when pool is not full shouldn't return error")
}
var wg sync.WaitGroup
wg.Add(1)
errCh := make(chan error, 1)
go func() {
// should be blocked. blocking num == 1
if err := p.Submit(demoFunc); err != nil {
errCh <- err
}
wg.Done()
}()
time.Sleep(1 * time.Second)
// already reached max blocking limit
if err := p.Submit(demoFunc); err != ants.ErrPoolOverload {
t.Fatalf("blocking submit when pool reach max blocking submit should return ErrPoolOverload")
}
// interrupt f to make blocking submit successful.
close(ch)
wg.Wait()
select {
case <-errCh:
t.Fatalf("blocking submit when pool is full should not return error")
default:
}
}

func TestNonblockingSubmitWithFunc(t *testing.T) {
poolSize := 10
p, err := ants.NewPoolWithFunc(poolSize, longRunningPoolFunc)
if err != nil {
t.Fatalf("create TimingPool failed: %s", err.Error())
}
p.Nonblocking = true
defer p.Release()
for i := 0; i < poolSize-1; i++ {
if err := p.Invoke(nil); err != nil {
t.Fatalf("nonblocking submit when pool is not full shouldn't return error")
}
}
ch := make(chan struct{})
// p is full now.
if err := p.Invoke(ch); err != nil {
t.Fatalf("nonblocking submit when pool is not full shouldn't return error")
}
if err := p.Invoke(nil); err == nil || err != ants.ErrPoolOverload {
t.Fatalf("nonblocking submit when pool is full should get an ErrPoolOverload")
}
// interrupt f to get an available worker
close(ch)
time.Sleep(1 * time.Second)
if err := p.Invoke(nil); err != nil {
t.Fatalf("nonblocking submit when pool is not full shouldn't return error")
}
}

func TestMaxBlockingSubmitWithFunc(t *testing.T) {
poolSize := 10
p, err := ants.NewPoolWithFunc(poolSize, longRunningPoolFunc)
if err != nil {
t.Fatalf("create TimingPool failed: %s", err.Error())
}
p.MaxBlockingTasks = 1
defer p.Release()
for i := 0; i < poolSize-1; i++ {
if err := p.Invoke(Param); err != nil {
t.Fatalf("submit when pool is not full shouldn't return error")
}
}
ch := make(chan struct{})
// p is full now.
if err := p.Invoke(ch); err != nil {
t.Fatalf("submit when pool is not full shouldn't return error")
}
var wg sync.WaitGroup
wg.Add(1)
errCh := make(chan error, 1)
go func() {
// should be blocked. blocking num == 1
if err := p.Invoke(Param); err != nil {
errCh <- err
}
wg.Done()
}()
time.Sleep(1 * time.Second)
// already reached max blocking limit
if err := p.Invoke(Param); err != ants.ErrPoolOverload {
t.Fatalf("blocking submit when pool reach max blocking submit should return ErrPoolOverload: %v", err)
}
// interrupt one func to make blocking submit successful.
close(ch)
wg.Wait()
select {
case <-errCh:
t.Fatalf("blocking submit when pool is full should not return error")
default:
}
}
func TestRestCodeCoverage(t *testing.T) {
_, err := ants.NewUltimatePool(-1, -1, false)
t.Log(err)
Expand Down
29 changes: 28 additions & 1 deletion pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,19 @@ type Pool struct {
// PanicHandler is used to handle panics from each worker goroutine.
// if nil, panics will be thrown out again from worker goroutines.
PanicHandler func(interface{})

// Max number of goroutine blocking on pool.Submit.
// 0 (default value) means no such limit.
MaxBlockingTasks int32

// goroutine already been blocked on pool.Submit
// protected by pool.lock
blockingNum int32

// When Nonblocking is true, Pool.Submit will never be blocked.
// ErrPoolOverload will be returned when Pool.Submit cannot be done at once.
// When Nonblocking is true, MaxBlockingTasks is inoperative.
Nonblocking bool
}

// Clear expired workers periodically.
Expand Down Expand Up @@ -151,7 +164,11 @@ func (p *Pool) Submit(task func()) error {
if atomic.LoadInt32(&p.release) == CLOSED {
return ErrPoolClosed
}
p.retrieveWorker().task <- task
if w := p.retrieveWorker(); w == nil {
return ErrPoolOverload
} else {
w.task <- task
}
return nil
}

Expand Down Expand Up @@ -237,8 +254,18 @@ func (p *Pool) retrieveWorker() *Worker {
p.lock.Unlock()
spawnWorker()
} else {
if p.Nonblocking {
p.lock.Unlock()
return nil
}
Reentry:
if p.MaxBlockingTasks != 0 && p.blockingNum >= p.MaxBlockingTasks {
p.lock.Unlock()
return nil
}
p.blockingNum++
p.cond.Wait()
p.blockingNum--
if p.Running() == 0 {
p.lock.Unlock()
spawnWorker()
Expand Down
29 changes: 28 additions & 1 deletion pool_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,19 @@ type PoolWithFunc struct {
// PanicHandler is used to handle panics from each worker goroutine.
// if nil, panics will be thrown out again from worker goroutines.
PanicHandler func(interface{})

// Max number of goroutine blocking on pool.Submit.
// 0 (default value) means no such limit.
MaxBlockingTasks int32

// goroutine already been blocked on pool.Submit
// protected by pool.lock
blockingNum int32

// When Nonblocking is true, Pool.Submit will never be blocked.
// ErrPoolOverload will be returned when Pool.Submit cannot be done at once.
// When Nonblocking is true, MaxBlockingTasks is inoperative.
Nonblocking bool
}

// Clear expired workers periodically.
Expand Down Expand Up @@ -156,7 +169,11 @@ func (p *PoolWithFunc) Invoke(args interface{}) error {
if atomic.LoadInt32(&p.release) == CLOSED {
return ErrPoolClosed
}
p.retrieveWorker().args <- args
if w := p.retrieveWorker(); w == nil {
return ErrPoolOverload
} else {
w.args <- args
}
return nil
}

Expand Down Expand Up @@ -242,8 +259,18 @@ func (p *PoolWithFunc) retrieveWorker() *WorkerWithFunc {
p.lock.Unlock()
spawnWorker()
} else {
if p.Nonblocking {
p.lock.Unlock()
return nil
}
Reentry:
if p.MaxBlockingTasks != 0 && p.blockingNum >= p.MaxBlockingTasks {
p.lock.Unlock()
return nil
}
p.blockingNum++
p.cond.Wait()
p.blockingNum--
if p.Running() == 0 {
p.lock.Unlock()
spawnWorker()
Expand Down

0 comments on commit 444711e

Please sign in to comment.