Skip to content

Commit

Permalink
😏 Optimization of the structure and style
Browse files Browse the repository at this point in the history
  • Loading branch information
panjf2000 committed Apr 14, 2019
1 parent cd84ab6 commit 00294fd
Show file tree
Hide file tree
Showing 6 changed files with 25 additions and 22 deletions.
13 changes: 8 additions & 5 deletions ants.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,14 @@ import (
)

const (
// DefaultAntsPoolSize is the default capacity for a default goroutine pool.
DefaultAntsPoolSize = math.MaxInt32
// DEFAULT_ANTS_POOL_SIZE is the default capacity for a default goroutine pool.
DEFAULT_ANTS_POOL_SIZE = math.MaxInt32

// DefaultCleanIntervalTime is the interval time to clean up goroutines.
DefaultCleanIntervalTime = 1
// DEFAULT_CLEAN_INTERVAL_TIME is the interval time to clean up goroutines.
DEFAULT_CLEAN_INTERVAL_TIME = 1

// CLOSED represents that the pool is closed.
CLOSED = 1
)

var (
Expand Down Expand Up @@ -65,7 +68,7 @@ var (
return 1
}()

defaultAntsPool, _ = NewPool(DefaultAntsPoolSize)
defaultAntsPool, _ = NewPool(DEFAULT_ANTS_POOL_SIZE)
)

// Init a instance pool when importing ants.
Expand Down
10 changes: 5 additions & 5 deletions ants_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func TestAntsPoolGetWorkerFromCache(t *testing.T) {
for i := 0; i < AntsSize; i++ {
p.Submit(demoFunc)
}
time.Sleep(2 * ants.DefaultCleanIntervalTime * time.Second)
time.Sleep(2 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second)
p.Submit(demoFunc)
t.Logf("pool, running workers number:%d", p.Running())
mem := runtime.MemStats{}
Expand All @@ -121,7 +121,7 @@ func TestAntsPoolWithFuncGetWorkerFromCache(t *testing.T) {
for i := 0; i < AntsSize; i++ {
p.Invoke(dur)
}
time.Sleep(2 * ants.DefaultCleanIntervalTime * time.Second)
time.Sleep(2 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second)
p.Invoke(dur)
t.Logf("pool with func, running workers number:%d", p.Running())
mem := runtime.MemStats{}
Expand Down Expand Up @@ -251,7 +251,7 @@ func TestPurge(t *testing.T) {
t.Fatalf("create TimingPool failed: %s", err.Error())
}
p.Submit(demoFunc)
time.Sleep(3 * ants.DefaultCleanIntervalTime * time.Second)
time.Sleep(3 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second)
if p.Running() != 0 {
t.Error("all p should be purged")
}
Expand All @@ -261,7 +261,7 @@ func TestPurge(t *testing.T) {
t.Fatalf("create TimingPoolWithFunc failed: %s", err.Error())
}
p1.Invoke(1)
time.Sleep(3 * ants.DefaultCleanIntervalTime * time.Second)
time.Sleep(3 * ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second)
if p.Running() != 0 {
t.Error("all p should be purged")
}
Expand Down Expand Up @@ -296,7 +296,7 @@ func TestRestCodeCoverage(t *testing.T) {
for i := 0; i < n; i++ {
p.Invoke(Param)
}
time.Sleep(ants.DefaultCleanIntervalTime * time.Second)
time.Sleep(ants.DEFAULT_CLEAN_INTERVAL_TIME * time.Second)
t.Logf("pool with func, capacity:%d", p.Cap())
t.Logf("pool with func, running workers number:%d", p.Running())
t.Logf("pool with func, free workers number:%d", p.Free())
Expand Down
10 changes: 5 additions & 5 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ type Pool struct {
PanicHandler func(interface{})
}

// clear expired workers periodically.
// Clear expired workers periodically.
func (p *Pool) periodicallyPurge() {
heartbeat := time.NewTicker(p.expiryDuration)
defer heartbeat.Stop()
Expand All @@ -72,7 +72,7 @@ func (p *Pool) periodicallyPurge() {
currentTime := time.Now()
p.lock.Lock()
idleWorkers := p.workers
if atomic.LoadInt32(&p.release) == 1 {
if CLOSED == atomic.LoadInt32(&p.release) {
p.lock.Unlock()
return
}
Expand All @@ -98,7 +98,7 @@ func (p *Pool) periodicallyPurge() {

// NewPool generates an instance of ants pool.
func NewPool(size int) (*Pool, error) {
return NewTimingPool(size, DefaultCleanIntervalTime)
return NewTimingPool(size, DEFAULT_CLEAN_INTERVAL_TIME)
}

// NewTimingPool generates an instance of ants pool with a custom timed task.
Expand All @@ -122,7 +122,7 @@ func NewTimingPool(size, expiry int) (*Pool, error) {

// Submit submits a task to this pool.
func (p *Pool) Submit(task func()) error {
if 1 == atomic.LoadInt32(&p.release) {
if CLOSED == atomic.LoadInt32(&p.release) {
return ErrPoolClosed
}
p.retrieveWorker().task <- task
Expand Down Expand Up @@ -226,7 +226,7 @@ func (p *Pool) retrieveWorker() *Worker {

// revertWorker puts a worker back into free pool, recycling the goroutines.
func (p *Pool) revertWorker(worker *Worker) bool {
if 1 == atomic.LoadInt32(&p.release) {
if CLOSED == atomic.LoadInt32(&p.release) {
return false
}
worker.recycleTime = time.Now()
Expand Down
10 changes: 5 additions & 5 deletions pool_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ type PoolWithFunc struct {
PanicHandler func(interface{})
}

// clear expired workers periodically.
// Clear expired workers periodically.
func (p *PoolWithFunc) periodicallyPurge() {
heartbeat := time.NewTicker(p.expiryDuration)
defer heartbeat.Stop()
Expand All @@ -75,7 +75,7 @@ func (p *PoolWithFunc) periodicallyPurge() {
currentTime := time.Now()
p.lock.Lock()
idleWorkers := p.workers
if atomic.LoadInt32(&p.release) == 1 {
if CLOSED == atomic.LoadInt32(&p.release) {
p.lock.Unlock()
return
}
Expand All @@ -101,7 +101,7 @@ func (p *PoolWithFunc) periodicallyPurge() {

// NewPoolWithFunc generates an instance of ants pool with a specific function.
func NewPoolWithFunc(size int, pf func(interface{})) (*PoolWithFunc, error) {
return NewTimingPoolWithFunc(size, DefaultCleanIntervalTime, pf)
return NewTimingPoolWithFunc(size, DEFAULT_CLEAN_INTERVAL_TIME, pf)
}

// NewTimingPoolWithFunc generates an instance of ants pool with a specific function and a custom timed task.
Expand All @@ -126,7 +126,7 @@ func NewTimingPoolWithFunc(size, expiry int, pf func(interface{})) (*PoolWithFun

// Invoke submits a task to pool.
func (p *PoolWithFunc) Invoke(args interface{}) error {
if 1 == atomic.LoadInt32(&p.release) {
if CLOSED == atomic.LoadInt32(&p.release) {
return ErrPoolClosed
}
p.retrieveWorker().args <- args
Expand Down Expand Up @@ -230,7 +230,7 @@ func (p *PoolWithFunc) retrieveWorker() *WorkerWithFunc {

// revertWorker puts a worker back into free pool, recycling the goroutines.
func (p *PoolWithFunc) revertWorker(worker *WorkerWithFunc) bool {
if 1 == atomic.LoadInt32(&p.release) {
if CLOSED == atomic.LoadInt32(&p.release) {
return false
}
worker.recycleTime = time.Now()
Expand Down
2 changes: 1 addition & 1 deletion worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (w *Worker) run() {
}()

for f := range w.task {
if f == nil {
if nil == f {
w.pool.decRunning()
w.pool.workerCache.Put(w)
return
Expand Down
2 changes: 1 addition & 1 deletion worker_func.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (w *WorkerWithFunc) run() {
}()

for args := range w.args {
if args == nil {
if nil == args {
w.pool.decRunning()
w.pool.workerCache.Put(w)
return
Expand Down

0 comments on commit 00294fd

Please sign in to comment.