diff --git a/ants_test.go b/ants_test.go index 5dcb654e..bb4104fb 100644 --- a/ants_test.go +++ b/ants_test.go @@ -563,129 +563,116 @@ func TestInfinitePool(t *testing.T) { } } -func testPoolWithDisablePurge(t *testing.T, p *Pool, numWorker int) { +func testPoolWithDisablePurge(t *testing.T, p *Pool, numWorker int, waitForPurge time.Duration) { sig := make(chan struct{}) - wg := sync.WaitGroup{} - - wg.Add(numWorker) + var wg1, wg2 sync.WaitGroup + wg1.Add(numWorker) + wg2.Add(numWorker) for i := 0; i < numWorker; i++ { _ = p.Submit(func() { - wg.Done() - sig <- struct{}{} + wg1.Done() + <-sig + wg2.Done() }) } - wg.Wait() - runCnt := p.Running() - assert.EqualValuesf(t, numWorker, runCnt, "expect %d workers running, but got %d", numWorker, runCnt) + wg1.Wait() + runningCnt := p.Running() + assert.EqualValuesf(t, numWorker, runningCnt, "expect %d workers running, but got %d", numWorker, runningCnt) freeCnt := p.Free() - assert.EqualValuesf(t, 0, freeCnt, "expect % free workers, but got %d", 0, freeCnt) + assert.EqualValuesf(t, 0, freeCnt, "expect %d free workers, but got %d", 0, freeCnt) - newCap := 10 - - p.Tune(newCap) - capacity := p.Cap() - assert.EqualValuesf(t, newCap, capacity, "expect capacity: %d but got %d", newCap, capacity) - <-sig - time.Sleep(time.Millisecond * 10) - - wg.Add(1) - _ = p.Submit(func() { - wg.Done() - sig <- struct{}{} - }) - wg.Wait() - - runCnt = p.Running() - assert.EqualValuesf(t, numWorker, runCnt, "expect %d workers running, but got %d", numWorker, runCnt) - - <-sig - <-sig + // Finish all tasks and sleep for a while to wait for purging, since we've disabled purge mechanism, + // we should see that all workers are still running after the sleep. + close(sig) + wg2.Wait() + time.Sleep(waitForPurge + waitForPurge/2) + runningCnt = p.Running() + assert.EqualValuesf(t, numWorker, runningCnt, "expect %d workers running, but got %d", numWorker, runningCnt) freeCnt = p.Free() - assert.EqualValuesf(t, newCap-numWorker, freeCnt, "expect % free workers, but got %d", newCap-numWorker, freeCnt) + assert.EqualValuesf(t, 0, freeCnt, "expect %d free workers, but got %d", 0, freeCnt) - p.Release() - p.Reboot() + err := p.ReleaseTimeout(waitForPurge + waitForPurge/2) + assert.NoErrorf(t, err, "release pool failed: %v", err) - runCnt = p.Running() - assert.EqualValuesf(t, numWorker, runCnt, "expect %d workers running, but got %d", numWorker, runCnt) + runningCnt = p.Running() + assert.EqualValuesf(t, 0, runningCnt, "expect %d workers running, but got %d", 0, runningCnt) + freeCnt = p.Free() + assert.EqualValuesf(t, numWorker, freeCnt, "expect %d free workers, but got %d", numWorker, freeCnt) } -func TestWithDisablePurge(t *testing.T) { - numWorker := 2 +func TestWithDisablePurgePool(t *testing.T) { + numWorker := 10 p, _ := NewPool(numWorker, WithDisablePurge(true)) - testPoolWithDisablePurge(t, p, numWorker) + testPoolWithDisablePurge(t, p, numWorker, DefaultCleanIntervalTime) } -func TestWithDisablePurgeAndWithExpiration(t *testing.T) { - numWorker := 2 - p, _ := NewPool(numWorker, WithDisablePurge(true), WithExpiryDuration(time.Millisecond*100)) - testPoolWithDisablePurge(t, p, numWorker) +func TestWithDisablePurgeAndWithExpirationPool(t *testing.T) { + numWorker := 10 + expiredDuration := time.Millisecond * 100 + p, _ := NewPool(numWorker, WithDisablePurge(true), WithExpiryDuration(expiredDuration)) + testPoolWithDisablePurge(t, p, numWorker, expiredDuration) } -func testPoolFuncWithDisablePurge(t *testing.T, p *PoolWithFunc, numWorker int, wg *sync.WaitGroup, sig chan struct{}) { - wg.Add(numWorker) +func testPoolFuncWithDisablePurge(t *testing.T, p *PoolWithFunc, numWorker int, wg1, wg2 *sync.WaitGroup, sig chan struct{}, waitForPurge time.Duration) { for i := 0; i < numWorker; i++ { _ = p.Invoke(i) } - wg.Wait() - runCnt := p.Running() - assert.EqualValuesf(t, numWorker, runCnt, "expect %d workers running, but got %d", numWorker, runCnt) + wg1.Wait() + runningCnt := p.Running() + assert.EqualValuesf(t, numWorker, runningCnt, "expect %d workers running, but got %d", numWorker, runningCnt) freeCnt := p.Free() - assert.EqualValuesf(t, 0, freeCnt, "expect % free workers, but got %d", 0, freeCnt) - - newCap := 10 - p.Tune(newCap) - capacity := p.Cap() - assert.EqualValuesf(t, newCap, capacity, "expect capacity: %d but got %d", newCap, capacity) - <-sig - - time.Sleep(time.Millisecond * 200) - - wg.Add(1) - _ = p.Invoke(10) - wg.Wait() - - runCnt = p.Running() - assert.EqualValuesf(t, numWorker, runCnt, "expect %d workers running, but got %d", numWorker, runCnt) + assert.EqualValuesf(t, 0, freeCnt, "expect %d free workers, but got %d", 0, freeCnt) - <-sig - <-sig + // Finish all tasks and sleep for a while to wait for purging, since we've disabled purge mechanism, + // we should see that all workers are still running after the sleep. + close(sig) + wg2.Wait() + time.Sleep(waitForPurge + waitForPurge/2) + runningCnt = p.Running() + assert.EqualValuesf(t, numWorker, runningCnt, "expect %d workers running, but got %d", numWorker, runningCnt) freeCnt = p.Free() - assert.EqualValuesf(t, newCap-numWorker, freeCnt, "expect % free workers, but got %d", newCap-numWorker, freeCnt) + assert.EqualValuesf(t, 0, freeCnt, "expect %d free workers, but got %d", 0, freeCnt) - p.Release() - p.Reboot() + err := p.ReleaseTimeout(waitForPurge + waitForPurge/2) + assert.NoErrorf(t, err, "release pool failed: %v", err) - runCnt = p.Running() - assert.EqualValuesf(t, numWorker, runCnt, "expect %d workers running, but got %d", numWorker, runCnt) + runningCnt = p.Running() + assert.EqualValuesf(t, 0, runningCnt, "expect %d workers running, but got %d", 0, runningCnt) + freeCnt = p.Free() + assert.EqualValuesf(t, numWorker, freeCnt, "expect %d free workers, but got %d", numWorker, freeCnt) } -func TestPoolFuncWithDisablePurge(t *testing.T) { - numWorker := 2 +func TestWithDisablePurgePoolFunc(t *testing.T) { + numWorker := 10 sig := make(chan struct{}) - wg := sync.WaitGroup{} - + var wg1, wg2 sync.WaitGroup + wg1.Add(numWorker) + wg2.Add(numWorker) p, _ := NewPoolWithFunc(numWorker, func(i interface{}) { - wg.Done() - sig <- struct{}{} + wg1.Done() + <-sig + wg2.Done() }, WithDisablePurge(true)) - testPoolFuncWithDisablePurge(t, p, numWorker, &wg, sig) + testPoolFuncWithDisablePurge(t, p, numWorker, &wg1, &wg2, sig, DefaultCleanIntervalTime) } -func TestPoolFuncWithDisablePurgeAndWithExpiration(t *testing.T) { +func TestWithDisablePurgeAndWithExpirationPoolFunc(t *testing.T) { numWorker := 2 sig := make(chan struct{}) - wg := sync.WaitGroup{} - + var wg1, wg2 sync.WaitGroup + wg1.Add(numWorker) + wg2.Add(numWorker) + expiredDuration := time.Millisecond * 100 p, _ := NewPoolWithFunc(numWorker, func(i interface{}) { - wg.Done() - sig <- struct{}{} - }, WithDisablePurge(true), WithExpiryDuration(time.Millisecond*100)) - testPoolFuncWithDisablePurge(t, p, numWorker, &wg, sig) + wg1.Done() + <-sig + wg2.Done() + }, WithDisablePurge(true), WithExpiryDuration(expiredDuration)) + testPoolFuncWithDisablePurge(t, p, numWorker, &wg1, &wg2, sig, expiredDuration) } func TestInfinitePoolWithFunc(t *testing.T) { diff --git a/pool.go b/pool.go index 407f9a12..d5e7b7b1 100644 --- a/pool.go +++ b/pool.go @@ -84,6 +84,7 @@ func (p *Pool) purgePeriodically(ctx context.Context) { if p.IsClosed() { break } + p.lock.Lock() expiredWorkers := p.workers.retrieveExpiry(p.options.ExpiryDuration) p.lock.Unlock() @@ -248,7 +249,7 @@ func (p *Pool) ReleaseTimeout(timeout time.Duration) error { endTime := time.Now().Add(timeout) for time.Now().Before(endTime) { - if p.Running() == 0 && atomic.LoadInt32(&p.heartbeatDone) == 1 { + if p.Running() == 0 && (p.options.DisablePurge || atomic.LoadInt32(&p.heartbeatDone) == 1) { return nil } time.Sleep(10 * time.Millisecond) diff --git a/pool_func.go b/pool_func.go index 08c21815..c72cf34f 100644 --- a/pool_func.go +++ b/pool_func.go @@ -86,6 +86,7 @@ func (p *PoolWithFunc) purgePeriodically(ctx context.Context) { if p.IsClosed() { break } + currentTime := time.Now() p.lock.Lock() idleWorkers := p.workers @@ -269,7 +270,7 @@ func (p *PoolWithFunc) ReleaseTimeout(timeout time.Duration) error { endTime := time.Now().Add(timeout) for time.Now().Before(endTime) { - if p.Running() == 0 && atomic.LoadInt32(&p.heartbeatDone) == 1 { + if p.Running() == 0 && (p.options.DisablePurge || atomic.LoadInt32(&p.heartbeatDone) == 1) { return nil } time.Sleep(10 * time.Millisecond) diff --git a/worker_loop_queue_test.go b/worker_loop_queue_test.go index 1a2e17c1..2c6e9f0f 100644 --- a/worker_loop_queue_test.go +++ b/worker_loop_queue_test.go @@ -29,8 +29,7 @@ func TestLoopQueue(t *testing.T) { } } assert.EqualValues(t, 5, q.len(), "Len error") - v := q.detach() - t.Log(v) + _ = q.detach() assert.EqualValues(t, 4, q.len(), "Len error") time.Sleep(time.Second)