diff --git a/net_sink.go b/net_sink.go index 12a181e..720fbe3 100644 --- a/net_sink.go +++ b/net_sink.go @@ -278,12 +278,15 @@ func (s *netSink) run() { var reconnectFailed bool // true if last reconnect failed - batchSize := GetSettings().BatchSize + settings := GetSettings() + + batchSize := settings.BatchSize isBatchEnabled := batchSize > 0 batch := make([]bytes.Buffer, 0, batchSize+cap(s.outc)) // expand allocation to allow for draining outc. todo change to an array to optimize sendBatch := false t := time.NewTicker(flushInterval) + bt := time.NewTicker(time.Duration(settings.FlushIntervalS) * time.Second) // todo maybe use a new configuration for batch flush interval defer t.Stop() // flush all/any buffered stats (on specified ticker) and send loop @@ -322,11 +325,11 @@ func (s *netSink) run() { } // send batched outc data anytime indicated or the batch is full - if sendBatch || len(batch) == batchSize { + if sendBatch || len(batch) >= batchSize { var err error batch, err = s.sendBatch(batch) if err != nil { - sendBatch = true // guarantee we conitnue to complete sending the batch after retrying and before procecessing anything else + sendBatch = true // guarentee for all cases that we complete sending the batch after retrying and before procecessing anything else continue // cut the iteration to process retries (nothing can be sent anyway since writeToConn failure with set s.conn to nil) } sendBatch = false @@ -356,6 +359,7 @@ func (s *netSink) run() { if err := s.send(buf); err == nil { putBuffer(buf) } + // if an error occurs we send the metric to the retry channel and make s.conn nil, breaking this loop and preventing further batch processing in this stack } close(done) @@ -372,6 +376,11 @@ func (s *netSink) run() { } } + select { + case <-bt.C: + sendBatch = true + default: + } } }