-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[prometheusremotewriteexporter] reduce allocations in createAttributes #35184
base: main
Are you sure you want to change the base?
[prometheusremotewriteexporter] reduce allocations in createAttributes #35184
Conversation
6a5d2ca
to
c4d233a
Compare
// best to keep it around for the lifetime of the Go process. Due to this shared | ||
// state, PrometheusConverter is NOT thread-safe and is only intended to be used by | ||
// a single go-routine at a time. | ||
// Each FromMetrics call should be followed by a Reset when the metrics can be safely |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we emit a warning log or something if someone calls FromMetrics without Resetting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Moved the reset to always be called inside FromMetrics so this is no longer a user concern.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that the user doesn't need to call reset, should we remove this part of the comment?
// Each FromMetrics call should be followed by a Reset.....
We don't plan to keep this forever, right? Ideally we'll be able to shard this to improve throughput, we're just hardcoding this to 1 because OTel's exporter helper doesn't ensure ordering. On the other hand, I agree that we shouldn't block optimizations based on something we want to do in the future 😬. @edma2, knowing that we'll eventually shard the output, any suggestions on how to do this without sacrificing your optimization? |
I wonder also since you can have multiple pipelines with multiple remote write exporters (i.e. sending data from dev cluster to 2 destination, dev and prod) if that would break this too. |
@ArthurSens my initial thought here is maybe wrap things in a
@jmichalek132 Each exporter would have its own instance of |
This PR was marked stale due to lack of activity. It will be closed in 14 days. |
This PR was marked stale due to lack of activity. It will be closed in 14 days. |
@ArthurSens @dashpole @jmichalek132 I addressed comments and also changed the implementation so it's now in a sync.Pool. This now supports concurrent access from the exporter class in case it ever supports more than 1 worker at a time. Please take a look! |
b337721
to
6f941e3
Compare
Awesome edma! I'm struggling a bit to find time to review this one, just wanted to let you know that this is on my list :) |
open-telemetry#57) createAttributes was allocating a new label slice for every series, which generates mucho garbage (~30-40% of all allocations). Keep around a re-usable underlying array of labels to reduce allocations on the hot path.
6f941e3
to
928529a
Compare
l := c.labelsMap | ||
clear(l) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we also clear labelMap
when we call reset()
? I think it would be cleaner if we reset the state in one single place instead, or is there any particular reason to do it here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the same reason as https://github.com/open-telemetry/opentelemetry-collector-contrib/pull/35184/files/928529a1cf587e8e5b29bd4880f2c36157eb8194#r1829677356 we want to isolate the contents of this map between calls to createAttributes
, so we do that by clearing it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That makes sense, thank you! Could we add a comment explaining it?
// best to keep it around for the lifetime of the Go process. Due to this shared | ||
// state, PrometheusConverter is NOT thread-safe and is only intended to be used by | ||
// a single go-routine at a time. | ||
// Each FromMetrics call should be followed by a Reset when the metrics can be safely |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that the user doesn't need to call reset, should we remove this part of the comment?
// Each FromMetrics call should be followed by a Reset.....
labels = labels[:0] | ||
startIndex := len(c.labels) | ||
for k, v := range l { | ||
labels = append(labels, prompb.Label{Name: k, Value: v}) | ||
c.labels = append(c.labels, prompb.Label{Name: k, Value: v}) | ||
} | ||
|
||
return labels | ||
return c.labels[startIndex:] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any possibility that len(c.labels)
is not 0 here? It's reset every time we call FromMetrics and I couldn't find any other place in the code where we write to this array, so why not just return c.labels
and not worry about startIndex? I might be missing something but it feels like we're overcomplicating things here
for k, v := range l {
c.labels = append(c.labels, prompb.Label{Name: k, Value: v})
}
return c.labels
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
startIndex
is important for keeping the returned slices isolated from each other while sharing the same underlying array within a single FromMetrics
call. It is 0 only for the first series of a batch.
Here is how it works: FromMetrics
is called once per batch, and createAttributes
for every series within the batch. We want to re-use the backing array of the labels
slice for all series within a single batch. We do that by appending the labels of each series to the end of the slice. Finally we return only starting from startIndex
so the caller doesn't see labels from other series (while reusing the same backing array which naturally grows up to the size needed to fit a single FromMetrics
call).
For example, if X1...X4 are labels from series X and Y1...Y3 are labels from series Y, then the backing array of c.labels
will look like [X1, X2, X3, X4, Y1, Y2, Y3]
after calling createAttributes
twice (this is a simplification as the backing array will probably have excess capacity from resizing or previous calls). Meanwhile, the first call to createAttributes
will have returned [X1, X2, X3, X4]
and the second call returned [Y1, Y2, Y3]
. On the next FromMetrics
call the index is reset to 0 and we can re-use the entire array with zero allocations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perfect, thank you for the explanation :)
Now what I'm thinking is if we have tests that assure concurrency works. Mostly to make sure we don't break the non-thread-safe promise by accident
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright, thanks for the patience and for kindly explaining everything here :)
Regarding all the code so far, it LGTM! Let's just add an extra test that certifies we're not calling FromMetrics concurrently in other areas of the codebase. This test will probably live in the exporters that uses the translator pkg, e.g. prometheusremotewriteexporter
Maybe something like this(pseudo-code):
func generateMetric(i int) pmetric.Metrics {
m := GenerateMetricsOneMetric()
m.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().AppendEmpty().SetIntValue(i)
return m
}
func TestPushMetricsConcurrency(t testint.T) {
// Create mock where prwexporter sends metric to
// Create prwexporter
n := 1000
metrics := make([]pmetric.Metrics, 0, n)
for i<n {
metrics = append(metrics, generateMetric(i)
}
var wg sync.WaitGroup
wg.Add(n)
for range metrics {
go func() {
prwexporter.PushMetrics(background, metrics[i])
wg.Done()
}()
}
wg.Wait()
//assert Metrics arrived in the mock.
}
# Use this changelog template to create an entry for release notes. | ||
|
||
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' | ||
change_type: 'breaking' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's an enhancement, right? Not a breaking change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dashpole suggested this should be considered breaking, unless I misread the comment #35184 (comment).
I agree it's more of an enhancement though it is technically changing a public function (FromMetrics).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FromMetrics
doesn't need to change to achieve this, does it? Rather than storing a converter pool in the PRW exporter it could be stored in the translator package and used within FromMetrics
since a single invocation of that function encompasses a complete usage of the converter. Alternately, we can complete the effort that was started to formalize that interface and add a Reset()
method to it, if required.
I think that should probably happen separately from optimization efforts, though. It already seems like there are several different threads being pulled at in this area. Can we work through what this API should look like to enable those desired changes, change the API, then deal with concurrency within the exporter and optimization within the translator independently?
This PR was marked stale due to lack of activity. It will be closed in 14 days. |
@ArthurSens thank you for the suggestion. Added a test, PTAL |
//go:build !race | ||
// +build !race | ||
|
||
// note: this test doesn't pass currently due to a race condition in batchTimeSeries |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ArthurSens I discovered a race condition when multiple goroutines run which seems unrelated to my change. The batching logic uses shared state which won't work with multiple goroutines. I added some build tags above to exclude this test until it's fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm, I've run your test on the main branch, and indeed, we have a data race like you said :O
I am not sure how to proceed here. To be honest, I think we need to fix the test first, so we're safe to merge this one, too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've opened a PR trying to address the issue: #36524
Wanna share your opinion? :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, there was a holiday and I didn't get a chance to look at this. I'll take a look this week.
This is on my review list; I'll get to it hopefully tomorrow or early next week 🤞 |
// Only call this function when the result of the previous call is no longer needed | ||
// (i.e. has been exported out of process). The return values of this function depend on | ||
// internal state which is reset on every call. | ||
func (c *PrometheusConverter) FromMetrics(md pmetric.Metrics, settings Settings) (map[string]*prompb.TimeSeries, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function was retained to minimize interface changes while adding the prometheusConverter
type to improve efficiency of label comparison. See #31385 for the history. I don't think it should be added to the API of the prometheusConverter
type in this manner as it was a shim to maintain compatibility with the PRW exporter and not the intended interface of this type. Given that the original author has seemingly found a different solution to their internal problem elsewhere and abandoned the proposed API changes we should probably take this opportunity to review the API that an exported PrometheusConverter
type should have.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although I like the idea of reimagining the API, we have way too much activity happening in this component at the moment, and it is mostly coming from new members like myself, @edma2, and @jmichalek132.
The OTel-Prometheus SIG group is relatively small, and I'd love to optimize contributions for community growth. Once we have enough hands, we can plan the refactoring you are proposing. By optimizing contributions for community growth, I mean fast interactions, try our best to keep PRs open for max 2 weeks (to avoid rebases), accepting imperfect PRs while encouraging improvements in follow-ups, etc.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I never said anything about reimagining an API. I said that if we're going to expose the PrometheusConverter
type we need to think about what that exposed API should be and I submit that simply taking this helper method and making it the public API ain't it. The API is already there, simply not exported. If a Reset()
method needs to be added, that's fine. What we shouldn't do is export this helper method, which has the wrong signature and does more work than is needed in order to keep a separation between a performance optimization and the public API of this package, simply because it's the "efficient" thing to do from the view of how long PRs remain open. That will mean a breaking change now and another breaking change when a sensible API is actually exposed.
An alternative that wouldn't involve considering the shape of a public API at all, and wouldn't involve any breaking changes to any public API, would be to move the converterPool
out of the exporter and into the translation package. The existing FromMetrics
function is the only thing that needs it and it only needs to obtain a converter, use it, and return it to the pool. As this PR currently stands the exporter never uses a PrometheusConverter
instance for anything more than a single invocation of FromMetrics
. Doing it this way should make this PR considerably smaller and eliminates any public API changes.
What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An alternative that wouldn't involve considering the shape of a public API at all, and wouldn't involve any breaking changes to any public API, would be to move the converterPool out of the exporter and into the translation package. The existing FromMetrics function is the only thing that needs it and it only needs to obtain a converter, use it, and return it to the pool. As this PR currently stands the exporter never uses a PrometheusConverter instance for anything more than a single invocation of FromMetrics. Doing it this way should make this PR considerably smaller and eliminates any public API changes.
@Aneurysm9 This wouldn't work because the translator package (i.e. FromMetrics) doesn't know when to put the converter back in the pool. It can't do it at the end of FromMetrics because the metrics may not have been shipped off yet; putting it back in the pool may prematurely reset its state and send invalid attributes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then we're back to what I said initially: expose the correct API for the PrometheusConverter
type, don't simply shove this shim into it. If we're going to export that type it should have the following interface:
type PrometheusConverter interface {
FromMetrics(pmetric.Metrics, Settings) error
TimeSeries() []prompb.TimeSeries
Reset()
}
It doesn't necessarily need to be an interface, and TimeSeries()
returning a prompb
type complicates the use case where an interface would actually make sense, but that's the shape of the exported methods I would expect. They're all there. Let's export them rather than perpetuating the stopgap that's in place and making it take multiple breaking changes to get there.
# Use this changelog template to create an entry for release notes. | ||
|
||
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' | ||
change_type: 'breaking' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
FromMetrics
doesn't need to change to achieve this, does it? Rather than storing a converter pool in the PRW exporter it could be stored in the translator package and used within FromMetrics
since a single invocation of that function encompasses a complete usage of the converter. Alternately, we can complete the effort that was started to formalize that interface and add a Reset()
method to it, if required.
I think that should probably happen separately from optimization efforts, though. It already seems like there are several different threads being pulled at in this area. Can we work through what this API should look like to enable those desired changes, change the API, then deal with concurrency within the exporter and optimization within the translator independently?
This PR was marked stale due to lack of activity. It will be closed in 14 days. |
Description:
While profiling the collector, we found that the
createAttributes
function was responsible for a significant chunk of allocations (30-40%) which was leading to a high CPU usage spent in GC.createAttributes
is responsible for converting attributes of a given data point to Prometheus labels. For simplicity, it allocates a new labels slice for every data point. We found that reducing allocations here significantly reduced GC time in our environment (in some deployments as much as ~50%).The strategy in this PR is to reuse the slice array as much as possible. The backing array will automatically resize as needed (batching with a batch processor will effectively set an upper bound). Note: we don't need to synchronize access to this (e.g.
sync.Pool
) since the exporter is configured with 1 consumer.Link to tracking Issue:
Testing:
Modified unit tests and ran benchmarks locally.
Works in our production environment.
benchstat output
Documentation: