Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Processor/logratelimitprocessor #36592

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/otelcontribcol/builder-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ processors:
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/intervalprocessor v0.114.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor v0.114.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/logdedupprocessor v0.114.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/logratelimitprocessor v0.114.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricsgenerationprocessor v0.114.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/metricstransformprocessor v0.114.0
- gomod: github.com/open-telemetry/opentelemetry-collector-contrib/processor/probabilisticsamplerprocessor v0.114.0
Expand Down Expand Up @@ -270,6 +271,7 @@ replaces:
- github.com/open-telemetry/opentelemetry-collector-contrib/receiver/receivercreator => ../../receiver/receivercreator
- github.com/open-telemetry/opentelemetry-collector-contrib/processor/k8sattributesprocessor => ../../processor/k8sattributesprocessor
- github.com/open-telemetry/opentelemetry-collector-contrib/processor/logdedupprocessor => ../../processor/logdedupprocessor
- github.com/open-telemetry/opentelemetry-collector-contrib/processor/logratelimitprocessor => ../../processor/logratelimitprocessor
- github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awsemfexporter => ../../exporter/awsemfexporter
- github.com/open-telemetry/opentelemetry-collector-contrib/receiver/opencensusreceiver => ../../receiver/opencensusreceiver
- github.com/open-telemetry/opentelemetry-collector-contrib/receiver/splunkhecreceiver => ../../receiver/splunkhecreceiver
Expand Down
1 change: 1 addition & 0 deletions processor/logratelimitprocessor/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
include ../../Makefile.Common
71 changes: 71 additions & 0 deletions processor/logratelimitprocessor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# Log Rate-Limiter Processor

<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [alpha]: logs |
| Distributions | [contrib], [k8s] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aprocessor%2Flogratelimit%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aprocessor%2Flogratelimit) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aprocessor%2Flogratelimit%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aprocessor%2Flogratelimit) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@anuraj381](https://www.github.com/anuraj381) |

[alpha]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/docs/component-stability.md#alpha
[contrib]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-contrib
[k8s]: https://github.com/open-telemetry/opentelemetry-collector-releases/tree/main/distributions/otelcol-k8s
<!-- end autogenerated section -->

## Details
The logratelimit processor is useful in the situation where you are collecting logs from multiple services on every otel-collector pod and there
are one or many services which are logging too many logs such that it increases noise in the entire pipeline and storage (otel backend). So to control
the log noise/surge from any service/namespace/pod etc. you can use the rate-limit processor, you can give the fields on which you want to limit logs
and add config for allowed rate and interval.<br>
The processor caches the count of logs in the given interval for each combination of given rate_limit_fields and once logs count starts to exceed the count
the processor will start dropping the logs till the interval finish in the best effort way. There are no mutex/locks involved, only one atomic counter is used
to keep rate-limiter lightweight / easy on resources.

## Configuration
| Field | Type | Default | Description |
|-------------------|----------|-------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| conditions | []string | `[]` | A slice of [OTTL] expressions used to evaluate which logs records to be rate-limited. See [OTTL Boolean Expressions] for more details. |
| allowed_rate | int | 30000 | Allowed rate of logs per logs combination of rate_limit_fields in configured `interval` |
| interval | duration | `60s` | The interval in which rate-limit is applied after `allowed_rate`. |
| rate_limit_fields | []string | `[]` | rate-limit is applied for each cobination of values of these fields |

[OTTL]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/v0.109.0/pkg/ottl#readme
[OTTL Boolean Expressions]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/LANGUAGE.md#boolean-expressions

## Important Note
Before using this processor you should have a rough idea on how many logs lines are coming from any combination of your configured rate_limit_fields values
on the opentelemetry-collector pods as you need to add a config for allowed_rate, this might be difficult to calculate in case of a daemonset deployment of
the collector as how many pods of any service will be there on a Kubernetes node is not fixed (generally) so you might get high logs a combination of rate_limit_fields
on one collector pod and maybe low number on other, this will disturb your calculation of allowed_rate, you might false drop logs. Ideally if you have a daemonset deployment
strategy for collector then next to that in pipeline maybe add one more deployment (not daemonset, but k8s deployment) of collector and there you can have fair idea of
allowed_rate and configure accordingly.

### Example Config
The following config is an example configuration for the logratelimit processor. It is configured with an allowed_rate of 30000 in an interval of `60 seconds` for each combination of mentioned rate_limit_fields array.
```yaml
receivers:
filelog:
include: [./example/*.log]

processors:
logratelimit:
conditions:
- 'attributes["log_level"] == "error"'
- 'resource.attributes["k8s.namespace.name"] == "my-k8s-ns-name"'
allowed_rate: 30000
interval: 60s
rate_limit_fields:
- attributes.service\.name
- resource.k8s\.container\.name

exporters:
kafka:

service:
pipelines:
logs:
receivers: [filelog]
processors: [logratelimit]
exporters: [kafka]
```
90 changes: 90 additions & 0 deletions processor/logratelimitprocessor/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package logratelimitprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/logratelimitprocessor"

import (
"errors"
"fmt"
"strings"
"time"

"go.opentelemetry.io/collector/component"
)

// Config defaults
const (
// defaultAllowedLogRate is default allowed logs per configured interval
defaultAllowedLogRate = 30000

// defaultInterval is default time interval for applying rate-limit after allowed logs
defaultInterval = 60 * time.Second

// bodyField is the name of the body field
bodyField = "body"

// attributeField is the name of the attribute field
attributeField = "attributes"

// resourceField is the name of the resource field
resourceField = "resource"
)

// Config errors
var (
errInvalidAllowedRate = errors.New("allowed_rate must be greater than 0")
errInvalidInterval = errors.New("interval must be greater than 0")
)

// Config is the config of the processor
type Config struct {
Conditions []string `mapstructure:"conditions"`
AllowedRate uint64 `mapstructure:"allowed_rate"`
Interval time.Duration `mapstructure:"interval"`
RateLimitFields []string `mapstructure:"rate_limit_fields"`
}

// createDefaultConfig returns the default config for the processor.
func createDefaultConfig() component.Config {
return &Config{
Conditions: []string{},
AllowedRate: defaultAllowedLogRate,
Interval: defaultInterval,
RateLimitFields: []string{},
}
}

// Validate validates the configuration
func (c Config) Validate() error {
if c.Interval <= 0 {
return errInvalidInterval
}

if c.AllowedRate < 0 {
return errInvalidAllowedRate
}

return c.validateRateLimitFields()
}

// validateRateLimitFields validates the rate_limit_fields
func (c Config) validateRateLimitFields() error {
knownExcludeFields := make(map[string]struct{})

for _, field := range c.RateLimitFields {
// Split and ensure the field starts with `body` or `attributes` or resource
parts := strings.Split(field, fieldDelimiter)
if parts[0] != bodyField && parts[0] != attributeField && parts[0] != resourceField {
return fmt.Errorf("an ratelimit field must start with %s or %s or %s", bodyField, attributeField, resourceField)
}

// If a field is valid make sure we haven't already seen it
if _, ok := knownExcludeFields[field]; ok {
return fmt.Errorf("duplicate rate_limit_field %s", field)
}

knownExcludeFields[field] = struct{}{}
}

return nil
}
8 changes: 8 additions & 0 deletions processor/logratelimitprocessor/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

//go:generate mdatagen metadata.yaml

// Package logratelimitprocessor implements a processor for rate-limiting logs based
// on a set of configurable log fields
package logratelimitprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/logratelimitprocessor"
15 changes: 15 additions & 0 deletions processor/logratelimitprocessor/documentation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[comment]: <> (Code generated by mdatagen. DO NOT EDIT.)

# logratelimit

## Internal Telemetry

The following telemetry is emitted by this component.

### otelcol_ratelimit_processor_dropped_logs

Number of log records that were dropped per rate_limit_fields fields cardinality

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| {records} | Sum | Int | true |
56 changes: 56 additions & 0 deletions processor/logratelimitprocessor/factory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package logratelimitprocessor // import "github.com/open-telemetry/opentelemetry-collector-contrib/processor/logratelimitprocessor"

import (
"context"
"fmt"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/processor"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/filter/filterottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl"
"github.com/open-telemetry/opentelemetry-collector-contrib/processor/logratelimitprocessor/internal/metadata"
)

// NewFactory creates a new factory for the processor.
func NewFactory() processor.Factory {
return processor.NewFactory(
metadata.Type,
createDefaultConfig,
processor.WithLogs(createLogsProcessor, metadata.LogsStability),
)
}

// createLogsProcessor creates a log processor.
func createLogsProcessor(_ context.Context, settings processor.Settings, cfg component.Config, consumer consumer.Logs) (processor.Logs, error) {
processorCfg, ok := cfg.(*Config)
if !ok {
return nil, fmt.Errorf("invalid config type: %+v", cfg)
}

processor, err := newProcessor(processorCfg, consumer, settings)
if err != nil {
return nil, fmt.Errorf("error creating processor: %w", err)
}

if len(processorCfg.Conditions) == 0 {
processor.conditions = nil
} else {
conditions, err := filterottl.NewBoolExprForLog(
processorCfg.Conditions,
filterottl.StandardLogFuncs(),
ottl.PropagateError,
settings.TelemetrySettings,
)
if err != nil {
return nil, fmt.Errorf("invalid condition: %w", err)
}
processor.conditions = conditions
}

return processor, nil
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading