Skip to content

Commit

Permalink
Merge pull request #70 from marcoferrer/lifo-improvements
Browse files Browse the repository at this point in the history
Misc improvements to lifo queue impl
  • Loading branch information
platinummonkey authored Nov 1, 2021
2 parents 5b2ca18 + f875284 commit 49798f1
Show file tree
Hide file tree
Showing 2 changed files with 97 additions and 119 deletions.
141 changes: 67 additions & 74 deletions limiter/lifo_blocking.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,45 @@ import (
)

type lifoElement struct {
id uint64
ctx context.Context
releaseChan chan core.Listener
next, prev *lifoElement
evicted bool
}

func (e *lifoElement) setListener(listener core.Listener) {
func (e *lifoElement) setListener(listener core.Listener) bool {
select {
case e.releaseChan <- listener:
// noop
close(e.releaseChan)
return true
default:
// timeout has expired
return false
}
}

func (q *lifoQueue) evictionFunc(e *lifoElement) func() {
return func() {
q.mu.Lock()
defer q.mu.Unlock()

// Prevent multiple invocations from
// corrupting the queue state.
if e.evicted {
return
}

e.evicted = true

if e.prev == nil {
q.top = e.next
} else {
e.prev.next = e.next
}
if e.next != nil {
e.next.prev = e.prev
}
q.size--
}
}

Expand All @@ -37,98 +64,49 @@ func (q *lifoQueue) len() uint64 {
return q.size
}

func (q *lifoQueue) push(ctx context.Context) (uint64, chan core.Listener) {
func (q *lifoQueue) push(ctx context.Context) (func(), chan core.Listener) {
q.mu.Lock()
defer q.mu.Unlock()
releaseChan := make(chan core.Listener, 1)
releaseChan := make(chan core.Listener)

if q.top != nil {
id := q.top.id + 1
if id == 0 { // on overflow, roll back to 1
id = 1
}
q.top = &lifoElement{id: id, next: q.top, ctx: ctx, releaseChan: releaseChan}
q.top = &lifoElement{next: q.top, ctx: ctx, releaseChan: releaseChan}
q.top.next.prev = q.top
q.size++
return id, releaseChan
} else {
q.top = &lifoElement{ctx: ctx, releaseChan: releaseChan}
}

q.size++
q.top = &lifoElement{id: 1, ctx: ctx, releaseChan: releaseChan}
return 1, releaseChan
return q.evictionFunc(q.top), releaseChan
}

func (q *lifoQueue) pop() *lifoElement {
q.mu.Lock()
defer q.mu.Unlock()
if q.size > 0 {
prev := lifoElement(*q.top)
prev := *q.top
next := q.top.next
if next != nil {
next.prev = nil
}
q.top.next = nil
q.top.prev = nil
q.top.evicted = true

q.top = next
q.size--
return &prev
}
return nil
}

func (q *lifoQueue) peek() (uint64, context.Context) {
func (q *lifoQueue) peek() (func(), *lifoElement) {
q.mu.RLock()
defer q.mu.RUnlock()
if q.size > 0 {
return q.top.id, q.top.ctx
}
return 0, nil
}

func (q *lifoQueue) remove(id uint64) {
q.mu.Lock()
defer q.mu.Unlock()
if q.size == 0 || q.size < id {
return
}
// remove the item, worst case O(n)
var prev *lifoElement
cur := q.top
for {
if cur == nil {
return
}
if cur.id == id {
next := cur.next
if prev == nil {
// at the top, just re-assign
if next != nil {
next.prev = nil
}
q.top.next = nil
q.top.prev = nil
q.top = next
q.size--
return
}
if next != nil {
next.prev = nil
}
prev.next = next
q.size--
return

// fix all id's above
/*cur = prev
for {
cur.id--
if cur.prev == nil {
return
}
}*/
}
cur.id--
prev = cur
cur = cur.next
return q.evictionFunc(q.top), q.top
}
return nil, nil
}

// LifoBlockingListener implements a blocking listener for the LifoBlockingListener
Expand All @@ -140,12 +118,24 @@ type LifoBlockingListener struct {
func (l *LifoBlockingListener) unblock() {
l.limiter.mu.Lock()
defer l.limiter.mu.Unlock()

if l.limiter.backlog.len() > 0 {
_, nextEventCtx := l.limiter.backlog.peek()
listener, ok := l.limiter.delegate.Acquire(nextEventCtx)

evict, nextEvent := l.limiter.backlog.peek()
listener, ok := l.limiter.delegate.Acquire(nextEvent.ctx)

if ok && listener != nil {
nextEvent := l.limiter.backlog.pop()
nextEvent.setListener(listener)
// We successfully acquired a listener from the
// delegate. Now we can evict the element from
// the queue
evict()

// If the listener is not accepted due to subtle timings
// between setListener being invoked and the element
// expiration elapsing we need to be sure to release it.
if accepted := nextEvent.setListener(listener); !accepted {
listener.OnIgnore()
}
}
// otherwise: still can't acquire the limit. unblock will be called again next time the limit is released.
}
Expand Down Expand Up @@ -232,14 +222,17 @@ func (l *LifoBlockingLimiter) tryAcquire(ctx context.Context) core.Listener {

// Create a holder for a listener and block until a listener is released by another
// operation. Holders will be unblocked in LIFO order
eventID, eventReleaseChan := l.backlog.push(ctx)
evict, eventReleaseChan := l.backlog.push(ctx)

select {
case listener = <-eventReleaseChan:
// If we have received a listener then that means
// that 'unblock' has already evicted this element
// from the queue for us.
return listener
case <-time.After(l.maxBacklogTimeout):
// Remove the holder from the backlog. This item is likely to be at the end of the
// list so do a remove to minimize the number of items to traverse
l.backlog.remove(eventID)
// Remove the holder from the backlog.
evict()
return nil
}
}
Expand Down
75 changes: 30 additions & 45 deletions limiter/lifo_blocking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,115 +21,100 @@ func TestLifoQueue(t *testing.T) {
q := lifoQueue{}

asrt.Equal(uint64(0), q.len())
size, ctx := q.peek()
asrt.Equal(uint64(0), size)
_, ctx := q.peek()
asrt.Equal(uint64(0), q.len())
asrt.Nil(ctx)
asrt.Nil(q.pop())

ctx1 := context.WithValue(context.Background(), testLifoQueueContextKey(1), 1)
q.push(ctx1)

size, ctx = q.peek()
_, element := q.peek()
asrt.Equal(uint64(1), q.len())
asrt.Equal(uint64(1), size)
asrt.NotNil(ctx)
asrt.Equal(ctx1, ctx)
asrt.NotNil(element.ctx)
asrt.Equal(ctx1, element.ctx)

// add a 2nd
ctx2 := context.WithValue(context.Background(), testLifoQueueContextKey(2), 2)
q.push(ctx2)

// make sure it's still LIFO
size, ctx = q.peek()
_, element = q.peek()
asrt.Equal(uint64(2), q.len())
asrt.Equal(uint64(2), size)
asrt.NotNil(ctx)
asrt.Equal(ctx2, ctx)
asrt.NotNil(element.ctx)
asrt.Equal(ctx2, element.ctx)
asrt.Equal(ctx2, q.top.ctx)

// pop off
element := q.pop()
element = q.pop()
asrt.NotNil(element)
asrt.Equal(ctx2, element.ctx)

// check that we only have one again
size, ctx = q.peek()
_, element = q.peek()
asrt.Equal(uint64(1), q.len())
asrt.Equal(uint64(1), size)
asrt.NotNil(ctx)
asrt.Equal(ctx1, ctx)
asrt.NotNil(element.ctx)
asrt.Equal(ctx1, element.ctx)

// add a 2nd & 3rd
ctx3 := context.WithValue(context.Background(), testLifoQueueContextKey(3), 3)
q.push(ctx3)
ctx4 := context.WithValue(context.Background(), testLifoQueueContextKey(4), 4)
q.push(ctx4)

// remove the middle
asrt.Equal(uint64(3), q.top.id)
q.remove(2)
size, ctx = q.peek()
asrt.Equal(uint64(2), q.len())
asrt.Equal(uint64(2), size)
asrt.NotNil(ctx)
asrt.Equal(ctx4, ctx)
asrt.Equal(ctx4, q.top.ctx)

// check sanity on id's for regression
for i := 2; i > 0; i-- {
element := q.pop()
asrt.Equal(uint64(i), element.id)
}
}

func TestLifoQueue_Remove(t *testing.T) {
func TestLifoQueue_Evict(t *testing.T) {
t.Parallel()
asrt := assert.New(t)
q := lifoQueue{}

asrt.Equal(uint64(0), q.len())
size, ctx := q.peek()
asrt.Equal(uint64(0), size)
asrt.Nil(ctx)
_, e := q.peek()
asrt.Equal(uint64(0), q.len())
asrt.Nil(e)
asrt.Nil(q.pop())

var evictFunc []func()
for i := 1; i <= 10; i++ {
ctx := context.WithValue(context.Background(), testLifoQueueContextKey(i), i)
q.push(ctx)
ctx := context.WithValue(context.Background(), testLifoQueueContextKey(1), i)
e, _ := q.push(ctx)
evictFunc = append(evictFunc, e)
}

// remove last
q.remove(1)
evictFunc[0]()
asrt.Equal(uint64(9), q.len())

// remove first
q.remove(q.len())
evictFunc[9]()
asrt.Equal(uint64(8), q.len())

// remove middle
q.remove(q.len() / 2)
evictFunc[4]()
asrt.Equal(uint64(7), q.len())

seenElements := make(map[uint64]struct{}, q.len())
seenElements := make(map[int]struct{}, q.len())
var element *lifoElement
for {
element = q.pop()
if element == nil {
break
}
_, seen := seenElements[element.id]
id := element.ctx.Value(testLifoQueueContextKey(1)).(int)
_, seen := seenElements[id]
asrt.False(seen, "no duplicate element ids allowed")
seenElements[element.id] = struct{}{}
seenElements[id] = struct{}{}
}
asrt.Equal(uint64(0), q.len())
asrt.Equal(7, len(seenElements))

q = lifoQueue{}
ctx = context.WithValue(context.Background(), testLifoQueueContextKey(1), 1)
q.push(ctx)
ctx := context.WithValue(context.Background(), testLifoQueueContextKey(1), 1)
evict, _ := q.push(ctx)

// Remove very last item leaving queue empty
q.remove(1)
evict()
asrt.Equal(uint64(0), q.len())
}

Expand Down

0 comments on commit 49798f1

Please sign in to comment.