Skip to content

Commit

Permalink
bugfix
Browse files Browse the repository at this point in the history
  • Loading branch information
Gennaro Del Sorbo committed Jan 30, 2022
1 parent 639e2d8 commit 66de109
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions roundrobin_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,9 +125,15 @@ func (p *RoundRobinHandler) HandleInRoundRobin(handler Handler) {
// key: message key
func (p *RoundRobinHandler) HandleDebounced(handler Handler, key string) {
// Backoff if currentInFlight > nPart * maxMessagesPerPartition
// this means messages are in error and or buffer is full
// this means messages are in error and/or buffer is full
buffer := func() int64 {
if p.buffer > 0 {
return int64(p.buffer)
}
return int64(p.nPart)
}()
if p.buffer > 0 {
for atomic.LoadInt64(&p.messagesInFlight) >= int64(p.nPart)*int64(p.buffer) {
for atomic.LoadInt64(&p.messagesInFlight) >= buffer {
time.Sleep(2 * time.Millisecond)
}
}
Expand Down

0 comments on commit 66de109

Please sign in to comment.