Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "[exporter/awsemfexporter]Split EMF log with larger than 100 buckets." #36763

Merged
merged 1 commit into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 0 additions & 27 deletions .chloggen/split-emf-log-when-buckets-larger-than-100.yaml

This file was deleted.

221 changes: 42 additions & 179 deletions exporter/awsemfexporter/datapoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,33 +109,6 @@ type summaryMetricEntry struct {
count uint64
}

// dataPointSplit is a structure used to manage segments of data points split from a histogram.
// It is not safe for concurrent use.
type dataPointSplit struct {
cWMetricHistogram *cWMetricHistogram
length int
capacity int
}

func (split *dataPointSplit) isFull() bool {
return split.length >= split.capacity
}

func (split *dataPointSplit) setMax(maxVal float64) {
split.cWMetricHistogram.Max = maxVal
}

func (split *dataPointSplit) setMin(minVal float64) {
split.cWMetricHistogram.Min = minVal
}

func (split *dataPointSplit) appendMetricData(metricVal float64, count uint64) {
split.cWMetricHistogram.Values = append(split.cWMetricHistogram.Values, metricVal)
split.cWMetricHistogram.Counts = append(split.cWMetricHistogram.Counts, float64(count))
split.length++
split.cWMetricHistogram.Count += count
}

// CalculateDeltaDatapoints retrieves the NumberDataPoint at the given index and performs rate/delta calculation if necessary.
func (dps numberDataPointSlice) CalculateDeltaDatapoints(i int, instrumentationScopeName string, _ bool, calculators *emfCalculators) ([]dataPoint, bool) {
metric := dps.NumberDataPointSlice.At(i)
Expand Down Expand Up @@ -220,195 +193,85 @@ func (dps histogramDataPointSlice) IsStaleNaNInf(i int) (bool, pcommon.Map) {
}

// CalculateDeltaDatapoints retrieves the ExponentialHistogramDataPoint at the given index.
// As CloudWatch EMF logs allows in maximum of 100 target members, the exponential histogram metric are split into multiple data points as needed,
// each containing a maximum of 100 buckets, to comply with CloudWatch EMF log constraints.
// Note that the number of values and counts in each split may not be less than splitThreshold as we are only adding non-zero bucket counts.
//
// For each split data point:
// - Min and Max values are recalculated based on the bucket boundary within that specific split.
// - Sum is only assigned to the first split to ensure the total sum of the datapoints after aggregation is correct.
// - Count is accumulated based on the bucket counts within each split.
func (dps exponentialHistogramDataPointSlice) CalculateDeltaDatapoints(idx int, instrumentationScopeName string, _ bool, _ *emfCalculators) ([]dataPoint, bool) {
metric := dps.ExponentialHistogramDataPointSlice.At(idx)

const splitThreshold = 100
currentBucketIndex := 0
currentPositiveIndex := metric.Positive().BucketCounts().Len() - 1
currentZeroIndex := 0
currentNegativeIndex := 0
var datapoints []dataPoint
totalBucketLen := metric.Positive().BucketCounts().Len() + metric.Negative().BucketCounts().Len()
if metric.ZeroCount() > 0 {
totalBucketLen++
}

for currentBucketIndex < totalBucketLen {
// Create a new dataPointSplit with a capacity of up to splitThreshold buckets
capacity := min(splitThreshold, totalBucketLen-currentBucketIndex)

sum := 0.0
// Only assign `Sum` if this is the first split to make sure the total sum of the datapoints after aggregation is correct.
if currentBucketIndex == 0 {
sum = metric.Sum()
}

split := dataPointSplit{
cWMetricHistogram: &cWMetricHistogram{
Values: []float64{},
Counts: []float64{},
Max: metric.Max(),
Min: metric.Min(),
Count: 0,
Sum: sum,
},
length: 0,
capacity: capacity,
}

// Set collect values from positive buckets and save into split.
currentBucketIndex, currentPositiveIndex = collectDatapointsWithPositiveBuckets(&split, metric, currentBucketIndex, currentPositiveIndex)
// Set collect values from zero buckets and save into split.
currentBucketIndex, currentZeroIndex = collectDatapointsWithZeroBucket(&split, metric, currentBucketIndex, currentZeroIndex)
// Set collect values from negative buckets and save into split.
currentBucketIndex, currentNegativeIndex = collectDatapointsWithNegativeBuckets(&split, metric, currentBucketIndex, currentNegativeIndex)

if split.length > 0 {
// Add the current split to the datapoints list
datapoints = append(datapoints, dataPoint{
name: dps.metricName,
value: split.cWMetricHistogram,
labels: createLabels(metric.Attributes(), instrumentationScopeName),
timestampMs: unixNanoToMilliseconds(metric.Timestamp()),
})
}
}

if len(datapoints) == 0 {
return []dataPoint{{
name: dps.metricName,
value: &cWMetricHistogram{
Values: []float64{},
Counts: []float64{},
Count: metric.Count(),
Sum: metric.Sum(),
Max: metric.Max(),
Min: metric.Min(),
},
labels: createLabels(metric.Attributes(), instrumentationScopeName),
timestampMs: unixNanoToMilliseconds(metric.Timestamp()),
}}, true
}

// Override the min and max values of the first and last splits with the raw data of the metric.
datapoints[0].value.(*cWMetricHistogram).Max = metric.Max()
datapoints[len(datapoints)-1].value.(*cWMetricHistogram).Min = metric.Min()

return datapoints, true
}

func collectDatapointsWithPositiveBuckets(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentPositiveIndex int) (int, int) {
if split.isFull() || currentPositiveIndex < 0 {
return currentBucketIndex, currentPositiveIndex
}

scale := metric.Scale()
base := math.Pow(2, math.Pow(2, float64(-scale)))
arrayValues := []float64{}
arrayCounts := []float64{}
var bucketBegin float64
var bucketEnd float64

// Set mid-point of positive buckets in values/counts array.
positiveBuckets := metric.Positive()
positiveOffset := positiveBuckets.Offset()
positiveBucketCounts := positiveBuckets.BucketCounts()
bucketBegin := 0.0
bucketEnd := 0.0

for !split.isFull() && currentPositiveIndex >= 0 {
index := currentPositiveIndex + int(positiveOffset)
if bucketEnd == 0 {
bucketEnd = math.Pow(base, float64(index+1))
bucketBegin = 0
bucketEnd = 0
for i := 0; i < positiveBucketCounts.Len(); i++ {
index := i + int(positiveOffset)
if bucketBegin == 0 {
bucketBegin = math.Pow(base, float64(index))
} else {
bucketEnd = bucketBegin
bucketBegin = bucketEnd
}
bucketBegin = math.Pow(base, float64(index))
bucketEnd = math.Pow(base, float64(index+1))
metricVal := (bucketBegin + bucketEnd) / 2
count := positiveBucketCounts.At(currentPositiveIndex)
count := positiveBucketCounts.At(i)
if count > 0 {
split.appendMetricData(metricVal, count)

// The value are append from high to low, set Max from the first bucket (highest value) and Min from the last bucket (lowest value)
if split.length == 1 {
split.setMax(bucketEnd)
}
if split.isFull() {
split.setMin(bucketBegin)
}
arrayValues = append(arrayValues, metricVal)
arrayCounts = append(arrayCounts, float64(count))
}
currentBucketIndex++
currentPositiveIndex--
}

return currentBucketIndex, currentPositiveIndex
}

func collectDatapointsWithZeroBucket(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentZeroIndex int) (int, int) {
if metric.ZeroCount() > 0 && !split.isFull() && currentZeroIndex == 0 {
split.appendMetricData(0, metric.ZeroCount())

// The value are append from high to low, set Max from the first bucket (highest value) and Min from the last bucket (lowest value)
if split.length == 1 {
split.setMax(0)
}
if split.isFull() {
split.setMin(0)
}
currentZeroIndex++
currentBucketIndex++
// Set count of zero bucket in values/counts array.
if metric.ZeroCount() > 0 {
arrayValues = append(arrayValues, 0)
arrayCounts = append(arrayCounts, float64(metric.ZeroCount()))
}

return currentBucketIndex, currentZeroIndex
}

func collectDatapointsWithNegativeBuckets(split *dataPointSplit, metric pmetric.ExponentialHistogramDataPoint, currentBucketIndex int, currentNegativeIndex int) (int, int) {
// Set mid-point of negative buckets in values/counts array.
// According to metrics spec, the value in histogram is expected to be non-negative.
// https://opentelemetry.io/docs/specs/otel/metrics/api/#histogram
// However, the negative support is defined in metrics data model.
// https://opentelemetry.io/docs/specs/otel/metrics/data-model/#exponentialhistogram
// The negative is also supported but only verified with unit test.
if split.isFull() || currentNegativeIndex >= metric.Negative().BucketCounts().Len() {
return currentBucketIndex, currentNegativeIndex
}

scale := metric.Scale()
base := math.Pow(2, math.Pow(2, float64(-scale)))
negativeBuckets := metric.Negative()
negativeOffset := negativeBuckets.Offset()
negativeBucketCounts := negativeBuckets.BucketCounts()
bucketBegin := 0.0
bucketEnd := 0.0

for !split.isFull() && currentNegativeIndex < metric.Negative().BucketCounts().Len() {
index := currentNegativeIndex + int(negativeOffset)
bucketBegin = 0
bucketEnd = 0
for i := 0; i < negativeBucketCounts.Len(); i++ {
index := i + int(negativeOffset)
if bucketEnd == 0 {
bucketEnd = -math.Pow(base, float64(index))
} else {
bucketEnd = bucketBegin
}
bucketBegin = -math.Pow(base, float64(index+1))
metricVal := (bucketBegin + bucketEnd) / 2
count := negativeBucketCounts.At(currentNegativeIndex)
count := negativeBucketCounts.At(i)
if count > 0 {
split.appendMetricData(metricVal, count)

// The value are append from high to low, set Max from the first bucket (highest value) and Min from the last bucket (lowest value)
if split.length == 1 {
split.setMax(bucketEnd)
}
if split.isFull() {
split.setMin(bucketBegin)
}
arrayValues = append(arrayValues, metricVal)
arrayCounts = append(arrayCounts, float64(count))
}
currentBucketIndex++
currentNegativeIndex++
}

return currentBucketIndex, currentNegativeIndex
return []dataPoint{{
name: dps.metricName,
value: &cWMetricHistogram{
Values: arrayValues,
Counts: arrayCounts,
Count: metric.Count(),
Sum: metric.Sum(),
Max: metric.Max(),
Min: metric.Min(),
},
labels: createLabels(metric.Attributes(), instrumentationScopeName),
timestampMs: unixNanoToMilliseconds(metric.Timestamp()),
}}, true
}

func (dps exponentialHistogramDataPointSlice) IsStaleNaNInf(i int) (bool, pcommon.Map) {
Expand Down
Loading
Loading