Skip to content

Commit

Permalink
Split EMF log with larger than 100 buckets.
Browse files Browse the repository at this point in the history
  • Loading branch information
zzhlogin committed Nov 12, 2024
1 parent 025cae2 commit 5eef5bd
Show file tree
Hide file tree
Showing 5 changed files with 669 additions and 47 deletions.
191 changes: 148 additions & 43 deletions exporter/awsemfexporter/datapoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,12 @@ type summaryMetricEntry struct {
count uint64
}

type dataPointSplit struct {
cWMetricHistogram *cWMetricHistogram
length int
capacity int
}

// CalculateDeltaDatapoints retrieves the NumberDataPoint at the given index and performs rate/delta calculation if necessary.
func (dps numberDataPointSlice) CalculateDeltaDatapoints(i int, instrumentationScopeName string, _ bool, calculators *emfCalculators) ([]dataPoint, bool) {
metric := dps.NumberDataPointSlice.At(i)
Expand Down Expand Up @@ -193,85 +199,184 @@ func (dps histogramDataPointSlice) IsStaleNaNInf(i int) (bool, pcommon.Map) {
}

// CalculateDeltaDatapoints retrieves the ExponentialHistogramDataPoint at the given index.
// As CloudWatch EMF logs allows in maximum of 100 target members, the exponential histogram metric are split into multiple data points as needed,
// each containing a maximum of 100 buckets, to comply with CloudWatch EMF log constraints.
// Note that the number of values and counts in each split may not be less than splitThreshold as we are only adding non-zero bucket counts.
//
// For each split data point:
// - Min and Max values are recalculated based on the bucket boundary within that specific split.
// - Sum is only assigned to the first split to ensure the total sum of the datapoints after aggregation is correct.
// - Count is accumulated based on the bucket counts within each split.
func (dps exponentialHistogramDataPointSlice) CalculateDeltaDatapoints(idx int, instrumentationScopeName string, _ bool, _ *emfCalculators) ([]dataPoint, bool) {
metric := dps.ExponentialHistogramDataPointSlice.At(idx)

const splitThreshold = 100
var currentBucketIndex = 0
var datapoints []dataPoint
var currentPositiveIndex = metric.Positive().BucketCounts().Len() - 1
var currentZeroIndex = 0
var currentNegativeIndex = 0
totalBucketLen := metric.Positive().BucketCounts().Len() + metric.Negative().BucketCounts().Len()
if metric.ZeroCount() > 0 {
totalBucketLen++
}

if totalBucketLen == 0 {
return []dataPoint{{
name: dps.metricName,
value: &cWMetricHistogram{
Values: []float64{},
Counts: []float64{},
Count: metric.Count(),
Sum: metric.Sum(),
Max: metric.Max(),
Min: metric.Min(),
},
labels: createLabels(metric.Attributes(), instrumentationScopeName),
timestampMs: unixNanoToMilliseconds(metric.Timestamp()),
}}, true
}

for currentBucketIndex < totalBucketLen {
// Create a new dataPointSplit with a capacity of up to splitThreshold buckets
split := dataPointSplit{
cWMetricHistogram: &cWMetricHistogram{
Values: []float64{},
Counts: []float64{},
Max: metric.Max(),
Min: metric.Min(),
Count: 0,
Sum: 0,
},
length: 0,
capacity: splitThreshold,
}

// Only assign `Sum` if this is the first split to make sure the total sum of the datapoints after aggregation is correct.
if currentBucketIndex == 0 {
split.cWMetricHistogram.Sum = metric.Sum()
}

if totalBucketLen-currentBucketIndex < splitThreshold {
split.capacity = totalBucketLen - currentBucketIndex
}

// Set mid-point of positive buckets in values/counts array.
currentBucketIndex, currentPositiveIndex = iteratePositiveBuckets(&split, metric, currentBucketIndex, currentPositiveIndex, totalBucketLen)
// Set count of zero bucket in values/counts array.
currentBucketIndex, currentZeroIndex = iterateZeroBucket(&split, metric, currentBucketIndex, currentZeroIndex, totalBucketLen)
// Set mid-point of negative buckets in values/counts array.
currentBucketIndex, currentNegativeIndex = iterateNegativeBuckets(&split, metric, currentBucketIndex, currentNegativeIndex, totalBucketLen)

// Add the current split to the datapoints list
datapoints = append(datapoints, dataPoint{
name: dps.metricName,
value: split.cWMetricHistogram,
labels: createLabels(metric.Attributes(), instrumentationScopeName),
timestampMs: unixNanoToMilliseconds(metric.Timestamp()),
})
}
return datapoints, true
}

func iteratePositiveBuckets(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentPositiveIndex int, totalBucketLen int) (int, int) {
scale := metric.Scale()
base := math.Pow(2, math.Pow(2, float64(-scale)))
arrayValues := []float64{}
arrayCounts := []float64{}
var bucketBegin float64
var bucketEnd float64

// Set mid-point of positive buckets in values/counts array.
positiveBuckets := metric.Positive()
positiveOffset := positiveBuckets.Offset()
positiveBucketCounts := positiveBuckets.BucketCounts()
bucketBegin = 0
bucketEnd = 0
for i := 0; i < positiveBucketCounts.Len(); i++ {
index := i + int(positiveOffset)
if bucketBegin == 0 {
bucketBegin = math.Pow(base, float64(index))
bucketBegin := 0.0
bucketEnd := 0.0

for split.length < split.capacity && currentPositiveIndex >= 0 {
index := currentPositiveIndex + int(positiveOffset)
if bucketEnd == 0 {
bucketEnd = math.Pow(base, float64(index+1))
} else {
bucketBegin = bucketEnd
bucketEnd = bucketBegin
}
bucketEnd = math.Pow(base, float64(index+1))
bucketBegin = math.Pow(base, float64(index))
metricVal := (bucketBegin + bucketEnd) / 2
count := positiveBucketCounts.At(i)
count := positiveBucketCounts.At(currentPositiveIndex)
if count > 0 {
arrayValues = append(arrayValues, metricVal)
arrayCounts = append(arrayCounts, float64(count))
split.cWMetricHistogram.Values = append(split.cWMetricHistogram.Values, metricVal)
split.cWMetricHistogram.Counts = append(split.cWMetricHistogram.Counts, float64(count))
split.length++
split.cWMetricHistogram.Count += count
if split.length == 1 && currentBucketIndex != 0 {
split.cWMetricHistogram.Max = bucketEnd
}
if split.length == split.capacity && currentBucketIndex != totalBucketLen-1 {
split.cWMetricHistogram.Min = bucketBegin
}
}
currentBucketIndex++
currentPositiveIndex--
}

// Set count of zero bucket in values/counts array.
if metric.ZeroCount() > 0 {
arrayValues = append(arrayValues, 0)
arrayCounts = append(arrayCounts, float64(metric.ZeroCount()))
return currentBucketIndex, currentPositiveIndex
}

func iterateZeroBucket(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentZeroIndex int, totalBucketLen int) (int, int) {
if metric.ZeroCount() > 0 && split.length < split.capacity && currentZeroIndex == 0 {
split.cWMetricHistogram.Values = append(split.cWMetricHistogram.Values, 0)
split.cWMetricHistogram.Counts = append(split.cWMetricHistogram.Counts, float64(metric.ZeroCount()))
split.length++
split.cWMetricHistogram.Count += metric.ZeroCount()
if split.length == 1 && currentBucketIndex != 0 {
split.cWMetricHistogram.Max = 0
}
if split.length == split.capacity && currentBucketIndex != totalBucketLen-1 {
split.cWMetricHistogram.Min = 0
}
currentZeroIndex++
currentBucketIndex++
}

// Set mid-point of negative buckets in values/counts array.
return currentBucketIndex, currentZeroIndex
}

func iterateNegativeBuckets(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentNegativeIndex int, totalBucketLen int) (int, int) {
// According to metrics spec, the value in histogram is expected to be non-negative.
// https://opentelemetry.io/docs/specs/otel/metrics/api/#histogram
// However, the negative support is defined in metrics data model.
// https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram
// The negative is also supported but only verified with unit test.

scale := metric.Scale()
base := math.Pow(2, math.Pow(2, float64(-scale)))
negativeBuckets := metric.Negative()
negativeOffset := negativeBuckets.Offset()
negativeBucketCounts := negativeBuckets.BucketCounts()
bucketBegin = 0
bucketEnd = 0
for i := 0; i < negativeBucketCounts.Len(); i++ {
index := i + int(negativeOffset)
bucketBegin := 0.0
bucketEnd := 0.0

for split.length < split.capacity && currentNegativeIndex < metric.Negative().BucketCounts().Len() {
index := currentNegativeIndex + int(negativeOffset)
if bucketEnd == 0 {
bucketEnd = -math.Pow(base, float64(index))
} else {
bucketEnd = bucketBegin
}
bucketBegin = -math.Pow(base, float64(index+1))
metricVal := (bucketBegin + bucketEnd) / 2
count := negativeBucketCounts.At(i)
count := negativeBucketCounts.At(currentNegativeIndex)
if count > 0 {
arrayValues = append(arrayValues, metricVal)
arrayCounts = append(arrayCounts, float64(count))
split.cWMetricHistogram.Values = append(split.cWMetricHistogram.Values, metricVal)
split.cWMetricHistogram.Counts = append(split.cWMetricHistogram.Counts, float64(count))
split.length++
split.cWMetricHistogram.Count += count
if split.length == 1 && currentBucketIndex != 0 {
split.cWMetricHistogram.Max = bucketEnd
}
if split.length == split.capacity && currentBucketIndex != totalBucketLen-1 {
split.cWMetricHistogram.Min = bucketBegin
}
}
currentBucketIndex++
currentNegativeIndex++
}

return []dataPoint{{
name: dps.metricName,
value: &cWMetricHistogram{
Values: arrayValues,
Counts: arrayCounts,
Count: metric.Count(),
Sum: metric.Sum(),
Max: metric.Max(),
Min: metric.Min(),
},
labels: createLabels(metric.Attributes(), instrumentationScopeName),
timestampMs: unixNanoToMilliseconds(metric.Timestamp()),
}}, true
return currentBucketIndex, currentNegativeIndex
}

func (dps exponentialHistogramDataPointSlice) IsStaleNaNInf(i int) (bool, pcommon.Map) {
Expand Down
Loading

0 comments on commit 5eef5bd

Please sign in to comment.