diff --git a/utils/dbutil/threads/pool.go b/utils/dbutil/threads/pool.go index e1511773d..e1586c851 100644 --- a/utils/dbutil/threads/pool.go +++ b/utils/dbutil/threads/pool.go @@ -1,63 +1,70 @@ package threads import ( + "math" "runtime/debug" - "sync" + "sync/atomic" ) const GoroutinesPerThread = 0.8 // threadPool counts threads in use type ThreadPool struct { - mu sync.Mutex - cap int - left int + cap int32 + left int32 } var GlobalPool ThreadPool // init ThreadPool only on demand to give time to other packages -// call debug.SetMaxThreads() if they need +// call debug.SetMaxThreads() if they need. +// Note: in datarace case the other participants will see no free threads but +// enough capacity. It's ok. func (p *ThreadPool) init() { - if p.cap == 0 { - p.cap = int(getMaxThreads() * GoroutinesPerThread) - p.left = p.cap + initialized := !atomic.CompareAndSwapInt32(&p.cap, 0, math.MaxInt32) + if initialized { + return } + cap := int32(getMaxThreads() * GoroutinesPerThread) + atomic.StoreInt32(&p.left, cap) + atomic.StoreInt32(&p.cap, cap) } -// Capacity of pool +// Capacity of pool. +// Note: first call may return greater value than nexts. Don't cache it. func (p *ThreadPool) Cap() int { - if p.cap == 0 { - p.mu.Lock() - defer p.mu.Unlock() - p.init() - } - return p.cap + p.init() + cap := atomic.LoadInt32(&p.cap) + return int(cap) } func (p *ThreadPool) Lock(want int) (got int, release func(count int)) { - p.mu.Lock() - defer p.mu.Unlock() - p.init() - if want < 1 { + if want < 0 { want = 0 } + if want > math.MaxInt32 { + want = math.MaxInt32 + } - got = min(p.left, want) - p.left -= got + left := atomic.AddInt32(&p.left, -int32(want)) + got = want + if left < 0 { + if left < -int32(got) { + left = -int32(got) + } + atomic.AddInt32(&p.left, -left) + got += int(left) + } release = func(count int) { - p.mu.Lock() - defer p.mu.Unlock() - if 0 > count || count > got { count = got } got -= count - p.left += count + atomic.AddInt32(&p.left, int32(count)) } return @@ -68,10 +75,3 @@ func getMaxThreads() float64 { debug.SetMaxThreads(was) return float64(was) } - -func min(a, b int) int { - if a < b { - return a - } - return b -}