Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregate metric datapoints over time period #29461

Closed
2 tasks
0x006EA1E5 opened this issue Nov 23, 2023 · 30 comments
Closed
2 tasks

Aggregate metric datapoints over time period #29461

0x006EA1E5 opened this issue Nov 23, 2023 · 30 comments
Assignees
Labels
Accepted Component New component has been sponsored Stale

Comments

@0x006EA1E5
Copy link

0x006EA1E5 commented Nov 23, 2023

The purpose and use-cases of the new component

This processor would receive a stream of data points for a a metric timeseries, and periodically emit an "aggregate" at a set interval.

We can achieve something similar already by exporting metrics to the Prometheus exporter, then periodically scraping the prom endpoint with a Prometheus receiver. However this is clunky and somewhat less efficient than using a processor.

One concrete use case is where we want to send high frequency metric datapoints to the Prometheus remote write exporter, for example datapoints produces by the count connector. When counting spans, the count connector will produce a single delta datapoints (increment value 1) for each counted span, which could of course be many times per second. However, typically we would only want to remote write to Prometheus periodically, as we would if we were scraping, perhaps once every 30 seconds. This is especially true for downstream metric sinks that charge for datapoints per minute.

This proposed processor would be stateful, tracking metrics by identity, maintaining a single aggregate value. This aggregate will be output every interval.

For example, if the processor received the following delta datapoints: 1, 3, 5, then at the next "tick" of the interval clock, a single delta datapoint of 9 would be emitted.

Similarly, if the processor received the following cumulative datapoints: 7, 9, 11, then at the next "tick" of the interval clock, a single cumulative datapoint of 11 would be emitted.

There would be a "max_staleness" config option so that we can stop tracking metrics which don't receive any data for a given time.

Example configuration for the component

max_staleness
Include/exclude MatchMetrics

Telemetry data types supported

Metrics

Is this a vendor-specific component?

  • This is a vendor-specific component
  • If this is a vendor-specific component, I am proposing to contribute and support it as a representative of the vendor.

Code Owner(s)

No response

Sponsor (optional)

@djaglowski

Additional context

No response

@0x006EA1E5 0x006EA1E5 added needs triage New item requiring triage Sponsor Needed New component seeking sponsor labels Nov 23, 2023
@crobert-1 crobert-1 added Accepted Component New component has been sponsored and removed Sponsor Needed New component seeking sponsor needs triage New item requiring triage labels Nov 23, 2023
@atoulme
Copy link
Contributor

atoulme commented Dec 13, 2023

@crobert-1 who is the sponsor for this issue?

@crobert-1
Copy link
Member

crobert-1 commented Dec 13, 2023

@crobert-1 who is the sponsor for this issue?

@djaglowski volunteered to be the sponsor here.

@verejoel
Copy link

This is something that is on our radar, and would like to support as much as possible. Our use case is to enable remote writing of metrics from the count connector to Thanos.

@RichieSams
Copy link
Contributor

Some clarifications of the spec for this one, given that #30479 will exist:

  • All non-cumulative metrics will be dropped. If you have delta metrics, use new component: deltatocumulative processor #30479 to convert them to cumulative
  • The aggregator will only store the "last" value received per unique combo of metric name + labels

@djaglowski Do you have any thoughts for a name? timeaggregationprocessor?

@djaglowski
Copy link
Member

@RichieSams, this all looks good to me.

@sh0rez
Copy link
Member

sh0rez commented Jan 23, 2024

other name idea: intervalprocessor (because it emits at fixed intervals)

for implementing this, you may want to take a look at streams.Ident from #30707, which can be used as a map[streams.Ident]T for storing last values for each distinct sample stream

@RichieSams
Copy link
Contributor

Thanks for the pointer! I like that name as well; it's less of a mouthful.

@verejoel
Copy link

Just so I understand the current situation:

Is this correct?

@RichieSams
Copy link
Contributor

Not quite, #30479 will only convert delta to cumulative. IE in ideal settings, there will be a 1 to 1 mapping of input metrics to output metrics.

#29461 (this processor) will only do aggregation.

@RichieSams
Copy link
Contributor

As an added comment: I have started working on this issue, PRs to follow in the coming days

@djaglowski
Copy link
Member

Thanks @RichieSams!

@0x006EA1E5
Copy link
Author

There is no reason why we can't aggregate deltas over time too, though, right?

@RichieSams
Copy link
Contributor

There is no reason why we can't aggregate deltas over time too, though, right?

It just duplicates the code of the new deltatocumulativeprocessor. Unless you mean something else like:

  1. Accumulate delta metrics
  2. At each interval export the current sum for each delta
  3. Reset all the deltas back to zero

This could work. But I'd be curious to the use-cases for that. Vs just converting to cumulative and aggregating those.

@0x006EA1E5
Copy link
Author

There is no reason why we can't aggregate deltas over time too, though, right?

It just duplicates the code of the new deltatocumulativeprocessor. Unless you mean something else like:

  1. Accumulate delta metrics
  2. At each interval export the current sum for each delta
  3. Reset all the deltas back to zero

This could work. But I'd be curious to the use-cases for that. Vs just converting to cumulative and aggregating those.

Yes exactly, many deltas in, one delta out. I think the batch processor does something similar, although the output trigger is batch size, not a clock period.

I guess one use case would be where deltas are preferred/required downstream, but you want to reduce the data rate, e.g., sending a single delta of 10,000 over the wire after 15s is much more efficient than sending 10,000 deltas of 1.

I'm not saying this has to be in scope for the initial implementation if there is no immediate need, but just trying to think about how all this might work.

@0x006EA1E5
Copy link
Author

  • All non-cumulative metrics will be dropped.

I'm assuming that the config across all these processes will be somewhat consistent.

The Cumulative to Delta Processor can be configured with metric include and exclude rules.

Isn't it more appropriate to simply pass through metric data that isn't matched?

If a user wants to drop a certain metric, they should configure a filter processor, no?

This gives maximum flexibility, and the pipeline is nice and explicit.

We'll, I guess it's best to align with what other processors do in comparable situations...

@RichieSams
Copy link
Contributor

I think the batch processor does something similar, although the output trigger is batch size, not a clock period.

It looks like the batchprocessor doesn't do any aggregation. It collects groups of metrics, and then sends them all at once in a single go. https://github.com/open-telemetry/opentelemetry-collector/blob/main/processor/batchprocessor/batch_processor.go#L425

IE, it would collect 10,000 deltas, and send them all at once to the next component in the pipeline. Vs without it, the next component would get 10,000 individual requests with each single metric. So it's optimizing for things like TCP connection / request overhead.

I'm not saying this has to be in scope for the initial implementation if there is no immediate need, but just trying to think about how all this might work.

For sure. I think it could be added in a future scope if the need arises. I don't think that would impact the immediate design.

@RichieSams
Copy link
Contributor

Isn't it more appropriate to simply pass through metric data that isn't matched?

I don't have a strong opinion either way. I'd go with whatever the convention is for other processors.

So for delta metrics, we'd "consume" all the metrics and then periodically export on the interval. For cumulative metrics, we'd pass them to the next component in the chain untouched. Yes?

@0x006EA1E5
Copy link
Author

Isn't it more appropriate to simply pass through metric data that isn't matched?

I don't have a strong opinion either way. I'd go with whatever the convention is for other processors.

So for delta metrics, we'd "consume" all the metrics and then periodically export on the interval. For cumulative metrics, we'd pass them to the next component in the chain untouched. Yes?

I think it's the other way round😅.

The principle use-case is for cumulative metrics, as produced by the new delta to cumulative processor.

But otherwise, yes, exactly 👍

@RichieSams
Copy link
Contributor

Right right, my brain is fried today. lol

@0x006EA1E5
Copy link
Author

0x006EA1E5 commented Feb 23, 2024

However, I realized there is another potential approach. The processor could aggregate metrics over time, and then on an interval, export the aggregate once, flushing the state to empty.

I think the first approach it correct, the exporter publishes the latest / current value every interval.

It exports all metrics currently stored in the state (Should it update all their timestamps to "now()"?)

Yes, the timestamp should be updated to now(). Or at least, the be precise, the exported datapoint should have the timestamp of the instant it was exported. Looking at the spec, cumulative datapoints can also have a start_time timestamp, which in our case would be the timestamp of the previous publish (or, if this is the first publish for the timeseries, then I think the spec describes what should happen in the various edge cases). The output is then a series of contiguous, non-overlapping intervals with no gaps the taken from the first datapoint received.

This is then similar to how things work if we were to publish a metric to a prometheus exporter, and the scrape that back in with a prometheus receiver. We can export datapoints to a prometheus exporter at any rate, but as the prometheus receiver scrapes at a set, steady interval, it just produces one datapoint per timeseries every scrap interval. This includes the case where no new datapoints were sent to the prometheus exporter, the next scrape will just output a new datapoint with the now() timestamp (as I understand it).

Doing it this way meets a key use-case of working with native OTLP metrics in the whole pipeline, until eventually sending to a prometheusremotewrite exporter.

@0x006EA1E5
Copy link
Author

The processor could aggregate metrics over time, and then on an interval, export the aggregate once, flushing the state to empty.

I think if/when we have the ability to work with delta datapoints, we would do something similar, or at least set the value to zero (we would still keep state for the timeseries).

But as I understand it, we are just looking at cumulative datapoints for now, and will pass-through deltas, right?

@RichieSams
Copy link
Contributor

cumulative datapoints can also have a start_time timestamp, which in our case would be the timestamp of the previous publish

Wouldn't start_timestamp just be inherited? IE if the incoming metrics have it, we leave it as-is

@0x006EA1E5
Copy link
Author

0x006EA1E5 commented Feb 28, 2024

Looking at the spec, for cumulative datapoints the start_time_unix_nano should be the start of series, so the same value for each subsequent datapoint

Contrast with cumulative aggregation temporality where we expect to report the full sum since 'start' (where usually start means a process/application start):

(I will edit my comment above to correct it)

So I suppose in our case, that is going to mean the cumulative datapoints' start_time_unix_nano will be the start_time_unix_nano of the first datapoint received, as you suggest :)

@0x006EA1E5
Copy link
Author

It is not clear to me, but it looks like a Sum consists of NumberDataPoints, and start_time_unix_nano is optional to for those https://github.com/open-telemetry/opentelemetry-proto/blob/v0.9.0/opentelemetry/proto/metrics/v1/metrics.proto#L395

@RichieSams
Copy link
Contributor

@sh0rez Can I get your opinion on the two export options I presented above? We have one vote for each atm :P

@sh0rez
Copy link
Member

sh0rez commented Mar 1, 2024

imo the reason for this processor to exist is to limit the flowrate of datapoints (datapoints per minute, etc).
my gut feeling how it should work:

  1. Let $I$ be the aggregation interval.
  2. All incoming samples during $\Delta I$ are collected.
  3. If delta:
    1. accumulate the values into one big delta
    2. use min(start_timestamp). this is important because the start->time interval defines what the delta accounts for.
    3. use max(timestamp)
  4. If cumulative:
    1. export the latest value unaltered
    2. start will not change per spec. if it does, it signals a restart and that needs to be retained.
    3. time is important to keep, as we shall not "lie values" into the future.

In the case where no new datapoints are received within $I$, nothing shall be exported either.
For delta this is correct because zero delta carries no information over no delta at all.
For cumulative this is correct because there is nothing new to report. This matches the behavior of Prometheus: If a scrape fails / the target goes away, no new points are written. If it comes back, there's a gap in the series that signals what happened.

We can leave the delta case out for now.

@0x006EA1E5
Copy link
Author

time is important to keep, as we shall not "lie values" into the future.

Yes this makes sense 👍

@sh0rez

imo the reason for this processor to exist is to limit the flowrate of datapoints (datapoints per minute, etc).

I would say the reason if to fix the flowrate, rather than just limit.

I think we have an open question as to whether we should publish a new cumulative datapoint at every interval in the case where the cumulative value has not changed. e.g., would we publish a stream of 0s every interval, or just publish the first 0 and the only publish another datapoint when there is a new / different value.

I detailed my use case in more detail here: #30827 (comment)

RichieSams added a commit to RichieSams/opentelemetry-collector-contrib that referenced this issue Apr 24, 2024
djaglowski pushed a commit that referenced this issue Apr 30, 2024
Description:

This PR implements the main logic of this new processor.

Link to tracking Issue:

#29461
This is a re-opening of
[30827](#30827)

Testing:
I added a test for the main aggregation / export behavior, but I need to
add more to test state expiry

Documentation:

I updated the README with the updated configs, but I should add some
examples as well.
Copy link
Contributor

github-actions bot commented May 1, 2024

This issue has been inactive for 60 days. It will be closed in 60 days if there is no activity. To ping code owners by adding a component label, see Adding Labels via Comments, or if you are unsure of which component this issue relates to, please ping @open-telemetry/collector-contrib-triagers. If this issue is still relevant, please ping the code owners or leave a comment explaining why it is still relevant. Otherwise, please close it.

@github-actions github-actions bot added the Stale label May 1, 2024
@RichieSams
Copy link
Contributor

Closed by: #32054

jpkrohling added a commit that referenced this issue May 21, 2024
…code owners (#33019)

**Description:** <Describe what has changed.>
<!--Ex. Fixing a bug - Describe the bug and how this fixes the issue.
Ex. Adding a feature - Explain what this achieves.-->
@jpkrohling and @djaglowski volunteered to be sponsors of the delta to
cumulative processor, and @djaglowski also volunteered to be sponsor of
the interval processor in relation to this. They should also be code
owners.

From
[CONTRIBUTING.md](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#adding-new-components):
```
A sponsor is an approver who will be in charge of being the official reviewer of the code and become a code owner for the component.
```

**Link to tracking Issue:** <Issue number if applicable>

#30479
- Delta to cumulative processor

#29461
- Interval processor

---------

Co-authored-by: Juraci Paixão Kröhling <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Accepted Component New component has been sponsored Stale
Projects
None yet
Development

No branches or pull requests

7 participants