From 73e67ba425481b94f51793df503b2e2b2e619514 Mon Sep 17 00:00:00 2001 From: Felipe Lopes Date: Mon, 17 Jun 2024 15:28:06 -0300 Subject: [PATCH] [processor/metricstransformprocessor] Support `count` aggregation type (#32935) **Description:** Implements https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/24978. --- .chloggen/count-aggregation-type.yaml | 27 +++++++++ processor/metricstransformprocessor/README.md | 4 +- processor/metricstransformprocessor/config.go | 5 +- ...rics_transform_processor_testcases_test.go | 57 +++++++++++++++++++ .../operation_aggregate_labels.go | 4 ++ 5 files changed, 94 insertions(+), 3 deletions(-) create mode 100644 .chloggen/count-aggregation-type.yaml diff --git a/.chloggen/count-aggregation-type.yaml b/.chloggen/count-aggregation-type.yaml new file mode 100644 index 000000000000..2007319d2ed7 --- /dev/null +++ b/.chloggen/count-aggregation-type.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: metricstransformprocessor + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Adds the 'count' aggregation type to the Metrics Transform Processor." + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [24978] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/processor/metricstransformprocessor/README.md b/processor/metricstransformprocessor/README.md index dd26f429902d..80f3c594a83c 100644 --- a/processor/metricstransformprocessor/README.md +++ b/processor/metricstransformprocessor/README.md @@ -86,7 +86,7 @@ processors: # new_name specifies the updated name of the metric; if action is insert or combine, new_name is required new_name: # aggregation_type defines how combined data points will be aggregated; if action is combine, aggregation_type is required - aggregation_type: {sum, mean, min, max} + aggregation_type: {sum, mean, min, max, count} # submatch_case specifies the case that should be used when adding label values based on regexp submatches when performing a combine action; leave blank to use the submatch value as is submatch_case: {lower, upper} # operations contain a list of operations that will be performed on the resulting metric(s) @@ -106,7 +106,7 @@ processors: # label_set contains a list of labels that will remain after aggregation; if action is aggregate_labels, label_set is required label_set: [labels...] # aggregation_type defines how data points will be aggregated; if action is aggregate_labels or aggregate_label_values, aggregation_type is required - aggregation_type: {sum, mean, min, max} + aggregation_type: {sum, mean, min, max, count} # experimental_scale specifies the scalar to apply to values experimental_scale: # value_actions contain a list of operations that will be performed on the selected label diff --git a/processor/metricstransformprocessor/config.go b/processor/metricstransformprocessor/config.go index b462a54a2473..62587de732ce 100644 --- a/processor/metricstransformprocessor/config.go +++ b/processor/metricstransformprocessor/config.go @@ -231,9 +231,12 @@ const ( // max indicates taking the max of the aggregated data. max aggregationType = "max" + + // count indicates taking the count of the aggregated data. + count aggregationType = "count" ) -var aggregationTypes = []aggregationType{sum, mean, min, max} +var aggregationTypes = []aggregationType{sum, mean, min, max, count} func (at aggregationType) isValid() bool { for _, aggregationType := range aggregationTypes { diff --git a/processor/metricstransformprocessor/metrics_transform_processor_testcases_test.go b/processor/metricstransformprocessor/metrics_transform_processor_testcases_test.go index 0c7861732169..ae623023590d 100644 --- a/processor/metricstransformprocessor/metrics_transform_processor_testcases_test.go +++ b/processor/metricstransformprocessor/metrics_transform_processor_testcases_test.go @@ -298,6 +298,35 @@ var ( addIntDatapoint(1, 2, 3, "label1-value1").build(), }, }, + { + name: "metric_label_aggregation_count_int_update", + transforms: []internalTransform{ + { + MetricIncludeFilter: internalFilterStrict{include: "metric1"}, + Action: Update, + Operations: []internalOperation{ + { + configOperation: Operation{ + Action: aggregateLabels, + AggregationType: count, + LabelSet: []string{"label1"}, + }, + labelSetMap: map[string]bool{"label1": true}, + }, + }, + }, + }, + in: []pmetric.Metric{ + metricBuilder(pmetric.MetricTypeGauge, "metric1", "label1", "label2"). + addIntDatapoint(1, 2, 1, "label1-value1", "label2-value1"). + addIntDatapoint(1, 2, 4, "label1-value1", "label2-value2"). + addIntDatapoint(1, 2, 2, "label1-value1", "label2-value2").build(), + }, + out: []pmetric.Metric{ + metricBuilder(pmetric.MetricTypeGauge, "metric1", "label1"). + addIntDatapoint(1, 2, 3, "label1-value1").build(), + }, + }, { name: "metric_label_aggregation_min_int_update", transforms: []internalTransform{ @@ -411,6 +440,34 @@ var ( addDoubleDatapoint(1, 2, 3, "label1-value1").build(), }, }, + { + name: "metric_label_aggregation_count_double_update", + transforms: []internalTransform{ + { + MetricIncludeFilter: internalFilterStrict{include: "metric1"}, + Action: Update, + Operations: []internalOperation{ + { + configOperation: Operation{ + Action: aggregateLabels, + AggregationType: count, + LabelSet: []string{"label1"}, + }, + labelSetMap: map[string]bool{"label1": true}, + }, + }, + }, + }, + in: []pmetric.Metric{ + metricBuilder(pmetric.MetricTypeGauge, "metric1", "label1", "label2"). + addDoubleDatapoint(1, 2, 3, "label1-value1", "label2-value1"). + addDoubleDatapoint(1, 2, 1, "label1-value1", "label2-value2").build(), + }, + out: []pmetric.Metric{ + metricBuilder(pmetric.MetricTypeGauge, "metric1", "label1"). + addDoubleDatapoint(1, 2, 2, "label1-value1").build(), + }, + }, { name: "metric_label_aggregation_min_double_update", transforms: []internalTransform{ diff --git a/processor/metricstransformprocessor/operation_aggregate_labels.go b/processor/metricstransformprocessor/operation_aggregate_labels.go index 1080d0a57dcf..a0129d2a870c 100644 --- a/processor/metricstransformprocessor/operation_aggregate_labels.go +++ b/processor/metricstransformprocessor/operation_aggregate_labels.go @@ -167,6 +167,8 @@ func mergeNumberDataPoints(dpsMap map[string]pmetric.NumberDataPointSlice, agg a dp.SetDoubleValue(math.Max(dp.DoubleValue(), doubleVal(dps.At(i)))) case min: dp.SetDoubleValue(math.Min(dp.DoubleValue(), doubleVal(dps.At(i)))) + case count: + dp.SetDoubleValue(float64(dps.Len())) } if dps.At(i).StartTimestamp() < dp.StartTimestamp() { dp.SetStartTimestamp(dps.At(i).StartTimestamp()) @@ -188,6 +190,8 @@ func mergeNumberDataPoints(dpsMap map[string]pmetric.NumberDataPointSlice, agg a if dp.IntValue() > intVal(dps.At(i)) { dp.SetIntValue(intVal(dps.At(i))) } + case count: + dp.SetIntValue(int64(dps.Len())) } if dps.At(i).StartTimestamp() < dp.StartTimestamp() { dp.SetStartTimestamp(dps.At(i).StartTimestamp())