Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[processor/interval] Refacor with time-base partitioning strategy #34948

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions processor/intervalprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ import (
)

var (
ErrInvalidIntervalValue = errors.New("invalid interval value")
ErrInvalidIntervalValue = errors.New("invalid interval value")
ErrIntervalLowesetGranularityIsSecond = errors.New("interval should should not contain milli or nano seconds")
)

var _ component.Config = (*Config)(nil)
Expand All @@ -37,9 +38,13 @@ type PassThrough struct {
// Validate checks whether the input configuration has all of the required fields for the processor.
// An error is returned if there are any invalid inputs.
func (config *Config) Validate() error {
if config.Interval <= 0 {
if config.Interval <= time.Second {
return ErrInvalidIntervalValue
}

if config.Interval%time.Second != 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, choosing an interval of like 16s345ms is odd for sure. But I don't think it's necessarily a problem for the algorithm

return ErrIntervalLowesetGranularityIsSecond
}

return nil
}
211 changes: 117 additions & 94 deletions processor/intervalprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@ type Processor struct {

stateLock sync.Mutex

partitions []*Partition
numPartitions int // Store number of partitions to avoid len(partitions) calls all the time.
config *Config

nextConsumer consumer.Metrics
}

type Partition struct {
md pmetric.Metrics
rmLookup map[identity.Resource]pmetric.ResourceMetrics
smLookup map[identity.Scope]pmetric.ScopeMetrics
Expand All @@ -37,47 +45,52 @@ type Processor struct {
histogramLookup map[identity.Stream]pmetric.HistogramDataPoint
expHistogramLookup map[identity.Stream]pmetric.ExponentialHistogramDataPoint
summaryLookup map[identity.Stream]pmetric.SummaryDataPoint

config *Config

nextConsumer consumer.Metrics
}

func newProcessor(config *Config, log *zap.Logger, nextConsumer consumer.Metrics) *Processor {
ctx, cancel := context.WithCancel(context.Background())

return &Processor{
ctx: ctx,
cancel: cancel,
logger: log,

stateLock: sync.Mutex{},

md: pmetric.NewMetrics(),
rmLookup: map[identity.Resource]pmetric.ResourceMetrics{},
smLookup: map[identity.Scope]pmetric.ScopeMetrics{},
mLookup: map[identity.Metric]pmetric.Metric{},
numberLookup: map[identity.Stream]pmetric.NumberDataPoint{},
histogramLookup: map[identity.Stream]pmetric.HistogramDataPoint{},
expHistogramLookup: map[identity.Stream]pmetric.ExponentialHistogramDataPoint{},
summaryLookup: map[identity.Stream]pmetric.SummaryDataPoint{},
numPartitions := int(config.Interval.Seconds())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I think this should be either hardcoded to something like 60. Or a configurable. If someone chooses a large interval like 15 minutes, we don't want to create 900 partitions as a result

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might sound like a dumb question, but why 900 partitions is a problem? 😅

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's just overkill. Also, I don't think we want to export every second. It's a lot of work to be doing every second.

I guess this highlights the issue with hardcoding or such. If the interval is short, like 15 seconds, we probably don't need to partition at all. But if it's 15 minutes or an hour, then partitioning will be very impactful. But again, we don't want to export every second.

IE, maybe we calculate the number of intervals to have, based on some "ideal" export frequency. (no idea what value to choose for that).

For example, say we choose the "ideal" export interval to be 15 seconds. If the "processor interval" is less than 15 seconds, we only have one partition. Between 15 and 30 seconds, two intervals. Between 30 and 45 seconds, three intervals. Etc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's just overkill. Also, I don't think we want to export every second. It's a lot of work to be doing every second.

Ok I see your point. I guess our approach here is switching where the heavy computation load is happening.

  • No partitions = heavy load on who is receiving the aggregated metrics since everything arrives at the same time
  • More partitions = Smoother load on the next consumer, but higher load on intervalprocessor

IE, maybe we calculate the number of intervals to have, based on some "ideal" export frequency. (no idea what value to choose for that).

Not sure if we'll be able to find a one-fits-all here, it all depends on the characteristics of the incoming metrics 😕.


partitions := make([]*Partition, numPartitions)
for i := range partitions {
partitions[i] = &Partition{
md: pmetric.NewMetrics(),
rmLookup: make(map[identity.Resource]pmetric.ResourceMetrics, 0),
smLookup: make(map[identity.Scope]pmetric.ScopeMetrics, 0),
mLookup: make(map[identity.Metric]pmetric.Metric, 0),
numberLookup: make(map[identity.Stream]pmetric.NumberDataPoint, 0),
histogramLookup: make(map[identity.Stream]pmetric.HistogramDataPoint, 0),
expHistogramLookup: make(map[identity.Stream]pmetric.ExponentialHistogramDataPoint, 0),
summaryLookup: make(map[identity.Stream]pmetric.SummaryDataPoint, 0),
}
}

config: config,
return &Processor{
ctx: ctx,
cancel: cancel,
logger: log,
stateLock: sync.Mutex{},
partitions: partitions,
numPartitions: numPartitions,
config: config,

nextConsumer: nextConsumer,
}
}

func (p *Processor) Start(_ context.Context, _ component.Host) error {
exportTicker := time.NewTicker(p.config.Interval)
exportTicker := time.NewTicker(time.Second)
Copy link
Contributor

@RichieSams RichieSams Sep 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be p.config.Interval / numPartitions instead of hardcoded to 1 second

i := 0
go func() {
for {
select {
case <-p.ctx.Done():
exportTicker.Stop()
return
case <-exportTicker.C:
p.exportMetrics()
p.exportMetrics(i)
i = (i + 1) % p.numPartitions
}
}
}()
Expand Down Expand Up @@ -109,16 +122,16 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro
return false
}

mClone, metricID := p.getOrCloneMetric(rm, sm, m)
aggregateDataPoints(m.Summary().DataPoints(), mClone.Summary().DataPoints(), metricID, p.summaryLookup)
mClone, metricID, partition := p.getOrCloneMetric(rm, sm, m)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to partition on metricID or streamID? (cc @sh0rez)

The current code partitions on metricID. Which is much simpler code-wise. But I do worry that it won't be enough. IE, if all the data are the same metrics, but only differentiated by datapoint labels. They'll all be partitioned into the same bucket, and thus defeat the purpose.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

friendly ping @sh0rez :)

aggregateDataPoints(m.Summary().DataPoints(), mClone.Summary().DataPoints(), metricID, p.partitions[partition].summaryLookup)
return true
case pmetric.MetricTypeGauge:
if p.config.PassThrough.Gauge {
return false
}

mClone, metricID := p.getOrCloneMetric(rm, sm, m)
aggregateDataPoints(m.Gauge().DataPoints(), mClone.Gauge().DataPoints(), metricID, p.numberLookup)
mClone, metricID, partition := p.getOrCloneMetric(rm, sm, m)
aggregateDataPoints(m.Gauge().DataPoints(), mClone.Gauge().DataPoints(), metricID, p.partitions[partition].numberLookup)
return true
case pmetric.MetricTypeSum:
// Check if we care about this value
Expand All @@ -132,10 +145,10 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro
return false
}

mClone, metricID := p.getOrCloneMetric(rm, sm, m)
mClone, metricID, partition := p.getOrCloneMetric(rm, sm, m)
cloneSum := mClone.Sum()

aggregateDataPoints(sum.DataPoints(), cloneSum.DataPoints(), metricID, p.numberLookup)
aggregateDataPoints(sum.DataPoints(), cloneSum.DataPoints(), metricID, p.partitions[partition].numberLookup)
return true
case pmetric.MetricTypeHistogram:
histogram := m.Histogram()
Expand All @@ -144,10 +157,10 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro
return false
}

mClone, metricID := p.getOrCloneMetric(rm, sm, m)
mClone, metricID, partition := p.getOrCloneMetric(rm, sm, m)
cloneHistogram := mClone.Histogram()

aggregateDataPoints(histogram.DataPoints(), cloneHistogram.DataPoints(), metricID, p.histogramLookup)
aggregateDataPoints(histogram.DataPoints(), cloneHistogram.DataPoints(), metricID, p.partitions[partition].histogramLookup)
return true
case pmetric.MetricTypeExponentialHistogram:
expHistogram := m.ExponentialHistogram()
Expand All @@ -156,10 +169,10 @@ func (p *Processor) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro
return false
}

mClone, metricID := p.getOrCloneMetric(rm, sm, m)
mClone, metricID, partition := p.getOrCloneMetric(rm, sm, m)
cloneExpHistogram := mClone.ExponentialHistogram()

aggregateDataPoints(expHistogram.DataPoints(), cloneExpHistogram.DataPoints(), metricID, p.expHistogramLookup)
aggregateDataPoints(expHistogram.DataPoints(), cloneExpHistogram.DataPoints(), metricID, p.partitions[partition].expHistogramLookup)
return true
default:
errs = errors.Join(fmt.Errorf("invalid MetricType %d", m.Type()))
Expand Down Expand Up @@ -201,24 +214,24 @@ func aggregateDataPoints[DPS metrics.DataPointSlice[DP], DP metrics.DataPoint[DP
}
}

func (p *Processor) exportMetrics() {
func (p *Processor) exportMetrics(partition int) {
md := func() pmetric.Metrics {
p.stateLock.Lock()
defer p.stateLock.Unlock()

// ConsumeMetrics() has prepared our own pmetric.Metrics instance ready for us to use
// Take it and clear replace it with a new empty one
out := p.md
p.md = pmetric.NewMetrics()
out := p.partitions[partition].md
p.partitions[partition].md = pmetric.NewMetrics()

// Clear all the lookup references
clear(p.rmLookup)
clear(p.smLookup)
clear(p.mLookup)
clear(p.numberLookup)
clear(p.histogramLookup)
clear(p.expHistogramLookup)
clear(p.summaryLookup)
clear(p.partitions[partition].rmLookup)
clear(p.partitions[partition].smLookup)
clear(p.partitions[partition].mLookup)
clear(p.partitions[partition].numberLookup)
clear(p.partitions[partition].histogramLookup)
clear(p.partitions[partition].expHistogramLookup)
clear(p.partitions[partition].summaryLookup)

return out
}()
Expand All @@ -228,64 +241,74 @@ func (p *Processor) exportMetrics() {
}
}

func (p *Processor) getOrCloneMetric(rm pmetric.ResourceMetrics, sm pmetric.ScopeMetrics, m pmetric.Metric) (pmetric.Metric, identity.Metric) {
func (p *Processor) getOrCloneMetric(rm pmetric.ResourceMetrics, sm pmetric.ScopeMetrics, m pmetric.Metric) (pmetric.Metric, identity.Metric, uint64) {
// Find the ResourceMetrics
resID := identity.OfResource(rm.Resource())
rmClone, ok := p.rmLookup[resID]
if !ok {
// We need to clone it *without* the ScopeMetricsSlice data
rmClone = p.md.ResourceMetrics().AppendEmpty()
rm.Resource().CopyTo(rmClone.Resource())
rmClone.SetSchemaUrl(rm.SchemaUrl())
p.rmLookup[resID] = rmClone
}

// Find the ScopeMetrics
scopeID := identity.OfScope(resID, sm.Scope())
smClone, ok := p.smLookup[scopeID]
if !ok {
// We need to clone it *without* the MetricSlice data
smClone = rmClone.ScopeMetrics().AppendEmpty()
sm.Scope().CopyTo(smClone.Scope())
smClone.SetSchemaUrl(sm.SchemaUrl())
p.smLookup[scopeID] = smClone
}

// Find the Metric
metricID := identity.OfMetric(scopeID, m)
mClone, ok := p.mLookup[metricID]
if !ok {
// We need to clone it *without* the datapoint data
mClone = smClone.Metrics().AppendEmpty()
mClone.SetName(m.Name())
mClone.SetDescription(m.Description())
mClone.SetUnit(m.Unit())

switch m.Type() {
case pmetric.MetricTypeGauge:
mClone.SetEmptyGauge()
case pmetric.MetricTypeSummary:
mClone.SetEmptySummary()
case pmetric.MetricTypeSum:
src := m.Sum()

dest := mClone.SetEmptySum()
dest.SetAggregationTemporality(src.AggregationTemporality())
dest.SetIsMonotonic(src.IsMonotonic())
case pmetric.MetricTypeHistogram:
src := m.Histogram()

dest := mClone.SetEmptyHistogram()
dest.SetAggregationTemporality(src.AggregationTemporality())
case pmetric.MetricTypeExponentialHistogram:
src := m.ExponentialHistogram()

dest := mClone.SetEmptyExponentialHistogram()
dest.SetAggregationTemporality(src.AggregationTemporality())
var mClone pmetric.Metric
var ok bool
for i, partition := range p.partitions {
mClone, ok = partition.mLookup[metricID]
if ok {
// int -> uint64 theoretically can lead to overflow.
// The only way we get an overflow here is if the interval is greater than 18446744073709551615 seconds.
// That's 584942417355 years. I think we're safe.
return mClone, metricID, uint64(i) //nolint
}
}

p.mLookup[metricID] = mClone
var rmClone pmetric.ResourceMetrics
var smClone pmetric.ScopeMetrics
// Getting here means the metric isn't stored in any partition, so we need to create it.

// int -> uint64 theoretically can lead to overflow.
// The only way we get an overflow here is if the interval is greater than 18446744073709551615 seconds.
// That's 584942417355 years. I think we're safe.
partition := metricID.Hash().Sum64() % uint64(p.numPartitions) //nolint

// We need to clone resourceMetrics *without* the ScopeMetricsSlice data
rmClone = p.partitions[partition].md.ResourceMetrics().AppendEmpty()
rm.Resource().CopyTo(rmClone.Resource())
rmClone.SetSchemaUrl(rm.SchemaUrl())

// We need to clone scopeMetrics *without* the Metric
smClone = rmClone.ScopeMetrics().AppendEmpty()
sm.Scope().CopyTo(smClone.Scope())
smClone.SetSchemaUrl(sm.SchemaUrl())

// We need to clone it *without* the datapoint data
mClone = smClone.Metrics().AppendEmpty()
mClone.SetName(m.Name())
mClone.SetDescription(m.Description())
mClone.SetUnit(m.Unit())

switch m.Type() {
case pmetric.MetricTypeGauge:
mClone.SetEmptyGauge()
case pmetric.MetricTypeSummary:
mClone.SetEmptySummary()
case pmetric.MetricTypeSum:
src := m.Sum()

dest := mClone.SetEmptySum()
dest.SetAggregationTemporality(src.AggregationTemporality())
dest.SetIsMonotonic(src.IsMonotonic())
case pmetric.MetricTypeHistogram:
src := m.Histogram()

dest := mClone.SetEmptyHistogram()
dest.SetAggregationTemporality(src.AggregationTemporality())
case pmetric.MetricTypeExponentialHistogram:
src := m.ExponentialHistogram()

dest := mClone.SetEmptyExponentialHistogram()
dest.SetAggregationTemporality(src.AggregationTemporality())
}

return mClone, metricID
p.partitions[partition].rmLookup[resID] = rmClone
p.partitions[partition].smLookup[scopeID] = smClone
p.partitions[partition].mLookup[metricID] = mClone

return mClone, metricID, partition
}
Loading
Loading