Skip to content

Commit

Permalink
guarentee batch doesn't starve
Browse files Browse the repository at this point in the history
add more comments (will need to cleanup later)
  • Loading branch information
maciuszek committed Dec 31, 2024
1 parent b7f0a3d commit b1ed20e
Showing 1 changed file with 12 additions and 3 deletions.
15 changes: 12 additions & 3 deletions net_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,12 +278,15 @@ func (s *netSink) run() {

var reconnectFailed bool // true if last reconnect failed

batchSize := GetSettings().BatchSize
settings := GetSettings()

batchSize := settings.BatchSize
isBatchEnabled := batchSize > 0
batch := make([]bytes.Buffer, 0, batchSize+cap(s.outc)) // expand allocation to allow for draining outc. todo change to an array to optimize
sendBatch := false

t := time.NewTicker(flushInterval)
bt := time.NewTicker(time.Duration(settings.FlushIntervalS) * time.Second) // todo maybe use a new configuration for batch flush interval
defer t.Stop()

// flush all/any buffered stats (on specified ticker) and send loop
Expand Down Expand Up @@ -322,11 +325,11 @@ func (s *netSink) run() {
}

// send batched outc data anytime indicated or the batch is full
if sendBatch || len(batch) == batchSize {
if sendBatch || len(batch) >= batchSize {
var err error
batch, err = s.sendBatch(batch)
if err != nil {
sendBatch = true // guarantee we conitnue to complete sending the batch after retrying and before procecessing anything else
sendBatch = true // guarentee for all cases that we complete sending the batch after retrying and before procecessing anything else

Check failure on line 332 in net_sink.go

View workflow job for this annotation

GitHub Actions / lint

`guarentee` is a misspelling of `guarantee` (misspell)
continue // cut the iteration to process retries (nothing can be sent anyway since writeToConn failure with set s.conn to nil)
}
sendBatch = false
Expand Down Expand Up @@ -356,6 +359,7 @@ func (s *netSink) run() {
if err := s.send(buf); err == nil {
putBuffer(buf)
}
// if an error occurs we send the metric to the retry channel and make s.conn nil, breaking this loop and preventing further batch processing in this stack
}

close(done)
Expand All @@ -372,6 +376,11 @@ func (s *netSink) run() {
}
}

select {
case <-bt.C:
sendBatch = true
default:
}
}
}

Expand Down

0 comments on commit b1ed20e

Please sign in to comment.