Skip to content

Commit

Permalink
open-telemetry#36524 but with mutex
Browse files Browse the repository at this point in the history
Signed-off-by: Arthur Silva Sens <[email protected]>
  • Loading branch information
ArthurSens committed Nov 26, 2024
1 parent 5698c9d commit b40881d
Showing 1 changed file with 19 additions and 0 deletions.
19 changes: 19 additions & 0 deletions exporter/prometheusremotewriteexporter/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import (
"errors"
"math"
"sort"
"sync"

"github.com/prometheus/prometheus/prompb"
)

type batchTimeSeriesState struct {
rwMutex sync.Mutex
// Track batch sizes sent to avoid over allocating huge buffers.
// This helps in the case where large batches are sent to avoid allocating too much unused memory
nextTimeSeriesBufferSize int
Expand All @@ -24,6 +26,7 @@ func newBatchTimeSericesState() batchTimeSeriesState {
nextTimeSeriesBufferSize: math.MaxInt,
nextMetricMetadataBufferSize: math.MaxInt,
nextRequestBufferSize: 0,
rwMutex: sync.Mutex{},
}
}

Expand All @@ -34,22 +37,30 @@ func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int,
}

// Allocate a buffer size of at least 10, or twice the last # of requests we sent
state.rwMutex.Lock()
requests := make([]*prompb.WriteRequest, 0, max(10, state.nextRequestBufferSize))
state.rwMutex.Unlock()

// Allocate a time series buffer 2x the last time series batch size or the length of the input if smaller
state.rwMutex.Lock()
tsArray := make([]prompb.TimeSeries, 0, min(state.nextTimeSeriesBufferSize, len(tsMap)))
state.rwMutex.Unlock()
sizeOfCurrentBatch := 0

i := 0
for _, v := range tsMap {
sizeOfSeries := v.Size()

if sizeOfCurrentBatch+sizeOfSeries >= maxBatchByteSize {
state.rwMutex.Lock()
state.nextTimeSeriesBufferSize = max(10, 2*len(tsArray))
state.rwMutex.Unlock()
wrapped := convertTimeseriesToRequest(tsArray)
requests = append(requests, wrapped)

state.rwMutex.Lock()
tsArray = make([]prompb.TimeSeries, 0, min(state.nextTimeSeriesBufferSize, len(tsMap)-i))
state.rwMutex.Unlock()
sizeOfCurrentBatch = 0
}

Expand All @@ -64,18 +75,24 @@ func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int,
}

// Allocate a metric metadata buffer 2x the last metric metadata batch size or the length of the input if smaller
state.rwMutex.Lock()
mArray := make([]prompb.MetricMetadata, 0, min(state.nextMetricMetadataBufferSize, len(m)))
state.rwMutex.Unlock()
sizeOfCurrentBatch = 0
i = 0
for _, v := range m {
sizeOfM := v.Size()

if sizeOfCurrentBatch+sizeOfM >= maxBatchByteSize {
state.rwMutex.Lock()
state.nextMetricMetadataBufferSize = max(10, 2*len(mArray))
state.rwMutex.Unlock()
wrapped := convertMetadataToRequest(mArray)
requests = append(requests, wrapped)

state.rwMutex.Lock()
mArray = make([]prompb.MetricMetadata, 0, min(state.nextMetricMetadataBufferSize, len(m)-i))
state.rwMutex.Unlock()
sizeOfCurrentBatch = 0
}

Expand All @@ -89,7 +106,9 @@ func batchTimeSeries(tsMap map[string]*prompb.TimeSeries, maxBatchByteSize int,
requests = append(requests, wrapped)
}

state.rwMutex.Lock()
state.nextRequestBufferSize = 2 * len(requests)
state.rwMutex.Unlock()
return requests, nil
}

Expand Down

0 comments on commit b40881d

Please sign in to comment.