diff --git a/limiter/lifo_blocking.go b/limiter/lifo_blocking.go index 5a5ed67..ed9f98f 100644 --- a/limiter/lifo_blocking.go +++ b/limiter/lifo_blocking.go @@ -10,18 +10,45 @@ import ( ) type lifoElement struct { - id uint64 ctx context.Context releaseChan chan core.Listener next, prev *lifoElement + evicted bool } -func (e *lifoElement) setListener(listener core.Listener) { +func (e *lifoElement) setListener(listener core.Listener) bool { select { case e.releaseChan <- listener: - // noop + close(e.releaseChan) + return true default: // timeout has expired + return false + } +} + +func (q *lifoQueue) evictionFunc(e *lifoElement) func() { + return func() { + q.mu.Lock() + defer q.mu.Unlock() + + // Prevent multiple invocations from + // corrupting the queue state. + if e.evicted { + return + } + + e.evicted = true + + if e.prev == nil { + q.top = e.next + } else { + e.prev.next = e.next + } + if e.next != nil { + e.next.prev = e.prev + } + q.size-- } } @@ -37,36 +64,35 @@ func (q *lifoQueue) len() uint64 { return q.size } -func (q *lifoQueue) push(ctx context.Context) (uint64, chan core.Listener) { +func (q *lifoQueue) push(ctx context.Context) (func(), chan core.Listener) { q.mu.Lock() defer q.mu.Unlock() - releaseChan := make(chan core.Listener, 1) + releaseChan := make(chan core.Listener) + if q.top != nil { - id := q.top.id + 1 - if id == 0 { // on overflow, roll back to 1 - id = 1 - } - q.top = &lifoElement{id: id, next: q.top, ctx: ctx, releaseChan: releaseChan} + q.top = &lifoElement{next: q.top, ctx: ctx, releaseChan: releaseChan} q.top.next.prev = q.top - q.size++ - return id, releaseChan + } else { + q.top = &lifoElement{ctx: ctx, releaseChan: releaseChan} } + q.size++ - q.top = &lifoElement{id: 1, ctx: ctx, releaseChan: releaseChan} - return 1, releaseChan + return q.evictionFunc(q.top), releaseChan } func (q *lifoQueue) pop() *lifoElement { q.mu.Lock() defer q.mu.Unlock() if q.size > 0 { - prev := lifoElement(*q.top) + prev := *q.top next := q.top.next if next != nil { next.prev = nil } q.top.next = nil q.top.prev = nil + q.top.evicted = true + q.top = next q.size-- return &prev @@ -74,61 +100,13 @@ func (q *lifoQueue) pop() *lifoElement { return nil } -func (q *lifoQueue) peek() (uint64, context.Context) { +func (q *lifoQueue) peek() (func(), *lifoElement) { q.mu.RLock() defer q.mu.RUnlock() if q.size > 0 { - return q.top.id, q.top.ctx - } - return 0, nil -} - -func (q *lifoQueue) remove(id uint64) { - q.mu.Lock() - defer q.mu.Unlock() - if q.size == 0 || q.size < id { - return - } - // remove the item, worst case O(n) - var prev *lifoElement - cur := q.top - for { - if cur == nil { - return - } - if cur.id == id { - next := cur.next - if prev == nil { - // at the top, just re-assign - if next != nil { - next.prev = nil - } - q.top.next = nil - q.top.prev = nil - q.top = next - q.size-- - return - } - if next != nil { - next.prev = nil - } - prev.next = next - q.size-- - return - - // fix all id's above - /*cur = prev - for { - cur.id-- - if cur.prev == nil { - return - } - }*/ - } - cur.id-- - prev = cur - cur = cur.next + return q.evictionFunc(q.top), q.top } + return nil, nil } // LifoBlockingListener implements a blocking listener for the LifoBlockingListener @@ -140,12 +118,24 @@ type LifoBlockingListener struct { func (l *LifoBlockingListener) unblock() { l.limiter.mu.Lock() defer l.limiter.mu.Unlock() + if l.limiter.backlog.len() > 0 { - _, nextEventCtx := l.limiter.backlog.peek() - listener, ok := l.limiter.delegate.Acquire(nextEventCtx) + + evict, nextEvent := l.limiter.backlog.peek() + listener, ok := l.limiter.delegate.Acquire(nextEvent.ctx) + if ok && listener != nil { - nextEvent := l.limiter.backlog.pop() - nextEvent.setListener(listener) + // We successfully acquired a listener from the + // delegate. Now we can evict the element from + // the queue + evict() + + // If the listener is not accepted due to subtle timings + // between setListener being invoked and the element + // expiration elapsing we need to be sure to release it. + if accepted := nextEvent.setListener(listener); !accepted { + listener.OnIgnore() + } } // otherwise: still can't acquire the limit. unblock will be called again next time the limit is released. } @@ -232,14 +222,17 @@ func (l *LifoBlockingLimiter) tryAcquire(ctx context.Context) core.Listener { // Create a holder for a listener and block until a listener is released by another // operation. Holders will be unblocked in LIFO order - eventID, eventReleaseChan := l.backlog.push(ctx) + evict, eventReleaseChan := l.backlog.push(ctx) + select { case listener = <-eventReleaseChan: + // If we have received a listener then that means + // that 'unblock' has already evicted this element + // from the queue for us. return listener case <-time.After(l.maxBacklogTimeout): - // Remove the holder from the backlog. This item is likely to be at the end of the - // list so do a remove to minimize the number of items to traverse - l.backlog.remove(eventID) + // Remove the holder from the backlog. + evict() return nil } } diff --git a/limiter/lifo_blocking_test.go b/limiter/lifo_blocking_test.go index 527d941..d8c5e40 100644 --- a/limiter/lifo_blocking_test.go +++ b/limiter/lifo_blocking_test.go @@ -21,43 +21,40 @@ func TestLifoQueue(t *testing.T) { q := lifoQueue{} asrt.Equal(uint64(0), q.len()) - size, ctx := q.peek() - asrt.Equal(uint64(0), size) + _, ctx := q.peek() + asrt.Equal(uint64(0), q.len()) asrt.Nil(ctx) asrt.Nil(q.pop()) ctx1 := context.WithValue(context.Background(), testLifoQueueContextKey(1), 1) q.push(ctx1) - size, ctx = q.peek() + _, element := q.peek() asrt.Equal(uint64(1), q.len()) - asrt.Equal(uint64(1), size) - asrt.NotNil(ctx) - asrt.Equal(ctx1, ctx) + asrt.NotNil(element.ctx) + asrt.Equal(ctx1, element.ctx) // add a 2nd ctx2 := context.WithValue(context.Background(), testLifoQueueContextKey(2), 2) q.push(ctx2) // make sure it's still LIFO - size, ctx = q.peek() + _, element = q.peek() asrt.Equal(uint64(2), q.len()) - asrt.Equal(uint64(2), size) - asrt.NotNil(ctx) - asrt.Equal(ctx2, ctx) + asrt.NotNil(element.ctx) + asrt.Equal(ctx2, element.ctx) asrt.Equal(ctx2, q.top.ctx) // pop off - element := q.pop() + element = q.pop() asrt.NotNil(element) asrt.Equal(ctx2, element.ctx) // check that we only have one again - size, ctx = q.peek() + _, element = q.peek() asrt.Equal(uint64(1), q.len()) - asrt.Equal(uint64(1), size) - asrt.NotNil(ctx) - asrt.Equal(ctx1, ctx) + asrt.NotNil(element.ctx) + asrt.Equal(ctx1, element.ctx) // add a 2nd & 3rd ctx3 := context.WithValue(context.Background(), testLifoQueueContextKey(3), 3) @@ -65,71 +62,59 @@ func TestLifoQueue(t *testing.T) { ctx4 := context.WithValue(context.Background(), testLifoQueueContextKey(4), 4) q.push(ctx4) - // remove the middle - asrt.Equal(uint64(3), q.top.id) - q.remove(2) - size, ctx = q.peek() - asrt.Equal(uint64(2), q.len()) - asrt.Equal(uint64(2), size) - asrt.NotNil(ctx) - asrt.Equal(ctx4, ctx) - asrt.Equal(ctx4, q.top.ctx) - - // check sanity on id's for regression - for i := 2; i > 0; i-- { - element := q.pop() - asrt.Equal(uint64(i), element.id) - } } -func TestLifoQueue_Remove(t *testing.T) { +func TestLifoQueue_Evict(t *testing.T) { t.Parallel() asrt := assert.New(t) q := lifoQueue{} asrt.Equal(uint64(0), q.len()) - size, ctx := q.peek() - asrt.Equal(uint64(0), size) - asrt.Nil(ctx) + _, e := q.peek() + asrt.Equal(uint64(0), q.len()) + asrt.Nil(e) asrt.Nil(q.pop()) + var evictFunc []func() for i := 1; i <= 10; i++ { - ctx := context.WithValue(context.Background(), testLifoQueueContextKey(i), i) - q.push(ctx) + ctx := context.WithValue(context.Background(), testLifoQueueContextKey(1), i) + e, _ := q.push(ctx) + evictFunc = append(evictFunc, e) } // remove last - q.remove(1) + evictFunc[0]() asrt.Equal(uint64(9), q.len()) // remove first - q.remove(q.len()) + evictFunc[9]() asrt.Equal(uint64(8), q.len()) // remove middle - q.remove(q.len() / 2) + evictFunc[4]() asrt.Equal(uint64(7), q.len()) - seenElements := make(map[uint64]struct{}, q.len()) + seenElements := make(map[int]struct{}, q.len()) var element *lifoElement for { element = q.pop() if element == nil { break } - _, seen := seenElements[element.id] + id := element.ctx.Value(testLifoQueueContextKey(1)).(int) + _, seen := seenElements[id] asrt.False(seen, "no duplicate element ids allowed") - seenElements[element.id] = struct{}{} + seenElements[id] = struct{}{} } asrt.Equal(uint64(0), q.len()) asrt.Equal(7, len(seenElements)) q = lifoQueue{} - ctx = context.WithValue(context.Background(), testLifoQueueContextKey(1), 1) - q.push(ctx) + ctx := context.WithValue(context.Background(), testLifoQueueContextKey(1), 1) + evict, _ := q.push(ctx) // Remove very last item leaving queue empty - q.remove(1) + evict() asrt.Equal(uint64(0), q.len()) }