Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DB thread pool improvement #481

Open
wants to merge 5 commits into
base: develop
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 32 additions & 32 deletions utils/dbutil/threads/pool.go
Original file line number Diff line number Diff line change
@@ -1,63 +1,70 @@
package threads

import (
"math"
"runtime/debug"
"sync"
"sync/atomic"
)

const GoroutinesPerThread = 0.8

// threadPool counts threads in use
type ThreadPool struct {
mu sync.Mutex
cap int
left int
cap int32
left int32
}

var GlobalPool ThreadPool

// init ThreadPool only on demand to give time to other packages
// call debug.SetMaxThreads() if they need
// call debug.SetMaxThreads() if they need.
// Note: in datarace case the other participants will see no free threads but
// enough capacity. It's ok.
func (p *ThreadPool) init() {
if p.cap == 0 {
p.cap = int(getMaxThreads() * GoroutinesPerThread)
p.left = p.cap
initialized := !atomic.CompareAndSwapInt32(&p.cap, 0, math.MaxInt32)
if initialized {
return
}
cap := int32(getMaxThreads() * GoroutinesPerThread)
atomic.StoreInt32(&p.left, cap)
atomic.StoreInt32(&p.cap, cap)
}

// Capacity of pool
// Capacity of pool.
// Note: first call may return greater value than nexts. Don't cache it.
func (p *ThreadPool) Cap() int {
if p.cap == 0 {
p.mu.Lock()
defer p.mu.Unlock()
p.init()
}
return p.cap
p.init()
cap := atomic.LoadInt32(&p.cap)
return int(cap)
}

func (p *ThreadPool) Lock(want int) (got int, release func(count int)) {
p.mu.Lock()
defer p.mu.Unlock()

p.init()

if want < 1 {
if want < 0 {
want = 0
}
if want > math.MaxInt32 {
want = math.MaxInt32
}

got = min(p.left, want)
p.left -= got
left := atomic.AddInt32(&p.left, -int32(want))
got = want
if left < 0 {
if left < -int32(got) {
left = -int32(got)
}
atomic.AddInt32(&p.left, -left)
got += int(left)
}

release = func(count int) {
p.mu.Lock()
defer p.mu.Unlock()

if 0 > count || count > got {
count = got
}

got -= count
p.left += count
atomic.AddInt32(&p.left, int32(count))
}

return
Expand All @@ -68,10 +75,3 @@ func getMaxThreads() float64 {
debug.SetMaxThreads(was)
return float64(was)
}

func min(a, b int) int {
if a < b {
return a
}
return b
}