Skip to content

Commit

Permalink
add semaphore files
Browse files Browse the repository at this point in the history
  • Loading branch information
moh-osman3 committed Apr 9, 2024
1 parent e955f56 commit 9668783
Show file tree
Hide file tree
Showing 4 changed files with 323 additions and 0 deletions.
136 changes: 136 additions & 0 deletions collector/admission/boundedqueue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package admission

import (
"context"
"fmt"
"sync"

"github.com/google/uuid"
orderedmap "github.com/wk8/go-ordered-map/v2"
)

type boundedQueue struct {
maxLimitBytes int64
maxLimitWaiters int64
currentBytes int64
currentWaiters int64
mtx sync.Mutex
// waiters waiters
waiters *orderedmap.OrderedMap[uuid.UUID, waiter]
}

type waiter struct {
readyCh chan struct{}
pendingBytes int64
ID uuid.UUID
}

func NewBoundedQueue(maxLimitBytes, maxLimitWaiters int64) *boundedQueue {
return &boundedQueue{
maxLimitBytes: maxLimitBytes,
maxLimitWaiters: maxLimitWaiters,
currentBytes: int64(0),
currentWaiters: int64(0),
waiters: orderedmap.New[uuid.UUID, waiter](),
}
}

func (bq *boundedQueue) admit(pendingBytes int64) (bool, error) {
bq.mtx.Lock()
defer bq.mtx.Unlock()

if pendingBytes > bq.maxLimitBytes { // will never succeed
return false, fmt.Errorf("rejecting request, request size larger than configured limit")
}

if bq.currentBytes + pendingBytes <= bq.maxLimitBytes { // no need to wait to admit
bq.currentBytes += pendingBytes
return true, nil
}

// since we were unable to admit, check if we can wait.
if bq.currentWaiters + 1 > bq.maxLimitWaiters { // too many waiters
return false, fmt.Errorf("rejecting request, too many waiters")
}

// if we got to this point we need to wait to acquire bytes, so update currentWaiters before releasing mutex.
bq.currentWaiters += 1
return false, nil
}

func (bq *boundedQueue) Acquire(ctx context.Context, pendingBytes int64) error {
success, err := bq.admit(pendingBytes)
if err != nil || success {
return err
}

// otherwise we need to wait for bytes to be released
curWaiter := waiter{
pendingBytes: pendingBytes,
readyCh: make(chan struct{}),
ID: uuid.New(),
}

bq.mtx.Lock()
_, dupped := bq.waiters.Set(curWaiter.ID, curWaiter)
if dupped {
panic("duplicate keys found")
}

bq.mtx.Unlock()

select {
case <-curWaiter.readyCh:
return nil
case <-ctx.Done():
// canceled before acquired so remove waiter.
bq.mtx.Lock()
defer bq.mtx.Unlock()

_, found := bq.waiters.Delete(curWaiter.ID)
if !found {
panic("deleting key that doesn't exist")
}

bq.currentWaiters -= 1
return fmt.Errorf("context canceled: %w ", ctx.Err())
}
}

func (bq *boundedQueue) Release(pendingBytes int64) {
bq.mtx.Lock()
defer bq.mtx.Unlock()

bq.currentBytes -= pendingBytes

for {
if bq.waiters.Len() == 0 {
return
}
next := bq.waiters.Oldest()
nextWaiter := next.Value
nextKey := next.Key
if bq.currentBytes + nextWaiter.pendingBytes <= bq.maxLimitBytes {
bq.currentBytes += nextWaiter.pendingBytes
bq.currentWaiters -= 1
close(nextWaiter.readyCh)
_, found := bq.waiters.Delete(nextKey)
if !found {
panic("deleting key that doesn't exist")
}

} else {
break
}
}
}

func (bq *boundedQueue) TryAcquire(pendingBytes int64) bool {
bq.mtx.Lock()
defer bq.mtx.Unlock()
if bq.currentBytes + pendingBytes <= bq.maxLimitBytes {
bq.currentBytes += pendingBytes
return true
}
return false
}
169 changes: 169 additions & 0 deletions collector/admission/boundedqueue_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
package admission


import (
"context"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/multierr"
)

func min(x, y int64) int64 {
if x <= y {
return x
}
return y
}

func abs(x int64) int64 {
if x < 0 {
return -x
}
return x
}
func TestAcquireSimpleNoWaiters(t *testing.T) {
maxLimitBytes := 1000
maxLimitWaiters := 10
numRequests := 40
requestSize := 21


bq := NewBoundedQueue(int64(maxLimitBytes), int64(maxLimitWaiters))

ctx, _ := context.WithTimeout(context.Background(), 10 * time.Second)
for i := 0; i < numRequests; i++ {
go func() {
err := bq.Acquire(ctx, int64(requestSize))
assert.NoError(t, err)
}()
}

require.Never(t, func() bool {
return bq.waiters.Len() > 0
}, 2*time.Second, 10*time.Millisecond)

for i := 0; i < int(numRequests); i++ {
bq.Release(int64(requestSize))
assert.Equal(t, int64(0), bq.currentWaiters)
}
}

func TestAcquireBoundedWithWaiters(t *testing.T) {
tests := []struct{
name string
maxLimitBytes int64
maxLimitWaiters int64
numRequests int64
requestSize int64
timeout time.Duration
}{
{
name: "below max waiters above max bytes",
maxLimitBytes: 1000,
maxLimitWaiters: 100,
numRequests: 100,
requestSize: 21,
timeout: 5 * time.Second,
},
{
name: "above max waiters above max bytes",
maxLimitBytes: 1000,
maxLimitWaiters: 100,
numRequests: 200,
requestSize: 21,
timeout: 5 * time.Second,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
bq := NewBoundedQueue(tt.maxLimitBytes, tt.maxLimitWaiters)
var blockedRequests int64
numReqsUntilBlocked := tt.maxLimitBytes / tt.requestSize
requestsAboveLimit := abs(tt.numRequests - numReqsUntilBlocked)
tooManyWaiters := requestsAboveLimit > tt.maxLimitWaiters

// There should never be more blocked requests than maxLimitWaiters.
blockedRequests = min(tt.maxLimitWaiters, requestsAboveLimit)

ctx, _ := context.WithTimeout(context.Background(), tt.timeout)
var errs error
for i := 0; i < int(tt.numRequests); i++ {
go func() {
err := bq.Acquire(ctx, tt.requestSize)
errs = multierr.Append(errs, err)
}()
}

require.Eventually(t, func() bool {
return bq.waiters.Len() == int(blockedRequests)
}, 3*time.Second, 10*time.Millisecond)


bq.Release(tt.requestSize)
assert.Equal(t, bq.waiters.Len(), int(blockedRequests)-1)

for i := 0; i < int(tt.numRequests)-1; i++ {
bq.Release(tt.requestSize)
}

if tooManyWaiters {
assert.ErrorContains(t, errs, "rejecting request, too many waiters")
} else {
assert.NoError(t, errs)
}

// confirm all bytes were released by acquiring maxLimitBytes.
assert.True(t, bq.TryAcquire(tt.maxLimitBytes))
})
}
}

func TestAcquireContextCanceled(t *testing.T) {
maxLimitBytes := 1000
maxLimitWaiters := 100
numRequests := 100
requestSize := 21
numReqsUntilBlocked := maxLimitBytes / requestSize
requestsAboveLimit := abs(int64(numRequests) - int64(numReqsUntilBlocked))

blockedRequests := min(int64(maxLimitWaiters), int64(requestsAboveLimit))

bq := NewBoundedQueue(int64(maxLimitBytes), int64(maxLimitWaiters))

ctx, cancel := context.WithTimeout(context.Background(), 10 * time.Second)
var errs error
var wg sync.WaitGroup
for i := 0; i < numRequests; i++ {
wg.Add(1)
go func() {
err := bq.Acquire(ctx, int64(requestSize))
errs = multierr.Append(errs, err)
wg.Done()
}()
}

// Wait until all calls to Acquire() happen and we have the expected number of waiters.
require.Eventually(t, func() bool {
return bq.waiters.Len() == int(blockedRequests)
}, 3*time.Second, 10*time.Millisecond)

cancel()
// assert.Equal(t, len(bq.waiters.keys), int(blockedRequests))
// time.Sleep(10 * time.Second)
wg.Wait()
assert.ErrorContains(t, errs, "context canceled")

// Now all waiters should have returned and been removed.
// time.Sleep(3 * time.Second)
assert.Equal(t, 0, bq.waiters.Len())

for i := 0; i < int(numRequests); i++ {
bq.Release(int64(requestSize))
assert.Equal(t, int64(0), bq.currentWaiters)
}
assert.True(t, bq.TryAcquire(int64(maxLimitBytes)))
}
5 changes: 5 additions & 0 deletions collector/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ require (
)

require (
github.com/bahlo/generic-list-go v0.2.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/buger/jsonparser v1.1.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.4.1 // indirect
Expand All @@ -33,6 +35,7 @@ require (
github.com/knadh/koanf/maps v0.1.1 // indirect
github.com/knadh/koanf/providers/confmap v0.1.0 // indirect
github.com/knadh/koanf/v2 v2.1.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
Expand All @@ -43,6 +46,8 @@ require (
github.com/prometheus/common v0.48.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/rogpeppe/go-internal v1.11.0 // indirect
github.com/wk8/go-ordered-map v1.0.0 // indirect
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
go.opentelemetry.io/collector/confmap v0.97.0 // indirect
go.opentelemetry.io/collector/consumer v0.97.0 // indirect
go.opentelemetry.io/collector/pdata v1.4.0 // indirect
Expand Down
13 changes: 13 additions & 0 deletions collector/go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
github.com/bahlo/generic-list-go v0.2.0 h1:5sz/EEAK+ls5wF+NeqDpk5+iNdMDXrh3z3nPnH1Wvgk=
github.com/bahlo/generic-list-go v0.2.0/go.mod h1:2KvAjgMlE5NNynlg/5iLrrCCZ2+5xWbdbCW3pNTGyYg=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/buger/jsonparser v1.1.1 h1:2PnMjfWD7wBILjqQbt530v576A/cAbQvEW9gGIpYMUs=
github.com/buger/jsonparser v1.1.1/go.mod h1:6RYKKt7H4d4+iWqouImQ9R2FZql3VbhNgx27UK13J/0=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand All @@ -21,6 +25,7 @@ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
Expand All @@ -37,6 +42,8 @@ github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw=
github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s=
github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ=
Expand All @@ -60,8 +67,13 @@ github.com/rogpeppe/go-internal v1.11.0 h1:cWPaGQEPrBb5/AsnsZesgZZ9yb1OQ+GOISoDN
github.com/rogpeppe/go-internal v1.11.0/go.mod h1:ddIwULY96R17DhadqLgMfk9H9tvdUzkipdSkR5nkCZA=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/wk8/go-ordered-map v1.0.0 h1:BV7z+2PaK8LTSd/mWgY12HyMAo5CEgkHqbkVq2thqr8=
github.com/wk8/go-ordered-map v1.0.0/go.mod h1:9ZIbRunKbuvfPKyBP1SIKLcXNlv74YCOZ3t3VTS6gRk=
github.com/wk8/go-ordered-map/v2 v2.1.8 h1:5h/BUHu93oj4gIdvHHHGsScSTMijfx5PeYkE/fJgbpc=
github.com/wk8/go-ordered-map/v2 v2.1.8/go.mod h1:5nJHM5DyteebpVlHnWMV0rPz6Zp7+xBAnxjb1X5vnTw=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.opentelemetry.io/collector/component v0.97.0 h1:vanKhXl5nptN8igRH4PqVYHOILif653vaPIKv6LCZCI=
Expand Down Expand Up @@ -138,5 +150,6 @@ google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHh
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

0 comments on commit 9668783

Please sign in to comment.