Skip to content

Commit

Permalink
tidy up accumulator comments and logs
Browse files Browse the repository at this point in the history
  • Loading branch information
hkfgo committed Dec 6, 2023
1 parent 5d6f49d commit 2c906ef
Showing 1 changed file with 7 additions and 8 deletions.
15 changes: 7 additions & 8 deletions exporter/prometheusexporter/accumulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,19 +161,19 @@ func (a *lastValueAccumulator) accumulateGauge(metric pmetric.Metric, il pcommon
}

func (a *lastValueAccumulator) accumulateSum(metric pmetric.Metric, il pcommon.InstrumentationScope, resourceAttrs pcommon.Map, now time.Time) (n int) {
sum := metric.Sum()
doubleSum := metric.Sum()

// Drop metrics with unspecified aggregations
if sum.AggregationTemporality() == pmetric.AggregationTemporalityUnspecified {
if doubleSum.AggregationTemporality() == pmetric.AggregationTemporalityUnspecified {
return
}

// Drop non-monotonic and non-cumulative metrics
if sum.AggregationTemporality() == pmetric.AggregationTemporalityDelta && !sum.IsMonotonic() {
if doubleSum.AggregationTemporality() == pmetric.AggregationTemporalityDelta && !doubleSum.IsMonotonic() {
return
}

dps := sum.DataPoints()
dps := doubleSum.DataPoints()
for i := 0; i < dps.Len(); i++ {
ip := dps.At(i)

Expand Down Expand Up @@ -201,7 +201,7 @@ func (a *lastValueAccumulator) accumulateSum(metric pmetric.Metric, il pcommon.I
}

// Delta-to-Cumulative
if sum.AggregationTemporality() == pmetric.AggregationTemporalityDelta && ip.StartTimestamp() == mv.value.Sum().DataPoints().At(0).Timestamp() {
if doubleSum.AggregationTemporality() == pmetric.AggregationTemporalityDelta && ip.StartTimestamp() == mv.value.Sum().DataPoints().At(0).Timestamp() {
ip.SetStartTimestamp(mv.value.Sum().DataPoints().At(0).StartTimestamp())
switch ip.ValueType() {
case pmetric.NumberDataPointValueTypeInt:
Expand Down Expand Up @@ -230,7 +230,7 @@ func (a *lastValueAccumulator) accumulateHistogram(metric pmetric.Metric, il pco
for i := 0; i < dps.Len(); i++ {
ip := dps.At(i)

signature := timeseriesSignature(il.Name(), metric, ip.Attributes(), resourceAttrs) // uniquely idenity this time series you are accumulating for
signature := timeseriesSignature(il.Name(), metric, ip.Attributes(), resourceAttrs) // uniquely identify this time series you are accumulating for
if ip.Flags().NoRecordedValue() {
a.registeredMetrics.Delete(signature)
return 0
Expand All @@ -239,7 +239,6 @@ func (a *lastValueAccumulator) accumulateHistogram(metric pmetric.Metric, il pco
v, ok := a.registeredMetrics.Load(signature) // a accumulates metric values for all times series. Get value for particular time series
if !ok {
// first data point
a.logger.Debug("Accumulate first histogram data point")
m := copyMetricMetadata(metric)
ip.CopyTo(m.SetEmptyHistogram().DataPoints().AppendEmpty())
m.Histogram().SetAggregationTemporality(pmetric.AggregationTemporalityCumulative)
Expand All @@ -256,7 +255,7 @@ func (a *lastValueAccumulator) accumulateHistogram(metric pmetric.Metric, il pco
case pmetric.AggregationTemporalityDelta:
pp := mv.value.Histogram().DataPoints().At(0) // previous aggregated value for time range
if ip.StartTimestamp().AsTime() != pp.Timestamp().AsTime() {
// treat misalgnment as restart and reset, or violation of single-writer principle and drop
// treat misalignment as restart and reset, or violation of single-writer principle and drop
a.logger.With(
zap.String("ip_start_time", ip.StartTimestamp().String()),
zap.String("pp_start_time", pp.StartTimestamp().String()),
Expand Down

0 comments on commit 2c906ef

Please sign in to comment.